Home

Awesome

JSON Event Sourcing

Introduction

The package published here is a library that implements event-driven state management for the JSON format, which is used in so many tools. It requires Apache Kafka for messaging and MongoDB for storing state.

An aggregate is a microservice with an interface that is composed of only Kafka topics. It receives the commands on a command topic. It emits events on two event topics, one which describes only the changes and another which also has the previous and the new state of the aggregate instance. The new state is also published separately on an aggregate topic. Finally, there is a reply topic to which either the new aggregate state is sent or the original command, annotated with errors.

The reply topic is meant for providing asynchronous feedback to the client. Clients can react to the messages on the reply topic, for example through Server-Sent Events. This combines the client and the server in one reactive loop, where data travels in only one direction.

Inside the aggregate there is a reducer, which is provided by the application developer. A reducer is a function that receives the current state of an aggregate instance and a command. Its task is to verify the command and to calculate the new state of the aggregate instance. The result is either the new state or the annotated command in case something was wrong with it.

For any particular aggregate instance all commands are processed sequentially. The commands will be consumed in the order they arrived.

Reducer.svg

Aggregates

An aggregate is a JSON document, which can have any structure plus the following technical fields:

FieldDescriptionMandatory
_corrThe correlation identifier that was used by the last command. It is usually a UUID.Yes
_deletedThis boolean marks the aggregate instance as deleted. This is a logical delete.No
_idThe identifier of the aggregate instance. It is usually a UUID.Yes
_jwtThe decoded JSON Web Token that was used by the last command.No
_seqA sequence number. This is the sequence number of the last event.Yes
_typeThe aggregate type, which is composed as <application>-<name>.Yes

Commands

A command is a JSON document, which has the following technical fields on top of whatever you put in it:

FieldDescriptionMandatory
_commandThe name of the command.Yes
_corrA correlation identifier. It is propagated throughout the flow. This is usually a UUID.Yes
_errorThis boolean indicates there is a problem with the command. It is set by a reducer.No
_idThe identifier of the aggregate instance. It is usually a UUID.Yes
_jwtThe decoded JSON Web Token.No
_languagesAn array of language tags in the order of preference. When a validator or some other component wishes to send messages to the user, it can use the proper language for it.No
_seqA sequence number. If this field is present it should have the same value as that field in the aggregate instance. Otherwise the command is ignored.No
_typeThe aggregate type, which is composed as <application>-<name>.Yes

There are three built-in commands called put, patch and delete. The put command replaces the entire contents of the aggregate instance. The patch command has the array field _ops, which is a JSON patch that is applied to the instance. The delete command performs a logical delete. It sets the field _deleted to true. The reducers for these commands can be replaced.

Events

An event is a JSON document. It is generated by the library, which compares the old state of the aggregate instance with the new state produced by the reducer. An event has the following technical fields:

FieldDescription
_afterAn optional field that carries the new state of the aggregate instance.
_beforeAn optional field that carries the previous state of the aggregate instance.
_commandThe name of the command that caused the event to be created.
_corrThe correlation identifier that was used by the last command. It is usually a UUID.
_idThe identifier of the aggregate instance. It is usually a UUID.
_jwtThe decoded JSON Web Token that was used by the last command.
_opsAn array of operations as described in RFC 6902. It describes how an aggregate instance has changed after the reduction of a command.
_seqA sequence number. There should not be holes in the sequence. This would indicate corruption of the event log.
_timestampThe timestamp in epoch millis.
_typeThe aggregate type, which is composed as <application>-<name>.

The Anatomy of an Application

Let's discuss the example in the code snippet below. An aggregate is implemented with Kafka Streams. You always create a streams topology first and then start it. That is why on line 77 a StreamsBuilder is created. That builder is passed to the Aggregate class, because it's build method also creates a structure of streams. It is called on line 51.

On line 73 the default configuration is loaded from conf/application.conf. Configuration will be discussed in separate section below.

The environment on line 40 is used to make a distinction between, for example, production, testing or development environments. That is because resources for those environments may be created in the same cluster. The aggregate appends the environment to all Kafka topic names and MongoDB collections.

On lines 49 and 50 two reducers are registered. They take a command and the current state of the aggregate instance and produce the new state. The signature of a reducer is always the following:

CompletionStage<JsonObject> reducer(JsonObject command, JsonObject currentState);

This uses the Java JSON API. A reducer returns a CompletionStage, because it may access other resources and this should happen in a non-blocking way. If your reducer doesn't need external resources you can just wrap the result with the completedFuture method.

Instead of registering one reducer per command you can also register one reducer for everything and dispatch the commands internally.

Line 52 says that all log messages should go to a log Kafka topic. The messages are in the Elastic Common Schema format. For aggregates all commands and events are logged. Commands that are in error are logged as well.

Finally, the topology is built on line 77 and started on line 79. It will run with "at least once" Kafka semantics. The default serialiser for keys is the string serialiser. For values it uses the JSON serialiser. As a consequence, all streams are declared as KStream<String, JsonObject>.

This example is available in the repository pincette-plusminus.

     1	package net.pincette.plusminus;
      	
     2	import static com.mongodb.reactivestreams.client.MongoClients.create;
     3	import static java.lang.System.exit;
     4	import static java.util.concurrent.CompletableFuture.completedFuture;
     5	import static java.util.logging.Level.parse;
     6	import static java.util.logging.Logger.getLogger;
     7	import static javax.json.Json.createObjectBuilder;
     8	import static net.pincette.jes.elastic.Logging.logKafka;
     9	import static net.pincette.jes.util.Configuration.loadDefault;
    10	import static net.pincette.jes.util.Streams.start;
    11	import static net.pincette.util.Util.tryToDoWithRethrow;
    12	import static net.pincette.util.Util.tryToGetSilent;
      	
    13	import com.mongodb.reactivestreams.client.MongoClient;
    14	import com.typesafe.config.Config;
    15	import java.util.concurrent.CompletionStage;
    16	import java.util.function.IntUnaryOperator;
    17	import java.util.logging.Level;
    18	import javax.json.JsonObject;
    19	import net.pincette.jes.Aggregate;
    20	import net.pincette.jes.util.Streams;
    21	import org.apache.kafka.streams.StreamsBuilder;
    22	import org.apache.kafka.streams.Topology;
      	
    23	public class Application {
    24	  private static final String AGGREGATE_TYPE = "counter";
    25	  private static final String APP = "plusminus";
    26	  private static final String DEV = "dev";
    27	  private static final String ENVIRONMENT = "environment";
    28	  private static final String INFO = "INFO";
    29	  private static final String KAFKA = "kafka";
    30	  private static final String LOG_LEVEL = "logLevel";
    31	  private static final String LOG_TOPIC = "logTopic";
    32	  private static final String MINUS = "minus";
    33	  private static final String MONGODB_DATABASE = "mongodb.database";
    34	  private static final String MONGODB_URI = "mongodb.uri";
    35	  private static final String PLUS = "plus";
    36	  private static final String VALUE = "value";
    37	  private static final String VERSION = "1.0";
      	
    38	  static StreamsBuilder createApp(
    39	      final StreamsBuilder builder, final Config config, final MongoClient mongoClient) {
    40	    final String environment = getEnvironment(config);
    41	    final Level logLevel = getLogLevel(config);
    42	    final Aggregate aggregate =
    43	        new Aggregate()
    44	            .withApp(APP)
    45	            .withType(AGGREGATE_TYPE)
    46	            .withEnvironment(environment)
    47	            .withBuilder(builder)
    48	            .withMongoDatabase(mongoClient.getDatabase(config.getString(MONGODB_DATABASE)))
    49	            .withReducer(PLUS, (command, currentState) -> reduce(currentState, v -> v + 1))
    50	            .withReducer(MINUS, (command, currentState) -> reduce(currentState, v -> v - 1));
      	
    51	    aggregate.build();
    52	    tryToGetSilent(() -> config.getString(LOG_TOPIC))
    53	        .ifPresent(topic -> logKafka(aggregate, logLevel, VERSION, topic));
      	
    54	    return builder;
    55	  }
      	
    56	  static String getEnvironment(final Config config) {
    57	    return tryToGetSilent(() -> config.getString(ENVIRONMENT)).orElse(DEV);
    58	  }
      	
    59	  private static Level getLogLevel(final Config config) {
    60	    return parse(tryToGetSilent(() -> config.getString(LOG_LEVEL)).orElse(INFO));
    61	  }
      	
    62	  static MongoClient getMongoClient(final Config config) {
    63	    return create(config.getString(MONGODB_URI));
    64	  }
      	
    65	  private static CompletionStage<JsonObject> reduce(
    66	      final JsonObject currentState, final IntUnaryOperator op) {
    67	    return completedFuture(
    68	        createObjectBuilder(currentState)
    69	            .add(VALUE, op.applyAsInt(currentState.getInt(VALUE, 0)))
    70	            .build());
    71	  }
      	
    72	  public static void main(final String[] args) {
    73	    final Config config = loadDefault();
      	
    74	    tryToDoWithRethrow(
    75	        () -> getMongoClient(config),
    76	        client -> {
    77	          final Topology topology = createApp(new StreamsBuilder(), config, client).build();
      	
    78	          getLogger(APP).log(Level.INFO, "Topology:\n\n {0}", topology.describe());
      	
    79	          if (!start(topology, Streams.fromConfig(config, KAFKA))) {
    80	            exit(1);
    81	          }
    82	        });
    83	  }
    84	}

Uniqueness

Sometimes it is important that aggregate instances are unique according to some business criterion and not only the aggregate identifier. This can be achieved by giving the aggregate object a MongoDB expression that generates unique values from the commands. Commands with a different aggregate identifier but the same unique business value will map to the same aggregate instance. The aggregate identifier of the instance will be the aggregate identifier of the first command with the unique value.

Whether this is useful or not depends on the use-case. When you have a stream of objects that come from another system and where the desired uniqueness has no meaning then this feature makes it very easy to consolidate that stream correctly in aggregate instances. However, when duplicates could be created accidentally, this feature would promote the overwriting of data from different users. In that case it is better to add a unique index to the MongoDB aggregate collection.

When you use this feature the Kafka topic with the purpose "unique" should exist. You should also make sure all commands have the fields that constitute the unique expression.

Unique expressions involving only one scalar field are very simple. The expression is then "$<field-name>". Sometimes, however, uniqueness can depend on several fields. You could write an expression that turns this into some scalar value, but that wouldn't be efficient. You could not make use of a MongoDB index. Therefore, a unique expression is also allowed to be a JSON object. You would write the expression like this:

{
  "field1": "$field1",
  "field2": "$field2"  
}

As with scalar values the field references are used to extract the values from the command. The result is a plain equality expression for MongoDB.

The Kafka Topics

The external interface at runtime is a set op Kafka topics. Their names always have the form <app>-<type>-<purpose>-<environment>. The following topics are expected to exist (the names are the purpose):

NameDescription
aggregateOn this topic the current state of the aggregate is emitted.
eventOn this topic the events are emitted, which contain the changes between two subsequent aggregate versions.
commandThrough this topic commands are received. It is the only input of the system.
event-fullThe events are also emitted on this topic, but here they have two extra fields. The_before field contains the previous state of the aggregate instance, while _after contains the current one. This is for consumers that want to do other kinds of analysis than the plain difference.
replyOn this topic either the new aggregate state or the failed command is emitted. The topic is meant to be routed back to the end-user, for example through Server-Sent Events.
uniqueThis topic is required when a "unique" MongoDB expression is given to the aggregate object. Commands will be re-keyed on this topic using the key values generated by the expression.

In the example of the previous section the reply topic would be called plusminus-counter-reply-dev.

The number of topic partitions should be the same for all topics. This is the upper limit for the parallelism you can achieve for one aggregate. It is best to provide more partitions than the level of parallelism you want to start with. This allows you to scale out without having to extend the number of partitions, for which down time would be needed.

Testing

An application like this is merely a set of reducers, which are JSON-in-JSON-out functions. This makes it quite easy to test. You only have to publish a sequence of commands on command topics, capture the replies from the reply topics and compare them with a set of expected results. The repository pincette-jes-test proposes this scheme. This is a generic test application that also uses Kafka Streams. In your project repository it expects to find the test/commands directory, containing the command files, and the test/replies directory with the expected result files. The actual results are compared with those.

Configuration

The configuration can come from anywhere. The above example uses the Lightbend Config package. By default it loads the file conf/application.conf. If you also have the file dev.conf in that directory, for example, then you can load it as follows:

> java -Dconfig.resource=dev.conf -jar app.jar

A configuration would look like this:

environment = "dev"
logLevel = "INFO"

kafka {
  application.id = "plusminus"
  bootstrap.servers = "localhost:9092"
  num.stream.threads = 1
  replication.factor = 3
}

mongodb {
  uri = "mongodb://localhost:27017"
  database = "es"
}

The property kafka.num.stream.threads is for Kafka Streams. This number, multiplied by the number of running microservice instances, should not be higher than the number of partitions in the topics. Otherwise there will be idle threads.

The property kafka.replication.factor is also used by Kafka Streams for the internal topics it creates.

The HTTP API

Until now we have microservices that are only reachable through Kafka. In order to use them from the outside we need an HTTP-interface. It is not necessary to create one for each and every microservice. You can have one generic HTTP-server, which can publish commands on Kafka topics. The repository pincette-jes-http is an example of such a server. It's a light weight process you can spin up with a port.

Access Control

Next to role-based and function-based access control it is sometimes also necessary to manage access to individual aggregate instances. A user may, for example, be allowed to issue a certain command to a certain type of aggregate, but not necessarily to all instances of that type.

The Aggregate class checks if the field _acl is present in the current state of an aggregate instance. Without the field the command is allowed. Otherwise it should contain a subobject, where each field is the name of a command. The value is an array of role names, which is matched with the field /_jwt/roles in the command. If the intersection is not empty then the command is good to go. If there is no field in _acl that corresponds to the command then the fallback field write is tried. If that is also not available then the command is allowed.

There are two exceptions to these rules. The user system is always allowed and if the aggregate has the "breaking the glass" option turned on, the boolean field /_jwt/breakingTheGlass is checked.

Logging

All log messages are sent to Kafka. For aggregates there is the pre-baked function logKafka. This will convert commands and events to Elastic Common Schema messages and route them to a Kafka topic for logging. You can call it like this:

logKafka(myAggregate, INFO, "1.0", "log-dev");

Other kinds of logging can be integrated in your Kafka Streams statements. The class ElasticCommonSchema makes it easy to generate ECS messages. You can add additional Streams statements by splitting an original stream chain in two. The first part goes into a local constant. You then connect the second part to the constant and you also connect the log stream to it. This yields a kind of tee structure.

If you use Elasticsearch for your logs then you have two options. You can use Kafka Connect, but then you should add a Kafka Streams statement that copies the JSON messages to another topic as strings. This is because we use a binary serialiser for JsonObject objects.

The other option is to run the pincette-jes-indexer microservice, which takes care of everything.

Server-Sent Events

This protocol is interesting to extend this event driven architecture to the client. The reply topic is meant for that. It contains commands that failed validation and new aggregate instance versions when commands have an effect. Those messages can be forwarded to the client through SSE.

The pincette-jes-fanout microservice implements this with the fanout.io service. You have to configure the topics you want to forward.

Serialisation

All messages are serialised with the JSON Event Sourcing serialiser. It first encodes a JsonObject in CBOR. Then it is compressed in GZIP format (see also RFC 1951 and RFC 1952). The deserialiser falls back to JSON in string format.

The command line tool pincette-jes-prodcon makes it easy to consume and produce messages using this serialisation.

Reacting to Change

Often one type of aggregate wants to react to changes that are published by another one. Possibly the former will not do this directly. There may be an intermediate microservice that makes such connections. In any case an event has to be converted to a command, perhaps based on some criteria. The function changed is handy in this case. You can write things like this:

final KStream<String, JsonObject> stream = builder.stream("plusminus-counter-event-dev");

stream
    .filter((k, v) -> changed(v, "/value"))
    .mapValues(v -> createCommand(event))
    .to("myapp-mytype-command-dev");

In general, however, the command has to be sent to several aggregate instances based on some criterion. So an event will be mapped to potentially more than one command, one for each destination aggregate instance. Moreover, if the query to obtain the destination results in a non-blocking asynchronous stream of many items then things can become complicated.

The Kafka Streams API provides the flatMap function where instead of mapping a value to another value you map it to a list of values, in the form of an Iterable. You can do the query synchronously and accumulate the result in a list, but this is not very efficient. The complete result would be held in memory and it would add more blocking I/O to the blocking Kafka Streams API.

The goal of the Reactor class is to make such a scenario simpler. You give it a source and destination aggregate type, a function to obtain the destination instances and a transformation function that transforms an event into a command. It will listen to the event-full topic of the source aggregate. For every event it calls your transformation function and pairs the result with every destination instance. The pairings produce complete commands that are sent to the command topic of the destination aggregate.

The destination function should return a Publisher<JsonObject>. You can use the functions aggregationPublisher and findPublisher for this. The JSON objects in the result should at least have the _id field. The transformation function should produce the command, with at least the field _command. The fields _id, _type, _corr and _timestamp will be set by the reactor. The correlation ID will be the same as the one on the incoming event.

Optionally you can provide a filter function that selects the events of interest. However, if the transformation function returns null or a JSON object without the _command field the result will be ignored, which is why the filter function is optional.

The following is an example in the context of the plusminus aggregate shown above. Whenever the value of a counter goes from 9 to 10 the other counters will receive the plus command. So the source and destination aggregate types are the same in this case.

  private static StreamsBuilder createReactor(
      final StreamsBuilder builder, final Config config, final MongoClient mongoClient) {
    final String environment = getEnvironment(config);

    return new Reactor()
        .withBuilder(builder)
        .withEnvironment(environment)
        .withSourceType("plusminus-counter")
        .withDestinationType("plusminus-counter")
        .withDestinations(
            event ->
                getOthers(
                    event.getString(ID),
                    mongoClient.getDatabase(config.getString(MONGODB_DATABASE)),
                    environment))
        .withEventToCommand(Application::createCommand)
        .withFilter(event -> changed(event, "/value", createValue(9), createValue(10)))
        .build();
  }

  private static CompletionStage<JsonObject> createCommand(final JsonObject event) {
    return completedFuture(createObjectBuilder().add(COMMAND, "plus").build());
  }

  static String getEnvironment(final Config config) {
    return tryToGetSilent(() -> config.getString(ENVIRONMENT)).orElse(DEV);
  }

  private static Publisher<JsonObject> getOthers(
      final String id, final MongoDatabase database, final String environment) {
    return aggregationPublisher(
        database.getCollection("plusminus-counter-" + environment),
        list(match(addNotDeleted(ne(ID, id))), project(include(ID))));
  }

Validation

You may want to protect the integrity of your aggregates through validation of the commands that are sent to it. This can be done by splitting a reducer in two. Both parts have the same signature, i.e. they receive the command and the current state of the aggregate instance and produce the resulting JSON. The first part does the validation. It always returns the command, either with error annotations or not. Only in the former case the second part is called. So the reply topic will get either a failed command or the new version of the aggregate instance. This scenario can be set up with the compose function.

It is possible to do the validation in a declarative way with the validator function. It uses the Mongo Validator, which allows you to write validation conditions in the MongoDB query language. The following is an example.

final net.pincette.mongo.Validator specs = new net.pincette.mongo.Validator();
final net.pincette.jes.Aggregate =
  new net.pincette.jes.Aggregate()
    .withReducer(
      "MyCommand",
     compose(
       net.pincette.jes.util.Validation.validator(
         "resource:/validators/my_command.json",
         specs),
       Application::myCommandReducer
     ))
    ...     

Using JSLT

It is not necessary to always write your reducers in Java. The JSLT language is an easy alternative to do all kinds of JSON transformations. In order to add it easily to an aggregate you can use the reducer function and combine it with a transformer. It would look like this:

final net.pincette.jes.Aggregate =
  new net.pincette.jes.Aggregate()
    .withReducer(
      "MyCommand",
      net.pincette.jes.Aggregate.reducer(
        net.pincette.json.Jslt.transformer("/reducers/my_command.jslt")))
    ...      

The transformer is given a JSON object with the fields command and state. It should produce the new state. The following is a reducer for a command that only updates myfield. Everything else in the aggregate instance is kept as it is. The pipe operator moves the context to the state field.

let command = (.command) // Save the command field.

.state | {
  "myfield": $command.myfield,
  *: .  
}

Troubleshooting

When you have a number of connected microservices it may not always be easy to find where something goes wrong. Since everything is connected to Kafka things are very observable. All you have to do is tap some topics with pincette-jes-cli. This will tell you where an expected message didn't arrive or where a wrong one was produced.

Another handy tool is the correlation ID. Every command has one and they are propagated to the events and snapshots. Commands and events can be logged in Elastic Common Schema format. The correlation ID is always written in the field trace.id. Searching on that field will give you a trail of what has happened.

Sometimes you will need a debugger, but hopefully not on production. For debugging one microservice you don't need to have the whole environment running on your machine. Say you have an issue with some microservice on the test cluster. You can shut down that service in the cluster and launch it in the debugger on your machine, with exactly the same Kafka configuration. This makes sure all topic partitions are served by your instance.

API Documentation

See javadoc.

MongoDB

You need at least MongoDB 4.2 to work with this library.

The Artifact

groupId: net.pincette

artifactId: pincette-jes

The Maven Archetype

You can generate a new project with the following command:

mvn archetype:generate -B \
                       -DarchetypeGroupId=net.pincette \
                       -DarchetypeArtifactId=pincette-jes-archetype \
                       -DarchetypeVersion=2.0.3 \
                       -DgroupId=net.pincette \
                       -DartifactId=myapp \
                       -Dversion=1.0-SNAPSHOT

This will produce the above-mentioned demonstration project. You can build it with mvn clean package. This will also run the tests, so make sure Kafka and MongoDB are running and create the Kafka topics first with the create_topics.sh script.