Integration Patterns

Many Enterprise Integration Patterns can be implemented with Akka Streams (see Java documentation or Scala documentation).

Splitter

You can achieve a Splitter as described in EIP using out of the box Akka Streams dsl.

Splitter

Simple Splitter

Let’s say that we have a stream containing strings. Each string contains a few numbers separated by “-”. We want to create out of this a stream that only contains the numbers.

Scala
//Sample Source
val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4"))

val ret = source
  .map(s => s.split("-").toList)
  .mapConcat(identity)
  //Sub-streams logic
  .map(s => s.toInt)
  .runWith(Sink.seq)

//Verify results
ret.futureValue should be(Vector(1, 2, 3, 2, 3, 3, 4))
Java
//Sample Source
Source<String, NotUsed> source = Source.from(Arrays.asList(new String[]{"1-2-3", "2-3", "3-4"}));


CompletionStage<List<Integer>> ret =
        source.map(s -> Arrays.asList(s.split("-")))
                .mapConcat(f -> f)
                //Sub-streams logic
                .map(s -> Integer.valueOf(s))
                .runWith(Sink.seq(), materializer);

//Verify results
List<Integer> list = ret.toCompletableFuture().get();
assert list.equals(Arrays.asList(1, 2, 3, 2, 3, 3, 4));

Spliter + Aggregator

Sometimes it’s very useful to split a message and aggregate it’s “sub-messages” into a new message (A combination of Splitter and Aggregator)

Let’s say that now we want to create a new stream containing the sums of the numbers in each original string.

Scala
//Sample Source
val source: Source[String, NotUsed] = Source(List("1-2-3", "2-3", "3-4"))

val result = source
  .map(s => s.split("-").toList)
  //split all messages into sub-streams
  .splitWhen(a => true)
  //now split each collection
  .mapConcat(identity)
  //Sub-streams logic
  .map(s => s.toInt)
  //aggregate each sub-stream
  .reduce((a, b) => a + b)
  //and merge back the result into the original stream
  .mergeSubstreams
  .runWith(Sink.seq);

//Verify results
result.futureValue should be(Vector(6, 5, 7))
Java
//Sample Source
Source<String, NotUsed> source = Source.from(Arrays.asList(new String[]{"1-2-3", "2-3", "3-4"}));


CompletionStage<List<Integer>> ret =
        source.map(s -> Arrays.asList(s.split("-")))
                //split all messages into sub-streams
                .splitWhen(a -> true)
                //now split each collection
                .mapConcat(f -> f)
                //Sub-streams logic
                .map(s -> Integer.valueOf(s))
                //aggregate each sub-stream
                .reduce((a, b) -> a + b)
                //and merge back the result into the original stream
                .mergeSubstreams()
                .runWith(Sink.seq(), materializer);

//Verify results
List<Integer> list = ret.toCompletableFuture().get();
assert list.equals(Arrays.asList(6, 5, 7));

While in real life this solution if overkill for such a simple problem (you can just do everything in a map), more complex scenarios, involving in particular I/O, will benefit from the fact that you can paralelize sub-streams and get back-pressure for “free”.

TODO: Create documentation pages for typical integration patterns and some might deserve a higher level component that is implemented in Alpakka. Contributions are very welcome. Creating an issue for discussion is a good first step for such contributions.

The source code for this page can be found here.