PostgreSQL is the world's most advanced open source database. Also, PostgreSQL is suitable for Event Sourcing. This repository provides a sample of event sourced system that uses PostgreSQL as event store.

Overview

Event Sourcing with PostgreSQL

Introduction

PostgreSQL is the world's most advanced open source database. Also, PostgreSQL is suitable for Event Sourcing.

This repository provides a sample of event sourced system that uses PostgreSQL as event store.

PostgreSQL Logo

See also

Example Domain

This sample uses heavily simplified ride hailing domain model inspired by tech/uklon experience.

  • A rider can place an order for a ride along a route specifying a price.
  • A driver can accept and complete an order.
  • An order can be cancelled before completion.

Domain use case diagram

Domain state diagram

Event Sourcing and CQRS 101

State-Oriented Persistence

State-oriented persistence

Event Sourcing

Event sourcing persists the state of an entity as a sequence of immutable state-changing events.

Event sourcing

Whenever the state of an entity changes, a new event is appended to the list of events.

Event sourcing

Current state of an entity can be restored by replaying all its events.

Event sourcing is best suited for short-living entities with relatively small total number of event (like orders).

Restoring the state of the short-living entity by replaying all its events doesn't have any performance impact. Thus, no optimizations for restoring state are required for short-living entities.

For endlessly stored entities (like users or bank accounts) with thousands of events restoring state by replaying all events is not optimal and snapshotting should be considered.

Snapshotting is an optimization technique where a snapshot of the aggregate's state is also saved, so an application can restore the current state of an aggregate from the snapshot instead of from scratch.

Snapshotting in event souring

An entity in event sourcing is also referenced as an aggregate.

A sequence of events for the same aggregate are also referenced as a stream.

CQRS

CQRS (Command-query responsibility segregation) stands for segregating the responsibility between commands (write requests) and queries (read requests). The write requests and the read requests are processed by different handlers.

A command generates zero or more events or results in an error.

CQRS

CQRS is a self-sufficient architectural pattern and doesn't require event sourcing.

Event sourcing is usually used in conjunction with CQRS. Event store is used as a write database and SQL or NoSQL database as a read database.

CQRS

Events in event sourcing are a part of a bounded context and should not be used "as-is" for integration with other bounded contexts. Integration events representing the current state of an aggregate should be used for communication between bounded contexts instead of a raw event sourcing change events.

Advantages of CQRS

  • Independent scaling of the read and write databases.
  • Optimized data schema for the read database (e.g. the read databases can be denormalized).
  • Simpler queries (e.g. complex JOIN operations can be avoided).

Advantages of Event Sourcing

  • Having a true history of the system (audit and traceability).
  • Ability to put the system in any prior state (e.g. for debugging).
  • Read-side projections can be created as needed (later) from events. It allows responding to future needs and new requirements.

Requirements for Event Store

  • Permanent storage. Store events forever.
  • Optimistic concurrency control. Prevent lost update anomaly (write-write conflicts).
  • Loading current state. Loading all previous events for the particular aggregate ID from an event store.
  • Subscribe to all events by aggregate type. Instead of subscribing to a single event stream that represents an aggregate.
  • Checkpoints. Store the event offset (a position in the stream) after handling it. Subscribe from the last known position instead of the stream start after the application restart.

Solution Architecture

PostgreSQL can be used as an event store. It will natively support appending events, concurrency control and reading events. Subscribing on events requires additional implementation.

PostgreSQL event store ER diagram

Separate table ORDER_AGGREGATE keeps track of the latest versions of the aggregates. It is required for optimistic concurrency control.

PostgreSQL doesn't allow subscribing on changes, so the solution is Transactional outbox pattern. A service that uses a database inserts events into an outbox table as part of the local transaction. A separate Message Relay process publishes the events inserted into database to a message broker.

Transactional outbox pattern

With event sourcing database model classical Transaction outbox pattern can be simplified even more. An outbox table is used to keep track of handled events. Outbox handler (aka Message Relay and Polling Publisher) processes new events by polling the database's outbox table.

Simplified transactional outbox pattern

Event processing includes updating the read model and publishing integration events.

All parts together look like this

PostgreSQL event store

Permanent Storage

PostgreSQL stores all data permanently be default.

Optimistic concurrency control

Optimistic concurrency control is done by checking aggregate versions in the ORDER_AGGREGATE table.

Appending an event operation consists of 2 SQL statements in a single transaction:

  1. Check the actual and expected version match and increment version
    UPDATE ORDER_AGGREGATE SET VERSION = VERSION + 1 WHERE ID = ? AND VERSION = ?
  2. Insert new event
    INSERT INTO ORDER_EVENT(AGGREGATE_ID, VERSION, EVENT_TYPE, JSON_DATA) VALUES(?, ?, ?, ?)

Loading current state

Current state of an aggregate can be loaded using simple query that fetches all aggregate events order by version in the ascending order

SELECT ID, EVENT_TYPE, JSON_DATA FROM ORDER_EVENT WHERE AGGREGATE_ID = ? ORDER BY VERSION ASC

Subscribe to all events by aggregate type

PostgreSQL doesn't allow subscribing on changes, so the solution is Transactional outbox pattern or its variations.

ORDER_EVENT_OUTBOX table keeps track of all subscribers (consumer groups) and the last processed event ID.

The concept of consumer groups is required to deliver events to only one consumer from the group. This is achieved by acquiring a locking on the same record of ORDER_EVENT_OUTBOX table.

Outbox handler polls ORDER_EVENT_OUTBOX table every second for new events and processes them

  1. Read the last processed event ID and acquire lock
    SELECT LAST_ID FROM ORDER_EVENT_OUTBOX WHERE SUBSCRIPTION_GROUP = ? FOR UPDATE NOWAIT
  2. Fetch new events
    SELECT ID, EVENT_TYPE, JSON_DATA FROM ORDER_EVENT WHERE ID > ? ORDER BY ID ASC
  3. Update the ID of the last event processed by the subscription
    UPDATE ORDER_EVENT_OUTBOX SET LAST_ID = ? WHERE SUBSCRIPTION_GROUP = ?

Checkpoints

The last known position from where the subscription starts getting events is stored in LAST_ID column of ORDER_EVENT_OUTBOX table.

Drawbacks

  1. The asynchronous replication leads to the eventual consistency between the write and read models. But polling database's outbox table for new messages with a fixed delay introduces pretty big full consistency lag (greater than or equal to the fixed delay between polls).
  2. The Outbox handler might process an event more than once. It might crash after processing an event but before recording the fact that it has done so. When it restarts, it will then process the same event again (update the read model and send an integration event).

Integration events are delivered with at-least-once delivery guarantee. The exactly-once delivery guarantee is hard to achieve due to a dual-write. A dual-write describes a situation when you need to atomically update the database and publish messages and two-phase commit (2PC) is not an option.

Consumers of integration events should be idempotent and filter duplicates and unordered events.

How to Run the Sample?

  1. Download & installOpenJDK 11 (LTS) at AdoptOpenJDK.

  2. Download and install Docker and Docker Compose.

  3. Build Java project and Docker image

    ./gradlew clean build jibDockerBuild -i
  4. Run Kafka, ksqlDB and event-sourcing-app

    docker-compose up -d --scale event-sourcing-app=2
    # wait a few minutes
  5. Follow the logs of the application

    docker-compose logs -f event-sourcing-app
  6. Install curl and jq.

  7. Run test.sh script and see the output.

The test.sh script has the following instructions:

  1. Place new order.
    ORDER_ID=$(curl -s -X POST http://localhost:8080/orders/ -d '{"riderId":"63770803-38f4-4594-aec2-4c74918f7165","price":"123.45","route":[{"address":"Київ, вулиця Полярна, 17А","lat":50.51980052414157,"lon":30.467197278948536},{"address":"Київ, вулиця Новокостянтинівська, 18В","lat":50.48509161169076,"lon":30.485170724431292}]}' -H 'Content-Type: application/json' | jq -r .orderId)
    sleep 2s
  2. Get the placed order.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    {
      "id": "466aafd1-288c-4299-be26-3be0c9c5aef1",
      "version": 1,
      "status": "PLACED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "placedDate": "2021-04-25T16:51:52.680374Z"
    }
  3. Accept the order.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"ACCEPTED","driverId":"2c068a1a-9263-433f-a70b-067d51b98378","version":1}' -H 'Content-Type: application/json'
    sleep 2s
  4. Get the accepted order.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    {
      "id": "466aafd1-288c-4299-be26-3be0c9c5aef1",
      "version": 2,
      "status": "ACCEPTED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-25T16:51:52.680374Z",
      "acceptedDate": "2021-04-25T16:51:55.114824Z"
    }
  5. Try to cancel an outdated version of the order to simulate lost update.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":1}' -H 'Content-Type: application/json' | jq
    {
      "error": "Actual revision 1 doesn't match expected revision 0"
    }
  6. Try to cancel a version of the order 'from the future' to simulate unordering.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":3}' -H 'Content-Type: application/json' | jq
    {
      "error": "Actual revision 1 doesn't match expected revision 2"
    }
  7. Complete the order.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"COMPLETED","version":2}' -H 'Content-Type: application/json'
    sleep 2s
  8. Get the completed order.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    {
      "id": "466aafd1-288c-4299-be26-3be0c9c5aef1",
      "version": 3,
      "status": "COMPLETED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-25T16:51:52.680374Z",
      "acceptedDate": "2021-04-25T16:51:55.114824Z",
      "completedDate": "2021-04-25T16:51:57.314153Z"
    }
  9. Try to cancel a completed order to simulate business rule violation.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":3}' -H 'Content-Type: application/json' | jq
    {
      "error": "Order in status COMPLETED can't be cancelled"
    }
  10. Print integration events.
    docker exec -it kafka /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic order-integration-events --from-beginning --property print.key=true --timeout-ms 3000
    466aafd1-288c-4299-be26-3be0c9c5aef1	{"order_id":"466aafd1-288c-4299-be26-3be0c9c5aef1","event_type":"OrderPlacedEvent","event_timestamp":1619369512680,"version":1,"status":"PLACED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}]}
    466aafd1-288c-4299-be26-3be0c9c5aef1	{"order_id":"466aafd1-288c-4299-be26-3be0c9c5aef1","event_type":"OrderAcceptedEvent","event_timestamp":1619369515114,"version":2,"status":"ACCEPTED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}],"driver_id":"2c068a1a-9263-433f-a70b-067d51b98378"}
    466aafd1-288c-4299-be26-3be0c9c5aef1	{"order_id":"466aafd1-288c-4299-be26-3be0c9c5aef1","event_type":"OrderCompletedEvent","event_timestamp":1619369517314,"version":3,"status":"COMPLETED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}],"driver_id":"2c068a1a-9263-433f-a70b-067d51b98378"}
    
You might also like...

This is a clone of Mircosoft Paint that uses Java and its javax.swing library

This is a clone of Mircosoft Paint that uses Java and its javax.swing library

PaintClone This is a clone of Mircosoft Paint that uses Java and its javax.swing library You are able to select a RBG colors and creates a pallet of t

Feb 17, 2022

The open Transportation Control System

openTCS Homepage: https://www.opentcs.org/ Changelog: changelog.adoc openTCS (short for open Transportation Control System) is a free platform for con

Jan 8, 2023

HUAWEI 3D Modeling Kit project contains a sample app. Guided by this demo, you will be able to implement full 3D Modeling Kit capabilities, including 3D object reconstruction and material generation.

HUAWEI 3D Modeling Kit Sample English | 中文 Introduction This project includes apps developed based on HUAWEI 3D Modeling Kit. The project directory is

Jan 1, 2023

An open source application to make your own android applications without coding!

An open source application to make your own android applications without coding!

Stif An Open source project for building Android Application at a go both with and without coding. This project was inspired from Scratch and Sketchwa

Aug 28, 2021

This is an open source visualization for the C4 model for visualising software architecture.

c4viz: C4 Visualization This is an open source visualization for the C4 model for visualising software architecture. It expects input in the form of a

Dec 6, 2022

WavesFX an open-source Waves wallet for Windows, macOS and Linux

WavesFX an open-source Waves wallet for Windows, macOS and Linux

WavesFX WavesFX is an open-source Waves wallet for Windows, macOS and Linux. Telegram Chat Releases can be found on the release list. How to build Wav

Apr 15, 2022

Provides a Java API to use the JavaScript library d3.js with the JavaFx WebView

Provides a Java API to use the JavaScript library d3.js with the JavaFx WebView

javafx-d3 Provides a Java API for using the JavaScript library d3.js with JavaFx Applications. Many thanks to the authors of the projects gwt-d3 [1] a

Dec 19, 2022

JSilhouette provides additional shapes for Java applications

JSilhouette JSilhouette provides additional shapes for Java applications. Currently JavaFX is supported. Installing You can get the latest version of

Nov 7, 2022

Lib-Tile is a multi Maven project written in JavaFX and NetBeans IDE 8 and provides the functionalities to use and handle easily Tiles in your JavaFX application.

Lib-Tile is a multi Maven project written in JavaFX and NetBeans IDE 8 and provides the functionalities to use and handle easily Tiles in your JavaFX application.

Lib-Tile Intention Lib-Tile is a multi Maven project written in JavaFX and NetBeans IDE and provides the functionalities to use and handle easily Tile

Apr 13, 2022
Comments
  • Postgres does not support subscribing on changes?

    Postgres does not support subscribing on changes?

    How about:

    https://www.postgresql.org/docs/current/sql-notify.html

    ?

    When NOTIFY is used to signal the occurrence of changes to a particular table, a useful programming technique is to put the NOTIFY in a statement trigger that is triggered by table updates. In this way, notification happens automatically when the table is changed, and the application programmer cannot accidentally forget to do it.

    opened by graforlock 0
  • Does this work with two writers?

    Does this work with two writers?

    Let's say I have two instances of the service. Both services are creating events. When using postgres, it's possible for there to be gaps in the bigserial id used for the event table.

    For example, it's possible for there to be visible id's 0, 1, 3, 4, 5 while the other service is still in the process of getting an event committed with id 2.

    opened by hjohn 8
Owner
Evgeniy Khyst
Director of Engineering, Solutions Architect designing robust solutions, building engineering teams, and establishing engineering culture and operations.
Evgeniy Khyst
Lightweight installer written in java, made for minecraft mods, The installer uses JPanel and uses a URL to install to the specific area (Discord URL's work the best i've seen)

InstallerForJava Lightweight installer written in java, made for minecraft mods, The installer uses JPanel and uses a URL to install to the specific a

null 18 Dec 9, 2022
MaterialFX is an open source Java library which provides material design components for JavaFX

MaterialFX MaterialFX is an open source Java library which provides material design components for JavaFX Explore the wiki » Download Latest Demo · Re

Alessadro Parisi 744 Jan 3, 2023
The Most Powerful Swipe Layout!

Android Swipe Layout ![Gitter](https://badges.gitter.im/Join Chat.svg) This is the brother of AndroidViewHover. One year ago, I started to make an app

代码家 12.3k Dec 28, 2022
Aether - An advanced sync plugin for Minecraft.

Aether Aether is an advanced sync plugin for Minecraft. Aether uses MongoDB for storing player information. Found a issue(s)? Report them in our issue

 grape 6 Sep 6, 2022
An advanced KeyListener for Java Swing UI.

keystrokelistener An advanced KeyListener for Java Swing UI. In Swing, We don't have any default way of mapping a set of KeyStrokes to a specific task

omega ui 1 Jan 22, 2022
An advanced book explorer/catalog application written in Java and Kotlin.

Boomega An advanced book explorer/catalog application written in Java and Kotlin. ✨ Features Cross-platform Dark/Light theme, modern UI Multiple UI la

Daniel Gyoerffy 54 Nov 10, 2022
InstallRepos - Install GitHub Repository on any Operating System (Linux, MacOS or Windows).

Install Repos Install GitHub Repository on any Operating System (Linux, MacOS or Windows). Requires Java JRE 17.0.0.1 or later. Repository Includes: s

Tushar Chaurasia 2 Apr 21, 2022
Event-driven trigger + recording system for FFXIV

Triggevent Fully event driven trigger + overlay system for FFXIV. Makes triggers easier to develop and test. Allows triggers to have custom configurat

null 63 Dec 28, 2022
FOSSLight source code repository

[Kor] FOSSLight FOSSLight is an integrated system that can efficiently process the open source compliance process. Features Compliance Workflow It can

FOSSLight 116 Dec 7, 2022
A desktop application designed to serve the co-curricular uses of students, clubs and forums, and admins of United International University.

ECA Management System Made by "Team Apocalypse": S M Jishanul Islam Sadia Ahmmed Sahid Hossain Mustakim Description A desktop application designed to

S M Jishanul Islam 2 Jan 31, 2022