Introducing Two Scala Libraries for Kafka Streams

At Lightbend we have been using Kafka Streams for quite some time now, mostly using it from Scala. In the course of this usage we have developed a few libraries that aim to improve the developer experience through better Scala tooling and enhanced abstraction support for Kafka Streams interactive queries. After several rounds of internal developer feedback and improvement, we decided to make these libraries available as open source that could potentially benefit the Kafka and Scala communities.

The two libraries are:

  1. kafka-streams-scala, which is a Scala library for Kafka Streams implemented as a thin wrapper around the Java API
  2. kafka-streams-query, which is a Scala library offering HTTP-based query on top of Kafka Streams Interactive Queries. This is based on akka-http.

This blog post introduces the Kafka Streams Scala library. In a future blog post, we will introduce the abstractions for the query layer.

Scala API for Kafka Streams

Kafka Streams is a JVM library that offers two types of APIs:

  1. Kafka Processor API allowing to specify execution as a topolgy of processors, where every processor is responsible for execution of a partial functionality of an overall execution
  2. Stream DSL built on top of the Streams Processor API and supports built-in abstractions for streams and tables in the form of KStream, KTable and GlobalKTable and declarative, functional programming style with stateless and stateful transformations.

Each API style has its pros and cons. While Kafka DSL is a great entry point for developers, it currently hides a lot of internals thus preventing from optimization of produced code. Processor API seems to be more complex, but allows creation of very efficient stream topologies.

Both libraries can be used from Scala programs as well. While Kafka processor APIs translates into Scala quite nice, the developer experience for Stream DSL usage is not nearly as good as that with Java. This primarily stems from the fact that Kafka Streams uses many of the features of Java generics that make type inferencing difficult for the Scala compiler. Hence the developer has to use lots of type annotations, which are sometimes even more verbose than the ones that she needs to supply to a corresponding Java program. Also Java, being a more verbose language, lacks much of the expressiveness that Scala offers, resulting in quite a bit of boilerplate in the developer code base. We have heard lots of stories on the pain of using the Java API for Kafka Streams from Scala.

kafka-streams-scala significantly improves Stream DSL usage for Scala developers. It does not attempt to provide idiomatic Scala APIs that one would implement in a Scala library developed from scratch. The intention is to make the Java APIs more usable in Scala through better type inferencing, enhanced expressiveness, and less boilerplate.

The library wraps Java Stream DSL APIs in Scala thereby providing:

  1. much better type inference in Scala
  2. less boilerplate in application code
  3. the usual builder-style composition that developers get with the original Java API

The above 3 points result in an overall improved productivity for development.

The design of the library was inspired by the work started by Alexis Seigneurin in this repository.

Quick Start

kafka-streams-scala is published and cross-built for Scala 2.11, and 2.12, so you can just add the following to your SBT build:

val kafka_streams_scala_version = "0.1.0"

libraryDependencies ++= Seq("com.lightbend" %%
  "kafka-streams-scala" % kafka_streams_scala_version)

For Maven builds, assuming Scala 2.12,

<dependency>
  <groupId>com.lightbend</groupId>
  <artifactId>kafka-streams-scala_2.12</artifactId>
  <version>0.1.0</version>
</dependency>

For Gradle builds,

compile 'com.lightbend:kafka-streams-scala_2.12:0.1.0'

Note: kafka-streams-scala requires Kafka Streams 1.0.0.

The API docs for kafka-streams-scala is available here for Scala 2.12 and here for Scala 2.11.

Sample Usage

The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them.

Here’s an example of the classic word count program that uses the Scala builder StreamBuilderS (a wrapper around StreamBuilder) and then builds an instance of KStreamS (a wrapper around KStream) using the wrapped API builder.stream. Then we reify to a table and get a KTableS, which, again is a wrapper around KTable.

The net result is that the following code is structured just like using the Java API, but from Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage will be more obvious when we use a more complicated example. The library comes with a test suite of a few examples that demonstrate these capabilities.

val builder = new StreamsBuilderS

val textLines = builder.stream[String, String](inputTopic)

val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

val wordCounts: KTableS[String, Long] =
  textLines.flatMapValues(v => pattern.split(v.toLowerCase))
    .groupBy((k, v) => v)
    .count()

wordCounts.toStream.to(outputTopic, Produced.`with`(stringSerde, longSerde))

val streams = new KafkaStreams(builder.build, streamsConfiguration)
streams.start()

Type Inference and Composition

Here’s a sample code fragment using the Scala wrapper library. Compare this example to the Scala code for the same example using the Java API directly in Confluent’s repository.

// Compute the total per region by summing the individual click counts per region.
val clicksPerRegion: KTableS[String, Long] = userClicksStream

  // Join the stream against the table.
  .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks))

  // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
  .map((_, regionWithClicks) => regionWithClicks)

  // Compute the total per region by summing the individual click counts per region.
  .groupByKey(Serialized.`with`(stringSerde, longSerde))
  .reduce(_ + _)

Better Abstraction

The wrapped Scala APIs also incur less boilerplate by taking advantage of Scala function literals that get converted to Java objects in the implementation of the API. Hence the surface syntax of the client API looks simpler and less noisy.

Here’s an example of a snippet built using the Java API from Scala:

val approximateWordCounts: KStream[String, Long] = textLines
  .flatMapValues(value => value.toLowerCase.split("\\W+").toIterable.asJava)
  .transform(
    new TransformerSupplier[Array[Byte], String, KeyValue[String, Long]] {
      override def get() = new ProbabilisticCounter
    },
    cmsStoreName)
approximateWordCounts.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))

And here’s the corresponding snippet using the Scala library. Note how the noise of TransformerSupplier has been abstracted out by the function literal syntax of Scala.

textLines
  .flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
  .transform(() => new ProbabilisticCounter, cmsStoreName)
  .to(outputTopic, Produced.`with`(Serdes.String(), longSerde))

Also, the explicit conversion asJava from a Scala Iterable to a Java Iterable is done for you by the Scala library.

Conclusions

We hope you will find these libraries useful and we look forward to receiving your feedback!