Lightbend

© 2018-2019, Lightbend, Inc. All Rights Reserved.
Release Date: June 28, 2019
Last Doc Update: 2019-09-10 12:24:56 UTC

API reference

1. An Introduction to Pipelines

Welcome to Lightbend Pipelines, v1.1.0 for OpenShift.

1.1. Why Pipelines?

Technologies like mobile, the Internet of Things (IoT), Big Data analytics, machine learning, and others are driving enterprises to modernize how they process large volumes of data. A rapidly growing percentage of that data is now arriving in the form of data streams and a growing percentage of those streams now require near-realtime processing. We use the term "Fast Data" to describe applications and systems that deal with such requirements.

The Fast Data landscape has been rapidly evolving, with tools like Spark, Flink, and Kafka Streams emerging from the world of large scale batch processing while projects like Reactive Streams and Akka Streams have emerged from the world of application development and high-performance networking.

The demand for availability, scalability, and resilience is forcing Fast Data architectures to become more like microservice architectures. Conversely, successful organizations building microservices find their data needs grow with their organization while their data sources are becoming more stream-like and more real-time. Hence, there is a unification happening between streaming data and microservice architectures.

It can be quite hard to develop, deploy, and operate large-scale microservices-based systems that can take advantage of streaming data and seamlessly integrate with systems for analytics processing and machine learning. The individual technologies are well-documented individually but combining them into fully integrated unified systems is no easy task.

Pipelines aims to make this easier by integrating the most popular streaming frameworks into a single platform for creating and running distributed Fast Data applications.

1.2. What does Pipelines do?

Pipelines allows developers to quickly build and deploy large, distributed stream processing applications by breaking such applications into smaller stream processing units called streamlets. Each streamlet represents a discrete chunk of stream processing logic with data being safely persisted at the edges using pre-defined schemas. Streamlets can be scaled up and down to process partitioned data streams. Streamlets can be written using multiple streaming runtimes, such as Akka Streams and Spark. This exposes the full power of the underlying runtime and its libraries while providing a higher-level abstraction for composing streamlets and expressing data schemas.

Streamlets can be composed into larger systems using "application blueprints", which specify how streamlets can be connected together. They can then be deployed as a whole and Pipelines will take care of deploying the individual streamlets and making sure connections get translated into data flowing between the streamlets at runtime.

Pipelines provides tooling for developing streamlets, composing them into applications, deploying those applications to your clusters, as well as operating and monitoring deployed applications.

Pipelines application
Figure 1. Pipelines application

1.3. Terminology

1.3.1. Applications

Everything in Pipelines is done in the context of an Application, which represents a self-contained distributed system of data processing services connected together by data streams.

Pipelines users can create and manage multiple applications, each one forming a separate self-contained system.

Multiple applications
Figure 2. Multiple applications

Within applications, there are three main types of entities to interact with:

  • streamlets

  • the application blueprint

  • the deployed application

Application structure
Figure 3. Application structure

1.3.2. Streamlets

Streamlets form the basic building blocks of applications since they implement the actual stream processing logic. Streamlets are deployed by Pipelines as self-contained, isolated services, which can then be composed into streaming distributed applications.

Streamlets can be visualized as black boxes with at least one data inlet and/or data outlet. Each inlet and outlet is explicitly typed using a schema.

Streamlet external view
Figure 4. Streamlet external view

Every streamlet has a unique name (e.g. its Scala/Java canonical class name) as well as a number of metadata properties such as a description and a set of labels.

Outlets represent durable data. This means that the Pipelines runtime guarantees that data written by a streamlet to one of its outlets will be persisted and will survive restarts of the streamlet or the entire application.

Depending on the number of inlets and outlets, streamlets can have different shapes.

In the next few sections we will discuss the main attributes of a streamlet.

Inlet and Outlet

A streamlet can have inlets and outlets. A streamlet should have at least one inlet or outlet. Each inlet is identified by the following:

  • a codec indicating the type of data it accepts. In Pipelines we currently support Avro as the codec

  • the class of data it accepts used as the type parameter in defining the inlet

  • a name specified as part of the inlet definition

Similarly all outlets have the same attributes as the inlets. In addition every outlet also needs to specify a partitioning function as part of its definition. This forms the logic based on which data will be partitioned when writing to Kafka downstream. Data partitioning in Kafka ensures scalability.

Streamlet Shape

The inlets and outlets define the shape of a streamlet. A streamlet that’s supposed to accept streaming data of a specific type and split into valid and invalid records can have one inlet and two outlets (one for valid and the other for invalid records). Pipelines offers APIs to build streamlets of various shapes.

Streamlet Logic

Logic defines the business process that the streamlet is supposed to execute. For a streamlet that validates incoming streaming data and classifies into valid and invalid records, the exact logic of validation is the business process and must be defined as part of the streamlet logic.

Depending on the shape we can think of various types of streamlets which can conceptually map to elements of a streaming data pipeline. In the following sections we discuss a few of the commonly used ones.

Ingress

An Ingress is a streamlet with zero inlets and one or more outlets. An ingress could be a server handling requests, like the HttpIngress that comes standard with Pipelines. It could also be polling some back-end for data or it could even be a simple test data generator.

Ingress
Figure 5. Ingress
Processor

A Processor has one inlet and one outlet. Processors represent common data transformations like map and filter, or any combination of them.

Processor
Figure 6. Processor
FanOut

FanOut-shaped streamlets have a single inlet and two or more outlets.

FanOut
Figure 7. FanOut
FanIn

FanIn-shaped streamlets have a single outlet and two or more inlets.

FanIn
Figure 8. FanIn
Egress

An Egress represents data leaving the Pipelines application. For instance this could be data being persisted to some database, notifications being sent to Slack, files being written to HDFS, etc.

Egress
Figure 9. Egress

1.3.3. Schemas

Inlets and outlets of streamlets are explicitly typed, e.g. they only handle data that conform to specific Avro schemas. Pipelines takes a schema-first approach:

  • Avro schemas need to be defined for every inlet and outlet of the streamlet. The Pipelines sbt plugin generates Java or Scala classes from the Avro schemas.

  • The generated types are used when defining the inlets and outlets of a streamlet.

  • Inlets and outlets can only be connected when their schemas are compatible.

For more information on schema compatibility and resolution, see Avro Schema Resolution.

1.3.4. Blueprints

After you have created some streamlets, it’s time to compose them together into a streaming application. Composition and configuration of streamlets is done by building an "application blueprint", which represents the entire distributed application as a set of named instances of streamlets connected together, an example of which is shown in the figure below:

Blueprint
Figure 10. Blueprint

The blueprint is just a source file which is part of the sbt project. A blueprint is required when publishing a Pipelines application. There can only be one blueprint per Pipelines application project. For the above blueprint that file would look something like this:

blueprint {
  streamlets {
    metrics = com.foo.Ingress1
    map = com.foo.Processor1
    logger = com.foo.Egress1
    validate = com.bar.FanOut1
    store = com.foo.Egress2
    logger2 = com.foo.Egress3
  }
  connections {
    metrics.out-0 = [map.in]
    metrics.out-1 = [logger.in]
    map.out = [validate.in]
    validate.out-0 = [store.in]
    validate.out-1 = [logger2.in]
  }
}

Each streamlet in the application blueprint is added by giving it a unique "reference" name, which identifies it. Multiple instances of the same streamlet can be added to the blueprint as long as they have different reference names.

Two streamlets can be connected together by connecting the inlet of one streamlet to a compatible outlet of another streamlet. Compatibility of inlets with outlets is purely based on the compatibility of their associated schemas. An inlet with a schema A can only be connected to an outlet that has a schema that is compatible with schema A. This compatibility is verified by the Pipelines sbt plugin.

Each outlet can have multiple downstream inlets connected to it. Every inlet reads data from the outlet independently. This is why the above blueprint file shows that each outlet actually has a list of inlets connected to it instead of a single inlet.

An example of a blueprint with multiple inlets connected to the same outlet would look like this:

    ...
    validation.out-1 = [
      logger2.in,
      other-streamlet.in
    ]
    ...

For more details on how to develop, compose, configure, deploy, and operate streamlets in an application, see the Getting Started guide and the Development Guide.

1.3.5. Streamlet Runtimes

Streamlets can be developed using several supported technologies. In Pipelines such a supported technology is called a "runtime".

Pipelines currently supports two runtimes:

  • Akka Streams (2.5.19+, Scala 2.12 or Java 8)

  • Spark (2.4.3+, Scala 2.12)

The Development Guide has sections on how to write streamlets for the specific runtimes.

1.4. Deployment

The blueprint can be verified through an sbt command. An application can be deployed when the application blueprint does not contain problems. Pipelines will take the blueprint and deploy all its streamlets to the relevant runtime(s) in such a way that data can start streaming as described.

1.5. Interacting with Deployed Applications

A deployed application is in its running state, e.g data is streaming from streamlet to streamlet.

Deployed Application
Figure 11. Deployed Application

Pipelines applications also generate lots of metrics that can be monitored using the Lightbend Console and the upcoming Pipelines Monitoring UI.

1.6. Data Safety and Overhead of Streamlets

As mentioned earlier, streamlets are deployed by the Pipelines operator as self-contained, isolated services. Data that flows between different streamlets will be serialized and persisted.

Data flowing between inlets and outlets is guaranteed, within some constraints, to be delivered at least once, with a minimum of potential duplicated records during live restarts or upgrades. More details about data safety can be found in the development guides for the different runtimes.

Of course data persistence between streamlets adds overhead and it is important to realize this when designing your applications. Several individual data processing operations can be combined into one single streamlet or spread out over multiple streamlets. Which design is best will depend on your use case and, for instance, on whether there is any value in the intermediate data produced. Intermediate data from an outlet can be consumed by multiple downstream streamlets instead of just one; each streamlet will receive all of the data.

For example, let’s look at the following blueprint:

Original Blueprint
Figure 12. Original Blueprint

Data transformation and validation are done in two separate steps. If the output of the initial transformation is not useful, e.g. if no other streamlets will consume that stream, then the map and validate streamlet can be combined into a single streamlet to save the overhead of (de)serialization and persistence:

Updated Blueprint
Figure 13. Updated Blueprint

2. Installing Pipelines

2.1. Overview

The Pipelines installer is a set of scripts and Helm charts that are packaged in a docker container together with all their dependencies.

The installer will perform the following sequence of steps:

  • Make sure that the correct version of Helm’s Tiller component is installed and with the correct permissions.

  • Add the Lightbend and Kubernetes Stable Helm chart repositories.

  • Install the Kafka and Spark operators using Helm.

  • Install the Lightbend console, which includes the Pipelines monitoring UI, using Helm.

  • Configure and Install Pipelines using Helm.

For each step there will be output on screen detailing the progress from the executables used.

2.2. Requirements

The following requirements must be met before running the installer:

2.2.1. Cluster requirements

Mutating Admission Webhook

The Kubernetes Mutating Admission Webhook admission controller plugin must be enabled. It is used by the Spark operator to customize driver and executor pods and without it Spark-based streamlets will not work properly. Most Kubernetes distributions have this plugin enabled and the Kubernetes community recommends this plugin to be enabled.

For more information about how and why the Spark operator uses this admission controller plugin, please refer to the following document:

2.2.2. Installer requirements

  • Kubernetes CLI tool, kubectl

  • OpenShift CLI tool, oc

  • Docker

  • Credentials for the Lightbend commercial repositories.

  • Credentials for the OpenShift cluster.

  • Credentials for the OpenShift container registry.

  • The user must be a cluster admin to install Pipelines. The following objects are created during the installation:

    • projects/namespaces.

    • deployments

    • service accounts

    • clusterrole bindings

    • role bindings

    • roles

    • secrets

    • persistent volumes claims

    • config maps

2.3. Installing

Login to the Lightbend commercial docker registry using the Lightbend-provided user name and API key:

docker login -u user_name -p api_key lightbend-docker-commercial-registry.bintray.io

Pull the docker image with the Pipelines installer:

docker pull lightbend-docker-commercial-registry.bintray.io/pipelines/pipelines-installer:1.1.0

Start the container and execute bash.

docker run -it lightbend-docker-commercial-registry.bintray.io/pipelines/pipelines-installer:1.1.0 /bin/bash

In what follows, $CLUSTER_FQDN is the cluster’s fully-qualified domain name or IP address. Either define this environment variable for your cluster in your current command window, so you can copy and paste the commands below, or substitute the correct value when you enter the commands.

Login to the cluster where you want to install Pipelines. For OpenShift, use the following command:

oc login $CLUSTER_FQDN --token=TOKEN

The TOKEN is unique to your OpenShift cluster. You can copy and paste this whole login command, including the token argument, using the upper-right-hand pull down menu, the one labeled with your user name, in the OpenShift GUI.

Run the Pipelines installer:

./install-openshift.sh $CLUSTER_FQDN

The script will query for the credentials to the Lightbend commercial repository.

Wait until the script completes and check the log output for the following text:

+------------------------------------------------------------------------------------+
|                      Installation of Pipelines has completed                       |
+------------------------------------------------------------------------------------+

This signals that the installation completed successfully and Pipelines has been deployed to your cluster in the namespace lightbend.

2.4. Post installation

2.4.1. Validating the installation

After the installation has completed, Kubernetes will be creating pods and other resources. This may take some time. You can check that all pods are healthy with the following command:

oc get pods -n lightbend

NAME                                                         READY   STATUS    RESTARTS   AGE
es-console-6f59cc59fd-xbcrx                                  2/2     Running   0          1m
grafana-server-57c887dd5-8ssmv                               1/1     Running   0          1m
pipelines-operator-698cf99d74-4bzn8                          1/1     Running   0          1m
pipelines-sparkoperator-fdp-sparkoperator-649695854c-6h7gz   1/1     Running   0          1m
pipelines-strimzi-entity-operator-549b4c45c4-prrcj           2/2     Running   0          1m
pipelines-strimzi-kafka-0                                    2/2     Running   0          1m
pipelines-strimzi-kafka-1                                    2/2     Running   0          1m
pipelines-strimzi-kafka-2                                    2/2     Running   0          1m
pipelines-strimzi-zookeeper-0                                2/2     Running   0          1m
pipelines-strimzi-zookeeper-1                                2/2     Running   0          1m
pipelines-strimzi-zookeeper-2                                2/2     Running   0          1m
prometheus-alertmanager-84fd6ffc5f-dt4nj                     2/2     Running   0          1m
prometheus-kube-state-metrics-f66c8f9c-mldbm                 1/1     Running   0          1m
prometheus-server-8fc8948cf-qmc75                            3/3     Running   0          1m
strimzi-cluster-operator-56d699b5c5-vqnc2                    1/1     Running   0          1m

When all pods are running, open a browser and navigate to the Lightbend Console.

The Pipelines installer automatically creates a Route that exposes the Console interface to external clients. The route will have the following format:

console-server-lightbend.YOUR-FQDN

The following script can be used to find the address of the route created by the installer:

oc get routes -n lightbend | awk '/console-server-lightbend/ {address=$2} END {print "The address for Lightbend Console is: " ((address)? address : "Not found")}'

The output of the command looks like this:

The address for Lightbend Console is: console-server-lightbend.some-cluster.lightbend.com

The following image shows the landing page of the Lightbend Console.

Lightbend Console
Figure 14. Lightbend Console

There will be a "Pipelines" icon in the top-left corner that will navigate to the main Pipelines Console page.

More details on the Pipelines Console can be found in the Monitoring Pipelines guide.

2.4.2. Configuring a container registry for Lightbend Pipelines

After installation you will need to enable anonymous access of the Lightbend container registry namespace. This is done using the container registry UI, which is located at https://registry-console-default.[FQDN]/registry. The figure below shows how to enable anonymous access. Select Projects from the left panel, select the lightbend project and click on the project access policy link and change the access policy to Anonymous: Allow all unauthenticated users to pull images.

OpenShift Container Registry
Figure 15. OpenShift Container Registry

Failure to enable anonymous access will prevent users from deploying Pipelines applications. If that happens, the oc plugin pipelines deploy command will print the following:

Error response from daemon: unauthorized: authentication required

2.4.3. Adjusting Kafka PVC size

By default Kafka will request 500 GB of persistent volume claims and the the max retention bytes per partition will be set to a 5000th of that. This PVC size and the log retention byte ratio can be adjusted with the following script.

./common/adjust-kafka-sizing.sh 500 5000

Setting PVC size to 5000Gi
Setting log retention bytes to 1073741824 bytes
kafka.kafka.strimzi.io/pipelines-strimzi patched
kafka.kafka.strimzi.io/pipelines-strimzi patched

This will update the PVC size to 5000 GB and the partition log retention bytes to ~1 GB.

2.4.4. Adjusting resource limits of Akka and Spark pods

The default values of the memory assigned to new pods are the following:

Spark driver pod

Request Memory: 512 M
Request CPU:    0.5
Limit CPU:    Not set / Optional

Spark executor pod

Request Memory: 2048 M
Request CPU:    1
Limit CPU:    Not set / Optional

Akka pod

Request Memory: 1024 M
Request CPU:    1
Limit Memory: 1024 M
Limit CPU:    1

These values can be changed using the following scripts provided with the installer.

./common/adjust-pod-cpu-resource.sh ['akka' | 'spark-executor' | 'spark-driver'] ['limits' | 'requests'] CPU-IN-MILLICORES
./common/adjust-pod-memory-resource.sh ['akka' | 'spark-executor' | 'spark-driver'] ['limits' | 'requests'] MEMORY-IN-MB

These scripts, which change memory and CPU resources, take three arguments:

  • Pod type to configure. This can be either akka, spark-driver or spark-executor.

  • Type to set, it can be limits or requests.

  • New CPU limit in "millicores" (for example 2000 for 2 cores) or the new memory limit in MB (for example 1024 for 1 GB of memory).

Examples:

Adjusting CPU limits for Akka pods to 2 CPUs:

./common/adjust-pod-cpu-resource.sh akka limits 2000

Adjusting CPU requests for Akka pods to 1 CPU:

./common/adjust-pod-cpu-resource.sh akka requests 1000

Adjusting memory limits for Spark-executor pods to 4GB:

./common/adjust-pod-cpu-resource.sh akka requests 4096
Warning

If the limits is adjusted below the request application deployment will fail. Also if you adjust the requests to be higher than your cluster node size, the streamlets can’t be run and deployment will fail.

Spark pods also have a memory overhead parameter. Memory overhead is the amount of off-heap memory a Spark executor pod may allocate in MB.

./common/adjust-pod-spark-memory-overhead.sh [spark-executor | spark-driver] MEMORY-IN-MB

For Spark executor pods the JAVA_OPTS can be adjusted using the following script.

./common/adjust-spark-executor-java-opts.sh NEW-JAVA-OPTS-STRING

Where NEW-JAVA-OPTS-STRING is a list of valid JVM options. The default value for this string is:

-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap

It recommended that these values are included in the string set.

2.5. Calculating Pipelines Application resource usage

To estimate the size of a Pipelines application the following information can be used. For each streamlet in the application, add the number of CPUs and memory it uses to the total.

Spark streamlets
Memory: 1580 M
CPU:    3

Akka streamlets
Memory: 1024 M
CPU:    1
Note

The Spark values should be multiples with the (default 2) scaling factor + 1 for the total resource usage per streamlet.

We can use this information to calculate the limits for the sample application "call-record-aggregator" (using the default scaling factor of 2 for Spark streamlets).

Streamlet               CPU     Memory
cdr-generator1          1       1024
cdr-generator2          1       1024
merge                   1       1024
cdr-ingress             1       1024
cdr-validator           1       1024
cdr-aggregator          3       1580
    executor 1          3       1580
    executor 2          3       1580
console-egress          1       1024
error-egress            1       1024

Total                   16      11908

Each node in the cluster must be able to run each of these pods, which in this case means that the minimum node size of the cluster is 3 CPUs and 1580 MB of memory. We would advise to add a healthy percentage on top of that for overhead per node.

2.6. FAQs

2.6.1. Q) What is the minimum node size needed to deploy a Pipelines application ?

A) Calculate the resource limits for all streamlets in a Pipelines applications. The size of the nodes needs to be able to accommodate the largest streamlet in the application plus the overhead of the pod.

2.6.2. Q) Where can I find more information about OpenShift and GlusterFS ?

3. Pipelines Quick Start

This document gives an overview of the steps that a user needs to follow in order to develop a complete Pipelines application. It talks about how to implement a simple application using the Scala API and Akka Streams as the runtime. For details on other runtime support, Java language support and more advanced features that Pipelines offers, you are encouraged to take a look at Build and Deploy a Pipelines Application.

3.1. Setting up a Pipelines Development Environment

3.1.1. Prerequisites

This guide assumes that you have the following tools installed:

3.1.2. Access to Lightbend Commercial Repositories

Pipelines currently depends on the Lightbend commercial repository for the various artifacts that you need to download and access. This page on the Lightbend development portal has all the relevant details on how to set up your local sbt to access the repository.

Docker

Pipelines builds Docker images that get pushed to a registry. The Pipelines operator fetches the images from that registry for deployment. This Docker registry can be configured at install time. This means that we need to do an upfront docker login (that we explain below) before accessing the registry.

The Pipelines sbt plugin will use Docker to build and push application images to a Docker registry:

  • The sbt-pipelines plugin pushes Pipelines application Docker images to a Docker registry.

  • The Pipelines operator pulls Docker images from the registry to deploy Pipeline applications.

  • For OpenShift, this docker registry is configured on the server. It is always the configured registry that you should log into. Usually, it will be the built-in registry, but a different registry could be used, which will be configured during cluster installation. If you are not sure about how to access it, please contact your system administrator.

In order to ensure that the registry is accessible, a docker login needs to be done first:

$ docker login -p password -u unused docker-registry-default.$CLUSTER_DOMAIN

Here, $CLUSTER_DOMAIN refers to the main DNS name of the cluster. (You can define this environment variable or just substitute the correct string.) The password token can be obtained from the cluster console UI as follows:

  1. Login to the cluster

  2. Go to the menu option at the top right of the console (marked by ?)

  3. Select Command Line Tools option and follow the instructions there in to get the token

  4. Need to get the token afresh every time it expires

Alternatively, if you are logged in to the cluster, you can copy the oc login command from the menu option Copy Login Command which is the top item in the rightmost menu under your login name.

3.1.3. Install the Pipelines CLI

The Pipelines oc plugin is used to deploy and configure Pipelines applications. It is actually a plugin for the Kubernetes kubectl CLI, but it also works in oc. A kubectl plugin is a standalone program whose name starts with kubectl-. It must be located in a directory in your PATH.

Download the Pipelines kubectl plugin from one of the following locations and move it to a directory in your PATH:

Even if you plan to use the OpenShift oc CLI, you’ll need to use kubectl for the next few steps.

To verify that the plugin has been successfully installed, list all plugins using the following command.

kubectl plugin list

The result should look something like this:

  The following kubectl-compatible plugins are available:

  /home/user/go/bin/kubectl-pipelines

The Pipelines kubectl plugin is now correctly installed.

Now let’s install the plugin into the OpenShift oc CLI (if you plan to use it):

kubectl pipelines install-oc-plugin

See The install-oc-plugin command for more details.

Most of the CLI examples in this documentation use oc plugin pipelines …​ to run Pipelines commands. If you prefer to use kubectl, then use kubectl pipelines …​ (no plugin argument) instead.

3.1.4. The First Pipelines Application

Let’s walk through creating a first Pipelines application.

Create the sbt build File

Let’s see a basic sbt build project that shows the essentials for creating a Pipelines project.

It’s important to note that the plugin PipelinesAkkaStreamsApplicationPlugin selects the streaming runtime as Akka Streams. Pipelines also support Spark as a runtime - we will discuss an example application using the Spark runtime in the Build and Deploy a Pipelines Application guide.

lazy val firstPipelinesProject = (project in file("."))
  .enablePlugins(PipelinesAkkaStreamsApplicationPlugin)
  .settings(
    libraryDependencies ++= Seq(..),
    name := "first-pipelines-project",
    ...
  )
Create the Avro Schema for the Domain

Any application starts with a set of domain object definitions and in a Pipelines application, the domain objects are designed schema-first. The user needs to define an Avro schema (.avsc file) for every element of the domain (typically in the folder named src/main/avro). Let’s take a look at an example Avro schema that processes streaming data from various sensors. Here are the schema elements with an one-liner explanation for each of them:

SensorData - reports timestamped measurements for every device

{
    "namespace": "pipelines.examples.sensordata",
    "type": "record",
    "name": "SensorData",
    "fields":[
         {
            "name": "deviceId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
         },
         {
            "name": "timestamp",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
         },
         {
            "name": "measurements", "type": "pipelines.examples.sensordata.Measurements"
         }
    ]
}

Measurements - models what need to be measured

{
    "namespace": "pipelines.examples.sensordata",
    "type": "record",
    "name": "Measurements",
    "fields":[
         {
            "name": "power", "type": "double"
         },
         {
            "name": "rotorSpeed", "type": "double"
         },
         {
            "name": "windSpeed", "type": "double"
         }
    ]
}

Metric - the metric types for every device

{
    "namespace": "pipelines.examples.sensordata",
    "type": "record",
    "name": "Metric",
    "fields":[
         {
            "name": "deviceId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
         },
         {
            "name": "timestamp",
            "type": {
                "type": "long",
                "logicalType": "timestamp-millis"
            }
         },
         {
            "name": "name", "type": "string"
         },
         {
            "name": "value", "type": "double"
         }
    ]
}

InvalidMetric - models the error in a metric

{
    "namespace": "pipelines.examples.sensordata",
    "type": "record",
    "name": "InvalidMetric",
    "fields":[
         {
            "name": "metric", "type": "pipelines.examples.sensordata.Metric"
         },
         {
            "name": "error", "type": "string"
         }
    ]
}

As part of the build process, the schema will be processed and appropriate Scala case classes will be generated in the same name and package as above, e.g. for the above schema, the build process will generate case classes named:

  • pipelines.examples.sensordata.SensorData

  • pipelines.examples.sensordata.Measurement

  • pipelines.examples.sensordata.Metric

  • pipelines.examples.sensordata.InvalidMetric

These classes will be used when we design streamlets for the application.

Design the Streamlets

Streamlets form the basic building blocks of a Pipelines application and they need to be defined on the domain element types. e.g. we can implement an Akka stream streamlet that writes SensorData to an outlet,

  • using HTTP as the protocol

  • using JSON as the received format (see object SensorDataJsonSupport below)

  • partitioning data based on a user defined key

Here are the steps to define such a streamlet:

  1. The streamlet extends AkkaServerStreamlet, which is an AkkaStreamlet that can listen on a port

  2. The streamlet needs to define a set of inlets and outlets - here we need only an outlet since we are defining an ingress

  3. The streamlet needs to define a shape

  4. The streamlet needs to define the logic for processing of streaming data

Here’s how we do the above steps and define SensorDataIngress:

package pipelines.examples.sensordata

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

import pipelines.akkastream.AkkaServerStreamlet
import pipelines.akkastream.util.scaladsl._
import pipelines.streamlets.StreamletShape
import pipelines.streamlets.avro._

import SensorDataJsonSupport._

object SensorDataIngress extends AkkaServerStreamlet { // Step 1
  val out = AvroOutlet[SensorData]("out", s  s.deviceId.toString + s.timestamp.toString) // Step 2
  def shape = StreamletShape.withOutlets(out) // Step 3
  override def createLogic = HttpServerLogic.default(this, out) // Step 4
}

Now that we can accept JSON data via HTTP and write it to an outlet, we can define streamlets that read from the outlet and do the following:

  • do some transformation - convert SensorData to another format Metrics which will be processed downstream

  • validate Metrics data and split into valid and invalid records

Here are the 2 streamlets that do the above steps in streaming data processing:

SensorDataToMetrics - This streamlet defines a Flow that plugs in to the computation graph as a successor to the Source that receives the data from the previous streamlet of the pipeline. Have a look at the createLogic method below.

package pipelines.examples.sensordata

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._
import pipelines.streamlets.StreamletShape
import pipelines.streamlets.avro._

object SensorDataToMetrics extends AkkaStreamlet {
  val in = AvroInlet[SensorData]("in")
  val out = AvroOutlet[Metric]("out", m  m.deviceId.toString + m.timestamp.toString)
  val shape = StreamletShape(in, out)
  def flow = {
    FlowWithPipelinesContext[SensorData]
      .mapConcat { data 
        List(
          Metric(data.deviceId, data.timestamp, "power", data.measurements.power),
          Metric(data.deviceId, data.timestamp, "rotorSpeed", data.measurements.rotorSpeed),
          Metric(data.deviceId, data.timestamp, "windSpeed", data.measurements.windSpeed)
        )
      }
  }
  override def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = atLeastOnceSource(in).via(flow).to(atLeastOnceSink(out))
  }
}

MetricsValidation - Validates Metrics and separates into valid and invalid records.

package pipelines.examples.sensordata

import pipelines.akkastream._
import pipelines.akkastream.util.scaladsl._
import pipelines.streamlets._
import pipelines.streamlets.avro._

object MetricsValidation extends AkkaStreamlet {
  val in = AvroInlet[Metric]("in")
  val invalid = AvroOutlet[InvalidMetric]("invalid", m  m.metric.deviceId.toString + m.metric.timestamp.toString)
  val valid = AvroOutlet[Metric]("valid", m  m.deviceId.toString + m.timestamp.toString)
  val shape = StreamletShape(in).withOutlets(invalid, valid)

  override def createLogic = new SplitterLogic(in, invalid, valid) {
    def flow = flowWithPipelinesContext()
      .map { metric 
        if (!SensorDataUtils.isValidMetric(metric)) Left(InvalidMetric(metric, "All measurements must be positive numbers!"))
        else Right(metric)
      }
  }
}

Also we need a couple of helper classes that support the above implementation of the streamlets.

SensorDataUtils - Has a method for validating a Metric

package pipelines.examples.sensordata

object SensorDataUtils {
  def isValidMetric(m: Metric) = m.value >= 0.0
}

Miscellaneous classes for JSON support for the domain element types.

package pipelines.examples.sensordata

import java.time.Instant
import java.util.UUID

import scala.util.Try

import spray.json._

trait UUIDJsonSupport extends DefaultJsonProtocol {
  implicit object UUIDFormat extends JsonFormat[UUID] {
    def write(uuid: UUID) = JsString(uuid.toString)

    def read(json: JsValue): UUID = json match {
      case JsString(uuid)  Try(UUID.fromString(uuid)).getOrElse(deserializationError(s"Expected valid UUID but got '$uuid'."))
      case other           deserializationError(s"Expected UUID as JsString, but got: $other")
    }
  }
}

trait InstantJsonSupport extends DefaultJsonProtocol {
  implicit object InstantFormat extends JsonFormat[Instant] {
    def write(instant: Instant) = JsNumber(instant.toEpochMilli)

    def read(json: JsValue): Instant = json match {
      case JsNumber(value)  Instant.ofEpochMilli(value.toLong)
      case other            deserializationError(s"Expected Instant as JsNumber, but got: $other")
    }
  }
}

object MeasurementsJsonSupport extends DefaultJsonProtocol {
  implicit val measurementFormat = jsonFormat3(Measurements.apply)
}

object SensorDataJsonSupport extends DefaultJsonProtocol with UUIDJsonSupport with InstantJsonSupport {
  import MeasurementsJsonSupport._
  implicit val sensorDataFormat = jsonFormat3(SensorData.apply)
}

Now that we have the streamlets defined, the next step is to wire them together, through the creation of a blueprint.

Create the Blueprint

Streamlets are connected to each other via a blueprint. The blueprint is defined as a combination of streamlets and connections. By default it is located in a file in the Pipelines application project at the path src/main/blueprint/blueprint.conf.

blueprint {
  streamlets {
    sensor-data = pipelines.examples.sensordata.SensorDataIngress
    metrics = pipelines.examples.sensordata.SensorDataToMetrics
    validation = pipelines.examples.sensordata.MetricsValidation
  }

  connections {
    sensor-data.out = [metrics.in]
    metrics.out = [validation.in]
  }
}

In the above blueprint, the ingress feeds into the processor that transforms SensorData to Metrics. This, in turn feeds into the validator that separates valid and invalid records. The validator has 2 outlets - one of them feeds valid Metrics to downstream while the other publishes invalid Metrics records (maybe) to some other egress. The blueprint above may be enhanced with more streamlets and connections, but it gives an idea of how one can incrementally evolve the connection graph of streamlets declaratively. Also note that blueprints don’t need to be complete or closed to be deployed and new streamlets can be added over time.

An application can only be deployed if the blueprint is valid (though it may not be complete). The outlets and inlets must use compatible schemas and every inlet must be connected. You can verify if the blueprint is valid by using the verifyBlueprint command in sbt.

Publish the Application

This is the process that will package your streamlets and the blueprint into a Docker image, which is then pushed to a Docker registry. You will have to specify which Docker registry this is using the sbt 'pipelinesDockerRegistry' setting. This can be done by adding the following to your build.sbt:

ThisBuild / pipelinesDockerRegistry := Some("docker-registry-default.$CLUSTER_DOMAIN")

Once the blueprint is verified, publish the application using sbt buildAndPublish. If everything goes ok, a "Success" prompt will be displayed.

Deploy the Application

Use the CLI plugin to deploy the application.

$ oc plugin pipelines deploy docker-registry-default.$CLUSTER_DOMAIN/pipelines/first-pipelines-project:292-c183d80

The actual image name to be deployed can be found in the output of the sbt buildAndPublish command.

The deploy command will prompt for credentials to the docker registry, when it is used for the first time, which will be used to pull your Pipelines application image. The credentials are stored in an image pull secret and used for subsequent usages of kubectl pipelines deploy. If the service account already has image pull secrets that contain credentials for the docker registry, the deploy command will not prompt for credentials.

You can also specify credentials with --username and --password-stdin. More details on this can be found in The deploy command. If you notice that your credentials have expired, you can update these with the update-docker-credentials command, read more about it in The update-docker-credentials command. It is advisable to use credentials that do not expire or that stay valid for a longer period of time, so that images can continue to get pulled over time, for instance when the Kubernetes pods for the application restart.

The result of a successful deployment will look like this:

Deployment of application first-pipelines-project has started.
Sending Data to the Application

You can feed data through the ingress, and we can use any HTTP client to send test data to it. Here’s an example using curl:

$ curl -i -X POST [application-name].[cluster-name]/[ingress-streamlet-reference] -H "Content-Type: application/json" --data [data file name]

The data file name points to a file which can contain json records of the following format:

{
    "deviceId": "c75cb448-df0e-4692-8e06-0321b7703992",
    "timestamp": 1495545346279,
    "measurements": {
        "power": 1.7,
        "rotorSpeed": 23.4,
        "windSpeed": 100.1
    }
}

If you want to test with invalid data, this is an example of an invalid sensor data:

{
    "deviceId": "c75cb448-df0e-4692-8e06-0321b7703992",
    "timestamp": 1495545346279,
    "measurements": {
        "power": -1.7,
        "rotorSpeed": 3.9,
        "windSpeed": 25.3
    }
}
Monitoring the Application

Use the status command to check the status of a deployed application.

$ oc plugin pipelines status first-pipelines-project

Pipelines also offers a monitoring UI for monitoring the details of the applications and the cluster. More details on this can be found in the Build and Deploy a Pipelines Application guide.

This document is an overview of the steps required to build and deploy a simple pipelines application. There are quite a bit of details which we have skipped through here. Please have a look at Build and Deploy a Pipelines Application for more detailed examples, multi-project support, multi-runtime support and more.

4. Build and Deploy a Pipelines Application

This guide describes how to build, configure and run a simple example application on Pipelines.

By following this guide, you will learn how to use the Pipelines CLI and sbt plugin to build and deploy a distributed streaming application composed of streamlets.

This guide assumes that you have read An Introduction to Pipelines. It will frequently refer to terms and concepts that are explained in that document.

Also this document assumes that you have gone through Pipelines Quick Start, e.g. that you have the development environment already installed and configured.

4.1. Anatomy of a Pipelines Application

Applications are the logical units of deployment. Within a cluster, we may have one or several applications. Streamlets that belong to an application have a common lifecycle.

You can check if any application already exist, on the cluster that you are connected to, with the following command:

$ oc plugin pipelines list

The first step in creating an application is to create an sbt project. There are a couple of example projects that you can use as a starting point. The examples are setup to use the Pipelines sbt plugin. The examples are available at the pipelines-examples github repo.

Clone the pipelines-examples repo with the command shown below:

$ git clone git@github.com:lightbend/pipelines-examples.git

For the sake of this guide, we are going to work with the 'call-record-aggregator' example, which can be found in the call-record-aggregator directory on github:

$ cd pipelines-examples/call-record-aggregator

4.1.1. Multi Project Support

This is a project that has multiple sub-projects and a mix of Akka Stream and Spark streamlets. Hence this will also be a good example to work through in order to get an idea of how multi-projects are supported in a Pipelines application.

When working on a multi-project, the project that has the blueprint is the entrypoint of the application and will have to be built with PipelinesApplicationPlugin supplied by Pipelines. There can be only one sub-project per multi-project build that includes a PipelinesApplicationPlugin. Here is how we define it in build.sbt for call-record-aggregator:

lazy val callRecordPipeline = (project in file("./call-record-pipeline"))
  .enablePlugins(PipelinesApplicationPlugin)
  .settings(commonSettings)
  .settings(
    name := "call-record-pipeline"
  )
  .dependsOn(akkaCdrIngestor, akkaJavaAggregationOutput, sparkAggregation)

A few things to note from the above definition:

  • The name of the application is defined by the name of the sbt build for this project - here it is called call-record-pipeline which is the name with which this application will be deployed.

  • Note we use PipelinesApplicationPlugin in the definition above which means this will be the entry point of the application

The other sub-projects of this application are:

  • datamodel which provides the schema definitions of the application

  • akkaCdrIngestor which has several Akka Streams streamlets written in Scala

  • akkaJavaAggregationOutput which has a few egresses implemented in Java using Akka Streams

  • sparkAggregation which has a few Spark streamlets that do aggregation of datat using Spark Structured Streaming

Note

This sample project demonstrates how to structure your application when you have Akka Streams and Spark based streamlets implemented with a mix of Scala and Java.

4.1.2. Streamlet Runtime Selection

Each project must choose the runtimes it requires, either Akka Streams or Spark. A runtime can be selected by enabling the relevant SBT plugin in the build.sbt file.

For Akka-Streams (either for Scala or Java ):

lazy val akkaCdrIngestor= (project in file("./akka-cdr-ingestor"))
  .enablePlugins(PipelinesAkkaStreamsLibraryPlugin)
  ...

For Spark (currently only in Scala):

lazy val sparkAggregation = (project in file("./spark-aggregation"))
  .enablePlugins(PipelinesSparkLibraryPlugin)
  ...

Besides the above 2 plugins, Pipelines offers 2 more plugins:

  • PipelinesLibraryPlugin used to build libraries which are generic ones and don’t contain any streamlets e.g. datamodel in call-record-aggregator project

  • PipelinesApplicationPlugin used with the sub-project that acts as the main entry point to the application, the one with the blueprint.

4.2. Creating and Publishing the Application

Our application consists of a set of Streamlets, which implement the streaming business logic. The connections between the streamlets are recorded in the blueprint and the entire topology forms the application graph. The blueprint file must exist and pass validation before the application can be published and deployed.

4.2.1. Creating Streamlets

Since we created the application from an example sbt project, you should already have a bunch of streamlets and a simple domain model ready to go. The domain model represents incoming call record data which are transformed for rendering simple analytics.

The domain model is specified in the form of an Avro schema that defines the contracts of the model elements. All the schema files can be found under src/main/avro folder of the datamodel sub-project. Here’s a sample schema from the example that you just created that describes the model for an abstraction named CallRecord:

{
    "namespace": "pipelines.examples.carly.data",
    "type": "record",
    "name": "CallRecord",
    "fields":[
         {
            "name": "user",
            "type": "string"
         },
         {
             "name": "other",
             "type": "string"
         },
         {
             "name": "direction",
             "type": "string"
         },
         {
              "name": "duration",
              "type": "long"
         },
         {
            "name": "timestamp",
            "type": "long"
         }
    ]
}

As part of the build process, the schema will be processed and appropriate Scala case classes will be generated in the same name and package as above, e.g. for the above schema, the build process will generate a case class named pipelines.examples.carly.data.CallRecord.

And here are the various streamlets that form the application topology:

  • pipelines.examples.carly.aggregator.CallRecordGeneratorIngress, a Spark streamlet that generates data in the form of a Dataset[CallRecord] through execution of a Spark StreamingQuery

  • pipelines.examples.carly.ingestor.CallRecordMerge, an Akka Stream streamlet that merges call records from multiple streamlets

  • pipelines.examples.carly.ingestor.CallRecordIngress, an Akka Stream streamlet that allows you to send JSON data to an HTTP endpoint hosted by the Pipelines operator using HTTP POST

  • pipelines.examples.carly.ingestor.CallRecordValidation, an Akka Stream streamlet that splits call records into streams of valid and invalid ones

  • pipelines.examples.carly.aggregator.CallStatsAggregator, a Spark streamlet that aggregates call record data based on specific logic within a window

  • pipelines.examples.carly.output.AggregateRecordEgress, an Akka Stream Java streamlet that writes aggregated call record statistics to the console

  • pipelines.examples.carly.output.InvalidRecordEgress, an Akka Stream Java streamlet that writes invalid call records to the console

The code for each of the above streamlets can be found in the call-record-aggregator project under the various sub-projects as discussed earlier. To learn more about developing streamlets, please refer to the Development Guide

4.2.2. Creating the application blueprint

The application blueprint is located in src/main/blueprint/blueprint.conf under sub-project call-record-pipeline. A blueprint is required when publishing a Pipelines application. There can only be one blueprint per Pipelines application project. It connects the streamlets that have been shown in the previous section:

blueprint {
  streamlets {
    cdr-generator1 = pipelines.examples.carly.aggregator.CallRecordGeneratorIngress
    cdr-generator2 = pipelines.examples.carly.aggregator.CallRecordGeneratorIngress
    merge = pipelines.examples.carly.ingestor.CallRecordMerge
    cdr-ingress = pipelines.examples.carly.ingestor.CallRecordIngress
    cdr-validator = pipelines.examples.carly.ingestor.CallRecordValidation
    cdr-aggregator = pipelines.examples.carly.aggregator.CallStatsAggregator
    console-egress = pipelines.examples.carly.output.AggregateRecordEgress
    error-egress = pipelines.examples.carly.output.InvalidRecordEgress

  }
  connections {
    cdr-generator1.out = [merge.in-0]
    cdr-generator2.out = [merge.in-1]
    cdr-ingress.out = [merge.in-2]
    merge.out = [cdr-validator.in]
    cdr-validator.valid = [cdr-aggregator.in]
    cdr-aggregator.out = [console-egress.in]
    cdr-validator.invalid = [error-egress.in]
  }
}

The streamlets section contains named instances of streamlets that will be used. The connections section describes how these named streamlet instances are connected, from outlet to inlet(s). An inlet can only connect to one outlet. Many inlets can connect to the same outlet.

An application can only be deployed if the blueprint exists and is valid. The outlets and inlets must use compatible schemas and every inlet must be connected. You can verify whether the blueprint is valid by using the verifyBlueprint command in sbt. That command will always be run automatically when publishing the application.

You can use a different filename for the blueprint by setting the blueprint sbt Setting in your Pipelines application project, but the file must exist in src/main/blueprint. To override the default blueprint filename see the following example.

lazy val callRecordPipeline = (project in file("./call-record-pipeline"))
  .enablePlugins(PipelinesApplicationPlugin)
  .settings(commonSettings)
  .settings(
    name := "call-record-pipeline",
    blueprint := Some("different-blueprint.conf")
  )
  .dependsOn(akkaCdrIngestor, akkaJavaAggregationOutput, sparkAggregation)

4.2.3. Publishing the Application

Once we are done developing our streamlets and connecting the streamlets in a blueprint, we need to package and publish the application to the target cluster that we connected to earlier on.

The buildAndPublish command will create a docker container with our artifacts and publish it to the Pipelines docker registry.

cd into the call-record-aggregator directory and execute the following:

$ sbt buildAndPublish

If everything goes well, you will see a [success] message at the end.

4.2.4. Deploying the application

Deployment of the application is done using the Pipelines oc plugin, the plugin adds a sub-command to oc called pipelines. The complete list of commands available from the Pipelines plugin can be viewed as follows:

$ oc plugin pipelines

This command line tool can be used to deploy and operate Pipelines applications.

Usage:
  pipelines [command]

Available Commands:
  configure     Configures a deployed Pipelines Application.
  deploy        Deploys a Pipelines Application to the cluster.
  help          Help about any command
  list          Lists deployed Pipelines Application in the current cluster.
  scale         Scales a streamlet of a deployed Pipelines Application to the specified number of replicas.
  undeploy      Undeploy one or more Pipelines applications.
  version       Print the plugin version.

Flags:
  -h, --help   help for pipelines

Use "pipelines [command] --help" for more information about a command.

To deploy the application, the full docker image path of the application is needed, it’s printed as the final step in the buildAndPublish task.

In this example we will use the following docker image path for our cluster.

registry.streampipe.lightbend.com/pipelines/call-record-pipeline:292-c183d80

The docker image is needed because it contains the application descriptor, the application descriptor will be the basis for the Kubernetes Custom Resource that the CLI will create on the cluster.

The deploy command will prompt for credentials to the docker registry, when it is used for the first time. The credentials are stored in an image pull secret and used for subsequent usages of kubectl pipelines deploy. If the service account already has image pull secrets that contain credentials for the docker registry, the deploy command will not prompt for credentials.

You can also specify credentials with --username and --password-stdin. More details on this can be found in The deploy command. If you notice that your credentials have expired, you can update these with the update-docker-credentials command, read more about it in The update-docker-credentials command. It is advisable to use credentials that do not expire or that stay valid for a longer period of time, so that images can continue to get pulled over time, for instance when the Kubernetes pods for the application restart.

Enter the following command to deploy the application.

$ oc plugin pipelines deploy registry-default.streampipe.lightbend.com/pipelines/call-record-pipeline:292-c183d80

The result of a successful deployment will look like this:

Deployment of application call-record-pipeline has started.

4.2.5. Providing configuration parameters when deploying a Pipelines application

When deploying an application there may be streamlets that have certain configuration requirements that need to be fulfilled. Examples of this can be credentials, hostnames or ports that must be configured per deployment.

These requirements can be provided to the deploy command as a set of key/value pairs appended after the image path.

$ oc plugin pipelines deploy registry-default.streampipe.lightbend.com/pipelines/call-record-pipeline:292-c183d80 cdr-aggregator.group-by-window="7 minute" cdr-aggregator.watermark="1 minute"

The name of the streamlet is used as prefix to the configuration parameter to distinguish between configuration parameters for different streamlets that have the same name.

If you are unsure of what parameters is required to deploy the application, you can execute the deploy command with just the application name and a list of missing configuration parameters will be printed.

$ oc plugin pipelines deploy registry-default.streampipe.lightbend.com/pipelines/call-record-pipeline:292-c183d80

Cannot find the following configuration key(s) defined in the command line arguments.
- group-by-window
- watermark

4.2.6. Reconfigure a deployed Pipelines Application

Sometimes there is a need for configure an already deployed application, this can be done using the configure command.

The configure command takes a deployed Pipelines application name and one or more configuration parameters.

$ oc plugin pipelines list

NAME                 NAMESPACE            VERSION     CREATION-TIME
call-record-pipeline call-record-pipeline 292-c183d80 2019-03-05 12:50:33 +0100 CET

Re-configure the application by running the following command:

$ oc plugin pipelines configure call-record-pipeline cdr-aggregator.group-by-window="5 minute" cdr-aggregator.watermark="2 minute"

2019/03/05 12:59:20 Configuration of application call-record-pipeline have been updated.

4.2.7. Scaling a streamlet in a Pipelines Application

Each type of streamlet (Akka or Spark) have a default set of replicas, this can be adjusted after deployment using the scale command.

The scale command takes 3 parameters, name of the application, name of the streamlet to scale and how many replicas that the streamlet should be scaled to.

$ oc plugin pipelines call-record-pipeline cdr-ingress 2

Streamlet cdr-ingress in application call-record-pipeline is being scaled to 2 replicas.

The number of replicas for a specific streamlet will remain in effect until the streamlet is removed by a new deployment of the application or if the application is undeployed.

4.2.8. List deployed Pipelines applications

To list all deployed Pipelines applications you can use the following command

$ oc plugin pipelines list

NAME                 NAMESPACE            CREATION-TIME
call-record-pipeline call-record-pipeline 2019-03-04 23:44:36 +0100 CET

4.2.9. Undeploying a Pipelines Application

A deployed application can be shutdown and removed using the undeploy command.

$ oc plugin pipelines undeploy call-record-pipeline

Undeployment of application `call-record-pipeline` started.

4.3. Sending data to your application

If all previous steps succeeded, at this point you should have a deployed and running application.

The application call-record-pipeline generates its own data to feed into the pipeline. But for most applications you need to feed data through an ingress, which is mostly an HttpIngress. In that case we can use any HTTP client to send test data to it. Here’s an example using curl:

$ curl -i -X POST [application-name].[cluster-name]/[ingress-streamlet-reference] -H "Content-Type: application/json" --data [data file name]

That data will now be processed and each data element will flow through the streamlets that you deployed.

4.4. Monitoring

Pipelines is fully integrated into the Lightbend Console. Besides the standard cluster-level views the Lightbend console now also has a special Pipelines section that allows you to inspect and monitor running Pipelines applications. More details can be found in the Monitoring Pipelines guide.

5. Monitoring Pipelines

5.1. Introduction to Pipelines Monitoring

This web-application is used to validate and monitor a Lightbend Pipelines application. Here you can get visual verification that your application looks as you expect, with the correct streamlets and the proper connections each with the desired schema. In addition you can watch your application run, checking on key performance indicators, such as latency and throughput, as well as the health of each streamlet and the monitors computed on them.

First a tour of the user interface components:

5.1.1. Pipeline Applications Page

Pipeline Applications Page
Figure 16. Pipeline Applications Page

The entry point for Lightbend Pipelines is the applications page. Each running pipelines application is represented with a 'tile'. Each tile contains basic information about the application (name, resource counts, how long it has been deployed, etc) along with a (live) thumbnail view of the blueprint and the health history over the current time period. Click on a tile to go to that application’s monitoring page.

To the left of the application tiles are panels containing:

  • a link to the cluster view

  • cluster resources being used for pipelines applications

  • a summary of applications organized by current health

5.1.2. Monitoring Page

Pipelines Monitoring Page Structure
Figure 17. Pipelines Monitoring Page Structure

You can always reach the embedded version of this document by clicking on any of the '?' icons found in the upper right of any panel in this application. Hide the help overlay by clicking 'CLOSE' in the upper right.

5.1.3. How to use help

This help panel can be displayed by clicking any '?' icon from any panel.

5.1.4. Fundamental Concepts: Current Period, Time and Selection

In this monitoring application all views (panels) are tightly coupled. Making a selection in one frequently affects most others. There are two main selection types used here (1) the application or (2) a streamlet. By default the application is selected - and when the page first loads you’ll see the center panel full of rows of healthbars, one for each streamlet in the application. If you click a streamlet in the blueprint diagram or click a healthbar title the application will then focus all panels on that streamlet - as it becomes the current selection.

Similarly there is a current time. The current-time is whatever time you’ve hovered over in a graph, health-bar or timeline - it is a very transient concept which is constantly changing based on the position of the mouse in these panels. The current-time is visualized by a vertical crosshair in each time-based panel (graphs, health-bars) and is reflected in the time callout in both the main timeline as well as the blueprint panel. As you hover within a graph or health-bar this current time changes to track the mouse. If you mouse out of these panels then the current time snaps to 'NOW' (the most recent time sample) - you’ll see this in the time callout in the blueprint graph with a '( LIVE )' label. Note: due to metric sampling (see observation period below) this most-recent sample could be within seconds of 'now' or minutes in the past (latency is a function of the observation period).

Time period The duration of interest is governed by the time period as selected in the upper right hand corner of the screen. The options available include: 30 minutes, 1 hour, 4 hours, 1 day and 1 week. Metrics are collected from streamlets at one rate (currently 10 second intervals) but health-bars and graphs are calculated by sampling these underlying metrics. The observation period (i.e. the time period) determines the sampling rate for all collected metrics. This sampling rate governs the temporal resolution of displays (health-bars & graphs). Ten second sampling is used for one hour duration (360 samples / hour), 40 seconds for a four hour duration, etc. Select a short observation period for low latency.

Update cycle Metrics are streamed (based on this sampling rate) and the current status of the update cycle is visualized in the top navigation bar. The health of a time-sample is determined as the 'worst' health over all collected metrics within its interval. However metrics shown in graphs are instantaneous samples and only reflect the state of the system at the time of collection.

5.2. Blueprint Graph

The blueprint graph shows the current state of the application as a directed graph of streamlets with dataflow from left to right. Each streamlet is depicted with a 'service type' (Akka Streams or Spark Streaming) icon, a name, instance count and ports.

Blueprint Graph
Figure 18. Blueprint Graph

5.2.1. Blueprint Themes

Three blueprint-based ‘themes’ are available for visualizing state in the blueprint. Each theme maps a streamlet metric into the streamlet icon color within the blueprint and another metric into the connections (color and line thickness) between streamlets. The legend below the blueprint is where you select the theme as well as understand the mappings and metric ranges (based on the current time period. Blueprint controls As you ‘scrub’ over a health-bar or metric graph on the page you’ll see streamlet and connections change color to reflect their color mapping at the current time.

Table 1. Blueprint themes
THEME STREAMLET COLOR CONNECTION COLOR AND THICKNESS

Activity

Sum of Throughput Across all Inlets

Consumer Lag (of target streamlet)

Health

Streamlet Monitor Health

Connection Schema

Latency

Streamlet Processing Time

Consumer Lag (of target streamlet)

‘Health’ is the default view. In this theme the streamlet icon’s color is based on its health

Health colors
Figure 19. Health colors

at the current time and connections are colored based on their schema. Health is defined as the least healthy of all the streamlet’s monitors at that time. This mapping is categorical in that neither health nor schema are continuous variables (for a given time) and thus the legend, below the blueprint, shows distinct states for both streamlet and connection mapping (of course health is a time-varying metric). Hovering over the color chips within the legend shows the details of the chip.

Health theme
Figure 20. Health theme

The ‘Activity’ theme maps throughput into streamlet icon color: specifically the sum of the outlets throughputs. Connections are visualized based on consumer lag (both for color and line thickness). In this manner for a given streamlet we’re visualizing its inlet pressure (consumer lag) and the streamlet’s effort at servicing that pressure (the rate of data production: the outlet throughput). The goal of this theme is to give the user a sense of the level of activity of both the pipeline as a whole as well as individual streamlets. These two mappings are continuous and the colormaps in the legend reflect this. In both cases (throughput and consumer lag) the colormap is scaled by the min-to-max range of all streamlets and connections for the current time period.

Activity theme
Figure 21. Activity theme

‘Latency’, the third theme, gives a measure of the inlet pressure (like ‘activity’ it uses consumer lag on connections) and processing time (for Akka Streams-based streamlets) for coloring the streamlet icons. In this manner it gives a feeling for the amount of work the streamlet is performing at the current time.

Latency theme
Figure 22. Latency theme

Note that not all streamlets produce processing time metrics (currently Spark-based streamlets do not provide processing time metric data) and these streamlets are always gray (as shown in the above image).

In general the color scheme for continuous variables (processing time, throughput and consumer lag) uses green to indicate ‘good’ and orange or pink to indicate ‘problematic’. Blue is used for more ambiguous states (like low throughput or low processing time) where value judgment isn’t relevant.

One side benefit of this mapping is that you can get a feeling for how the streamlet is performing relative to the entire observation period as well as among its peers.

5.2.2. Streamlet Iconography

Instance count

Each streamlet icon within the blueprint contains several visual elements

  • Service type icon (Akka Streams or Spark

  • Ports for incoming or outgoing connections to other streamlets

  • Streamlet scale (shown when more than one instance is running)

  • Endpoint (the blue cloud-like icon on the streamlet right side)

Schema’s are defined for each inlet and outlet port and both ports and, in the health theme, both ports and wires are colored by schema type, allowing you to get a quick sense of which ports are compatible. These schema colors are also used in the 'shape' details panel (see below).

Streamlets can be scaled. The scale of each streamlet is visualized below the streamlet icon. This streamlet consists of three instances (pods).

Selecting a streamlet, by clicking on it, focuses the rest of the page on this, the Current Selection. Clicking the background of the blueprint graph selects the application as a whole. The blueprint graph resizes with the window size, so the selection highlight effect depends on the resolution of the display and the complexity of the blueprint. This ranges from a very simple callout (for narrow window sizes), to a full resolution icon with a solid white outline for larger window sizes.

UI scaling
Figure 23. UI scaling

5.3. Application Details

Application status

The leftmost panel contains application status including the current (i.e. live) health rollups by streamlet as well as a summary of health transitions per streamlet over the current time period.

The amount of red (critical) and orange (warning) indicates the relative amount of time over the duration that the streamlet spent in an unhealthy condition. Streamlets in this list are ordered by their amount (weighted by severity) of health issues.

5.4. Health And Timeline

Health timeline

The center panel visualizes the health of the current selection (either application or streamlet) over the time period. The health model used in Lightbend Console is based on a hierarchical rollup of health from monitors to streamlets (or workloads) to applications as a whole.

5.5. Selection Health, Shape and Introspection

Tabs Use the tabs in this panel to show health-bars for the current selection (either application or streamlet). When the current selection is a single streamlet then additional content is available.

5.5.1. Health

This panel shows healthbars for the current selection. When the application is selected (by clicking the background of the blueprint diagram) then this panel contains streamlet health, one streamlet per row. When a streamlet is selected then the panel contains monitor health (one row per monitor).

Health bars
Figure 24. Health bars

As in the blueprint view, health information is available for four states Health colors

The health-bars can be ordered by

  • name (streamlet or monitor)

  • first-unhealthy (default)

  • most-unhealthy

First unhealthy simply orders by the first sample in each health metric to turn warning or critical. Most-unhealthy ordering is based on the summation of the number of samples over the duration that are either warning or critical - where critical has twice the weight as warning.

5.5.2. Shape: Inlet, Outlet and Endpoint Details

Click the 'Shape' tab and this panel changes to show details on the inlets and outlets of the selected streamlet. In the upper portion of the panel a graphic shows the streamlet along with its named ports colored by their schema type.

Shape view
Figure 25. Shape view

Below this is the URL of exposed endpoints (if any) followed by an expandable list of port details including the port name, schema type, current values of key performance metrics and upstream or downstream connections. Clicking on a connection in the list will select that up/downstream streamlet.

Consumer Lag metrics are defined on each inlet and Throughput metrics are defined on both inlet and outlets(please see: this). The values shown for these metrics are valid for the current-time. If you mouse over the main timeline (or a graph) and thus change the current-time you’ll see these values change as well. In this manner you can correlate exact values for these key performance metrics with other metrics from this streamlet.

These key metrics are based upon reading from or writing data to Kafka, the message bus behind the scenes. Inlets connect to Kafka via a consumer-group while outlets are written to topics. Both of these are shown in the details.

5.5.3. Stream Map: Akka Streams Introspection (Experimental Feature)

Akka Streams-based streamlets are defined by their own directed-graph of operators. This internal structure is visualized in the 'Stream Map' tab. Operators are shown as circles, connections between operators are shown as curved lines.

Stream Map
Figure 26. Stream Map

There is only one 'theme' for this visualization: connections show processing-time on the target (i.e. downstream) operator and operators show the sum of processing-time for all their incoming connections.

The left-most operators are associated with the streamlet’s inlets, while the right-most are tied to outlets. Operators in the middle are a combination of the pipelines-provided code for handling data on ports as well as the key operator(s) that you, as the developer/user, have programmed.

Stream Map tooltip This view, when combined with the blueprint 'latency' theme, can give you a good idea of where time is being spent within the streamlet. As you scrub a health-bar (or timeline or metric graph) you’ll see this directed graph update to show the exact distribution of computational resources for each current time. Hovering over an operator displays a tooltip with the Akka Streams operator definition along with its class. Note: This is available at this time for most, but not all, Akka Streams-based streamlets.

5.6. Metric Graphs

Graphs of metrics for the current selection are shown in the right-most panel. There are two basic graph representations: paired-stack graphs and area graphs. Throughput is depicted as an upper/lower stacked graph where all the incoming data are shown on top and the out-going throughput data on bottom. Each upper and lower graph could contain multiple sources and they are stacked upon each other.

Area graphs show one or more curves overlaid upon each other.

Currently applications only produce one family of high level metrics: Throughput. (please see: Key Performance Metrics)

Application throughput graph
Figure 27. Application throughput graph

Shown in a paired-stack graph.

Whenever a streamlet is selected the right-most panel displays metric graphs relating to that streamlet.

Streamlet graphs
Figure 28. Streamlet graphs

Each streamlet monitor is based upon a metric exported by the streamlet. When combined with an expression (based on the monitor type) and other parameters a monitor is defined. The metrics backing up most monitors are graphed in this panel.

In addition there are other key metrics for each streamlet type. Metrics in this category include consumer-lag(for each streamlet inlet), throughput(on streamlet inlets and outlets), number of instances running (scale) and restart history are also shown in this panel.

When a streamlet outputs several metrics with the same name but with different labels then a single graph is created for the collection of metrics. As you mouse over the graph you’ll see, along with the crosshairs, each curve highlight. Curves occluded by others cannot be highlighted, so hover over an indicator chip in the upper right. This will highlight the curve (increasing its opacity) as well as show the labels unique to this

Streamlet graph - annotated
Figure 29. Streamlet graph - annotated

In this example there are two labels with values unique to curve: container & pod.

Most graphs display a description tooltip on graph-title hover.

Graph tooltip
Figure 30. Graph tooltip

5.6.1. Key Performance Metrics

Consumer Lag is a key metric available for streamlet inlets. Each inlet is part of a Kafka Consumer Group across all the instances of that streamlet. If there is one instance of the streamlet, it is a group of one. The group members divide up the Kafka partitions of the topic they are reading. Every partition has a lag or latency measurement, which is how far back the consumer group reader is from the most recent message. This metric is captured in terms of the number of records each reader (instance) is behind the most recent record produced upstream - and is presented as the maximum across all readers in the group.

Throughput, another key metric, is available on both inlets and outlets. On streamlet inlets it represents the rate of data records read by the streamlet (in records / second). On outlets it is the rate of data records produced by the streamlet. It is useful to note that there might not be a one-to-one relationship between inlet records and outlet records and is dependent on the functionality of the streamlet itself.

For application throughput we compare the application’s incoming data (i.e. the data produced on all outlets of all ingress’s) with the outgoing data (i.e. the data consumed on all inlets of all egress streamlets in the application). Incoming data is shown in the upper stack, outgoing on the bottom stack as in the following image (note that the outgoing only has one curve (the blue one) as the other egress streamlet inlet rate is zero in this particular case).

Application throuhput graph - annotated
Figure 31. Application throuhput graph - annotated

5.6.2. Crosshair Behavior

As you move the mouse over a graph you’ll see the crosshair track the mouse. A timeline callout appears below the graph. In addition you’ll see a small vertical line drawn on all graphs, health-bars and timelines on the page - allowing you to correlate behavior across metrics, monitors and streamlets. When hovering over a metric graph, the vertical crosshair shows the (up to six) metric values (one per curve) at the current time. Callout values are shown if the mouse is within one time sample from the mouse location - meaning that unknown (missing) data is not shown.

Graph crosshair
Figure 32. Graph crosshair

5.7. Controls

Controls The top level controls panel is the jumping off point for other views on the current selection: metric dashboards (via Grafana) or infrastructure monitoring

There is a Grafana metric dashboard for each streamlet as well as the application as a whole. In these dashboards you can see a variety of metrics for each service type of the current selection (Kafka, Akka Streams, Spark, JVM and Kubernetes) separated into groups. Here you also have finer-grain control over time periods and other graphing attributes.

Graphana dashboard
Figure 33. Graphana dashboard

5.7.1. Relationship with Lightbend Console Monitoring

There are two main views for monitoring Lightbend-enabled applications: the infrastructure and Pipelines Applications views. In the Pipelines Application view you see a blueprint and streamlet-oriented frame of reference: all information is structured for a higher level perspective on a running pipeline-enabled application: where the main actors are the blueprint, its constituent streamlets, their monitors and metrics. Health at the application level is based on rolling up health at the streamlet level which is based on rolling up health at the monitor level.

Application blueprint
Figure 34. Application blueprint

Controls Another viewpoint is infrastructure-oriented: a cluster consists of nodes running pods (you can reach this view by clicking on the 'workload' icon within the controls panel in the window upper left). Those pods are organized by the workloads in which they are running. Workloads have monitors running on pods and the health of the workload is based on rolling up the health of its monitors. This infrastructure-based view does not have a high level concept of 'application' but it does allow you to define or customize monitors for workloads. While this customization is not available directly in the pipeline-based view, those monitors do carry over to the pipeline perspective.

Cluster map view This is the corresponding infrastructure view for this pipeline application. Note that there are 11 workloads here (three with two pods each all others with one). But the pipeline view contains eight streamlets. This discrepancy is due to how spark jobs are handled: they each have an executor and driver - resulting in an additional workload for each spark streamlet - or three additional workloads for this application (since there are three spark-based streamlets).

6. Developing Pipelines Streamlets

See the Pipelines concepts document if you are not yet familiar with it. The actual stream processing logic is implemented in streamlets, which are the basic building blocks of a streaming application. Akka streamlets can be developed in Scala or Java, Spark streamlets can only be developed in Scala.

6.1. Schema First Approach

Pipelines has adopted the schema first approach for building streaming data pipelines. The user needs to supply the Avro schema as the starting point of the domain model for which streaming data pipelines need to be built.

Pipelines plugs in the appropriate plug-ins into the build system of the application to generate Java / Scala classes corresponding to the Avro schema.

This approach has the advantage that the user only needs to take care of the core domain model and Pipelines does the heavy lifting of generating the classes and integrating them with the main application.

However since Pipelines takes the schema as the input, it needs to ensure that the corresponding data inlets and outlets honor the schema when allowing data to flow through them. This needs to be done to ensure data consistency across all the inlets and outlets. We discuss this in the next section.

6.2. Schema Code Generation

The user can choose what programming language to generate their schemas into by defining Settings in their SBT project. When no Settings are defined then by default the sbt-pipelines plugin will look for Avro schemas in src/main/avro and will generate Scala classes for them.

If you wish to override the location of your Avro schemas in your project, or if you wish to generate Java classes instead, you can do so by defining a Setting in your SBT project.

  • schemaCodeGenerator (Default: SchemaCodeGenerator.Scala) - The programming language to generate Avro schemas classes into.

  • schemaPaths (Default: Map(SchemaFormat.Avro → "src/main/avro")) - The relative path to the Avro schemas.

  • schemaFormat (Default: Seq(SchemaFormat.Avro)) - The schema formats to generate. Avro is the only format currently supported.

For example, to generate Java classes from Avro schemas in a non-default location.

lazy val datamodel = (project in file("./my-pipelines-library"))
  .enablePlugins(PipelinesLibraryPlugin)
  .settings(
    schemaCodeGenerator := SchemaCodeGenerator.Java,
    schemaPaths := Map(SchemaFormat.Avro -> "src/main/resources/avroschemas")
  )

6.3. Schema Aware Inlets and Outlets

In Pipelines, all streamlet inlets and outlets are schema-aware. This means two things:

  • Every inlet and outlet must allow only that data to flow through them which honor the schema for which they are programmed to.

  • Inlets and outlets can be connected together only if the schemas of an outlet and the corresponding inlet are compatible.

Let’s take a look at how Pipelines ensures both of the above guarantees.

6.3.1. Data Safety Guarantee through Types

As mentioned above, any pipeline definition starts with the user providing the Avro schema definition for the domain object. Let’s assume the user provides the following Avro schema as an input. It’s a definition of a call record as used by a telephone company.

{
    "namespace": "pipelines.examples",
    "type": "record",
    "name": "CallRecord",
    "fields":[
         {
            "name": "user",
            "type": "string"
         },
         {
             "name": "other",
             "type": "string"
         },
         {
             "name": "direction",
             "type": "string"
         },
         {
              "name": "duration",
              "type": "long"
         },
         {
            "name": "timestamp",
            "type": "long"
         }
    ]
}

In case of a Scala application, Pipelines will generate a Scala case class pipelines.examples.CallRecord corresponding to the above schema. This class will now be made available for use within the application.

When we define a streamlet where objects of type CallRecord will be passing through its inlet, we define the inlet as val in = AvroInlet[CallRecord]("call-record-in"). Pipelines ensures the following compile time guarantees through this type definition:

  • The inlet only allows a codec of type Avro

  • Pipelines only allows the inlet to be used with an object of type CallRecord. For example when implementing createLogic if you do readStream(in) where in is an inlet parameterized by a type other than CallRecord, the compiler will complain

6.3.2. Outlet and the Partitioning Function

Similar to inlets, the user can define an outlet as val out = AvroOutlet[CallRecord]("call-record-out", _.name). Besides the name of the outlet, it must also have a partitioning function that defines the logic using which data will be partitioned when writing to Kafka topics. Data partitioning in Kafka ensures scalability.

All logic regarding data safety that we discussed for inlets in the last section applies for outlets as well.

6.4. Schema Aware StreamletLogic

When we implement StreamletLogic for a streamlet, we use the inlets and outlets which, as we discussed above, are schema aware. We also use the class generated from the schema, which is also schema aware. Here’s an example:


class CallStatsAggregator extends SparkStreamlet {

  val in = AvroInlet[CallRecord]("in")
  val out = AvroOutlet[AggregatedCallStats]("out", _.startTime.toString)
  val shape = StreamletShape(in, out)

  override def createLogic = new SparkStreamletLogic {

    override def buildStreamingQueries = {
      val dataset = readStream(in)
      val outStream = process(dataset)
      val query = writeStream(outStream, out, OutputMode.Update)
      Seq(query)
    }

    private def process(inDataset: Dataset[CallRecord]): Dataset[AggregatedCallStats] = {
      val query =
        inDataset
          .withColumn("ts", $"timestamp".cast(TimestampType))
          .withWatermark("ts", "1 minute")
          .groupBy(window($"ts", "1 minute"))
          .agg(avg($"duration") as "avgCallDuration", sum($"duration") as "totalCallDuration")
          .withColumn("windowDuration", $"window.end".cast(LongType) - $"window.start".cast(LongType))

      query
        .select($"window.start".cast(LongType) as "startTime", $"windowDuration", $"avgCallDuration", $"totalCallDuration")
        .as[AggregatedCallStats]
    }
  }
}

In the above example, we have one inlet that allows data of type CallRecord and one outlet that allows data of type AggregatedCallStats. Here the user had supplied the schema for both of the above types from which Scala classes have been generated by Pipelines. And the entire StreamletLogic code is based on these two classes - we read CallRecord from the inlet, do processing and generate AggregatedCallStats to be sent to the outlet.

Hence the entire streamlet is guaranteed only to process data that conforms to the schema which the user had supplied.

6.5. Composing Streamlets using Blueprints

In Pipelines, a streaming application is composed of Streamlets. Each Streamlet defines one or more inlets and outlets. To create a functional pipeline, we must define how the Streamlets connect together by declaring which outlet from one Streamlet should connect with an inlet of another.

A blueprint specifies which streamlets should be used in a pipeline and how they should be connected together. It must be defined in the file src/main/blueprint/blueprint.conf within the file structure of the Pipelines application sbt project.

An example of a blueprint is shown below:

blueprint {
  streamlets {
    sensor-data = pipelines.examples.sensordata.SensorDataIngress
    metrics = pipelines.examples.sensordata.SensorDataToMetrics
    validation = pipelines.examples.sensordata.MetricsValidation
    reporter = pipelines.examples.sensordata.MetricsEgress
  }

  connections {
    sensor-data.out = [metrics.in]
    metrics.out = [validation.in]
    validation.out-0 = [reporter.in]
  }
}
Note

Blueprint files are specified in the HOCON format, a superset of JSON.

The above example shows four streamlets, sensor-data, metrics, validation, and reporter. The streamlets section declares the streamlets used in that blueprint and gives them a short name that we use to refer to them in the next section. The connections section declares how the inlets and outlets of the different participating streamlets are connected.

A blueprint file always consists of one blueprint section. The streamlets section defines which streamlets must be used in the blueprint and assigns a reference to every streamlet. The streamlet references are later on used for connecting streamlets together. Lets zoom in on the streamlets section in the example below:

  streamlets {
    sensor-data = pipelines.examples.sensordata.SensorDataIngress
    metrics = pipelines.examples.sensordata.SensorDataToMetrics
    validation = pipelines.examples.sensordata.MetricsValidation
    reporter = pipelines.examples.sensordata.MetricsEgress
  }

The above example shows four streamlet types defined in the pipelines.examples.sensordata package. Streamlet types (classes or objects) are specified by their fully qualified names.

Every assignment in the streamlets section assigns a streamlet reference to an instance of a type of streamlet. Once deployed, Pipelines will run at least one streamlet–depending on the requested number of replicas–for each streamlet reference that has been defined in the blueprint.

The Streamlets can be defined in Java or Scala, as Java or Scala classes with default constructors or as Scala objects. These classes must be available on the classpath, which can be defined directly in the pipelines application sbt project or in any dependency that the project has specified, as you would expect of any sbt project.

Note

You can define more than one streamlet reference to the same Scala object. In that case, as many streamlets as assigned streamlet references will be run once everything is deployed (assuming a scaling factor of one). You should view a Streamlet, defined as a Scala object as a template that can be used many times to instantiate streamlets, not as a singleton object that will only run once as a streamlet.

The streamlet references assigned in the streamlets section can be used in the connections section to connect streamlets together. Lets zoom in on the streamlets section in the example below:

  connections {
    sensor-data.out = [metrics.in]
    metrics.out = [validation.in]
    validation.out-0 = [reporter.in]
  }

Every expression in a connections section defines how an outlet connects to one or more inlets. Every assignment follows the following pattern:

  <streamlet-reference-a>.<outlet> = [<streamlet-reference-b>.<inlet>, <streamlet-reference-c>.<inlet>, ...]

Streamlet outlets and inlets are always prefixed by a streamlet-reference, followed by a dot ('.'). As you can see from the above pattern, it is possible to connect many inlets to the same outlet.

Every streamlet type has a shape, defining how many inlets and outlets it has. All streamlets implement a shape() method which returns the shape of the streamlet. In this case, the streamlet referenced by sensor-data has a single outlet named "out". Similarly, the streamlet referenced by "metrics" has one inlet named "in" and one outlet named "out".

As discussed in the user guide, inlets and outlets of streamlets are explicitly typed, e.g. they only handle data that conform to specific Avro schemas. Inlets can only be connected to outlets if their schemas are compatible. You can verify if the blueprint connects all the streamlets correctly by using:

  sbt verifyBlueprint

The blueprint is automatically verified when sbt buildAndPublish is used.

6.6. Message Delivery Semantics

In the subsequent sections, we’ll sometimes mention different models for message delivery semantics provided by Spark-based and Akka-based streamlets.

Pipelines follows the 'let it crash' principle and can recover from most failure scenarios, except those deemed catastrophic, where the data used for recovery (snapshots) may have been lost. This approach also follows the general policy of Kubernetes, where processes are ephemeral and can be restarted, replaced, or scaled up/down at any time.

With message delivery semantics, we refer to the expected message delivery guaranties in the case of failure recovery. In a distributed application such as Pipelines, failure may happen at several different execution levels: from a failed task in an individual executor, to a pod that goes down, to a complete application crash.

After a failure recovery, we recognize these different message delivery guarantees:

At most once

Data may have been processed but will never be processed twice. In this case, data may be lost but processing will never result in duplicate records.

At-least-once

Data that has been processed may be replayed and processed again. In this case, each data record is guaranteed to be processed and may result in duplicate records.

Exactly once

Data is processed once and only once. All data is guaranteed to be processed and no duplicate records are generated. This is the most desirable guarantee for many enterprise applications, but it’s considered impossible to achieve in a distributed environment.

Effectively Exactly Once

is a variant of exactly once delivery semantics that tolerates duplicates during data processing and requires the producer side of the process to be idempotent. That is, producing the same record more than once is the same as producing it only once. In practical terms, this translates to writing the data to a system that can preserve the uniqueness of keys or use a deduplication process to prevent duplicate records from being produced to an external system.

6.7. Streamlet Configuration Parameters

A streamlet can require dynamic configuration parameters at deployment time. Configuration parameters can be used to change the way the streamlet functions when it is run.

Examples of configuration parameters are database connection strings, URLs, credentials, or anything else that you want to specify at deployment time.

A streamlet specifies that it requires particular config parameters by expressing them in code. The values for these parameters will be requested, validated, and set when kubectl pipelines deploy is used to deploy the Pipelines application.

There are a number of predefined configuration parameter types:

IntegerConfigParameter

A signed 32 bit integer value.

StringConfigParameter

A string with the max length of 1k characters.

DoubleConfigParameter

A 64 bit floating point value.

BooleanConfigParameter

A boolean value.

RegExpConfigParameter

A string validated using a regular expression.

DurationConfigParameter

A duration string, for example "2 minutes".

MemorySizeConfigParameter

A memory size string, for example "32M".

In addition to the predefined types, you can also define your own types.

6.7.1. Using a configuration parameter in a streamlet

The following section will break down how we can use an Integer configuration parameter type in a streamlet to request the value for a maximum number of records within a time window.

import pipelines.streamlets._

object RecordSumFlow extends AkkaStreamlet {

  val recordsInWindowParameter = IntegerConfigParameter("records-in-window","This value describes how many records of data should be processed together, default 64 records", Some(64))

  override def configParameters = Set(recordsInWindowParameter)

  val inlet = AvroInlet[Metric]("metric")
  val outlet = AvroOutlet[SummedMetric]("summed-metric")
  val shape = StreamletShape.withInlets(inlet).withOutlets(outlet)

  def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph() = {
      val recordsInWindow = streamletConfig.getInt(recordsInWindowParameter.key)
      val flow = FlowWithPipelinesContext[Metric].grouped(recordsInWindow).map(sumRecords)

      atLeastOnceSource(inlet)
        .via(flow)
        .to(atLeastOnceSink(outlet))
    }
  }

  private def sumRecords(records: Seq[Metric]) : SummedMetric {....}
}

As seen in the example below, we first need to create an instance of IntegerConfigParameter.

val recordsInWindowParameter = IntegerConfigParameter("records-in-window","This value describes how many records of data should be processed together, default 64 records", Some(64))

The arguments provided to IntegerConfigParameter() are the following:

  • A key, which has to be unique within the streamlet.

  • [Optional] A description, which will be shown by the CLI.

  • [Optional] A default value, which will be used by the CLI when no value is passed during deploy.

After the configuration parameter is defined, we can use it to extract its value from the runtime configuration in the createLogic function:

val recordsInWindow = streamletConfig.getInt(recordsInWindowParameter.key)

Note that its up to the developer to use the correct config method to extract the value of the parameter. Since the type being used here is IntegerConfigParameter the config method used is getInt.

6.7.2. Custom validation

It is easy to create your own custom validation for a configuration parameter using the RegExpConfigParameter type. This type allows you to validate the entered value using a regular expression.

For example, if we want to validate a 24 hour timestamp, this is how it could be defined and used in a streamlet.


import java.time._
import pipelines.streamlets._

object RecordFilterFlow extends AkkaStreamlet {

  private def militaryTimeConfigParameter(key: String, defaultValue: Option[String] = None) = {
    RegExpConfigParameter(
      key,
      "This parameter type validates that the users enter the time in 24h format.",
      "^(0[0-9]|1[0-9]|2[0-3]|[0-9]):[0-5][0-9]$",
      defaultValue
    )
  }

  val purgeRecordsBeforeTime = militaryTimeConfigParameter("purge-records-before-this-time",Some("20:00"))

  override def configParameters = Set(purgeRecordsBeforeTime)

  val inlet = AvroInlet[Metric]("metric")
  val outlet = AvroOutlet[SummedMetric]("metric")
  val shape = StreamletShape.withInlets(inlet).withOutlets(outlet)

  def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph() = {
      val filterTime = LocalTime.parse(streamletConfig.getString(purgeRecordsBeforeTime.key))
      val flow = FlowWithPipelinesContext[Metric].filter(record.time.isAfter(filterTime))

      atLeastOnceSource(inlet)
        .via(flow)
        .to(atLeastOnceSink(outlet))
    }
  }
}

6.7.3. Providing values for configuration parameters when testing streamlets

When writing tests for streamlets, you can provide values for configuration parameters when you initialize the runner-specific testkit.

If we want to write a test for the example streamlet RecordSumFlow, we could add values for the recordsInWindowParameter configuration parameter like this:

val testkit = AkkaStreamletTestKit(system, mat).withConfigParameterValues(ConfigParameterValue(RecordSumFlow.recordsInWindowParameter, "20"))

The Spark testkit has a similar function for adding values to configuration parameters when testing a streamlet.

val configTestKit = SparkStreamletTestkit(session).withConfigParameterValues(ConfigParameterValue(MySparkProcessor.NameFilter, "some-name"))

The Java API is slightly different as you can see in the example below:

AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system, mat).withConfigParameterValues(ConfigParameterValue.create(RecordSumFlow.recordsInWindowParameter, "20"));

6.7.4. Using configuration parameters in Java

Using the Configuration parameters in Java is similar to the Scala version. The main difference is how class instantiation is done and how to retrieve the config parameter key.

Creating an instance of a StringConfigParameter in Java:

StringConfigParameter nameFilter = StringConfigParameter.create("name-filter-value","Filters out the data in the stream that matches this name.");

Example of accessing the value of a configuration parameter in Java:

class FilterStreamlet extends AkkaStreamlet {
  StringConfigParameter nameFilter = StringConfigParameter.create("name-filter-value",
      "Filters out the data in the stream that matches this name.");

  AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
  AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", d -> d.name(), Data.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
  }

  public RunnableGraphStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getStreamletContext()) {
      public RunnableGraph createRunnableGraph() {
        String filterName = streamletConfig().getString(nameFilter.getKey());
        return getAtMostOnceSource(inlet).via(Flow.<Data>create()).filter(data -> data.name().equals(filterName))
            .to(getAtMostOnceSink(outlet));
      }
    };
  }
}

6.8. Streamlet Volume Mounts

Sometimes a streamlet needs to read and/or write files from/to some shared file system. Since streamlets run as processes on Kubernetes, they do not automatically have such a file system available. Pipelines makes it possible for a streamlet to declare the need for a shared file system (e.g. a "volume" in Kubernetes terms) that should be mounted at a specific path. At deployment time the user can then indicate where that file system is actually located using a Kubernetes Persistent Volume Claim (PVC). Pipelines will then make sure that the PVC will be mounted at the specified path at runtime and the streamlet can then treat it like a local file system.

The following example streamlet shows how to declare and use a volume mount:

object ValidMetricLogger extends AkkaStreamlet {
  val inlet = AvroInlet[Metric]("in")
  val shape = StreamletShape.withInlets(inlet)

  // Declare the volume mount:
  private val filterDataMount = VolumeMount("filter-data-mount", "/mnt/data", ReadWriteMany)
  override def volumeMounts = Vector(filterDataMount)

  override def createLogic = new RunnableGraphStreamletLogic() {
    private def log(metric: Metric) = {
      val logString = s"$metric\n"

      // Use the volume mount in the streamlet logic:
      Files.write(
        Paths.get(filterDataMount.path, "log.txt"),
        logString.getBytes(StandardCharsets.UTF_8),
        StandardOpenOption.APPEND
      )
    }

    private def flow = {
      FlowWithPipelinesContext[Metric]
        .map { validMetric 
          log(validMetric)
          validMetric
        }
    }

    def runnableGraph = {
      atLeastOnceSource(inlet)
        .via(flow)
        .to(atLeastOnceSink)
    }
  }
}

6.8.1. Java API

The Java API is slightly different from the Scala API. The example belows shows a streamlet that uses a read only volume mount.

  class FilterStreamlet extends AkkaStreamlet {
    // Declare the volume mount:
    private VolumeMount referenceFiles = VolumeMount.createReadOnlyMany("reference-mount", "/mnt/data");

    private AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
    private AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", d -> d.name(), Data.class);

    public StreamletShape shape() {
      return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
    }

    private Boolean findNameInFile(String name) {
      // Use the volume mount in the streamlet logic:
      Path filePath = Paths.get(referenceFiles.getPath(), "names.txt");
      // find name in file......
      return true;
    }

    public RunnableGraphStreamletLogic createLogic() {
      return new RunnableGraphStreamletLogic(getStreamletContext()) {
        public RunnableGraph createRunnableGraph() {
          String filterName = streamletRefConfig().getString(nameFilter.getKey());
          return getAtMostOnceSource(inlet).via(Flow.<Data>create()).filter(data -> findNameInFile(data.name()))
              .to(getAtMostOnceSink(outlet));
        }
      };
    }
  }

If you want to use a writable volume mount you can replace createReadOnlyMany with createReadWriteMany above.

6.8.2. Access Modes and PVC Mounting

The PVC associated with the streamlet volume mount is required to have the same access mode as the volume mount declared in the streamlet. When deploying the application the access mode will be checked, if the access mode differs from the access mode declared in the streamlet, the deployment of the application will fail.

The following access modes are available:

  • ReadOnlyMany: all streamlet instances get read-only access to the same volume.

  • ReadWriteMany: all streamlet instances get read and write access to the same volume.

6.8.3. Cluster Security Considerations

When deploying a Pipelines application that contains streamlets with a volume mount, you may have to apply additional Kubernetes security configuration resources to the Kubernetes cluster for the application to deploy successfully.

The pod in which the streamlet is running may need to be associated with a Pod Security Context (PSP) or (on OpenShift) a Security Context Constraint (SCC).

This can be done by associating the Pipelines application service account, called pipelines-app-serviceaccount and located in the namespace of the application, with a PSP/SCC.

The PSP and SCC must allow the application pods to mount a writable volume as group id 185. This is the group id of the user running in the streamlet container.

Security Context Constraints Example

This is an example of an SCC that would allow a Pipelines application with a writable volume mount to deploy correctly to an Openshift cluster with an activated SCC controller.

kind: SecurityContextConstraints
apiVersion: v1
metadata:
  name: pipelines-application-scc
allowPrivilegedContainer: true
runAsUser:
  type: MustRunAsNonRoot
seLinuxContext:
  type: RunAsAny
fsGroup:
  type: MustRunAs
  ranges:
  - min: 185
    max: 186
supplementalGroups:
  type: RunAsAny
volumes:
- '*'
Pod Security Policy Example

This is an example of a PSP that would allow a Pipelines application with a writable volume mount to deploy correctly.

apiVersion: extensions/v1beta1
kind: PodSecurityPolicy
metadata:
  name: pipelines-volume-mount-psp
spec:
  runAsUser:
    rule: 'MustRunAsNonRoot'
  seLinux:
    rule: 'RunAsAny'
  supplementalGroups:
    rule: 'RunAsAny'
  seLinux:
    rule: 'RunAsAny'
  fsGroup:
    rule: 'MustRunAs'
    ranges:
    - min: 185
      max: 186
  volumes:
  - '*'

7. Developing with Akka Streamlets

7.1. Akka Streamlet Fundamentals

An Akka-based streamlet has a couple of responsibilities:

  • It needs to capture your stream processing logic.

  • It needs to publish metadata which will be used by the sbt-pipelines plugin to verify that a blueprint is correct. This metadata consists of the shape of the streamlet (StreamletShape) defined by the inlets and outlets of the streamlet. Connecting streamlets need to match on inlets and outlets to make a valid pipelines topology, as mentioned previously in Composing Streamlets using Blueprints . Pipelines automatically extracts the shapes from the streamlets to verify this match.

  • For the Pipelines runtime, it needs to provide metadata so that it can be configured, scaled, and run as part of an application.

  • The inlets and outlets of a Streamlet have two functions:

    • To specify to Pipelines that the streamlet needs certain data streams to exist at runtime, which it will read from and write to, and

    • To provide handles inside the stream processing logic to connect to the data streams that Pipelines provides. The StreamletLogic provides methods that take an inlet or outlet argument to read from or write to. These will be the specific data streams that Pipelines has set up for you.

The next sections will go into the details of defining an Akka Streamlet:

  • defining inlets and outlets

  • creating a streamlet shape from the inlets and outlets

  • creating a streamlet logic that uses the inlets and outlets to read and write data

7.1.1. Inlets and Outlets

A streamlet can have one or more inlets and outlets. Pipelines offers classes for Inlets and Outlets based on the codec of the data they manipulate. Currently Pipelines supports Avro and hence the classes are named AvroInlet and AvroOutlet. Each outlet also allows the user to define a partitioning function that will be used to partition the data.

7.1.2. StreamletShape

The StreamletShape captures the connectivity and compatibility details of an Akka-based streamlet. It captures which—and how many—inlets and outlets the streamlet has.

The sbt-pipelines plugin extracts—amongst other things—the shape from the streamlet that it finds on the classpath. This metadata is used to verify that the blueprint connects the streamlets correctly.

Pipelines offers an API StreamletShape to define various shapes with inlets and outlets. Each inlet and outlet can be defined separately with specific names. The outlet also needs to have a partitioning function to ensure effective data partitioning logic.

When you build your own Akka streamlet, you need to define the shape. You will learn how to do this in Building an Akka Streamlet. The next section describes how the StreamletLogic captures stream processing logic.

7.1.3. StreamletLogic

The stream processing logic is captured in a StreamletLogic, which is an abstract class. It provides:

  • The ActorSystem to run Akka components in, a Materializer for Akka Stream graphs and an ExecutionContext for asynchronous calls (in Scala these are in implicit scope inside the StreamletLogic).

  • The Typesafe Config loaded from the classpath, from which can be read configuration settings. It is accessed using the config method.

  • A subset of the Typesafe Config that is reserved for the streamlet, which can be used for deployment-time, user-configurable settings. It is accessed through the streamletConfig method.

  • Hooks to access data streams to read from inlets and write to outlets, for when you want to build your own custom streamlets.

A StreamletLogic will often setup an Akka Stream graph and run it. The RunnableGraphStreamletLogic, which extends StreamletLogic, makes this very easy, you only need to provide the RunnableGraph that you want to run.

You can access the items described above, like actor system and config, directly through methods on the StreamletLogic, or through implicit scope.

The StreamletLogic is only constructed once the streamlet is run, so it is safe to put instance values and variables in it that you would like to use when the streamlet is running.

When you build your own Akka streamlet, you define both a StreamletShape and a subclass of StreamletLogic. More on that later.

7.1.4. Lifecycle

In general terms, when you publish and deploy a Pipelines application, each streamlet definition becomes a physical deployment as one or more Kubernetes artifacts. In the case of Akka-based streamlets, each streamlet is submitted as a Deployment resource. Upon deployment, it becomes a set of running pods depending in the scale factor for that streamlet.

A so-called runner runs inside every pod. The runner connects the streamlet to data streams, at which point the streamlet starts consuming from inlets. The Streamlet advances through the data streams that are provided on inlets and writes data to outlets.

If you make any changes to the streamlets and deploy the application again, existing pods will be stopped, new pods will be started to reflect the changes. It could be that pods are restarted in other ways, for instance by administrators.

This means that streamlets can get stopped, started or restarted at any moment in time. The next section about message delivery semantics explains the options that are available for how to continue processing data after a restart.

7.1.5. Message Delivery Semantics

If you only want to process data that is arriving in real-time—ignoring data from the past—an Akka streamlet can simply start from the most recently arrived record. In this case, an at-most-once semantics is used (see Message Delivery Semantics). The streamlet will process data as it arrives. Data that arrives while the streamlet is not running will not be processed.

At-most-once message delivery semantics are applied to Akka streamlets that use standard Akka Stream graphs like Source, Sink, Flow.

Akka Streamlets also supports at-least-once semantics for cases where the above mentioned options are not viable. In this case, offsets of incoming records are tracked, so that the streamlet can start from a specific offset, per inlet, on restart. Once records are written to outlets, the associated incoming record offsets are considered. Offsets are committed in batch, and only when the offset changes.

Akka streamlets that want to support at-least-once semantics, must use an Akka Stream FlowWithContext that propagates a PipelinesContext through the flow. Pipelines automatically propagates offset details through the flow and commits offset changes in batches.

The FlowWithContext provides a constrained set of operators compared to the Akka Stream Flow. The subset of operators process records in-order.

There are some side effects to this approach. For instance, if you filter records, only offsets associated to records that are not filtered will be committed. This means that on restart, more re-processing—of previously filtered records—can occur than you might expect.

We’ll show you how to program these options below, but first, Building an Akka Streamlet describes in more detail how you can build Akka streamlets.

7.2. Building an Akka Streamlet

The following sections describe how you can create an Akka streamlet. As mentioned in the Akka Streamlet Fundamentals, an Akka streamlet is defined by the following features:

  • It is a Streamlet. Pipelines offers a class for implementing Akka streamlets, AkkaStreamlet, which extends pipelines.streamlets.Streamlet. Any Akka streamlet needs to extend AkkaStreamlet.

  • It has a shape - we call it StreamletShape. Any Akka streamlet needs to define a concrete shape using the APIs available for the StreamletShape class, which defines the inlets and outlets of the streamlet.

  • It has a StreamletLogic that defines the business logic of the Akka streamlet.

In this tutorial we’ll build a simple Akka streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet ReportPrinter.

7.2.1. Extending from AkkaStreamlet

Lets start with building the ReportPrinter streamlet. The first thing to do is extend the pipelines.akkastream.AkkaStreamlet abstract class, as shown below:

Scala:

package com.example

import akka.stream.scaladsl.Sink

import pipelines.streamlets._
import pipelines.streamlets.avro._

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

Java:

package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

The code snippet above shows an object ReportPrinter that extends AkkaStreamlet. We have shown the steps needed to complete the implementation, which we will do in the next few sections.

The next step is to implement inlets and outlets of the streamlet.

7.2.2. Inlets and Outlets

The streamlet that we are building in this tutorial will have an inlet and no outlet.

Scala:

package com.example

import akka.stream.scaladsl.Sink

import pipelines.streamlets._
import pipelines.streamlets.avro._

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  val inlet = AvroInlet[Report]("report-in")
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

Java:

package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

Pipelines supports Avro encoded processing of data - we make this explicit by defining the inlet as AvroInlet. Report is the class of objects that will be accepted by this inlet. This means that an inlet defined by AvroInlet[Report] will only accept Avro encoded data for the class Report. The class Report will be generated by Pipelines during application build time from the Avro schema that the user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier. As an example we can have the following Avro schema for the Report object that contains a report of some of the attributes of products from an inventory:

{
  "namespace": "com.example",
  "type": "record",
  "name": "Report",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "description",
      "type": "string"
    },
    {
      "name": "keywords",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

In the definition of the inlet, "report-in" is the name of the inlet. It’s recommended that you use a domain specific name for the inlet which indicates the nature of data that this inlet is supposed to accept. We will use this inlet later to read data from it.

This streamlet does not have any outlet. But in general outlets are defined similarly, val out = AvroOutlet[Report]("report-out", _.name) will define an outlet which will write Avro encoded data for the object of type Report. Here "report-out" is the name of the outlet and _.name is the partitioning function that partitions the data from the outlet.

7.2.3. Streamlet Shape

Lets now define the shape of ReportPrinter by using the APIs in pipelines.streamlets.StreamletShape:

Scala:

package com.example

import akka.stream.scaladsl.Sink

import pipelines.streamlets._
import pipelines.streamlets.avro._

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  val inlet = AvroInlet[Report]("report-in")
  // 2. TODO Define the shape of the streamlet
  val shape = StreamletShape.withInlets(inlet)
  // 3. TODO Override createLogic to provide StreamletLogic
}

Java:

package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);

  // 2. TODO Define the shape of the streamlet
  public StreamletShape shape() {
   return StreamletShape.createWithInlets(inlet);
  }
  // 3. TODO Override createLogic to provide StreamletLogic
}

The above code overrides the shape method with a value that defines the shape of the streamlet. StreamletShape offers methods to define shapes, e.g. to define a streamlet with 2 inlets and 2 outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid).

The next step is to define the StreamletLogic.

7.2.4. Defining the StreamletLogic

The StreamletLogic class makes it possible for a user to specify domain logic. It is defined as an abstract class in pipelines.akkastream.StreamletLogic and provides an abstract method run() where the user can define the specific logic for the Akka Streamlet.

In this step we need to override createLogic from AkkaStreamlet in our ReportPrinter object. createLogic needs to return an instance of StreamletLogic which will do the processing based on the requirements of ReportPrinter object.

Scala:

package com.example

import akka.stream.scaladsl.Sink

import pipelines.streamlets._
import pipelines.streamlets.avro._

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  val inlet = AvroInlet[Report]("report-in")
  // 2. TODO Define the shape of the streamlet
  val shape = StreamletShape.withInlets(inlet)
  // 3. TODO Override createLogic to provide StreamletLogic
  def createLogic = new RunnableGraphStreamletLogic() {
    def format(report: Report) = s"${report.name}\n]n${report.description}"
    def runnableGraph =
      atMostOnceSource(inlet)
        .to(Sink.foreach(report  println(format(report))))
  }
}

Java:

package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);

  // 2. TODO Define the shape of the streamlet
  public StreamletShape shape() {
   return StreamletShape.createWithInlets(inlet);
  }

  // 3. TODO Override createLogic to provide StreamletLogic
  public RunnableGraphStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getStreamletContext()) {
      public String format(Report report) {
        return report.getName() + "\n\n" +report.getDescription();
      }
      public RunnableGraph<NotUsed> createRunnableGraph() {
        return getAtMostOnceSource(inlet).to(Sink.foreach(report -> System.out.println(format(report))));
      }
    };
  }
}

In the above code we override createLogic to supply the domain logic for the streamlet.

In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier, val inlet = AvroInlet[Report]("report-in"), and do some processing on it.

The RunnableGraphStreamletLogic, which extends StreamletLogic, is easy to use when you only want to define a RunnableGraph that will be run. It only requires you to define a runnableGraph method which defines the graph that will be run, which is what we have done in the above code. The runnableGraph method specifies that we’ll create a Source from the inlet to read the reports and connect it to a Sink that will print out the reports.

Here are the steps that we do as part of the processing logic:

  • Since it’s a console printer, we would like to write to console as specified by the format method.

  • Every report read from the Source is printed by using Sink.foreach which is part of the akka.stream.scaladsl package.

Note that the processing logic can be quite complex and we can maintain state as part of the implementation of StreamletLogic.

Note

If the streamlet needs to have local state (vals, vars) for processing logic, it has to be put inside the StreamletLogic class and not as part of the Streamlet class. The Streamlet class is used by Pipelines for extraction of streamlets using reflection and hence cannot have any state within it.

Note

In summary, here are the steps for defining an Akka streamlet:

  • Define the inlets and outlets

  • Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Pipelines

  • Define the custom processing logic that will read data from inlets and write data to outlets

7.2.5. Using ReportPrinter in the blueprint

An example of a blueprint using the ReportPrinter could look like this:

blueprint {
  streamlets {
    ingress = com.example.ReportIngress
    report-printer = com.example.ReportPrinter
  }

  connections {
    ingress.out = [report-printer.report-in]
  }
}

The omitted ReportIngress could for instance be another AkkaStreamlet that writes Reports to its outlet.

7.2.6. At-least-once or At-most-once processing

You can access the inlet and outlet streams through methods on the StreamletLogic that return Akka Streams Sources and Sinks.

So far we’ve used the atMostOnceSource method to read from the inlet. When the streamlet is started, it will only receive elements that arrive after it is started. The same applies when the streamlet is restarted for any reason. In short, the atMostOnceSource provides at-most-once message delivery semantics, as described in Message Delivery Semantics. StreamletLogic also provides methods that support at-least-once message delivery semantics.

The methods that support at-least-once for getting sources and sinks automatically propagate the offsets for every element, so that the offset read in the stream through the attached source can be automatically advanced after processing through the attached sink, which provides the at-least-once messaging semantics.

In the next sections, we’ll describe SourceWithContext and FlowWithContext that are used for this purpose, as well as the `Sink`s that can be used to make sure that offsets are committed.

Sources for inlets

The atLeastOnceSource method returns a akka.stream.scaladsl.SourceWithContext for an inlet, the Java equivalent is getAtLeastOnceSource, which returns a akka.stream.javadsl.SourceWithContext. The SourceWithContext provides the elements that are read from an inlet, each element associated with a PipelinesContext. The PipelinesContext contains the offset in the stream.

A FlowWithContext, which contains a subset of Flow, automatically propagates the context for every element, so that you don’t have to worry about it when using the Akka Streams operators to process the stream. It can be easily attached to a SourceWithContext using the via method, similar to how a Source and Flow are connected.

Note

In the Scala API, we provide a type alias pipelines.akkastream.SourceWithPipelinesContext that encapsulates the Pipelines-specific part of the SourceWithContext type definition.

The atMostOnceSource methods returns an Akka Streams Source, which always starts reading elements as they arrive.

Sinks for outlets

The atLeastOnceSink method (Java API equivalent is getAtLeastOnceSink) returns a Sink[(T, PipelinesContext)] to write to an outlet. Elements written to the sink will be committed when offsets change. This will occur in batches, rather than individually, if the downstream processing is slower than the rate of elements arriving upstream.

You can connect a SourceWithContext or a FlowWithContext to this type of sink with a to method, similar to how you would connect a Source or Flow to a Sink. There is also an atLeastOnceSink that takes no arguments, anything written to this sink will not end up in any outlet, but the offsets associated will be committed.

The noCommitSink method (Java API equivalent is getNoCommitSink) returns a Sink, for when you choose to always process elements as they arrive.

FlowWithContext

The FlowWithContext provides a constrained set of operators compared to the Akka Streams Flow. The subset of operators process records in-order. It is used for convenience, as a user you don’t have to worry about how the context per element is passed along, or how the offset of a stream is advanced after processing.

Operators that turn T into Seq[T]

The FlowWithContext propagates the PipelinesContext per element that it operates on. In the case of operators that create a Seq[T] of elements for every element, like mapConcat and grouped, The sequencing operation is also applied to the context, which means that the context is no longer PipelinesContext, but instead is turned into a Seq[PipelinesContext]. If you transform the Seq[T] to some type that is written to an outlet, you will have to map the context with mapContext and select which context you want to use for every element, since now there is a Seq[PipelinesContext] where a PipelinesContext is required. In most cases it makes sense to just use flowWithContext.mapContext(_.last) to just select the last PipelinesContext associated with the grouped input elements.

Converting between FlowWithContext and Flow

The Akka Streams Flow supports more operations than the FlowWithContext and allows for integrating with any kind of Graph, including custom GraphStage`s. The `FlowWithContext[In, PipelinesContext, Out, PipelinesContext, Mat] can be converted to a Flow[(In, PipelinesContext), (Out, PipelinesContext), Mat]. As you can see from the type signature, every element in the resulting Flow is a Tuple2 of an element and its PipelinesContext. (In Java an akka.japi.Pair type is used instead of the Scala Tuple2 type). The Flow[(In, PipelinesContext), (Out, PipelinesContext), Mat] can be converted (back) to a FlowWithContext[In, PipelinesContext, Out, PipelinesContext, Mat].

Being able to convert back and forth between FlowWithContext and Flow means that you can stop automatic context propagation, apply more advanced operations on the stream, and once you are finished, convert the Flow back into a FlowWithContext, as long as you pass along the elements with their contexts as tuples.

Note

If the order of elements is changed in custom operations on the Flow, it is likely that offsets will be advanced too early, which can result in data loss on Streamlet restart.

The same is true for the SourceWithContext[PipelinesContext, Out, Mat], which can be turned into a Source[(Out, PipelinesContext), Mat]. The endContextPropagation method on FlowWithContext returns a Flow. FlowWithContext.from turns a Flow back into a FlowWithContext. The endContextPropagation method on SourceWithContext returns a Source. The SourceWithContext.from creates a SourceWithContext from a Source. In Java SourceWithContext.fromPairs is the equivalant for turning a Source into a SourceWithContext.

At-least-once ReportPrinter

The atMostOnceSource does not provide any tracking of where the streamlet left off, so if the streamlet is restarted it will print all elements from the earliest available data in the outlet it is connected to.

In this section we’re going to use the atLeastOnceSource to track offsets and commit them automatically with an atLeastOnceSink.

The atLeastOnceSource method provides a SourceWithPipelinesContext[In] for an inlet. Internally, a PipelinesContext is paired with every element to keep track of the associated offset. The SourceWithPipelinesContext has many of the operators of a Source, it just automatically propagates the Pipelines context. In the code below it is connected to an atLeastOnceSink, which automcatically commits any offset changes found in the PipelinesContext for every element.

The code below shows how the ReportPrinter can be changed to use at-least-once semantics. All we need to do is change the streamlet logic:

Scala:

def createLogic = new RunnableGraphStreamletLogic() {
  def format(report: Report) = s"${report.name}\n\n${report.description}"
  def runnableGraph =
    atLeastOnceSource(inlet)
      .map { report 
        println(format(report))
        report
      }
      .to(atLeastOnceSink)
}

Java:

public RunnableGraph<NotUsed> createRunnableGraph() {
  return getAtLeastOnceSource(inlet)
    .map(report -> {
        System.out.println(format(report));
        return report;
    })
    .asSource()
    .to(getAtLeastOnceSink());
}

This completes the code for the ReportPrinter. A next step would be to use it in a blueprint, as described in Composing Streamlets using Blueprints

7.3. Utilities

The pipelines.akkastream.util library contains some predefined StreamletLogics:

  • HttpServerLogic

  • SplitterLogic

  • MergeLogic

The following sections describe how you implement stream processing logic with these utilities.

7.3.1. HttpServerLogic

Use case

An HttpServerLogic can be used to handle HTTP requests and store the received data into an outlet. You need to extend your streamlet from AkkaServerStreamlet so that Pipelines will expose an HTTP endpoint in Kubernetes. The HttpServerLogic typically unmarshalls incoming HTTP requests as they arrive and stores the data in the outlet. Http requests that are attempted while the HttpServerLogic is not running will result in a 503 response.

Examples

The HttpServerLogic defines an abstract method for the user to supply the processing logic, an HTTP route:

def route(sinkRef: WritableSinkRef[Out]): Route

The HttpServerLogic object has default implementations of the HttpServerLogic, which support some pre-baked routes:

  • the default method creates a HttpServerLogic that handles PUT and POST requests, where the data is read from the entity body.

  • the defaultStreaming method creates a HttpServerLogic that handles streaming HTTP requests, where the data is read from the entity body as a framed stream.

Handle PUT / POST requests by default

The code snippet below shows an example of an HttpServerLogic using the defaultLogic:

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

import pipelines.akkastream.AkkaServerStreamlet
import pipelines.akkastream.util.scaladsl._
import pipelines.streamlets.StreamletShape
import pipelines.streamlets.avro._

import SensorDataJsonSupport._

object SensorDataIngress extends AkkaServerStreamlet {
  val out = AvroOutlet[SensorData]("out", s  s.deviceId.toString + s.timestamp.toString)
  def shape = StreamletShape.withOutlets(out)
  override def createLogic = HttpServerLogic.default(this, out)
}

In Scala, the HttpServerLogic requires an implicit akka.http.scaladsl.marshalling.FromByteStringUnmarshaller[Out] to unmarshal the entity body into the data that you want to write to the outlet. (FromByteStringUnmarshaller[T] is an alias for Unmarshaller[ByteString, T])

An HttpServerLogic can only be used in combination with an AkkaServerStreamlet. The HttpServerLogic.default method requires a Server argument (this also applies to the HttpServerLogic.defaultStreaming method and the HttpServerLogic constructor, you cannot construct it without it). The AkkaServerStreamlet implements Server, so you can just pass this to it from inside the streamlet, as you can see in the above snippet.

In the above example, the SensorDataJsonSupport object defines implicit spray-json JSON formats for the SensorData type.

In Java, the akka.http.javadsl.unmarshalling.Unmarshaller<ByteString, T> is an argument to the constructor.

import pipelines.akkastream.AkkaServerStreamlet;

import pipelines.akkastream.StreamletLogic;
import pipelines.akkastream.util.javadsl.HttpServerLogic;

import pipelines.streamlets.StreamletShape;
import pipelines.streamlets.avro.AvroOutlet;

import akka.http.javadsl.marshallers.jackson.Jackson;

public class SensorDataIngress extends AkkaServerStreamlet {
  AvroOutlet<SensorData> out =  AvroOutlet.<SensorData>create("out", s -> s.getDeviceId().toString() + s.getTimestamp().toString(), SensorData.class);

  public StreamletShape shape() {
   return StreamletShape.createWithOutlets(out);
  }

  public StreamletLogic createLogic() {
    return HttpServerLogic.createDefault(this, out, Jackson.byteStringUnmarshaller(SensorData.class), getStreamletContext());
  }
}

In the above Java example, a Jackson Unmarshaller<ByteString, Out> is created for the SensorData class, using the Jackson.byteStringUnmarshaller method.

Handle streamed HTTP entities

The Scala example below shows how you can use the defaultStreaming method to unmarshal a JSON stream and write the unmarshalled data to the outlet:

import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

import SensorDataJsonSupport._
import pipelines.akkastream.AkkaServerStreamlet
import pipelines.akkastream.util.scaladsl._
import pipelines.streamlets.StreamletShape
import pipelines.streamlets.avro._

object SensorDataStreamingIngress extends AkkaServerStreamlet {
  val out = AvroOutlet[SensorData]("out", s  s.deviceId.toString + s.timestamp.toString)
  def shape = StreamletShape.withOutlets(out)

  implicit val entityStreamingSupport = EntityStreamingSupport.json()
  override def createLogic = HttpServerLogic.defaultStreaming(this, out)
}

The defaultStreaming requires an implicit FromByteStringUnmarshaller[Out] and an EntityStreamingSupport. The above example provides a EntityStreamingSupport.json() to read the JSON stream. The defaultStreamingLogic uses this to read JSON from the request entity. The SensorDataJsonSupport in the above example provides implicit spray-json JSON Formats to unmarshal the JSON elements in the stream. The akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport object implicitly provides a FromByteStringUnmarshaller based on the formats in the SensorDataJsonSupport object.

The Java example below shows how to use the defaultStreaming in a similar manner:

import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.marshallers.jackson.Jackson;

import pipelines.akkastream.AkkaServerStreamlet;

import pipelines.akkastream.util.javadsl.HttpServerLogic;
import pipelines.akkastream.StreamletLogic;
import pipelines.streamlets.StreamletShape;
import pipelines.streamlets.avro.AvroOutlet;

public class SensorDataStreamingIngress extends AkkaServerStreamlet {

  AvroOutlet<SensorData> out =  AvroOutlet.<SensorData>create("out", s -> s.getDeviceId().toString() + s.getTimestamp().toString(), SensorData.class);

  public StreamletShape shape() {
   return StreamletShape.createWithOutlets(out);
  }

  public StreamletLogic createLogic() {
    EntityStreamingSupport ess = EntityStreamingSupport.json();
    return HttpServerLogic.createDefaultStreaming(this, out, Jackson.byteStringUnmarshaller(SensorData.class), ess, getStreamletContext());
  }
}
Define a custom Route

If you want to provide your own akka-http Route, override the route method. The route method provides a sinkRef argument which you can use to write to the outlet.

override def createLogic = new HttpServerLogic(this, outlet) {
  def route(sinkRef: WritableSinkRef[Out]): Route = {
    put {
      entity(as[Data]) { data 
        onSuccess(sinkRef.write(data)) { _ 
          complete(StatusCodes.OK)
        }
      }
    }
  }
}

The above Scala example creates a route that will handle put requests where the entity contains Data, which is written to the outlet using the WritableSinkRef.

7.3.2. SplitterLogic

Use case

A SplitterLogic can be used to split a stream in two, writing elements to one of two outlets. Every element from the outlet will be processed through a FlowWithPipelinesContext, which provides at-least-once semantics.

Example

The SplitterLogic defines an abstract flow method for the user to supply a FlowWithPipelinesContext[I, Either[L, R]]. The Java version of Splitter uses an Either type that is bundled with pipelines as pipelines.akkastream.javadsl.util.Either.

The Scala example below shows a Splitter that validates metrics and splits the stream into valid and invalid metrics, which are written respectively to the valid and invalid outlets:

import pipelines.akkastream._
import pipelines.akkastream.util.scaladsl._
import pipelines.streamlets._
import pipelines.streamlets.avro._

object MetricsValidation extends AkkaStreamlet {
  val in = AvroInlet[Metric]("in")
  val invalid = AvroOutlet[InvalidMetric]("invalid", m  m.metric.deviceId.toString + m.metric.timestamp.toString)
  val valid = AvroOutlet[Metric]("valid", m  m.deviceId.toString + m.timestamp.toString)
  val shape = StreamletShape(in).withOutlets(invalid, valid)

  override def createLogic = new SplitterLogic(in, invalid, valid) {
    def flow = flowWithPipelinesContext()
      .map { metric 
        if (!SensorDataUtils.isValidMetric(metric)) Left(InvalidMetric(metric, "All measurements must be positive numbers!"))
        else Right(metric)
      }
  }
}

The Java example below provides similar functionality:

import akka.stream.javadsl.*;
import pipelines.akkastream.javadsl.util.Either;

import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;

import com.typesafe.config.Config;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.util.javadsl.*;

public class MetricsValidation extends AkkaStreamlet {
  AvroInlet<Metric> inlet = AvroInlet.<Metric>create("in", Metric.class);
  AvroOutlet<InvalidMetric> invalidOutlet = AvroOutlet.<InvalidMetric>create("invalid",  m -> m.metric.toString(), InvalidMetric.class);
  AvroOutlet<Metric> validOutlet = AvroOutlet.<Metric>create("valid", m -> m.getDeviceId().toString() + m.getTimestamp().toString(), Metric.class);

  public StreamletShape shape() {
   return StreamletShape.createWithInlets(inlet).withOutlets(invalidOutlet, validOutlet);
  }

  public SplitterLogic createLogic() {
    return new SplitterLogic<Metric,InvalidMetric, Metric>(inlet, invalidOutlet, validOutlet, getStreamletContext()) {
      public FlowWithContext<Metric, PipelinesContext, Either<InvalidMetric, Metric>, PipelinesContext, NotUsed> createFlow() {
        return createFlowWithPipelinesContext()
          .map(metric -> {
            if (!SensorDataUtils.isValidMetric(metric)) return Either.left(new InvalidMetric(metric, "All measurements must be positive numbers!"));
            else return Either.right(metric);
          });
      }
    };
  }
}

7.3.3. MergeLogic

Use case

A MergeLogic can be used to merge two or more inlets into one outlet.

Elements from all inlets will be processed with at-least-once semantics. The elements will be processed in semi-random order and with equal priority for all inlets.

Example

The Scala example below shows a Merge configured to combine elements from two inlets of the type Metric into one outlet of the same type.

object MetricMerge extends AkkaStreamlet {

  val in0 = AvroInlet[Metric]("in-0")
  val in1 = AvroInlet[Metric]("in-1")
  val out = AvroOutlet[Metric]("out", m  m.deviceId.toString + m.timestamp.toString)

  final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out)

  override final def createLogic = new MergeLogic(Vector(in0, in1), out)
}

object MetricsMerge extends Merge[Metric](5)

The Java example below provides similar functionality:

class TestMerger extends AkkaStreamlet {
    AvroInlet<Data> inlet1 = AvroInlet.<Data>create("in-0", Data.class);
    AvroInlet<Data> inlet2 = AvroInlet.<Data>create("in-1", Data.class);
    AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out",  d -> d.name(), Data.class);

    public StreamletShape shape() {
      return StreamletShape.createWithInlets(inlet1, inlet2).withOutlets(outlet);
    }
    public MergeLogic createLogic() {
      List<CodecInlet<Data>> inlets = new ArrayList<CodecInlet<Data>>();
      inlets.add(inlet1);
      inlets.add(inlet2);
      return new MergeLogic(inlets, outlet, getStreamletContext());
    }
}

7.4. Testing an Akka Streamlet

A testkit is provided to make it easier to write unit tests for Akka Stream streamlets. The unit tests are meant to facilitate local testing of streamlets. The testkit allows writing of tests in Scala, as well as Java.

7.4.1. Basic flow of testkit APIs

Here’s the basic flow that you need to follow when writing tests using the testkit:

  1. Instantiate the testkit

  2. Setup inlet taps that tap the inlet ports of the streamlet

  3. Setup outlet taps for outlet ports

  4. Push data into inlet ports

  5. Run the streamlet using the testkit and the setup inlet taps and outlet taps

  6. Use the probes of the outlet taps to a verify expected results

The testkit connects taps to inlets and outlets of a streamlet. The taps provide means to the tester to write data to the inlets and read data from outlets, making it possible to assert that the streamlet behaves as expected.

7.4.2. Using the testkit from Scala

Let’s consider an example where we would like to write unit tests for testing a FlowProcessor. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing framework.

Imports

Here are the imports that we need for writing the tests. These include some obligatory inputs for ScalaTest and test kits for Pipelines and Akka, and the code that is generated from the Avro schemas.

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.testkit._

import org.scalatest._
import org.scalatest.concurrent._

import pipelines.streamlets._
import pipelines.streamlets.avro._
import pipelines.akkastream._
import pipelines.akkastream.scaladsl._
import pipelines.akkastream.testdata._
import pipelines.akkastream.testkit._
Setting up a sample TestProcessor

Let’s set up a TestProcessor streamlet that we would like to test. It’s a simple one that filters events based on whether the event id is an even number.

class TestProcessor extends AkkaStreamlet {
  val in = AvroInlet[Data]("in")
  val out = AvroOutlet[Data]("out", _.id.toString)
  final override val shape = StreamletShape.withInlets(in).withOutlets(out)

  val flow = Flow[Data].filter(_.id % 2 == 0)
  override final def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = atMostOnceSource(in).via(flow).to(atMostOnceSink(out))
  }
}
The unit test

Here’s how we would write a unit test using AkkaStreamletTestkit. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

class TestProcessorSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {

  private implicit val system = ActorSystem("AkkaStreamletSpec")
  private implicit val mat = ActorMaterializer()

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
  }

  "An TestProcessor" should {

    val testkit = AkkaStreamletTestKit(system, mat)

    "Allow for creating a 'flow processor'" in {
      val data = Vector(Data(1, "a"), Data(2, "b"), Data(3, "c"))
      val expectedData = Vector(Data(2, "b"))
      val source = Source(data)
      val proc = new TestProcessor
      val in = testkit.inletFromSource(proc.in, source)
      val out = testkit.outletAsTap(proc.out)

      testkit.run(proc, in, out, ()  {
        out.probe.receiveN(1) mustBe expectedData.map(d  proc.out.partitioner(d) -> d)
      })

      out.probe.expectMsg(Completed)
    }
  }
}
Initialization and cleanups

As per ScalaTest guidelines, we can do custom cleanups in methods like afterAll() and after() depending on your requirements. In the current implementation we shutdown the actor system in afterAll():

override def afterAll: Unit = {
  TestKit.shutdownActorSystem(system)
}

Similarly you can have beforeAll() and before() for custom initializations.

If you have a number of tests that work based on similar initializations and cleanups you can also have a common base trait from which the test trait can extend.

7.4.3. Using the testkit from Java

Using from Java is almost the same as from Scala, the only difference being that you need to use idiomatic Java abstractions and frameworks for writing the tests. In this example we will write the test for the same TestProcessor streamlet using the Java DSL of the toolkit in JUnit 4.

Imports

Here are the imports that we need for writing the tests. These include some obligatory inputs for JUnit and test kits for Pipelines and Akka, and the code that is generated from the Avro schemas.

import org.junit.Test;
import org.junit.Assert;

import akka.NotUsed;

import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

import pipelines.akkastream.testdata.*;
import pipelines.akkastream.testkit.*;

import scala.compat.java8.FutureConverters;
Setting up a sample TestProcessor

Let’s set up a TestProcessor that we would like to test.

class TestProcessor extends AkkaStreamlet {
  AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
  AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", d -> d.name(), Data.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
  }

  public StreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getStreamletContext()) {
      public RunnableGraph<NotUsed> createRunnableGraph() {
        getAtMostOnceSource(inlet)
          .via(Flow.<Data>create().filter(d -> d.getId() % 2 == 0))
          .to(getAtMostOnceSink(outlet))
      }
    };
  }
}
The unit test

Here’s how we would write a unit test using JUnit. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

@Test
public void testFlowProcessor() {
  TestProcessor sfp = new TestProcessor();

  // 1. instantiate the testkit
  AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system, mat);

  // 2. Setup inlet taps that tap the inlet ports of the streamlet
  QueueInletTap<Data> in = testkit.makeInletAsTap(sfp.shape().inlet());

  // 3. Setup outlet probes for outlet ports
  ProbeOutletTap<Data> out = testkit.makeOutletAsTap(sfp.shape().outlet());

  // 4. Push data into inlet ports
  in.queue().offer(new Data(1, "a"));
  in.queue().offer(new Data(2, "b"));

  // 5. Run the streamlet using the testkit and the setup inlet taps and outlet probes
  testkit.<Data>run(sfp, in, out, () -> {
    // 6. Assert
    return out.probe().expectMsg(new scala.Tuple2<String, Data>("2", new Data(2, "b")));
  });

  // 6. Assert
  out.probe().expectMsg(Completed.completed());
}
Initialization and Cleanups

As per JUnit guidelines, we can do custom initializations and cleanups in methods like setup() and tearDown() respectively depending on your requirements. One common practice is to set up a base class that does all common initializations and clean ups for your tests.

import org.junit.BeforeClass;
import org.junit.AfterClass;

import org.scalatest.junit.JUnitSuite;

import scala.concurrent.duration.Duration;

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.testkit.TestKit;

public abstract class JavaDslTest extends JUnitSuite {

  static ActorMaterializer mat;
  static ActorSystem system;

  @BeforeClass
  public static void setUp() throws Exception {
    system = ActorSystem.create();
    mat = ActorMaterializer.create(system);
  }

  @AfterClass
  public static void tearDown() throws Exception {
    TestKit.shutdownActorSystem(system, Duration.create(10, "seconds"), false);
    system = null;
  }
}

8. Developing with Spark Streamlets

8.1. Spark Streamlet Fundamentals

A Spark streamlet has the following responsibilities:

  • It needs to capture your stream processing logic.

  • It needs to publish metadata which will be used by the sbt-pipelines plugin to verify that a blueprint is correct. This metadata consists of the shape of the streamlet (StreamletShape) defined by the inlets and outlets of the streamlet. Connecting streamlets need to match on inlets and outlets to make a valid pipelines topology, as mentioned previously in Composing Streamlets using Blueprints . Pipelines automatically extracts the shapes from the streamlets to verify this match.

  • For the Pipelines runtime, it needs to provide metadata so that it can be configured, scaled, and run as part of an application.

  • The inlets and outlets of a Streamlet have two functions:

    • To specify to Pipelines that the streamlet needs certain data streams to exist at runtime, which it will read from and write to, and

    • To provide handles inside the stream processing logic to connect to the data streams that Pipelines provides. The StreamletLogic provides methods that take an inlet or outlet argument to read from or write to. These will be the specific data streams that Pipelines has set up for you.

The next sections will go into the details of defining a Spark streamlet:

  • defining inlets and outlets

  • creating a streamlet shape from the inlets and outlets

  • creating a streamlet logic that uses the inlets and outlets to read and write data

8.1.1. Inlets and Outlets

A streamlet can have one or more inlets and outlets. Pipelines offers classes for Inlets and Outlets based on the codec of the data they manipulate. Currently, Pipelines supports Avro and hence the classes are named AvroInlet and AvroOutlet. Each outlet also allows the user to define a partitioning function that will be used to partition the data.

8.1.2. StreamletShape

The StreamletShape captures the connectivity and compatibility details of a Spark-based streamlet. It captures which—and how many—inlets and outlets the streamlet has.

The sbt-pipelines plugin extracts—amongst other things—the shape from the streamlet that it finds on the classpath. This metadata is used to verify that the blueprint connects the streamlets correctly.

Pipelines offers an API StreamletShape to define various shapes with inlets and outlets. Each inlet and outlet can be defined separately with specific names. The outlet also needs to have a partitioning function to ensure effective data partitioning logic.

When you build your own Spark streamlet, you need to define the shape. You will learn how to do this in Building a Spark Streamlet. The next section describes how the SparkStreamletLogic captures stream processing logic.

8.1.3. SparkStreamletLogic

The stream processing logic is captured in a SparkStreamletLogic, which is an abstract class.

The SparkStreamletLogic provides methods to read and write data streams, and provides access to the configuration of the streamlet, among other things.

A SparkStreamletLogic must setup one or more Structured Streaming Queries, represented by a collection of StreamingQuerys, through the method buildStreamingQueries. These jobs will be run by the run method of SparkStreamlet to produce a StreamletExecution. A StreamletExecution is a simple class to manage a collection of StreamingQuerys.

The SparkStreamletLogic is only constructed when the streamlet is run, so it is safe to put instance values and variables in it that you would like to use when the streamlet is running. Note that the Streamlet is created for extracting metadata and hence no instance values should be put inside a streamlet.

8.1.4. SparkStreamletContext

The SparkStreamletContext provides the necessary context under which a streamlet runs. It contains the following context data and contracts:

  • An active SparkSession to run Spark streaming jobs.

  • The Typesafe Config loaded from the classpath through a config method, which can be used to read configuration settings.

  • The name used in the blueprint for the specific instance of this streamlet being run.

  • A function checkpointDir which returns a directory on a persistent storage where checkpoints can be safely kept, making them available across restarts.

8.1.5. Lifecycle

In general terms, when you publish and deploy a Pipelines application, each streamlet definition becomes a physical deployment as one or more Kubernetes artifacts. In the case of Spark streamlets, each streamlet is submitted as a Spark application through the Spark Operator as a Spark Application resource. Upon deployment, it becomes a set of running pods, with one pod as a Spark Driver and n-pods as executors, where n is the scale factor for that streamlet.

Note

The Spark Operator is a Kubernetes operator dedicated to managing Spark applications.

The Spark Operator takes care of monitoring the streamlets and is responsible for their resilient execution, like restarting the Spark application in case of failure.

Spark Streamlet Deployment Model
Figure 35. Spark Streamlet Deployment Model

In Spark Streamlet Deployment Model, we can visualize the chain of delegation involved in the deployment of a Spark streamlet:

  • The Pipelines Operator prepares a Custom Resource describing the Spark streamlet and submits it to the Spark Operator

  • The Spark Operator processes the Custom Resource and issues the deployment of a Spark driver pod.

  • The Spark Driver then requests executor resources from Kubernetes to deploy the distributed processing.

  • Finally, if and when resources are available, the Spark-bound executors start as Kubernetes pods. The executors are the components tasked with the actual data processing, while the Spark driver serves as coordinator of the (stream) data process.

In this architecture, the Spark driver runs the Pipelines-specific logic that connects the streamlet to our managed data streams, at which point the streamlet starts consuming from inlets. The streamlet advances through the data streams that are provided on inlets and writes data to outlets.

If you make any changes to the streamlet and deploy the application again, the existing Spark applications will be stopped, and the new version will be started to reflect the changes. It could be that Spark streamlets are restarted in other ways, for instance by administrators.

This means that a streamlet can get stopped, started or restarted at any moment in time. The next section about message delivery semantics explains the options that are available for how to continue processing data after a restart.

8.1.6. Message Delivery Semantics

The message delivery semantics provided by Spark streamlets are determined by the guarantees provided by the underlying Spark sources and sinks used in the streamlet. Recall that we defined the different message delivery guarantees in Message Delivery Semantics.

Let’s consider the following types of streamlets as forming a topology as illustrated in Ingress-Processor-Egress Streamlets:

  • an ingress, a streamlet that reads from an external streaming source and makes data available to the pipeline

  • a processor, a streamlet that has an inlet and an outlet - it does domain logic processing on data that it reads and passes the processed data downstream

  • an egress, a streamlet that receives data on its inlet and writes to an external sink

The following sections will use these types of streamlets for describing message delivery semantics.

Ingress-Processor-Egress Streamlets
Figure 36. Ingress-Processor-Egress Streamlets
Message Delivery Semantics of Spark-based Ingresses

Spark-based ingresses use a Structured Streaming source to obtain data from an external streaming source and provide it to the Pipelines application. In this case, message delivery semantics are dependent on the capabilities of the source. In general, streaming sources deemed resilient will provide at-least-once semantics in a Pipeline application.

Refer to the The Structured Streaming Programming Guide or to Stream Processing with Apache Spark for more detailed documentation about your streaming source of choice.

Message Delivery Semantics of Spark-based Processors

A Spark based processor is a streamlet that receives and produces data internal to the Pipelines application. In this scenario, processors consume data from inlets and produce data to outlets using Kafka topics as the underlying physical implementation.

At the level of a Spark-based processor, this translates to using Kafka sources and sinks. Given that Kafka is a reliable data source from the Structured Streaming perspective, the message delivery semantics are considered to be at-least-once.

Note
Testing At-Least-Once Message Delivery

Bundled with our examples, spark-resilience-test can be used to verify that messages are processed with at-least-once semantics even while a part of the pipeline is continuously failing.

Message Delivery Semantics of Spark-based Egresses

The message delivery guarantees of a Spark-based egress are determined by the combination of the Kafka-based inlet and the target system where the egress will produce its data.

The data provided to the egress is delivered with at-least-once semantics. The egress is responsible to reliably produce this data to the target system. The overall message delivery semantics of the Pipeline will depend on the reliability characteristics of this egress.

In particular, it is possible to 'upgrade' the end-to-end message delivery guarantee to effectively-exactly-once by making the egress idempotent and ensuring that all other streamlets used provide at-least-once semantics.

For this purpose, we could use Structured Streaming’s deduplication feature, or use a target system able to preserve uniqueness of primary keys, such as an RDBMS.

Message Delivery Semantics and the Kafka Retention Period

The at-least-once delivery is guaranteed within the Kafka retention configuration. This retention is a configuration proper to the Kafka brokers that dictates when old data can be evicted from the log. If a Spark-based processor is offline for a longer time than the configured retention, it will restart from the earliest offset available in the corresponding Kafka topic, which might silently result in data loss.

8.1.7. Reliable Restart of Stateful Processes

In a Pipeline application, it’s possible to use the full capabilities of Structured Streaming to implement the business logic required in a streamlet. This includes stateful processing, from time-based aggregations to arbitrary stateful computations that use mapGroupsWithState and flatMapGroupsWithState.

All stateful processing relies on snapshots and a state store for the bookkeeping of the offsets processed and the computed state at any time. In Pipelines, this state store is deployed on Kubernetes using Persistent Volume Claims (PVCs) backed by a storage class that allows for access mode ReadWriteMany.

PVCs are automatically provisioned for each Pipelines application. The volumes are claimed for as long as the Pipeline application is deployed, allowing for seamless re-deployment, upgrades, and recovery in case of failure.

We can use the managed storage to safely store checkpoint data. Checkpoint information is managed by Pipelines for all streamlets that use the context.writeStream method.

In case we need a directory to store state data that must persist across restarts of the streamlet, we can obtain a directory mounted on the managed PVC using the context.checkpointDir(name) method. This method takes as parameter the name of the directory we want for our particular use and returns a path to a persistent storage location.

When implementing egresses that use Spark’s Structured Streaming sinks, ensure that each query uses a unique name for its checkpointDir

In Example of the use of checkpointDir, we can see the use of the checkpointDir method to provide the checkpoint directory to a console-based egress.

Example 1. Example of the use of checkpointDir
override def buildStreamingQueries = {
  readStream(in).writeStream
    .format("console")
    .option("checkpointLocation", context.checkpointDir("console-egress"))
    .outputMode(OutputMode.Append())
    .start()
    .toQueryExecution
}
Warning
Storage Size is Currently Fixed

Please note that — currently — the volume size allocated for storage is fixed per Pipelines platform deployment.

8.1.8. Checkpoints and Application Upgrades

The storage of snapshots and state has certain impact on the upgradability of the Spark-based components. Once a state representation is deployed, the state definition (schema) may not change.

Upgrading an application that uses stateful computation requires planning ahead of time to avoid making incompatible changes that prevent recovery using the saved state. Significant changes to the application logic, addition of sources, or changes to the schema representing the state are not allowed.

For details on the degrees of freedom and upgrade options, please refer to the Structured Streaming documentation.

8.2. Building a Spark Streamlet

The following sections describe how you can create a Spark streamlet. As mentioned in the Spark Streamlet Fundamentals, a Spark streamlet is defined by the following features:

  • It is a Streamlet. Pipelines offers a class for implementing Spark streamlets, SparkStreamlet which extends pipelines.streamlets.Streamlet. Any Spark streamlet needs to extend SparkStreamlet.

  • It has a shape - we call it StreamletShape. Any Spark streamlet needs to define a concrete shape using the APIs available for the StreamletShape class, which defines the inlets and outlets of the streamlet.

  • It has a StreamletLogic that defines how the streamlet generates StreamingQuerys from the business logic.

In this tutorial we’ll build a simple Spark streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet ReportPrinter.

8.2.1. Extending from SparkStreamlet

Lets start with building the ReportPrinter streamlet. The first thing to do is extend the pipelines.spark.SparkStreamlet abstract class, as shown below:

package com.example

import pipelines.streamlets._
import pipelines.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

The code snippet above shows an abstract class SparkConsoleEgress that extends SparkStreamlet. We have shown the steps needed to complete the implementation, which we will do in the next few sections.

The next step is to implement inlets and outlets of the streamlet.

8.2.2. Inlets and Outlets

The streamlet that we are building in this tutorial will have an inlet and no outlet.

package com.example

import pipelines.streamlets.avro._
import pipelines.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. TODO Define the shape of the streamlet using inlets and outlets
  // 3. TODO Override createLogic to provide StreamletLogic, where the inlets and outlets are used to read and write streams.
}

Pipelines supports Avro encoded processing of data - we make this explicit by defining the inlet as AvroInlet. Report is the class of objects that will be accepted by this inlet. This means that an inlet defined by AvroInlet[Report] will only accept Avro encoded data for the class Report. The class Report will be generated by Pipelines during application build time from the Avro schema that the user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier. As an example we can have the following Avro schema for the Report object that contains a report of some of the attributes of products from an inventory:

{
  "namespace": "com.example",
  "type": "record",
  "name": "Report",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "description",
      "type": "string"
    },
    {
      "name": "keywords",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

In the definition of the inlet, "report-in" is the name of the inlet. It’s recommended that you use a domain specific name for the inlet which indicates the nature of data that this inlet is supposed to accept. We will use this inlet later to read data from it.

This streamlet does not have any outlet. But in general outlets are defined similarly, val out = AvroOutlet[Report]("report-out", _.name) will define an outlet which will write Avro encoded data for the object of type Report. Here "report-out" is the name of the outlet and _.name is the partitioning function that partitions the data from the outlet.

8.2.3. Streamlet Shape

Lets now define the shape of ReportPrinter by using the APIs in pipelines.streamlets.StreamletShape:

package com.example

import pipelines.streamlets._
import pipelines.streamlets.avro._
import pipelines.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet using inlets and outlets
  override val shape = StreamletShape.withInlets(in)

  // 3. TODO Override createLogic to provide StreamletLogic, where the inlets and outlets are used to read and write streams.
}

The above code overrides the shape method with a value that defines the shape of the streamlet. StreamletShape offers methods to define shapes, e.g. to define a streamlet with two inlets and two outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid).

The next step is to define the SparkStreamletLogic.

8.2.4. Defining the SparkStreamletLogic

The SparkStreamletLogic class makes it possible for a user to specify domain logic. It is defined as an abstract class in pipelines.spark.SparkStreamletLogic and provides an abstract method buildStreamingJobs where the user can define the specific logic for the Spark Streamlet.

In this step we need to override createLogic from SparkStreamlet in our ReportPrinter object. createLogic needs to return an instance of SparkStreamletLogic which will do the processing based on the requirements of ReportPrinter object.

package com.example

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.OutputMode

import pipelines.streamlets._
import pipelines.streamlets.avro._
import pipelines.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet using inlets and outlets
  override val shape = StreamletShape.withInlets(in)

  // 3. Override createLogic to provide StreamletLogic, where the inlets and outlets are used to read and write streams.
  override def createLogic = new SparkStreamletlogic {
    // Define some formatting attributes
    val numRows = 50
    val truncate = false

    override def buildStreamingQueries = {
      val inDataset = readStream(in)
      val q = inDataset.writeStream
        .format("console")
        .option("numRows", numRows)
        .option("truncate", truncate)
        .outputMode(OutputMode.Append())
        .start()
      Seq(q)
    }
  }
}

In the above code we override createLogic from SparkStreamletLogic with an instance that overrides buildStreamingQueries to supply the domain logic for the streamlet. In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier, val in = AvroInlet[Report]("report-in"), and do some processing on it.

Here are the steps that we do as part of the processing logic:

  • Since it’s a console printer, we would like to write to console as specified by .format("console") in the implementation above

  • We use two parameters on how to display (a) how many rows to display at once and (b) if we would like to truncate long lines. These are defined by values numRows and truncate in the concrete implementation of SparkStreamletLogic.

Note that the processing logic can be quite complex and we can maintain state as part of the implementation of SparkStreamletLogic.

Note

If the streamlet needs to have local state (vals, vars) for processing logic, it has to be put inside the SparkStreamletLogic class and not as part of the Streamlet class. The Streamlet class is used by Pipelines for extraction of streamlets using reflection and hence cannot have any state within it.

Note

In summary, here are the steps for defining a Spark streamlet:

  • Define the inlets and outlets

  • Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Pipelines

  • Define the custom processing logic that will read data from inlets and write data to outlets

8.2.5. Using ReportPrinter in the blueprint

An example of a blueprint using the ReportPrinter could look like this:

blueprint {
  streamlets {
    ingress = com.example.ReportIngress
    report-printer = com.example.ReportPrinter
  }

  connections {
    ingress.out = [report-printer.report-in]
  }
}

The omitted ReportIngress could for instance be another SparkStreamlet that writes Reports to its outlet.

8.3. Testing a Spark Streamlet

A testkit is provided to make it easier to write unit tests for Spark streamlets. The unit tests are meant to facilitate local testing of streamlets.

8.3.1. Basic flow of testkit APIs

Here’s the basic flow that you need to follow when writing tests using the testkit:

  1. Extend the test class with the SparkScalaTestSupport trait. This trait provides the basic functionalities of managing the SparkSession, basic initialization and cleanups and the core APIs of the testkit

  2. Create a Spark streamlet testkit instance

  3. Create the Spark streamlet that needs to be tested

  4. Setup inlet taps that tap the inlet ports of the streamlet

  5. Setup outlet taps for outlet ports

  6. Push data into inlet ports

  7. Run the streamlet using the testkit and the setup inlet taps and outlet taps

  8. Write assertions to ensure that the expected results match the actual ones

8.3.2. Details of the workflow

Let’s consider an example where we would like to write unit tests for testing a SparkStreamlet that reads data from an inlet, does some processing and writes processed data to an outlet. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing library.

Setting up a sample SparkStreamlet

Here is a list of imports needed for writing the test suite.

import scala.collection.immutable.Seq
import scala.concurrent.duration._

import org.apache.spark.sql.streaming.OutputMode

import pipelines.streamlets.StreamletShape
import pipelines.streamlets.avro._
import pipelines.spark.avro._
import pipelines.spark.testkit._
import pipelines.spark.sql.SQLImplicits._

SparkStreamlet is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Spark streamlet, please refer to Building a Spark Streamlet.

// create Spark Streamlet
val processor = new SparkStreamlet {
  val in = AvroInlet[Data]("in")
  val out = AvroOutlet[Simple]("out", _.name)
  val shape = StreamletShape(in, out)

  override def createLogic() = new SparkStreamletLogic {
    override def buildStreamingQueries = {
      val dataset = readStream(in)
      val outStream = dataset.select($"name").as[Simple]
      val query = writeStream(outStream, out, OutputMode.Append)
      Seq(query)
    }
  }
}
The unit test

Here’s how we would write a unit test using ScalaTest. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

class SparkProcessorSpec extends SparkScalaTestSupport { // 1. Extend SparkScalaTestSupport

  "SparkProcessor" should {

    // 2. Initialize the testkit
    val testkit = SparkStreamletTestkit(session)

    "process streaming data" in {

      // 3. create Spark streamlet
      val processor = new SparkStreamlet {
        val in = AvroInlet[Data]("in")
        val out = AvroOutlet[Simple]("out", _.name)
        val shape = StreamletShape(in, out)

        override def createLogic() = new SparkStreamletLogic {
          override def buildStreamingQueries = {
            val dataset = readStream(in)
            val outStream = dataset.select($"name").as[Simple]
            val query = writeStream(outStream, out, OutputMode.Append)
            Seq(query)
          }
        }
      }

      // 4. setup inlet tap on inlet port
      val in: SparkInletTap[Data] = testkit.inletAsTap[Data](processor.in)

      // 5. setup outlet tap on outlet port
      val out: SparkOutletTap[Simple] = testkit.outletAsTap[Simple](processor.out)

      // 6. build data and send to inlet tap
      val data = (1 to 10).map(i  Data(i, s"name$i"))
      in.addData(data)

      // 7. Run the streamlet using the testkit and the setup inlet taps and outlet probes
      testkit.run(processor, Seq(in), Seq(out), 2.seconds)

      // get data from outlet tap
      val results = out.asCollection(session)

      // 8. Assert that actual matches expectation
      results should contain(Simple("name1"))
    }
  }
}
The SparkScalaTestSupport trait

This provides session management and needs to be mixed in with the main test class. This trait provides the following functionalities:

  1. Manage a SparkSession for all tests, initialized when the test class initialize.

  2. Cleanup the session using afterAll. If you want custom logic for cleanups, override the afterAll method and call super.afterAll() before adding your custom logic.

The SparkStreamletTestkit class
  1. Provide core APIs like inletAsTap, outletAsTap, run.

  2. Support for adding values for configuration parameters.

Special Note on Aggregation Query

There may be situations where the Spark query that you are testing involves aggregation operators. The testkit gives some special considerations to writing tests for such queries. Here’s an example of a Spark Streamlet that involves aggregation operators:

class CallStatsAggregator extends SparkStreamlet {

  val in = AvroInlet[CallRecord]("in")
  val out = AvroOutlet[AggregatedCallStats]("out", _.startTime.toString)
  val shape = StreamletShape(in, out)

  val GroupByWindow = DurationConfigParameter(
    "group-by-window",
    "Window duration for the moving average computation",
    Some("1 minute"))

  val Watermark = DurationConfigParameter(
    "watermark",
    "Late events watermark duration: how long to wait for late events",
    Some("1 minute"))

  override def configParameters = Vector(GroupByWindow, Watermark)

  override def createLogic = new SparkStreamletLogic {

    override def buildStreamingQueries = {
      val dataset = readStream(in)
      val outStream = process(dataset)
      val query = writeStream(outStream, out, OutputMode.Update)
      Seq(query)
    }

    val watermark = context.streamletConfig.getDuration(Watermark.key)
    val groupByWindow = context.streamletConfig.getDuration(GroupByWindow.key)

    private def process(inDataset: Dataset[CallRecord]): Dataset[AggregatedCallStats] = {
      println(s"Starting query with watermark $watermark, group-by-window $groupByWindow")
      val query =
        inDataset
          .withColumn("ts", $"timestamp".cast(TimestampType))
          .withWatermark("ts", "1 minute")
          .groupBy(window($"ts", "1 minute"))
          .agg(avg($"duration") as "avgCallDuration", sum($"duration") as "totalCallDuration")
          .withColumn("windowDuration", $"window.end".cast(LongType) - $"window.start".cast(LongType))

      query
        .select($"window.start".cast(LongType) as "startTime", $"windowDuration", $"avgCallDuration", $"totalCallDuration")
        .as[AggregatedCallStats]
    }
  }
}

The above query involves aggregation operations like groupBy over window and subsequent averaging of call durations tracked by the query. Such stateful streaming queries use a StateStore and need to be run with OutputMode.Update (recommended) or OutputMode.Complete when streaming data appears in the incoming DataSet. OutputMode.Update updates the aggregate with incoming data and hence is recommended for writing tests involving aggregation queries. On the other hand OutputMode.Complete does not drop old aggregation state since by definition this mode preserves all data in the Result Table.

For queries that do not involve aggregation, only new data need to be written and the query needs to be run with OutputMode.Append. This is the default behavior that the testkit sets for the user.

In order to have the query run with OutputMode.Update, the user needs to pass in this argument explicitly when setting up the outlet probe:

// setup outlet tap on outlet port
val out = testkit.outletAsTap[AggregatedCallStats](aggregator.shape.outlet, OutputMode.Update)

For more details on OutputMode, have a look at the relevant documentation on Spark.

Also since the stateful queries use StateStore, the latter needs to be explicitly stopped when the test suite ends:

override def afterAll(): Unit = {
  super.afterAll()
  StateStore.stop() // stop the state store maintenance thread and unload store providers
}

9. Using pipelines CLI

The Pipelines CLI allows you to create, manage, deploy, and operate Pipelines applications.

The CLI examples in this documentation assume you use oc plugin pipelines …​ to run Pipelines commands. If you prefer to use kubectl instead, then use kubectl pipelines …​ for all these commands.

If you use the oc CLI for OpenShift, you’ll need the Kubernetes kubectl CLI to run one command to install the Pipelines plugin into oc. See The install-oc-plugin command for more details.

Synopsis

oc plugin pipelines [command] [flags]

Options

  -h, --help   help for pipelines

9.1. The configure command

Configures a deployed Pipelines Application.

Synopsis

Configures a deployed Pipelines Application.

oc plugin pipelines configure [parameters]

Examples

oc plugin pipelines configure my-app mystreamlet.hostname=localhost

or to list all required configuration parameters:

oc plugin pipelines configure my-app

Options

  -h, --help   help for configure

9.2. The deploy command

Deploys a Pipelines Application to the cluster.

Synopsis

Deploys a Pipelines Application to the cluster. The arguments to the command consists of a docker image path and optionally one or more '<streamlet-name>.<property>=<value>' pairs, separated by a space.

You can optionally provide credentials for the docker registry found in the image path. The --username flag must be used in conjunction with --password-stdin or with --password. --password-stdin is preferred because it is read from stdin, which means that it does not end up in the history of your shell. One way to provide the password via stdin is to pipe it from a file:

cat key.json | oc plugin pipelines deploy eu.gcr.io/floating-mammoth-123414/foo/sensor-data-scala:292-c183d80 --username _json_key --password-stdin

You can also use --password, which is less secure:

oc plugin pipelines deploy eu.gcr.io/floating-mammoth-123414/foo/sensor-data-scala:292-c183d80 --username _json_key -password "$(cat key.json)"

If you do not provide a username and password, you will be prompted for them at first deployment.

The credentials are stored in an image pull secret, subsequent usage of the deploy command will use the stored credentials. You can update the credentials with the "update-docker-credentials" command.

oc plugin pipelines deploy [parameters]

Examples

oc plugin pipelines deploy registry.test-cluster.ingestion.io/pipelines/sensor-data-scala:292-c183d80 valid-logger.log-level=info valid-logger.msg-prefix=valid

Options

  -h, --help              help for deploy
  -p, --password string   docker registry password.
      --password-stdin    Take the password from stdin
  -u, --username string   docker registry username.

9.3. The install-oc-plugin command

Installs the Pipelines OpenShift CLI oc plugin.

Synopsis

Installs the Pipelines OpenShift CLI oc plugin. After installation all commands can be accessed by executing oc plugin pipelines.

oc plugin pipelines install-oc-plugin [parameters]

Options

  -h, --help   help for install-oc-plugin

9.4. The list command

Lists deployed Pipelines Application in the current cluster.

Synopsis

Lists deployed Pipelines Application in the current cluster.

oc plugin pipelines list [parameters]

Options

  -h, --help   help for list

9.5. The scale command

Scales a streamlet of a deployed Pipelines Application to the specified number of replicas.

Synopsis

Scales a streamlet of a deployed Pipelines Application to the specified number of replicas.

oc plugin pipelines scale [parameters]

Examples

oc plugin pipelines my-app my-streamlet 2

Options

  -h, --help   help for scale

9.6. The status command

Gets the status of a Pipelines Application.

Synopsis

Gets the status of a Pipelines Application.

oc plugin pipelines status [parameters]

Examples

oc plugin pipelines status my-app

Options

  -h, --help   help for status

9.7. The undeploy command

Undeploys one or more Pipelines applications.

Synopsis

Use this command to undeploy one or more Pipelines applications by providing a space separated list of application names.

oc plugin pipelines undeploy [parameters]

Options

  -h, --help   help for undeploy

9.8. The update-docker-credentials command

Updates docker registry credentials that are used to pull Pipelines application images.

Synopsis

This command configures the pipelines service account to use the specified Docker registry credentials so that Pipelines application images can be pulled from that docker registry. The arguments to the command consists of the namespace that the application is deployed to and the docker registry that you want pipelines to pull Pipelines application images from. You will be prompted for a username and password.

oc plugin pipelines update-docker-credentials [parameters]

Examples

oc plugin pipelines update-docker-credentials my-app docker-registry-default.server.example.com

Options

  -h, --help   help for update-docker-credentials

9.9. The version command

Prints the plugin version.

Synopsis

Prints the plugin version.

oc plugin pipelines version [parameters]

Options

  -h, --help   help for version