Introduction to Reactive Streams for Java Developers

By Jacek Kunicki (@rucek) August 18, 2017

Jacek is a Senior Software Engineer at SoftwareMill, and a seasoned Java and Scala developer I’ve known for years. It’s a joy to see that him, and Softwaremill itself, have become experts in the field of Akka, using it with Java as well as Scala APIs and continue to deliver real projects with it. And, more importantly, sharing their practical learnings with the wider community. – Konrad “ktoso” Malawski

Reactive Streams is a standard for asynchronous data processing in a streaming fashion. With their inclusion in Java 9, as the java.util.concurrent.Flow interfaces, Reactive Streams are becoming the go-to tool for building streaming components of applications for years to come. It’s worth pointing out that Reactive Streams are “just” a standard, and by themselves can’t do anything - we need to use an implementation thereof, and in our example today we’ll use Akka Streams - one of the main leading implementations of RS since their inception.

Background

A typical stream processing pipeline consists of a number of stages, each of which sends data to the next one (i.e. downstream).

Now if you take two subsequent stages and treat them as a producer and a consumer of data, the producer can either be slower or faster than the consumer. While it’s fine when the producer is the slower party, the situation gets complicated when the consumer is the slower one. The reason is that the consumer can then be flooded with data, which it has to handle more or less gracefully.

The simplest way to deal with too much data is simply to drop anything that cannot be handled - this is a common behavior e.g. in the world of networking hardware. But what if we don’t want to discard anything? Then backpressure comes to the rescue.

Being an important part of the Reactive Streams, the idea of backpressure boils down to bounding the amount of data being transmitted between the stages of the pipeline, so that no stage gets flooded. And since the reactive approach is about not blocking unless really necessary, the backpressure implementation in a reactive stream must be non-blocking as well.

Ways to go

The Reactive Streams standard defines a number of interfaces, but no actual implementation. This means that just adding a dependency to org.reactivestreams:reactive-streams would not get you anywhere far - you still need an actual implementation. There are numerous implementations of Reactive Streams, and in this tutorial we’ll use Akka Streams and its Java DSL. Other implementations include RxJava 2.x or Reactor, and others.

The use case

Let’s say we have a directory that we want to watch for new CSV files, then process every file in a streaming fashion, perform some on-the-fly aggregations and send the aggregated results to a websocket (in real time). Additionally, we want to define some kind of threshold for the aggregated data to trigger email notifications.

In our case the CSV lines will contain some (id, value) pairs, with the id changing every second line, e.g.:

370582,0.17870700247256666
370582,0.5262255382633264
441876,0.30998025265909457
441876,0.3141591265785087
722246,0.7334219632071504
722246,0.5310146239777006

We want to compute an average value of every two lines sharing the same id and only send it to the websocket when it’s greater than 0.9. Moreover, we want to send a notification email on every 5th value sent to the websocket. Finally, we want to read and display the data received from the websocket with a trivial JavaScript frontend.

The architecture

We’re going to use several tools from the Akka ecosystem (see Fig. 1). The core will of course be Akka Streams, which lets us process the data in real time and in a streaming fashion. To read the CSV files, we’re going to use Alpakka, which is a set of Akka Streams connectors to various technologies, protocols or libraries. Interestingly, since Akka Streams are Reactive Streams, the entire Alpakka ecosystem is also accessible to any other RS implementation - this is the benefit of interoperability that the RS interfaces set out to achieve. Finally, we’re going to use Akka HTTP to expose a websocket endpoint. The nice thing here is that Akka HTTP seamlessly integrates with Akka Streams (which it actually uses under the hood), so exposing a stream as a websocket is trivial.

Figure 1. Architecture overview
Figure 1. Architecture overview

If you compare the above to a classical Java EE architecture, you’ll probably notice that things are much simpler here. No containers, no beans, just a simple standalone application. Moreover, the Java EE stack does not support the streaming approach whatsoever.

Akka Streams basics

In Akka Streams, the processing pipeline (the graph) consists of three types of elements: a Source (the producer), a Sink (the consumer), and Flows (the processing stages).

Using those components, you define your graph, which is nothing more than a recipe for processing your data - it doesn’t do any computations so far. To actually execute the pipeline, you need to materialize the graph, i.e. convert it to a runnable form. In order to do it, you need a so-called materializer, which optimizes the graph definition and actually runs it. Therefore, the definition of the graph is completely decoupled from the way of running it, which, in theory, lets you use any materializer to run the pipeline. However, the built-in ActorMaterializer is actually the status quo, so chances are you won’t be using any other implementation.

When you look carefully at the type parameters of the components, you will notice that each of them, apart from the respective input/output types, has a mysterious Mat type. It refers to the so-called materialized value, which is a value that is accessible from outside the graph (as opposed to the input/output types which are internal to the communication between the graph stages - see Fig. 2). If you want to ignore the materialized value, which is quite often the case when you just focus on passing data between the graph stages, there is a special type parameter to denote it: NotUsed. You can think of it as being similar to Java’s Void; however, it carries a little bit more semantic meaning - “we’re not using this value” means more than just Void. Note also that in some APIs a similar type Done is used, to signal that something has completed. Other Java libraries would have perhaps used Void for both these cases, but Akka Streams attempts to keep all the types as semantically useful as possible.

Figure 2. Flow type parameters explained
Figure 2. Flow type parameters explained

The application

Let’s now get to the actual implementation of the CSV processor. We’re going to start with defining the Akka Streams graph and later use Akka HTTP to connect the stream to a websocket.

Building blocks of the streaming pipeline

As an entrypoint for our streaming pipeline, we want to watch a given directory for new CSV files. We’d like to utilize the java.nio.file.WatchService, but since we’re in a streaming application, we’d like to get a Source of events to handle, rather than the callback-focused style of that service.. Thankfully, such a Source is already available in Alpakka as one of the connectors - the DirectoryChangesSource, a part of alpakka-file, which uses the WatchService under the hood:

private final Source<Pair<Path, DirectoryChange>, NotUsed> newFiles =
    DirectoryChangesSource.create(DATA_DIR, DATA_DIR_POLL_INTERVAL, 128);

This gives us a source that emits elements of type Pair<Path, DirectoryChange>. We want to filter those to only include new CSV files and emit just the path downstream. For this and subsequent data transformations we’re going to define small building blocks - Flows - that we will later combine into a complete processing pipeline:

private final Flow<Pair<Path, DirectoryChange>, Path, NotUsed> csvPaths =
    Flow.<Pair<Path, DirectoryChange>>create()
        .filter(this::isCsvFileCreationEvent)
        .map(Pair::first);

private boolean isCsvFileCreationEvent(Pair<Path, DirectoryChange> p) {
  return p.first().toString().endsWith(".csv") && p.second().equals(DirectoryChange.Creation);
}

One of the ways to create a Flow is to use the generic create() method - it’s useful when the input type itself is generic. Here, the resulting flow is going to emit (as a Path) every new CSV file in DATA_DIR.

Now, we’d like to convert the Paths to lines streamed from each file. To convert a source into another source we could use one of the flatMap* methods. Both variants create a Source from each incoming element and somehow combine the resulting sources into a single new one - either by concatenating or by merging the original sources. Our choice would be the flatMapConcat variant, since we want to preserve the order of lines, so that the lines with the same ids remain next to each other. To convert a Path into a stream of bytes, we’re going to use a built-in FileIO utility:

private final Flow<Path, ByteString, NotUsed> fileBytes = 
    Flow.of(Path.class).flatMapConcat(FileIO::fromPath);

This time we’re using the of() method to create a new flow - it comes handy when the input type is not generic.

The ByteString you see above is Akka Streams’ representation of a sequence of bytes. What we want to do now is to parse the stream of bytes as a CSV file - for which we’re going to use Alpakka once again - this time the alpakka-csv module:

private final Flow<ByteString, Collection<ByteString>, NotUsed> csvFields =
    Flow.of(ByteString.class).via(CsvParsing.lineScanner());

Note the use of the via combinator, which lets us attach an arbitrary Flow to the output of another graph stage (a Source or another Flow). As a result, this gives us a stream of elements, each of which represents the fields in a single line in the CSV file. We can then convert those to our domain model:

class Reading {

  private final int id;

  private final double value;

  private Reading(int id, double value) {
    this.id = id;
    this.value = value;
  }

  double getValue() {
    return value;
  }

  @Override
  public String toString() {
    return String.format("Reading(%d, %f)", id, value);
  }

  static Reading create(Collection<ByteString> fields) {
    List<String> fieldList = fields.stream().map(ByteString::utf8String).collect(toList());
    int id = Integer.parseInt(fieldList.get(0));
    double value = Double.parseDouble(fieldList.get(1));
    return new Reading(id, value);
  }
}

To perform the actual conversion, we use the map method and pass a reference to Reading.create:

private final Flow<Collection<ByteString>, Reading, NotUsed> readings =
    Flow.<Collection<ByteString>>create().map(Reading::create);

The next step is to group the readings into pairs, compute the average value for each group and only emit downstream the values above a given threshold. Since we want the average computation to take place asynchronously, we’re going to use the mapAsyncUnordered method, which executes an asynchronous operation with a given parallelism level:

private final Flow<Reading, Double, NotUsed> averageReadings =
    Flow.of(Reading.class)
        .grouped(2)
        .mapAsyncUnordered(10, readings ->
            CompletableFuture.supplyAsync(() ->
                readings.stream()
                    .map(Reading::getValue)
                    .collect(averagingDouble(v -> v)))
        )
        .filter(v -> v > AVERAGE_THRESHOLD);

With the above building blocks defined, we’re now ready to combine them (with the via combinator you’ve already seen) into the full pipeline. This is as simple as:

private final Source<Double, NotUsed> liveReadings =
      newFiles
          .via(csvPaths)
          .via(fileBytes)
          .via(csvFields)
          .via(readings)
          .via(averageReadings);
Note

When combining the building blocks like in the example above, the compiler guards us against accidentally connecting two blocks with incompatible data types together.

The stream as a websocket

We’re now going to use Akka HTTP to create a simple web server, whose roles will be:

  • to expose the source of readings as a websocket,
  • to serve a trivial web page that connects to the websocket and displays the received data.

Creating a web server with Akka HTTP is pretty straightforward - you just need to subclass HttpApp and provide request mappings using the route DSL:

class Server extends HttpApp {

  private final Source<Double, NotUsed> readings;

  Server(Source<Double, NotUsed> readings) {
    this.readings = readings;
  }

  @Override
  protected Route routes() {
    return route(
        path("data", () -> {
              Source<Message, NotUsed> messages = readings.map(String::valueOf).map(TextMessage::create);
              return handleWebSocketMessages(Flow.fromSinkAndSourceCoupled(Sink.ignore(), messages));
            }
        ),
        get(() ->
            pathSingleSlash(() ->
                getFromResource("index.html")
            )
        )
    );
  }
}

There are two routes defined here: /data, which is the websocket endpoint, and / to serve the trivial frontend. You can now see how simple it is to expose an Akka Streams’ Source as a websocket endpoint - you just use handleWebSocketMessages, which takes care of upgrading the HTTP connection to a websocket one, and provide it with a flow to handle the incoming and outgoing data.

A WebSocket is modeled as a flow, so there are the incoming messages and the outgoing messages that we send to the client. In our case, we want to ignore incoming data, hence we create the flow with the “incoming” side being bound to a Sink.ignore(). The “outgoing” side of the websocket handler flow is simply bound to our source of the average values. All we need to do with the doubles representing the averages is to convert each of them to a TextMessage, which is Akka HTTP’s wrapper for the websocket data - this is done trivially with the already familiar map method.

To start the server, all you have to do is to call the startServer method, providing a hostname and a port:

Server server = new Server(csvProcessor.liveReadings);
server.startServer(config.getString("server.host"), config.getInt("server.port"));

The frontend

To receive and display data from the websocket, we’re going to use some really simple JavaScript code, which just appends the received values to a <textarea>. The code uses the ES6 syntax, which should run smoothly in any modern browser.

let ws = new WebSocket("ws://localhost:8080/data");
ws.onopen = () => log("WS connection opened");
ws.onclose = event => log("WS connection closed with code: " + event.code);
ws.onmessage = event => log("WS received: " + event.data);

The log method appends the message to the <textarea>, together with a timestamp.

Running

To run and test the application, you need to:

  • run the server (sbt run),
  • go to http://localhost:8080 in your browser (or to a host/port of your choice if you changed the defaults),
  • copy one or more files from src/main/resources/sample-data to the data directory under the project root (unless you changed csv-processor.data-dir in the configuration),
  • see the data appear in the server logs and in the browser.

Adding an email trigger

A final touch to our application will be a side channel responsible for simulating email notifications after every 5th element sent over the websocket. We want it to stay on the side in order not to interfere with the elements being transmitted.

To achieve such a behavior, we’re going to use a more advanced feature of Akka Streams - the Graph DSL - to define a custom graph stage that splits the stream into two: one just emitting the values to the websocket, the other one detecting every 5th and simulating an email notification - see Fig. 3.

Figure 3. Custom graph stage for email notifications
Figure 3. Custom graph stage for email notifications

We’re going to use a built-in Broadcast stage, which sends its input to a declared number of outputs, and a custom sink - the Mailer:

private final Graph<FlowShape<Double, Double>, NotUsed> notifier = GraphDSL.create(builder -> {
  Sink<Double, NotUsed> mailerSink = Flow.of(Double.class)
      .grouped(EMAIL_THRESHOLD)
      .to(Sink.foreach(ds ->
          logger.info("Sending e-mail")
      ));

  UniformFanOutShape<Double, Double> broadcast = builder.add(Broadcast.create(2));
  SinkShape<Double> mailer = builder.add(mailerSink);

  builder.from(broadcast.out(1)).toInlet(mailer.in());

  return FlowShape.of(broadcast.in(), broadcast.out(0));
});

The starting point for creating a custom graph stage is the GraphDSL.create() method, which exposes an instance of a graph Builder - used for manipulating the structure of the graph.

Next, we define the custom sink, which uses grouped to group the incoming elements into groups of an arbitrary size (the default being 5) and emit the groups downstream. For each such group we simulate a side effect - an email notification.

Having the custom sink defined, we can now use the builder instance to add it to the graph. We’re also adding the Broadcast stage with two outputs.

Now we need to specify the connections between the graph elements - we want to connect one of the outputs of the Broadcast stage to the mailer sink, and use the other as the output of our custom graph stage. The input of our custom stage will be connected directly to the input of the Broadcast stage.

Note 1

There’s no way for the compiler to determine whether all parts of the graph are correctly connected. This is, however, verified at runtime by the materializer, so that there are no dangling inputs or outputs.

Note 2

What you can notice here is that every custom stage is actually a Graph<S, M>, where S is the shape, which determines the number and types of inputs and outputs, whereas M is the materialized value, if any. In our case, the shape is a Flow, which means a single input and a single output.

The final step is to connect the notifier as an additional stage of the liveReadings pipeline, which is now going to look like:

private final Source<Double, NotUsed> liveReadings =
    newFiles
        .via(csvPaths)
        .via(fileBytes)
        .via(csvFields)
        .via(readings)
        .via(averageReadings)
        .via(notifier);

When you run the updated code, you should see log entries about email notifications being sent every time five new values have been sent over the websocket.

Summary

In this article you have learned the general concepts of processing data in a streaming fashion, and how Akka Streams can help you build a lightweight data processing pipeline - as opposed to a traditional Java EE approach.

You have seen how to use some of the Akka Streams’ built-in processing stages, and how to build a custom one yourself using the Graph DSL. You have also used Alpakka to stream data from a file system, and Akka HTTP to build a simple web server with a websocket endpoint that seamlessly integrates with Akka Streams.

The full working example used in this article is available on GitHub. It contains some additional log stages in a number of places to let you better follow what’s going on inside the pipeline. Those have been purposely omitted in the article for the sake of brevity.