Home

Awesome

Event Sourcing with Kafka and ksqlDB

<a name="0b79795d3efc95b9976c7c5b933afce2"></a>Introduction

Kafka is not for event sourcing, isn't it?

Kafka alone is not an event store, but Kafka and ksqlDB together allow building full-featured event stores.

This repository provides a sample of event sourced system that uses Kafka and ksqlDB as event store.

Kafka Logo ksqlDB Logo

See also

<a name="8753dff3c2879207fa06ef1844b1ea4d"></a>Example Domain

This sample uses heavily simplified ride hailing domain model inspired by tech/uklon experience.

Domain use case diagram

Domain state diagram

<a name="19025f75ca30ec4a46f55a6b9afdeba6"></a>Event Sourcing and CQRS 101

<a name="436b314e78fec59a76bad8b93b52ee75"></a>State-Oriented Persistence

State-oriented persistence

<a name="c4b3d1c8edab1825366ac1d541d8226f"></a>Event Sourcing

Event sourcing persists the state of an entity as a sequence of immutable state-changing events.

Event sourcing

Whenever the state of an entity changes, a new event is appended to the list of events.

Event sourcing

Current state of an entity can be restored by replaying all its events.

Event sourcing is best suited for short-living entities with relatively small total number of event (like orders).

Restoring the state of the short-living entity by replaying all its events doesn't have any performance impact. Thus, no optimizations for restoring state are required for short-living entities.

For endlessly stored entities (like users or bank accounts) with thousands of events restoring state by replaying all events is not optimal and snapshotting should be considered.

Snapshotting is an optimization technique where a snapshot of the aggregate's state is also saved, so an application can restore the current state of an aggregate from the snapshot instead of from scratch.

Snapshotting in event souring

An entity in event sourcing is also referenced as an aggregate.

A sequence of events for the same aggregate are also referenced as a stream.

<a name="b2cf9293622451d86574d2973398ca70"></a>CQRS

CQRS (Command-query responsibility segregation) stands for segregating the responsibility between commands (write requests) and queries (read requests). The write requests and the read requests are processed by different handlers.

A command generates zero or more events or results in an error.

CQRS

CQRS is a self-sufficient architectural pattern and doesn't require event sourcing.

Event sourcing is usually used in conjunction with CQRS. Event store is used as a write database and SQL or NoSQL database as a read database.

CQRS

Events in event sourcing are a part of a bounded context and should not be used "as-is" for integration with other bounded contexts. Integration events representing the current state of an aggregate should be used for communication between bounded contexts instead of a raw event sourcing change events.

<a name="cc00871be6276415cfb13eb24e97fe48"></a>Advantages of CQRS

<a name="845b7e034fb763fcdf57e9467c0a8707"></a>Advantages of Event Sourcing

<a name="70b356f41293ace9df0d04cd8175ac35"></a>Requirements for Event Store

<a name="9f6302143996033ebb94d536b860acc3"></a>Solution Architecture

<a name="7a5c6b7581644459b2452045e4b3584d"></a>Kafka Topic Architecture

Kafka topic architecture

<a name="942a78423f55c5226a507463acf7be49"></a>ksqlDB Streams vs Tables

A stream is a partitioned, immutable, append-only collection that represents a series of historical facts.

A table is a mutable, partitioned collection that models change over time and represents what is true as of "now".

ksqlDB stream vs table example

Both steams and tables are backed by a Kafka topic.

The current state of a table is stored locally and ephemerally on a specific server by using RocksDB. The series of changes that are applied to a table is stored durably in a Kafka topic.

ksqlDB streams and tables duality

<a name="41e46dfc4197bfe5135c9953cd4eb8b7"></a>ksqlDb Event Store

<a name="b55137335b62df2ead97b0ae0f1d7722"></a>Naive Solution (Not Working!)

  1. Commands are not persisted.
  2. ORDER_EVENTS stream for storing events with AGGREGATE_ID as a key.
  3. ORDER_AGGREGATES table aggregates events by AGGREGATE_ID (a key).
  4. Command handler reads by ID aggregate events from ORDER_AGGREGATES table and appends new events to ORDER_EVENTS stream.
CREATE STREAM ORDER_EVENTS (
  AGGREGATE_ID STRING KEY,
  TYPE STRING,
  JSON_DATA STRING
) WITH (
  KAFKA_TOPIC='order-events',
  PARTITIONS=10,
  REPLICAS=1,
  VALUE_FORMAT='JSON'
);

CREATE TABLE ORDER_AGGREGATES WITH (
  KAFKA_TOPIC='order-aggregates',
  PARTITIONS=10,
  REPLICAS=1,
  VALUE_FORMAT='JSON'
) AS SELECT
  AGGREGATE_ID,
  COLLECT_LIST(TYPE) AS TYPE_LIST,
  COLLECT_LIST(JSON_DATA) AS JSON_DATA_LIST
FROM ORDER_COMMANDS_AND_EVENTS
GROUP BY AGGREGATE_ID
EMIT CHANGES;

ksqlDB persistent query that does the stream aggregation into a table runs as a background process.

There is no guarantee that the command handler will read the latest version of the aggregate.

ksqlDB event store not working solution

<a name="c60b741f731650358e295c4598f5b9fd"></a>Working Solution

ksqlDB event store high-level

  1. Commands and events are persistent in the same stream ORDER_COMMANDS_AND_EVENTS with AGGREGATE_ID as a key to preserve ordering.
  2. ORDER_AGGREGATES table aggregates commands and events (COLLECT_LIST) by AGGREGATE_ID (a key).
  3. Command handler consumes commands from order-aggregates topic where changes in ORDER_AGGREGATES are published. Thus, processing command B that follows command A, it will be possible to read all previous events generated by command A.
  4. Event handler consumes events form order-aggregates topic, updates the read database and sends integration events.
CREATE STREAM IF NOT EXISTS ORDER_COMMANDS_AND_EVENTS (
  AGGREGATE_ID STRING KEY,
  IS_COMMAND BOOLEAN,
  TYPE STRING,
  JSON_DATA STRING
) WITH (
  KAFKA_TOPIC='order-commands-and-events',
  PARTITIONS=10,
  REPLICAS=1,
  VALUE_FORMAT='JSON'
);

CREATE TABLE IF NOT EXISTS ORDER_AGGREGATES WITH (
  KAFKA_TOPIC='order-aggregates',
  PARTITIONS=10,
  REPLICAS=1,
  VALUE_FORMAT='JSON'
) AS SELECT
  AGGREGATE_ID,
  COLLECT_LIST(IS_COMMAND) AS IS_COMMAND_LIST,
  COLLECT_LIST(TYPE) AS TYPE_LIST,
  COLLECT_LIST(JSON_DATA) AS JSON_DATA_LIST
FROM ORDER_COMMANDS_AND_EVENTS
GROUP BY AGGREGATE_ID
EMIT CHANGES;

All parts together look like this

ksqlDB event store

<a name="205928bf89c3012be2e11d1e5e7ad01f"></a>Permanent Storage

Events can be stored in Kafka topics permanently.

To set retention to "forever" set properties in kafka.properties

log.retention.ms=-1
log.retention.bytes=-1

or environment variables for confluentinc/cp-kafka image

KAFKA_LOG_RETENTION_MS: -1
KAFKA_LOG_RETENTION_BYTES: -1

For storing huge volumes of data in Kafka use Tired Storage.

<a name="6eec4db0e612f3a70dab7d96c463e8f6"></a>Optimistic concurrency control

Command must generate one or more events in order to implement optimistic concurrency control.

Commands and events for the same aggregate are processed sequentially because:

Thus, optimistic concurrency control based on version check can be implemented:

  1. Actual aggregate version must match the expected version specified in a command.
  2. The last entry from the stream must be a command, and the last but one must be an event. If both entries (the last and the last but one) are commands, it's a concurrent modification.
class OrderCommandHandler {

  // ...
  private boolean checkVersionMatches(
      Command latestCommand, List<Command> unprocessedCommands, Order order) {
    if (order.getBaseVersion() != latestCommand.getExpectedVersion()) {
      // Actual version doesn't match expected version
      return false;
    }
    if (unprocessedCommands.size() > 1) {
      // Concurrent modification
      return false;
    }
    return true;
  }
}

Also, set a Kafka producer property max.in.flight.requests.per.connection=1 to make sure that while a batch of messages is retrying, additional messages will not be sent.

It is possible that the broker will fail to write the first batch of messages, succeed to write the second, and then retry the first batch and succeed, thereby reversing the order.

<a name="323effe18de24bcc666f161931c903f3"></a>Loading current state

There is no need to do a remote call to fetch all events for an aggregate.

ORDER_AGGREGATES table records and messages in its changelog topic order-aggregates contain all aggregate events and commands (COLLECT_LIST(JSON_DATA)).

When a command, or an event is consumed from order-aggregates topic, the message already contains all aggregate events.

However, it is also possible to fetch all events for an aggregate with ksqlDB SQL query

SELECT * FROM ORDER_AGGREGATES WHERE AGGREGATE_ID='${orderId}' AND IS_COMMAND=false;
<a name="784ff5dca3b046266edf61637822bbff"></a>Subscribe to all events by aggregate type

ORDER_AGGREGATES table and in its changelog topic order-aggregates contain all aggregates of the same type.

Consumers of order-aggregates topic receive all commands and events related to the same aggregate type.

<a name="0b584912c4fa746206884e080303ed49"></a>Checkpoints

Consumer commits offset of the last message after processing it. Consumer will continue consuming messages from where it left off in the offset after a restart.

<a name="0cfc0523189294ac086e11c8e286ba2d"></a>Drawbacks
  1. Commands have to be persisted. It's easy to flood the system with invalid commands that will take a lot of space in the storage.
  2. Commands are processed asynchronously, so after submitting a command a result of processing can't be received synchronously. API should be designed with asynchrony in mind.
  3. Errors have to be modelled as events due to asynchronous command processing. A command resulting in error produces the ErrorEvent and increments the aggregate version.
  4. A command must generate one or more events (and never zero events). Otherwise, optimistic concurrency check implementation will work incorrectly.
  5. Adding event sourcing snapshotting is possible but will complicate the solution even more.

<a name="56a081f86b7ec6a7d523c7e6d186f1a3"></a>Why ksqlDB?

It is possible to build event sourced systems with Kafka Streams, but I find ksqlDB simpler and more convenient solution for event sourcing because Kafka Streams has a few limitations:

ksqlDB is actually uses Kafka Streams internally to provide higher level of abstraction. Thus, ksqlDB has the following advantages:

<a name="53af957fc9dc9f7083531a00fe3f364e"></a>How to Run the Sample?

  1. Download & installOpenJDK 11 (LTS) at AdoptOpenJDK.

  2. Download and install Docker and Docker Compose.

  3. Build Java project and Docker image

    ./gradlew clean build jibDockerBuild -i
    
  4. Run Kafka, ksqlDB and event-sourcing-app

    docker-compose up -d --scale event-sourcing-app=2
    # wait a few minutes
    
  5. Follow the logs of the application

    docker-compose logs -f event-sourcing-app
    
  6. Install curl and jq.

  7. Run test.sh script and see the output.

The test.sh script has the following instructions:

  1. Place new order.
    ORDER_ID=$(curl -s -X POST http://localhost:8080/orders/ -d '{"riderId":"63770803-38f4-4594-aec2-4c74918f7165","price":"123.45","route":[{"address":"Київ, вулиця Полярна, 17А","lat":50.51980052414157,"lon":30.467197278948536},{"address":"Київ, вулиця Новокостянтинівська, 18В","lat":50.48509161169076,"lon":30.485170724431292}]}' -H 'Content-Type: application/json' | jq -r .orderId)
    sleep 1s
    
  2. Get the placed order.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "827e3a63-d252-415f-af60-94c5a36bfcd6",
      "version": 1,
      "status": "PLACED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "placedDate": "2021-04-23T15:26:22.543938Z",
      "errors": []
    }
    
  3. Accept the order. Try to cancel the order concurrently to simulate a write-write conflict.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"ACCEPTED","driverId":"2c068a1a-9263-433f-a70b-067d51b98378","version":1}' -H 'Content-Type: application/json'
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":1}' -H 'Content-Type: application/json'
    sleep 1s
    
  4. Get the accepted order with optimistic concurrency control error.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "827e3a63-d252-415f-af60-94c5a36bfcd6",
      "version": 2,
      "status": "ACCEPTED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-23T15:26:22.543938Z",
      "acceptedDate": "2021-04-23T15:26:23.542921Z",
      "errors": [
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Concurrent modification"
        }
      ]
    }
    
  5. Try to cancel an outdated version of the order to simulate lost update.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":1}' -H 'Content-Type: application/json'
    sleep 1s
    
  6. Get the accepted order with optimistic concurrency control error.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "827e3a63-d252-415f-af60-94c5a36bfcd6",
      "version": 2,
      "status": "ACCEPTED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-23T15:26:22.543938Z",
      "acceptedDate": "2021-04-23T15:26:23.542921Z",
      "errors": [
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Concurrent modification"
        },
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Actual version 2 doesn't match expected version 1"
        }
      ]
    }
    
  7. Try to cancel a version of the order 'from the future' to simulate unordering.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":3}' -H 'Content-Type: application/json'
    sleep 1s
    
  8. Get the accepted order with optimistic concurrency control error.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "827e3a63-d252-415f-af60-94c5a36bfcd6",
      "version": 2,
      "status": "ACCEPTED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-23T15:26:22.543938Z",
      "acceptedDate": "2021-04-23T15:26:23.542921Z",
      "errors": [
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Concurrent modification"
        },
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Actual version 2 doesn't match expected version 1"
        },
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 3,
          "message": "Actual version 2 doesn't match expected version 3"
        }
      ]
    }
    
  9. Complete the order.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"COMPLETED","version":2}' -H 'Content-Type: application/json'
    sleep 1s
    
  10. Get the completed order.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "827e3a63-d252-415f-af60-94c5a36bfcd6",
      "version": 3,
      "status": "COMPLETED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-23T15:26:22.543938Z",
      "acceptedDate": "2021-04-23T15:26:23.542921Z",
      "completedDate": "2021-04-23T15:26:26.791512Z",
      "errors": [
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Concurrent modification"
        },
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Actual version 2 doesn't match expected version 1"
        },
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 3,
          "message": "Actual version 2 doesn't match expected version 3"
        }
      ]
    }
    
  11. Try to cancel a completed order to simulate business rule violation.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","version":3}' -H 'Content-Type: application/json'
    sleep 1s
    
  12. Get the completed order with business rule validation error.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "827e3a63-d252-415f-af60-94c5a36bfcd6",
      "version": 3,
      "status": "COMPLETED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-23T15:26:22.543938Z",
      "acceptedDate": "2021-04-23T15:26:23.542921Z",
      "completedDate": "2021-04-23T15:26:26.791512Z",
      "errors": [
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Concurrent modification"
        },
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 1,
          "message": "Actual version 2 doesn't match expected version 1"
        },
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 3,
          "message": "Actual version 2 doesn't match expected version 3"
        },
        {
          "command": "CancelOrderCommand",
          "expectedVersion": 3,
          "message": "Order in status COMPLETED can't be cancelled"
        }
      ]
    }
    
  13. Print integration events.
    docker exec -it kafka /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic order-integration-events --from-beginning --property print.key=true --timeout-ms 3000
    
    827e3a63-d252-415f-af60-94c5a36bfcd6	{"order_id":"827e3a63-d252-415f-af60-94c5a36bfcd6","event_type":"OrderPlacedEvent","event_timestamp":1619191582543,"version":1,"status":"PLACED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}]}
    827e3a63-d252-415f-af60-94c5a36bfcd6	{"order_id":"827e3a63-d252-415f-af60-94c5a36bfcd6","event_type":"OrderAcceptedEvent","event_timestamp":1619191583542,"version":2,"status":"ACCEPTED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}],"driver_id":"2c068a1a-9263-433f-a70b-067d51b98378"}
    827e3a63-d252-415f-af60-94c5a36bfcd6	{"order_id":"827e3a63-d252-415f-af60-94c5a36bfcd6","event_type":"OrderCompletedEvent","event_timestamp":1619191586791,"version":3,"status":"COMPLETED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}],"driver_id":"2c068a1a-9263-433f-a70b-067d51b98378"}