HTTP Based Interactive Query Layer for Kafka Streams

One of the challenges of implementing streaming applications is the need to gain access to a current application’s in-memory state. General approaches for obtaining this execution information include the following:

  • Logging from the application and querying those logs by other applications
  • Periodically writing useful information to an external storage (for example a database)

Although both of these approaches work and are often used, they typically suffer from the following limitations:

  • They introduce additional latency to the execution, since they are typically implemented using synchronous calls within the execution itself
  • They require additional software components, such as a database, log aggregators, and so on, which translates to additional maintenance

In order to overcome these drawbacks, version 0.10.1 of Kafka Streams introduced Interactive Queries.

Interactive Queries allow you to treat the stream processing layer as a lightweight embedded database and to directly query the latest state of your stream processing application, without needing to materialize that state to external storage first.

Kafka Streams’ Interactive Queries leverages local state information in the node where the application is running. If the application runs in a distributed mode on multiple nodes, then each node contains the respective state information. Kafka Streams does not publish any unifying API that allows you to query across all the nodes for the state information. However it offers a set of infrastructure components that can be used to implement a query service based on your favorite end points.

Although Kafka streams provides all of the machinery for accessing state information locally inside Kafka Stream process, exposing this information via HTTP is left as an exercise for the developer.

At Lightbend, we have been using Kafka Streams for quite some time now and we have been developing utilities that make using the streams library idiomatic and expressive from Scala. As part of this initiative we are open sourcing kafka-streams-query that implements an HTTP based query layer on top of Interactive Queries. This is the second Kafka Streams Scala library that we are open sourcing, after kafka-streams-scala, a Scala API for Kafka Streams.

The HTTP functionality is implemented on top of the Akka module, akka-http, which is based on akka-streams and akka-actor, provide the right level of abstraction for providing and consuming HTTP-based services.

Quick Start

kafka-streams-query is published and cross-built for Scala 2.11, and 2.12, so you can just add the following to your SBT build:

val kafka_streams_query_version = "0.1.0"

libraryDependencies ++= Seq("com.lightbend" %%
  "kafka-streams-query" % kafka_streams_query_version)

For Maven, assuming Scala 2.12:

<dependency>
    <groupId>com.lightbend</groupId>
    <artifactId>kafka-streams-query_2.12</artifactId>
    <version>0.1.0</version>
</dependency>

For Gradle builds, assuming Scala 2.12:

compile 'com.lightbend:kafka-streams-query_2.12:0.1.0'

Note: kafka-streams-query requires Kafka Streams 1.0.0.

The API docs for kafka-streams-query is available here for Scala 2.12 and here for Scala 2.11.

The Library

The library is organized around 3 main packages containing the following:

  1. http: The main end point implementation includes a class InteractiveQueryHttpService that provides methods for starting and stopping the HTTP service. The other classes provided are HttpRequester that handles the request, does some validations, and forwards the request to KeyValueFetcher, which invokes the actual service for fetching the state information.
  2. services: This layer interacts with the underlying Kafka Streams API to fetch data from the local state. The 2 classes in this layer are (a) MetadataService that uses Kafka Streams API to fetch the metadata for the state and (b) LocalStateStoreQueryService that does the actual query for the state.
  3. serializers: A bunch of serializers useful for application development that help you serialize your model structures.

For sample usages of these abstractions, the library comes bundled with a couple of examples - one that uses the Kafka Streams DSL and the other that uses the lower level Procedure based APIs. In the section below we provide some basic usage patterns of these abstractions.

Basic Usage Patterns

Here are examples of how each layer is used.

HTTP Layer

This layer offers an abstract class InteractiveQueryHttpService that takes care of starting and stopping the HTTP service. The implementation is based on akka-http. This is an abstract class - for a concrete implementation, the routes need to be defined in a derived class. Here’s an example:

class MyHTTPService(
  hostInfo: HostInfo,
  actorSystem: ActorSystem,
  actorMaterializer: ActorMaterializer
) extends InteractiveQueryHttpService(hostInfo, actorSystem, actorMaterializer, ec) {

  // define the routes
  val routes = handleExceptions(myExceptionHandler) {
    pathPrefix("...") {
      //.. impl
    }
  }
}

Another key abstraction that this layer provides is the KeyValueFetcher, which allows querying the key/value store using HTTP-based requests. The package also includes an implementation of HttpRequester that provides a generic API over HTTP to query from a host and a store. The result is returned as a Future. This implementation is again based on akka-http and akka-streams.

Using the Kafka Streams query class, KeyValueFetcher, from within your application is easy - you just have to define your own implementation and delegate calls to the fetch methods of the base class. Here’s a snippet from the bundled example application:

class SummaryInfoFetcher(kvf: KeyValueFetcher) {
  // fetch
  def fetchAccessCountSummary(hostKey: String): Future[Long] =
    kvf.fetch(hostKey, WeblogProcessing.ACCESS_COUNT_PER_HOST_STORE, "/weblog/access/" + hostKey)

  // windowed fetch
  def fetchWindowedAccessCountSummary(hostKey: String, fromTime: Long, toTime: Long): Future[List[(Long, Long)]] =
    kvf.fetchWindowed(hostKey, WeblogProcessing.WINDOWED_ACCESS_COUNT_PER_HOST_STORE, "/weblog/access/win/", fromTime, toTime)
}

Note: The library offers a fetcher implementation for key/value based stores only. However you can define your own store as well and integrate with the rest of the library components. In our example of using the Kafka Streams Procedure based APIs, we define a custom state store based on bloom filters and implement the fetcher accordingly.

Services Layer

This layer uses the abstractions from the HTTP layer and builds higher-level services. The principal abstraction in this layer is LocalStateStoreQuery[K, V], which offers a generic query service from a local Kafka Streams state store.

Besides providing the basic query service, the implementation also takes care of some of the lower level issues to abstract users from the distribution semantics of Kafka Streams. For an application deployed in the distributed mode, if the user issues a query to the state store when Kafka Streams is in the process of rebalancing the partition, the query may fail because states may be in the process of migration across nodes. The API for LocalStateStoreQuery implements retry semantics to take care of such situations. The section Handling Rebalancing of Partitions below explains this in more detail.

The API also takes care of the situation when the state information could not be located in the node where the query is issued. The section Distributed Query below explains this situation in detail.

Defining the Application Service

Here’s a sample of how to use the service LocalStateStoreQuery when defining a custom HTTP-based application service:

// Step 1: service for fetching metadata information
val metadataService = new MetadataService(streams)

// Step 2: service for fetching from local state store
val localStateStoreQuery = new LocalStateStoreQuery[String, Long]

// Step 3: http service for request handling
val httpRequester = new HttpRequester(system, materializer, executionContext)

// Step 4: create custom http application service
val restService = new MyAppHttpService(
  hostInfo,
  new MyKeyValueFetcher(new KeyValueFetcher(metadataService, ..)),
  system, materializer, executionContext
)

// enjoy!
restService.start()

Distributed Query

If the application is run in a distributed mode across multiple physical nodes, local state information is spread across all the nodes. The http services that the library offers can handle this and provide a unified view of the global application state.

Consider the following scenario:

  1. The application is deployed in 3 nodes with IPs, ip1, ip2 and ip3. Assuming the application uses this library, the HTTP services run on port 7070 in each of the nodes.
  2. The user queries for some information from http://ip1:7070/<path>/<to>/<key>.

It may so happen that the <key> that she is looking for may not reside in host ip1. The query service handles this situation by interacting with the MetadataService as follows:

  1. User queries from host ip1
  2. Check MetadataService to get information about the key that the user is looking for
  3. If the metadata for the key indicates that the data is part of the local state in ip1, then we are done. Return the query result
  4. Otherwise, get the host information from the metadata where this state resides
  5. Query the appropriate node by reissuing the HTTP request to get the state information

Handling Rebalancing of Partitions

It may so happen that when the user does the query, Kafka Streams may be doing a partition rebalancing when states may migrate from one store (node) to another. During such a situation Kafka Streams throws InvalidStateStoreException.

Migration is typically done when new instances of the application come up or Kafka Streams does a rebalancing. The library handles such situation through retry semantics. The query API will continue to retry until the rebalancing is complete or the retry count is exhausted.

Conclusions

We hope you will find this library and our Scala API for Kafka Streams useful. We look forward to receiving your feedback!