What can Reactive Streams offer EE4J?

By James Roper (@jroper), February 6, 2018

In my current role at Lightbend I’m investigating and pursuing opportunities where Reactive Streams can make the lives of EE4J (the new Java EE) developers better. In this blog post I’m going to share some of the ideas that we’ve had for Reactive Streams in EE4J, and how these ideas will benefit developers.

Reactive Streams was adopted by the JDK in the form of the java.util.concurrent.Flow API. It allows two different libraries that support asynchronous streaming to connect to each other, with well specified semantics about how each should behave, so that backpressure, completion, cancellation and error handling is predictably propagated between the two libraries. There is a rich ecosystem of open source libraries that support Reactive Streams, and since its inclusion in JDK9, there are a few in development implementations that are targetting the JDK, including the incubating JDK9 HTTP Client, and the Asynchronous Database Adapter (ADBA) effort that have also adopted it.

High level use case

Before I jump into the specific parts of EE4J where Reactive Streams would be useful, it’s worth talking about the overall picture. Reactive Streams is an integration API, or more specifically, a Service Provider Interface (SPI). It’s not intended that application developers implement the reactive streams interfaces directly themselves, rather, it is intended that the various streaming data sources and sinks provided by libraries, database connectors, clients and so on, implement reactive streams, so that application developers can then easily plumb those sources and sinks together.

Doing a quick off the top of my head count based on my knowledge of existing specs that do streaming in EE4J and the JDK, there exist no less than 10 different APIs that are offered by the various specs for streaming data either synchronously or asynchronously. These range from, of course, InputStream and OutputStream in the JDK, to NIO Channel’s, to the Servlet 3.1 ReadListener and WriteListener extensions, to the JDBC ResultSet, JSR 356 @OnMessage annotations, Message Driven Beans and JMS, CDI events using @Observes, Java collection Stream and Iterator based APIs, and finally, the new JDK9 Flow API. Each of these APIs streams data in some way, and offers varying levels of capability - some are synchronous, some are asynchronus, some offer backpressure and some don’t, some have well defined error handling semantics and others don’t.

The problem arises when I want to connect two of these together. For example, if I want to emit CDI events on a WebSocket. Or if I want to plumb a stream of messages into a database. Or if I want to stream data from an HTTP client response to a servlet response. When the two APIs for streaming are different, then I have to write non trivial boiler plate code to connect them together. This even applies to connecting an InputStream and OutputStream, I can’t simply pass an InputStream to an OutputStream and say “here’s your source of data, write it”, I have to allocate a buffer, read and write in a loop making sure that I get the semantics and argument ordering correct, I have to ensure that I properly wrap things in try-with-resources blocks to ensure that things are cleaned up correctly even when there are errors, and so on. And all this for what should be the simplest of all use cases.

It gets even more complex when we start integrating asynchronous APIs. What if the source is producing too much data for the sink, how do I tell the source to slow down? In some cases, the API’s don’t even offer a backpressure mechanism, in others, they do, but the mechanisms differ, we have interest based event APIs like NIO, we have on ready callback based APIs like Servlet 3.1, and we have token based backpressure APIs like JDK9 Flow. Connecting these together, especially given that these asynchronous APIs require concurrent programming, is unreasonable to expect application developers to implement and maintain for each permutation of APIs that need to interoperate.

And so the high level reason for supporting Reactive Streams in EE4J is in two parts. Firstly, it facilitates integration of EE4J APIs with each other. Secondly, it facilitates integration of EE4J APIs with third party libraries. To connect a Reactive Streams publisher to a Reactive Streams Subscriber is one line of code, publisher.subscribe(subscriber);, and in this single line of code, a developer can be confident that data flow, backpressure, completion handling and error handling are correctly handled.

So as we consider the individual specs within EE4J and the use cases associated with them that Reactive Streams helps with, we should remember that each additional place where Reactive Streams is used in EE4J increases the usefulness of Reactive Streams in other parts of the spec. In my use cases below I’ve tried to focus on use cases that are interesting today in and of themselves, assuming no other changes are made to support Reactive Streams. For each use case that does get implemented, the usefulness of Reactive Streams will multiply.

Servlet IO

It is a common use case for an application to store or transfer files, sometimes large files, perhaps hundreds of megabytes in size. And a very common place to store these is in an object storage service, such as Amazon S3. Let’s imagine you have an expense reporting application, and each expense must have an associated scan or photo of a receipt, so your application offers the ability to upload and download these receipts, using Amazon S3 to store them.

A unique thing about a request to upload or download large files is that such a request is long running. It can take minutes to upload or download a single file - consider someone trying to use the dodgy airport wifi to upload a receipt for a meal they purchased at the airport, uploading their 4mB image at 20kB/s. That single upload will take over 3 minutes. Using the existing blocking APIs offered by the servlet API requires each upload consuming a thread from the servlet containers thread pool for the duration of the upload, 3 minutes. Threads are a very limited resource. Each thread requires up to a few megabytes of memory allocated for it stack (depending on configuration and use), you can’t just go create more threads when you need them because they are expensive. For this reason, servlet containers use pools of threads, a typical configuration is to have a pool of 200 threads. With 200 threads, that server would only be able to serve one upload of a receipt on dodgy wifi per second before the server exhausts its thread pool and starts rejecting requests. And that’s not including any other requests the server has to handle. This is dismal performance, and a very inefficient use of resources.

The solution to the problem is to use asynchronous IO to handle these file uploads and downloads. Asynchronous IO allows the server to only assign a thread to a connection when it actually needs it - when there’s data available to read and write. There are a number of asynchronous HTTP clients out there, for this use case we’ll choose the JDK9 HTTP client. Servlet 3.1 introduced asynchronous IO, so we could use that to receive the uploaded data, however, connecting the JDK9 HTTP client to the Servlet 3.1 asynchronous IO APIs is not at all trivial. These 180 lines of concurrent code are what it takes to write code that adapts Servlet IO to the Reactive Streams API offered by the JDK9 HTTP client. However, if HttpServletRequest offered a method that allowed getting a Publisher<ByteBuffer> for consuming a request, this is what a servlet that handled file uploads to S3 would look like:

public class S3UploadServlet extends HttpServlet {
  private final HttpClient client = HttpClient.newHttpClient();
  private final String S3_UPLOAD_URL = ...;
  public void doPost(HttpServletRequest req, HttpServletResponse resp) {
    AsyncContext ctx = req.startAsync();
    client.sendAsync(HttpRequest.newBuilder(S3_UPLOAD_URL)
      // pipe the incoming request bytes directly to S3
      .POST(BodyPublisher.fromPublisher(req.getPublisher()))
      .build()
    ).thenAccept(s3Response -> {
      resp.setStatus(s3Response.statusCode());
      ctx.complete();
    });
  }
}

As you can see, the line of code needed to connect the servlet request body stream to the HTTP client request body stream was .POST(BodyPublisher.fromPublisher(req.getPublisher())). That’s 180 lines of non trivial concurrent code in the example I linked to above down to one line of trivial code, with built in handling of backpressure and completion/error propogation. Furthermore the code reads like the high level task that the developer is trying to achieve, literally “Post the body published from the request”.

Multipart request handling

Servlet 3.0 introduced support for handling multipart/form-data requests, however this support involves buffering to disk, and does not offer any asynchronous streaming capabilities. As an extension of the previous use case, a developer might like to use multipart/form-data to upload files. A reactive streams based API to handle this might expose the parts of the form as a stream of parts, and each part might be a ByteBuffer sub stream itself. This is what the code might look like, using Akka Streams to handle the outer stream:

public class S3UploadServlet extends HttpServlet {
  private final HttpClient client = HttpClient.newHttpClient();
  private final ActorSystem system = ActorSystem.create();
  private final Materializer materializer = ActorMaterializer.create(system);
  private final String S3_UPLOAD_URL = ...;

  public void doPost(HttpServletRequest req, HttpServletResponse resp) {
    AsyncContext ctx = req.startAsync();
    Source.fromPublisher(req.getPartPublisher())
      // Filter to only get the file part
      .filter(part -> part.getName().equals("file"))
      // Handle one part at a time asynchronously
      .mapAsync(1, part -> {
        // Post the file part to S3
        client.sendAsync(HttpRequest.newBuilder(S3_UPLOAD_URL)
          // Here we plumb the publisher for the part (containing the 
          // bytes) to the HTTP client request body to S3.
          .POST(BodyPublisher.fromPublisher(part.getPublisher()))
          .build())
      })
      // Collect the the result of the upload
      .runWith(Sink.head(), materializer)
      // Attach a callback to the resulting completion stage
      .thenAcceptAsync(s3Response -> {
        resp.setStatus(s3Response.statusCode());
        ctx.complete();
      });
  }
}

Similarly, other implementations of Reactive Streams can easily be used to handle the multipart request in the same way. The important thing that Reactive Streams allows here is that a developer can select whatever tool they want to handle which ever part of the stream they want, and be assured that end to end, all data, backpressure, completion and errors are consistently propogated.

Messaging/JMS

So far we’ve only looked at use cases that deal with streaming bytes for IO. Reactive Streams is also very useful for streaming high level messages, such as those produced and consumed by message brokers.

Here’s an example of what it might look like to subscribe to a queue using a Reactive Streams compatible messaging API in EE4J, using Akka Streams to handle the stream and save message content using an asynchronous database API:

@MessageSubscriber(topic = "mytopic")
public Subscriber<MessageEnvelope<MyEvent>> handleMyTopic() {
  // Create an Akka stream that will materialize into a Subscriber
  // that can subscribe to the events
  Subscriber<MessageEvelope<MyEvent>> handler = Source.asSubscriber()
    
    // Handle each message by saving it to the database
    .mapAsync(1, msg ->
      saveToDatabase(msg.data())
        
        // return the message rather than the result of the database op
        .thenApply(result -> msg)

    // Commit the message once handled
    ).map(msg -> msg.commit())
    
    // Feed into an ignoring sink, since everything is now handled
    .to(Sink.ignore())
    
    // And run it to get the subscriber
    .run(materializer);

  return handler;
}

This is a fairly trivial example, but using Akka Streams, we could have the messages fanned out to multiple other consumers, we could have cycles in the processing that aggregate state, etc.

WebSockets

WebSockets is another type of long lived connection where synchronous IO is not appropriate. The current EE4J spec for WebSockets, JSR-356, does offer asynchronous handling of messages, however it does not support backpressure on receiving (so if the other end is sending too much data, there is no way to tell it to slow down, you must buffer or fail). It is also a purpose built API just for WebSockets, so any integration with other asynchronous data sources or sinks must be implemented manually by the end user.

Now imagine we wanted to implement a chat room, perhaps we’re going to use Apache Kafka as our chatroom backend, with one single partition topic per room. To implement this today, a reasonable amount of boilerplate is required, not just to transfer messages from the client to Apache Kafka and back, but also to propogate errors. Furthermore, if a client was producing messages at a high rate - faster than Apache Kafka is willing to consume, these messages are going to buffer on the server, causing it to run out of memory.

There exist however a number of Reactive Streams implementations for Apache Kafka. If JSR-356 were to support Reactive Streams, perhaps in the form of an @OnStream annotation that can be used as an alternative to @OnMessage, this is what it might look like to implement such a chat room:

@OnStream
public Publisher<ChatMessage> joinRoom(
  @PathParam("room") room, 
  Publisher<ChatMessage> incomingMessages
) {
  // The Kafka subscriber will send any messages it receives to Kafka
  Subscriber<ChatMessage> kafkaSubscriber = createKafkaConsumerForRoom(room);
  // The Kafka publisher will emit any messages it receives from Kafka
  Publisher<ChatMessage> kafkaPublisher = createKafkaProducerForRoom(room);
  
  // We now connect the incoming chat messages to the subscriber
  incomingMessages.subscribe(kafkaSubscriber);

  // And return the rooms publisher to be published to the WebSocket
  return kafkaPublisher;
}

The createKafka* methods would be code specific for connecting to Kafka, the only JSR-356 specific code would be the above method itself.

CDI

Contexts and Dependency Injection (CDI) could take advantage of Reactive Streams for event publishing and subscribing, with many use cases similar to the ones described above, but another opportunity for CDI that we think is important is rather something that facilitates not just CDI, but all asynchronous processing in general.

The problem comes with CDI implementations using thread locals to propagate context, for example, the context for the current request, which might include the current authenticated user, cached authorization information for the user, etc. The assumption in CDI is that the container will always be in control of the threads that operate within this context. However, with asynchronous processing, that is not the case.

A good example of this is in a chat room. Each user connected to the chat room has an active WebSocket request, which has a particular CDI context that goes with it. Processing of messages received from these users might happen in the right CDI context, but what happens when User A sends a message to the chat room, and then it has to be sent to User B? The thread that is propogating the message to User B now needs the request context for User B’s request, in order to correctly handle the message, but that thread will have User A’s request context, because it was the receipt of a message from User A that initiated the current processing.

The only answers to this in CDI currently is to not use CDI contexts, rather, to capture all context at the start of the request, and from then on use a non CDI mechanism (such as simply passing the context every manually) to use that context when it’s needed. We think CDI can offer a better solution to this, by allowing context to be captured, and set up/torn down in another thread.

JPA

Before JPA can support any asynchronous operations, a standard for asynchronous database drivers is needed. Fortunately, there is currently a lot of active development going on towards this, so we can expect to see progress here I think in the next 12 months. Once that support exists, here are some examples of how JPA could be modified to support asynchronous streaming:

Streamed queries

Database exports are one obvious example for query streaming where you don’t want to load the entire result set into memory, but I think more interesting use cases will arrive in the not too distant future, particularly around event logging, as different services may want to stream events from a database to be brought up to date on demand. Here’s an example of serving one such event stream through a Reactive Streams WebSocket API, using an imagined getResultPublisher method on TypedQuery:

@OnStream
public Publisher<Event> streamEvents(
  @QueryParam("since") Date since
) {

  Publisher<Event> events = entityManager.createQuery(
    "select e from events where e.timestamp >= :since",
    Event.class
  ).setParameter("since", since)
    .getResultPublisher();

  return events;
}

Once again, backpressure, completion and error handling is all done for you.

Streamed ingestion

An example of streamed ingestion might be persisting logs, here’s an example where logs a pushed in via WebSockets, using an imagined persistSubscriber method on EntityManager:

@OnStream
public void ingestLogs(
  Publisher<Log> logs
) {
  Subscriber<Log> ingester = entityManager.persistSubscriber(Log.class);

  logs.subscribe(ingester);
}

Streaming combinators

In the use cases above, when plumbing streams, there’s often a need to transform the stream in some way, whether it’s a simple map or filter, or perhaps collecting a stream into a single value, or maybe concatenating, broadcasting or merging streams. For Reactive Streams, this functionality is provided by a number of implementations such as Akka Streams, Reactor and RxJava. While these do the job well, it may be desirable for EE4J developers to have common functionality available to them out of the box.

Furthermore, a standard library for streaming combinators allows other APIs to use this to provide more fluent APIs to developers than what the bare Reactive Streams Publisher and Subscriber interfaces offer, without having to depend on a third party library.

Finally, it would allow EE4J libraries to assume a basic set of streaming operators that they can use in their own implementations, since they could not and should not depend on 3rd party libraries. This is very important, as otherwise the burden of reimplementing many operators would fall into the hands of each and every library wanting to provide streaming capabilities, sidetracking them from the core issue they’re attempting to solve, be it database access, WebSockets or anything else.

New possibilities

All of the use cases above only cover changes to existing streaming usages in EE4J, however, adoption of a robust streaming standard like Reactive Streams facilitates new possibilities for features and technologies that EE4J could support.

Here are some examples:

Event sourcing

Event sourcing itself may not be that interesting from a streaming perspective, but the ability to consume the event log as a stream of messages is incredibly interesting. Basing the event log stream on Reactive Streams would allow all sorts of consumers to be plugged in, such as message brokers, WebSockets, CDI event publishers, and databases.

Real time message distribution

Applications are tending to provide more responsive user interfaces, with updates being pushed to users as soon as they happen. While WebSockets may offer the mechanism to communicate with the client, EE4J doesn’t yet offer any backend solution for communicating pushes between clusters of machines in the backend. Message brokers can often be too heavy weight to achieve this, and instead a lighter-weight, inter-node communication mechanism would be better. Reactive Streams is the perfect API for building this on, as it then allows seamless plumbing to WebSockets and other stream sinks/sources.

Data processing pipelines

Current EE4J features tend to be focussed on short lived transactional processes, which have a definite start and definite end. There is a shift in the industry to more stream based approaches, which is seen in stream processing projects like Spark, Flink and Kafka Streams. EE4J could provide a mechanism for integrating EE4J applications into these pipelines, and Reactive Streams would be the perfect API to offer this integration.

Summary

In this article we’ve looked at a broad array of possibilities of what Reactive Streams brings to the table for EE4J. Each possibility brings value in its own right, but the more possibilities that are implemented overall, the more value each brings. It’s our hope at Lightbend that we can now start collaborating with the EE4J community in make all of these possibilities a reality.