Home

Awesome

Reactive Streams Utilities

This is an exploration of what a utilities library for Reactive Streams in the JDK might look like.

Glossary:

A short glossary for the sake of avoiding confusion and establish a shared vocabulary, for purpose of this proposal we define:

Goals:

Non goals:

Approach

This proposal proposes an API based on the builder pattern, where operators such as map and filter are applied to builders, and the final result (eg, a Publisher, Subscriber, Processor or complete graph) is built by invoking a build() method.

Such an API allows for flexible implementation - it means that not each stage of the graph needs to implement Reactive Streams itself, instead, stages can be fused together in a straight forward way, and other aspects like context, monitoring and tracing that require out of band (from what Reactive Streams provides) transfer of state and signals can be implemented by the engine that builds the streams from the graph.

So if we take a use case - let's say we are consuming the Twitter streaming API, which emits a stream of new line separated JSON structures. To consume this stream, the JDK9 HTTP client requires application developers to supply a Susbscriber<ByteBuffer> to consume response bodies, we can build this subscriber like so:

Subscriber<ByteBuffer> subscriber = 
  ReactiveStreams.<ByteBuffer>builder()
    // Assume parseLines is a utility that converts a
    // stream of arbitrarily chunked ByteBuffers to
    // ByteBuffers that represent one line
    .flatMapIterable(parseLines)
    // Assume parseJson is a function that takes
    // a ByteBuffer and returns a parsed JSON object
    .map(parseJson)
    // Asume saveToDatabase is a function that saves
    // the object to a database and returns a
    // CompletionStage of the result of the operation
    .flatMapCompletionStage(saveToDatabase)
    // And run by ignoring each element, since we've
    // handled them above already.
    .forEach(e -> {})
    .build().getSubscriber();

We now have a Subscriber<ByteBuffer> that we can wrap in a JDK9 HTTP client BodyProcessor, and pass that to the send() method.

To elaborate on the above API a little.

So, in all, we have four different types of builders:

Here's an example of using CompletionBuilder:

CompletionStage<MyObject> result = 
  ReactiveStreams.fromPublisher(somePublisher)
    .collect(Collectors.toList())
    .build();

In this case, we have taken a Publisher provided by some other API, at that stage we have a PublisherBuilder, and then using the collect method, we collect it into a list, this returns a CompletionBuilder, which we then build to run the stream and give us the result. The type that collect accepts is a java.util.stream.Collector, so this API is compatible with all the synchronous collectors already supplied out of the box in the JDK.

Implementation

Underneath, this API builds a graph of stages that describe the processing. When build is invoked, this graph is passed to a ReactiveStreamsEngine to build the Publisher, Subscriber, Processor or CompletionStage as necessary. During this phase, the underlying engine implementation can do any processing on the graph it wants - it can fuse stages together, it can wrap callbacks in context supplying wrappers, etc. The build method is overloaded, one takse an explicit ReactiveStreamsEngine, the other looks up the ReactiveStreamsEngine using the JDK ServiceLoader mechanism.

An engine based on Akka Streams is already implemented, as is an engine based on RxJava. No work has been done on a zero dependency RI for the JDK, though there have been a few experimental efforts that could be used as a starter for this.

TCK

A TCK has been implemented - at this stage it is very incomplete, but what it does demonstrate is how the Reactive Streams TCK provided by http://www.reactive-streams.org can be utilised to validate that the Reactive Streams interfaces built by this API are conforming Reactive Streams implementations.

Next steps

The following work needs to be done: