Reactive Streams Utilities - Future standard utilities library for Reactive Streams.

Overview

Reactive Streams Utilities

This is an exploration of what a utilities library for Reactive Streams in the JDK might look like.

Glossary:

A short glossary for the sake of avoiding confusion and establish a shared vocabulary, for purpose of this proposal we define:

  • Reactive Streams - for purpose of this proposal understood as the execution semantics defined by the http://www.reactive-streams.org specification.
  • juc.Flow - the interfaces nested in the java.util.concurrent.Flow class, that is, Publisher, Subscriber, Processor and Subscription. These are a one to one mapping of the interfaces provided by the Reactive Streams specification.

Goals:

  • To fill a gap in the JDK where if a developer wants to do even the simplest of things with a JDK9 j.u.c.Flow, such as map or filter, they need to bring in a third party library that implements that.
    • 💡 Rationale: This matters to JDK end-users as well as implementers of various JDK components (such as WebSocket, HTTP Client, ADBC and other APIs that may want to consider exposing reactive streams interfaces)
  • To produce an API that can build Publishers, Subscribers, Processors, and complete graphs.
    • 💡 Rationale: With the goal of being useful for building or consuming APIs that use reactive streams (for example, JDK9 Http Client and future APIs making use of reactive streams).
  • To produce an API that aligns closely with j.u.stream.Stream, using it for inspiration for naming, scope, general API shape, and other aspects, however providing the alternative execution semantics as defined by j.u.c.Flow.
    • 💡 Rationale: Ensure familiarity of Java developers with the new API, as well as fit the JDK's established style of operators, limiting the number of concepts Java developers need to understand to do the different types of streaming offered by the JDK.
  • To produce a Service Provider Interface (SPI) that can be implemented by multiple providers (including a Reference Implementation in the JDK itself), using the ServiceLoader mechanism to provide and load a default implementation (while allowing custom implementations to be manually provided).
    • 💡 Rationale: There are a lot of concerns that each different streams implementation provides and implements beyond streaming, for example monitoring/tracing, concurrency modelling, buffering strategies, performance aspects of the streams handling including fusing, and context (e.g. thread local) propagation. This will allow libraries to use and provide contracts based on this API without depending on a particular implementation, and allows developers to select the implementation that meets their needs.

Non goals:

  • To produce a "rich" set of operations for working with Reactive Streams (a.k.a. "kitchen sink"), the here defined operations should be the minimal useful set of operations.
    • 💡 Rationale: There already exist a number of Reactive Streams implementations that seek to meet this goal (eg, Akka Streams, Reactor, RxJava), and once you go past the basics (map, filter, collect), and start dealing with things like fan in/out, cycles, restarting, etc, the different approaches to solving this start to vary greatly. The JDK should provide enough to be useful for typical every day streaming use cases, with developers being able to select a third party library for anything more advanced.

Approach

This proposal proposes an API based on the builder pattern, where operators such as map and filter are applied to builders, and the final result (eg, a Publisher, Subscriber, Processor or complete graph) is built by invoking a build() method.

Such an API allows for flexible implementation - it means that not each stage of the graph needs to implement Reactive Streams itself, instead, stages can be fused together in a straight forward way, and other aspects like context, monitoring and tracing that require out of band (from what Reactive Streams provides) transfer of state and signals can be implemented by the engine that builds the streams from the graph.

So if we take a use case - let's say we are consuming the Twitter streaming API, which emits a stream of new line separated JSON structures. To consume this stream, the JDK9 HTTP client requires application developers to supply a Susbscriber<ByteBuffer> to consume response bodies, we can build this subscriber like so:

Subscriber<ByteBuffer> subscriber = 
  ReactiveStreams.<ByteBuffer>builder()
    // Assume parseLines is a utility that converts a
    // stream of arbitrarily chunked ByteBuffers to
    // ByteBuffers that represent one line
    .flatMapIterable(parseLines)
    // Assume parseJson is a function that takes
    // a ByteBuffer and returns a parsed JSON object
    .map(parseJson)
    // Asume saveToDatabase is a function that saves
    // the object to a database and returns a
    // CompletionStage of the result of the operation
    .flatMapCompletionStage(saveToDatabase)
    // And run by ignoring each element, since we've
    // handled them above already.
    .forEach(e -> {})
    .build().getSubscriber();

We now have a Subscriber<ByteBuffer> that we can wrap in a JDK9 HTTP client BodyProcessor, and pass that to the send() method.

To elaborate on the above API a little.

  • ReactiveStreams.builder() returns a ProcessorBuilder. At this stage we are building a graph that has both an inlet (ie is a Subscriber) and an outlet (ie is a Publisher). If we invoked .build() at this stage, we would build a Processor<ByteBuffer, ByteBuffer>.
  • flatMapIterable does a 1:n mapping of elements in memory, returning the results in an Iterable. It returns a new ProcessorBuilder.
  • map is implemented in the current proposal, and it returns a new ProcessorBuilder that outputs the new type that was mapped to.
  • flatMapCompletionStage does a 1:1 mapping of elements to elements asynchronously provided by a CompletionStage.
  • forEach handles each element, and in this case we've handled them all alreday in flatMapCompletionStage. An important thing to note here is that this method doesn't return a ProcessorBuilder, since we have now provided a sink to consume the elements, so the shape of the graph has changed to a SubscriberBuilder.
  • The build method returns a SubscriberWithResult. Many subscribers have some form of result, for example, a toList subscriber will produce a result that is a list of all the elements received, or a reduce subscriber will produce a result that is the result of a reduction function being applied to all the elements. So, when we build a subscriber, we also want to be able to return the result that that subscriber produces. In this case, we actually aren't interested in the result - though we could be, the result would be a CompletionStage<Void> that is redeemed when the stream completes either normally or with an error. The SubscriberWithResult naming is probably not good. Accumulator might be a better name.

So, in all, we have four different types of builders:

  • PublisherBuilder - for building a Publisher.
  • ProcessorBuilder - for building a Processor.
  • SubscriberBuilder - for building a SubscriberWithResult, which is product of Subscriber and CompletionStage.
  • CompletionBuilder - for building/running complete graphs, it produces a CompletionStage that is redeemed when the stream completes.

Here's an example of using CompletionBuilder:

CompletionStage<MyObject> result = 
  ReactiveStreams.fromPublisher(somePublisher)
    .collect(Collectors.toList())
    .build();

In this case, we have taken a Publisher provided by some other API, at that stage we have a PublisherBuilder, and then using the collect method, we collect it into a list, this returns a CompletionBuilder, which we then build to run the stream and give us the result. The type that collect accepts is a java.util.stream.Collector, so this API is compatible with all the synchronous collectors already supplied out of the box in the JDK.

Implementation

Underneath, this API builds a graph of stages that describe the processing. When build is invoked, this graph is passed to a ReactiveStreamsEngine to build the Publisher, Subscriber, Processor or CompletionStage as necessary. During this phase, the underlying engine implementation can do any processing on the graph it wants - it can fuse stages together, it can wrap callbacks in context supplying wrappers, etc. The build method is overloaded, one takse an explicit ReactiveStreamsEngine, the other looks up the ReactiveStreamsEngine using the JDK ServiceLoader mechanism.

An engine based on Akka Streams is already implemented, as is an engine based on RxJava. No work has been done on a zero dependency RI for the JDK, though there have been a few experimental efforts that could be used as a starter for this.

TCK

A TCK has been implemented - at this stage it is very incomplete, but what it does demonstrate is how the Reactive Streams TCK provided by http://www.reactive-streams.org can be utilised to validate that the Reactive Streams interfaces built by this API are conforming Reactive Streams implementations.

Next steps

The following work needs to be done:

  • Decide on a set of stages/operators/generators specifec to Reactive Streams that are needed, beyond what the JDK8 Streams API has provided. For example, asynchronous generator functions based on CompletionStage might be useful, and perhaps a stream split function, maybe a cancelled/ignore subscribers, perhaps batching operators as well.
  • Implement a zero dependency reference implementation.
Comments
  • Reference implementation

    Reference implementation

    So, I had a go at implementing a reference implementation. I took an approach somewhat similar to Akka's graph stage - so all streams are completely fused.

    Most of the logic is in GraphLogic.java, which GitHub collapses by default because it's > 1000 loc, so don't miss that.

    • Concurrency/memory is managed using a serial executor.
    • Stages are plumbed together with inlets/outlets between them, the inlets/outlets have a similar (perhaps too similar) API to the Akka graph stage API.
    • Most calls on inlets/outlets are forwarded directly through the stream, with only a select few (mostly pushes and one pull) being unrolled to avoid infinite recursions. I think this differs to Akka, and it does lead to some gotchas when implementing stages, the inlet/outlet signals can result in a synchronous recursive signal back to the stage. Note that this API is not public, unlike Akka, so those gotchas aren't as big a deal.
    • Sorry, code is quite messy and ill commented for now, I want to get some initial feedback before cleaning it up.
    • I implemented some benchmarks, performance on a single long stream is comparable to Akka, materialization is 10x faster than Akka, and sub stream materialization is many times faster too. RxJava though is still a lot faster than both - I guess that's because they don't use a high push/pull abstraction for stages, but implement each feature purpose built.
     Engine                              | RefImpl | Akka    | RxJava  |
    --------------------------------------------------------------------
     identity processor with long stream |    6158 |    8686 |    2072 |
     complex processor with long stream  |    2335 |    1828 |     285 |
     serial materialization              |     344 |    3359 |     167 |
     concurrent materialization          |     550 |    6576 |     252 |
     long concat map stream              |     102 |     776 |      62 |
    
    opened by jroper 0
  • Try structuring the proposal a bit more

    Try structuring the proposal a bit more

    I tried to restructure it a bit so it's less of a wall of text. I also tried adding a glossary (perhaps not needed though), and rationale for the points which are explicitly separate from the goal, making the goal perhaps easier to grasp.

    Thought this helps getting the message through more cleanly

    opened by ktoso 0
  • Stateful variants of functions

    Stateful variants of functions

    Currently, map, filter etc must be stateless, otherwise the returned builder can't be reused. We might consider adding stateful versions of them, ie, where the function is provided by a Supplier. This would then allow you to create things like a ProcessorBuilder<ByteBuffer, CharSequence> for decoding bytes into characters, that could be applied to many graphs, without having to create a new ProcessorBuilder each time.

    We might also consider making the spi provide this, even if we don't offer it in the api, so that it can be added in future. The filter stage already allows stateful filtering, which is how drop and dropWhile are implemented.

    We could also have a general stateful stage constructor in the API, which is a Supplier<Graph>, that gets supplied when the graph is built. This wouldn't need support in the spi.

    opened by jroper 0
  • Async collectors

    Async collectors

    Currently we're using the JDK8 collector API for doing all terminal accumulations. It would be good to have an async alternative, eg AsyncCollector, where the accumulator and finisher functions return CompletionStage of a value when they are done. Then async alternatives of collect, reduce etc can be provided.

    Note this could probably be implemented by a stateful mapCompletionStage.

    opened by jroper 0
  • Stream splitting

    Stream splitting

    We might want to introduce some stream splitting functions. For example, a splitWhen function.

    This functionality is generally only useful when doing parsing, where you parse some sort of header out of the stream, followed by a sub stream, then another header, etc.

    Generally, the element that is split at needs to be accessed before consuming the sub stream. So the sub stream should make that element available, along with the rest of the stream. For example:

    PublisherBuilder<SubStream<T>> splitWhen(Predicate<? super T> predicate);
    class SubStream<T> {
      final T first;
      final PublisherBuilder<T> rest;
    }
    

    So, for each element where the predicate holds true (plus the first element), a SubStream is emitted, with first being that element, and rest being the remainder of the stream as long as the predicate holds false.

    opened by jroper 0
Owner
Lightbend
Lightbend
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

RxJava: Reactive Extensions for the JVM RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-base

ReactiveX 46.7k Dec 30, 2022
Utilities for handling math and showing numbers in Java.

digital Utilities for handling math and showing numbers in Java. What is it? BitConversion allows converting float and double values to int and long v

Tommy Ettinger 9 Nov 16, 2022
Reactive Programming for Android

Reactive Programming for Android Agera is a set of classes and interfaces to help write functional, asynchronous, and reactive applications for Androi

Google 7.3k Jan 5, 2023
Immutable in-memory R-tree and R*-tree implementations in Java with reactive api

rtree In-memory immutable 2D R-tree implementation in java using RxJava Observables for reactive processing of search results. Status: released to Mav

Dave Moten 999 Dec 20, 2022
A high performance caching library for Java

Caffeine is a high performance, near optimal caching library. For more details, see our user's guide and browse the API docs for the latest release. C

Ben Manes 13k Jan 5, 2023
A Java library for quickly and efficiently parsing and writing UUIDs

fast-uuid fast-uuid is a Java library for quickly and efficiently parsing and writing UUIDs. It yields the most dramatic performance gains when compar

Jon Chambers 142 Jan 1, 2023
Hollow is a java library and toolset for disseminating in-memory datasets from a single producer to many consumers for high performance read-only access.

Hollow Hollow is a java library and toolset for disseminating in-memory datasets from a single producer to many consumers for high performance read-on

Netflix, Inc. 1.1k Dec 25, 2022
A fork of Cliff Click's High Scale Library. Improved with bug fixes and a real build system.

High Scale Lib This is Boundary's fork of Cliff Click's high scale lib. We will be maintaining this fork with bug fixes, improvements and versioned bu

BMC TrueSight Pulse (formerly Boundary) 402 Jan 2, 2023
Java port of a concurrent trie hash map implementation from the Scala collections library

About This is a Java port of a concurrent trie hash map implementation from the Scala collections library. It is almost a line-by-line conversion from

null 147 Oct 31, 2022
Java library for the HyperLogLog algorithm

java-hll A Java implementation of HyperLogLog whose goal is to be storage-compatible with other similar offerings from Aggregate Knowledge. NOTE: This

Aggregate Knowledge (a Neustar service) 296 Dec 30, 2022
A simple integer compression library in Java

JavaFastPFOR: A simple integer compression library in Java License This code is released under the Apache License Version 2.0 http://www.apache.org/li

Daniel Lemire 487 Dec 30, 2022
Library for creating In-memory circular buffers that use direct ByteBuffers to minimize GC overhead

Overview This project aims at creating a simple efficient building block for "Big Data" libraries, applications and frameworks; thing that can be used

Tatu Saloranta 132 Jul 28, 2022
LWJGL is a Java library that enables cross-platform access to popular native APIs useful in the development of graphics (OpenGL, Vulkan), audio (OpenAL), parallel computing (OpenCL, CUDA) and XR (OpenVR, LibOVR) applications.

LWJGL - Lightweight Java Game Library 3 LWJGL (https://www.lwjgl.org) is a Java library that enables cross-platform access to popular native APIs usef

Lightweight Java Game Library 4k Dec 29, 2022
A modern I/O library for Android, Kotlin, and Java.

Okio See the project website for documentation and APIs. Okio is a library that complements java.io and java.nio to make it much easier to access, sto

Square 8.2k Dec 31, 2022
A Persistent Java Collections Library

PCollections A Persistent Java Collections Library Overview PCollections serves as a persistent and immutable analogue of the Java Collections Framewo

harold cooper 708 Dec 28, 2022
Jalgorithm is an open-source Java library which has implemented various algorithms and data structure

We loved Java and algorithms, so We made Jalgorithm ❤ Jalgorithm is an open-source Java library which has implemented various algorithms and data stru

Muhammad Karbalaee 35 Dec 15, 2022
A Primitive Collection library that reduces memory usage and improves performance

Primitive-Collections This is a Simple Primitive Collections Library i started as a hobby Project. It is based on Java's Collection Library and FastUt

Speiger 26 Dec 25, 2022
Uber-project for (some) standard Jackson textual format backends: csv, properties, yaml (xml to be added in future)

Overview This is a multi-module umbrella project for Jackson standard text-format dataformat backends. Dataformat backends are used to support format

FasterXML, LLC 351 Dec 22, 2022