Lightbend

© 2018-2019, Lightbend, Inc. All Rights Reserved.
Release Date: May 20, 2019
Last Doc Update: 2019-05-17 20:13:03 UTC

1. An Introduction to Pipelines

Welcome to Lightbend Pipelines, v1.0.1 for OpenShift.

1.1. Why Pipelines?

Technologies like mobile, the Internet of Things (IoT), Big Data analytics, machine learning, and others are driving enterprises to modernize how they process large volumes of data. A rapidly growing percentage of that data is now arriving in the form of data streams and a growing percentage of those streams now require near-realtime processing. We use the term "Fast Data" to describe applications and systems that deal with such requirements.

The Fast Data landscape has been rapidly evolving, with tools like Spark, Flink, and Kafka Streams emerging from the world of large scale batch processing while projects like Reactive Streams and Akka Streams have emerged from the world of application development and high-performance networking.

The demand for availability, scalability, and resilience is forcing Fast Data architectures to become more like microservice architectures. Conversely, successful organizations building microservices find their data needs grow with their organization while their data sources are becoming more stream-like and more real-time. Hence, there is a unification happening between streaming data and microservice architectures.

It can be quite hard to develop, deploy, and operate large-scale microservices-based systems that can take advantage of streaming data and seamlessly integrate with systems for analytics processing and machine learning. The individual technologies are well-documented individually but combining them into fully integrated unified systems is no easy task.

Pipelines aims to make this easier by integrating the most popular streaming frameworks into a single platform for creating and running distributed Fast Data applications.

1.2. What does Pipelines do?

Pipelines allows developers to quickly build and deploy large, distributed stream processing applications by breaking such applications into smaller stream processing units called streamlets. Each streamlet represents a discrete chunk of stream processing logic with data being safely persisted at the edges using pre-defined schemas. Streamlets can be scaled up and down to process partitioned data streams. Streamlets can be written using multiple streaming runtimes, such as Akka Streams and Spark. This exposes the full power of the underlying runtime and its libraries while providing a higher-level abstraction for composing streamlets and expressing data schemas.

Streamlets can be composed into larger systems using "application blueprints", which specify how streamlets can be connected together. They can then be deployed as a whole and Pipelines will take care of deploying the individual streamlets and making sure connections get translated into data flowing between the streamlets at runtime.

Pipelines provides tooling for developing streamlets, composing them into applications, deploying those applications to your clusters, as well as operating and monitoring deployed applications.

image

1.3. Terminology

1.3.1. Applications

Everything in Pipelines is done in the context of an Application, which represents a self-contained distributed system of data processing services connected together by data streams.

Pipelines users can create and manage multiple applications, each one forming a separate self-contained system.

apps 1

Within applications, there are three main types of entities to interact with:

  • streamlets

  • the application blueprint

  • the deployed application

apps 2

1.3.2. Streamlets

Streamlets form the basic building blocks of applications since they implement the actual stream processing logic. Streamlets are deployed by Pipelines as self-contained, isolated services, which can then be composed into streaming distributed applications.

Streamlets can be visualized as black boxes with at least one data inlet and/or data outlet. Each inlet and outlet is explicitly typed using a schema.

streamlet

Every streamlet has a unique name (e.g. its Scala/Java canonical class name) as well as a number of metadata properties such as a description and a set of labels.

Outlets represent durable data. This means that the Pipelines runtime guarantees that data written by a streamlet to one of its outlets will be persisted and will survive restarts of the streamlet or the entire application.

Depending on the number of inlets and outlets, streamlets can have different shapes.

In the next few sections we will discuss the main attributes of a streamlet.

Inlet and Outlet

A streamlet can have inlets and outlets. A streamlet should have at least one inlet or outlet. Each inlet is identified by the following:

  • a codec indicating the type of data it accepts. In Pipelines we currently support Avro as the codec

  • the class of data it accepts used as the type parameter in defining the inlet

  • a name specified as part of the inlet definition

Similarly all outlets have the same attributes as the inlets. In addition every outlet also needs to specify a partitioning function as part of its definition. This forms the logic based on which data will be partitioned when writing to Kafka downstream. Data partitioning in Kafka ensures scalability.

Streamlet Shape

The inlets and outlets define the shape of a streamlet. A streamlet that’s supposed to accept streaming data of a specific type and split into valid and invalid records can have one inlet and two outlets (one for valid and the other for invalid records). Pipelines offers APIs to build streamlets of various shapes.

Streamlet Logic

Logic defines the business process that the streamlet is supposed to execute. For a streamlet that validates incoming streaming data and classifies into valid and invalid records, the exact logic of validation is the business process and must be defined as part of the streamlet logic.

Depending on the shape we can think of various types of streamlets which can conceptually map to elements of a streaming data pipeline. In the following sections we discuss a few of the commonly used ones.

Ingress

An Ingress is a streamlet with zero inlets and one or more outlets. An ingress could be a server handling requests, like the HttpIngress that comes standard with Pipelines. It could also be polling some back-end for data or it could even be a simple test data generator.

streamlets ingress

Processor

A Processor has one inlet and one outlet. Processors represent common data transformations like map and filter, or any combination of them.

streamlets processor

FanOut

FanOut-shaped streamlets have a single inlet and two or more outlets.

streamlets fanout

FanIn

FanIn-shaped streamlets have a single outlet and two or more inlets.

streamlets fanin

Egress

An Egress represents data leaving the Pipelines application. For instance this could be data being persisted to some database, notifications being sent to Slack, files being written to HDFS, etc.

streamlets egress

1.3.3. Schemas

Inlets and outlets of streamlets are explicitly typed, e.g. they only handle data that conform to specific Avro schemas. Pipelines takes a schema-first approach:

  • Avro schemas need to be defined for every inlet and outlet of the streamlet. The Pipelines sbt plugin generates Java or Scala classes from the Avro schemas.

  • The generated types are used when defining the inlets and outlets of a streamlet.

  • Inlets and outlets can only be connected when their schemas are compatible.

For more information on schema compatibility and resolution, see Avro Schema Resolution.

1.3.4. Blueprints

After you have created some streamlets, it’s time to compose them together into a streaming application. Composition and configuration of streamlets is done by building an "application blueprint", which represents the entire distributed application as a set of named instances of streamlets connected together, an example of which is shown in the figure below:

blueprint

The blueprint is just a source file which is part of the sbt project. For the above blueprint that file would look something like this:

blueprint {
  streamlets {
    metrics = com.foo.Ingress1
    map = com.foo.Processor1
    logger = com.foo.Egress1
    validate = com.bar.FanOut1
    store = com.foo.Egress2
    logger2 = com.foo.Egress3
  }
  connections {
    metrics.out-0 = [map.in]
    metrics.out-1 = [logger.in]
    map.out = [validate.in]
    validate.out-0 = [store.in]
    validate.out-1 = [logger2.in]
  }
}

Each streamlet in the application blueprint is added by giving it a unique "reference" name, which identifies it. Multiple instances of the same streamlet can be added to the blueprint as long as they have different reference names.

Two streamlets can be connected together by connecting the inlet of one streamlet to a compatible outlet of another streamlet. Compatibility of inlets with outlets is purely based on the compatibility of their associated schemas. An inlet with a schema A can only be connected to an outlet that has a schema that is compatible with schema A. This compatibility is verified by the Pipelines sbt plugin.

Each outlet can have multiple downstream inlets connected to it. Every inlet reads data from the outlet independently. This is why the above blueprint file shows that each outlet actually has a list of inlets connected to it instead of a single inlet.

An example of a blueprint with multiple inlets connected to the same outlet would look like this:

    ...
    validation.out-1 = [
      logger2.in,
      other-streamlet.in
    ]
    ...

For more details on how to develop, compose, configure, deploy, and operate streamlets in an application, see the Getting Started guide and the Development Guide.

1.3.5. Streamlet Runtimes

Streamlets can be developed using several supported technologies. In Pipelines such a supported technology is called a "runtime".

Pipelines currently supports two runtimes:

  • Akka Streams (2.5.19+, Scala 2.12 or Java 8)

  • Spark (2.4.3+, Scala 2.12)

The Development Guide has sections on how to write streamlets for the specific runtimes.

1.4. Deployment

The blueprint can be verified through an sbt command. An application can be deployed when the application blueprint does not contain problems. Pipelines will take the blueprint and deploy all its streamlets to the relevant runtime(s) in such a way that data can start streaming as described.

1.5. Interacting with Deployed Applications

A deployed application is in its running state, e.g data is streaming from streamlet to streamlet.

deploy 2

Pipelines applications also generate lots of metrics that can be monitored using the Lightbend Console and the upcoming Pipelines Monitoring UI.

1.6. Data Safety and Overhead of Streamlets

As mentioned earlier, streamlets are deployed by the Pipelines operator as self-contained, isolated services. Data that flows between different streamlets will be serialized and persisted.

Data flowing between inlets and outlets is guaranteed, within some constraints, to be delivered at least once, with a minimum of potential duplicated records during live restarts or upgrades. More details about data safety can be found in the development guides for the different runtimes.

Of course data persistence between streamlets adds overhead and it is important to realize this when designing your applications. Several individual data processing operations can be combined into one single streamlet or spread out over multiple streamlets. Which design is best will depend on your use case and, for instance, on whether there is any value in the intermediate data produced. Intermediate data from an outlet can be consumed by multiple downstream streamlets instead of just one; each streamlet will receive all of the data.

For example, let’s look at the following blueprint:

blueprint

Data transformation and validation are done in two separate steps. If the output of the initial transformation is not useful, e.g. if no other streamlets will consume that stream, then the map and validate streamlet can be combined into a single streamlet to save the overhead of (de)serialization and persistence:

blueprint merged streamlet

2. Installing Pipelines

2.1. Overview

The Pipelines installer is a set of scripts and Helm charts that are packaged in a docker container together with all their dependencies.

The installer will perform the following sequence of steps:

  • Make sure that the correct version of Helm’s Tiller component is installed and with the correct permissions.

  • Add the Lightbend and Kubernetes Stable Helm chart repositories.

  • Install the Kafka and Spark operators using Helm.

  • Install the Lightbend console, which includes the Pipelines monitoring UI, using Helm.

  • Configure and Install Pipelines using Helm.

For each step there will be output on screen detailing the progress from the executables used.

2.2. Requirements

The following requirements must be met before running the installer:

2.2.1. Cluster requirements

2.2.2. Installer requirements

  • Kubernetes CLI tool, kubectl

  • OpenShift CLI tool, oc

  • Docker

  • Credentials for the Lightbend commercial repositories.

  • Credentials for the OpenShift cluster.

  • Credentials for the OpenShift container registry.

  • The user must be a cluster admin to install Pipelines. The following objects are created during the installation:

    • projects/namespaces.

    • deployments

    • service accounts

    • clusterrole bindings

    • role bindings

    • roles

    • secrets

    • persistent volumes claims

    • config maps

2.3. Installing

Login to the Lightbend commercial docker registry:

docker login -u  -p  lightbend-docker-commercial-registry.bintray.io

Pull the docker image with the Pipelines installer:

docker pull lightbend-docker-commercial-registry.bintray.io/pipelines/pipelines-installer:1.0.0

Start the container and execute bash.

docker run -it lightbend-docker-commercial-registry.bintray.io/pipelines/pipelines-installer:1.0.0 /bin/bash

In what follows, $CLUSTER_FQDN is the cluster’s fully-qualified domain name or IP address. Either define this environment variable for your cluster in your current command window, so you can copy and paste the commands below, or substitute the correct value when you enter the commands.

Login to the cluster where you want to install Pipelines. For OpenShift, use the following command:

oc login $CLUSTER_FQDN --token=

Run the Pipelines installer:

./install.sh $CLUSTER_FQDN

The script will query for the credentials to the Lightbend commercial repository.

Wait until the script completes and check the log output for the following text:

+------------------------------------------------------------------------------------+
|                      Installation of Pipelines has completed                       |
+------------------------------------------------------------------------------------+

This signals that the installation completed successfully and Pipelines has been deployed to your cluster in the namespace lightbend.

2.4. Post installation

2.4.1. Validating the installation

After the installation has completed, Kubernetes will be creating pods and other resources. This may take some time. You can check that all pods are healthy with the following command:

oc get pods -n lightbend

NAME                                                         READY   STATUS    RESTARTS   AGE
es-console-6f59cc59fd-xbcrx                                  2/2     Running   0          1m
grafana-server-57c887dd5-8ssmv                               1/1     Running   0          1m
pipelines-operator-698cf99d74-4bzn8                          1/1     Running   0          1m
pipelines-sparkoperator-fdp-sparkoperator-649695854c-6h7gz   1/1     Running   0          1m
pipelines-strimzi-entity-operator-549b4c45c4-prrcj           2/2     Running   0          1m
pipelines-strimzi-kafka-0                                    2/2     Running   0          1m
pipelines-strimzi-kafka-1                                    2/2     Running   0          1m
pipelines-strimzi-kafka-2                                    2/2     Running   0          1m
pipelines-strimzi-zookeeper-0                                2/2     Running   0          1m
pipelines-strimzi-zookeeper-1                                2/2     Running   0          1m
pipelines-strimzi-zookeeper-2                                2/2     Running   0          1m
prometheus-alertmanager-84fd6ffc5f-dt4nj                     2/2     Running   0          1m
prometheus-kube-state-metrics-f66c8f9c-mldbm                 1/1     Running   0          1m
prometheus-server-8fc8948cf-qmc75                            3/3     Running   0          1m
strimzi-cluster-operator-56d699b5c5-vqnc2                    1/1     Running   0          1m

When all pods are running, open a browser and navigate to the Lightbend Console. Use the following command to find the exact address of the Console:

oc get routes -n lightbend --no-headers | awk -v i=1 -v j=2 'FNR == i {print $j}'

The following image shows the landing page of the Lightbend Console.

The Pipelines UI

There will be a "Pipelines" icon in the top-left corner that will navigate to the main Pipelines Console page.

More details on the Pipelines Console can be found in the Monitoring Pipelines guide.

2.4.2. Configuring a container registry for Lightbend Pipelines

After installation you will need to enable anonymous access of the Lightbend container registry namespace. This is done using the container registry UI, which is located at https://registry-console-default.[FQDN]/registry. The figure below shows how to enable anonymous access. Select Projects from the left panel, select the lightbend project and click on the project access policy link and change the access policy to Anonymous: Allow all unauthenticated users to pull images.

OpenShift Container Registry

Failure to enable anonymous access will prevent users from deploying Pipelines applications. If that happens, the oc plugin pipelines deploy command will print the following:

Error response from daemon: unauthorized: authentication required

2.4.3. Adjusting Kafka PVC size

By default Kafka will request 500 GB of persistent volume claims and the the max retention bytes per partition will be set to a 5000th of that. This PVC size and the log retention byte ratio can be adjusted with the following script.

./common/adjust-kafka-sizing.sh 500 5000

Setting PVC size to 5000Gi
Setting log retention bytes to 1073741824 bytes
kafka.kafka.strimzi.io/pipelines-strimzi patched
kafka.kafka.strimzi.io/pipelines-strimzi patched

This will update the PVC size to 5000 GB and the partition log retention bytes to ~1 GB.

2.4.4. Adjusting resource limits of Akka and Spark pods

The default values of the memory assigned to new pods are the following:

Spark driver pod

Request Memory: 512 M
Request CPU:    0.5
Limit CPU:    Not set / Optional

Spark executor pod

Request Memory: 2048 M
Request CPU:    1
Limit CPU:    Not set / Optional

Akka pod

Request Memory: 1024 M
Request CPU:    1
Limit Memory: 1024 M
Limit CPU:    1

These values can be changed using the following scripts provided with the installer.

./common/adjust-pod-cpu-resource.sh ['akka' | 'spark-executor' | 'spark-driver'] ['limits' | 'requests'] 
./common/adjust-pod-memory-resource.sh ['akka' | 'spark-executor' | 'spark-driver'] ['limits' | 'requests'] 

These scripts, that change memory and CPU resources, take three arguments:

  • Pod type to configure. This can be either akka, spark-driver or spark-executor.

  • Type to set, it can be limits or requests.

  • New CPU limit in "millicores" (for example 2000 for 2 cores) or the new memory limit in MB (for example 1024 for 1 GB of memory).

Examples:

Adjusting CPU limits for Akka pods to 2 CPUs:

./common/adjust-pod-cpu-resource.sh akka limits 2000

Adjusting CPU requests for Akka pods to 1 CPU:

./common/adjust-pod-cpu-resource.sh akka requests 1000

Adjusting memory limits for Spark-executor pods to 4GB:

./common/adjust-pod-cpu-resource.sh akka requests 4096
Warning

If the limits is adjusted below the request application deployment will fail. Also if you adjust the requests to be higher than your cluster node size, the streamlets can’t be run and deployment will fail.

Spark pods also have a memory overhead parameter. Memory overhead is the amount of off-heap memory a spark executor pod may allocate in MB.

./common/adjust-pod-spark-memory-overhead.sh [spark-executor | spark-driver] 

For Spark executor pods the JAVA_OPTS can be adjusted using the following script.

./common/adjust-spark-executor-java-opts.sh 

The default value for this setting is:

-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap

It recommended that these values are included in the string set.

2.5. Calculating Pipelines Application resource usage

To estimate the size of a Pipelines application the following information can be used. For each streamlet in the application, add the number of CPUs and memory it uses to the total.

Spark streamlets
Memory: 1580 M
CPU:    3

Akka streamlets
Memory: 1024 M
CPU:    1
Note

The Spark values should be multiples with the (default 2) scaling factor + 1 for the total resource usage per streamlet.

We can use this information to calculate the limits for the sample application "call-record-aggregator" (using the default scaling factor of 2 for Spark streamlets).

Streamlet               CPU     Memory
cdr-generator1          1       1024
cdr-generator2          1       1024
merge                   1       1024
cdr-ingress             1       1024
cdr-validator           1       1024
cdr-aggregator          3       1580
    executor 1          3       1580
    executor 2          3       1580
console-egress          1       1024
error-egress            1       1024

Total                   16      11908

Each node in the cluster must be able to run each of these pods, which in this case means that the minimum node size of the cluster is 3 CPUs and 1580 MB of memory. We would advise to add a healthy percentage on top of that for overhead per node.

2.6. FAQs

2.6.1. Q) What is the minimum node size needed to deploy a Pipelines application ?

A) Calculate the resource limits for all streamlets in a Pipelines applications. The size of the nodes needs to be able to accommodate the largest streamlet in the application plus the overhead of the pod.

2.6.2. Q) Where can I find more information about OpenShift and GlusterFS ?

3. Pipelines Quick Start

This document gives an overview of the steps that a user needs to follow in order to develop a complete Pipelines application. It talks about the simplest use case using only a couple of streamlets. For more details you are encouraged to take a look at Build and Deploy a Pipelines Application.

3.1. Setting up a Pipelines Development Environment

3.1.1. Prerequisites

This guide assumes that you have the following tools installed:

3.1.2. Access to Lightbend Commercial Repositories

Pipelines currently depends on the Lightbend commercial repository for the various artifacts that you need to download and access. This page on the Lightbend development portal has all the relevant details on how to set up your local sbt to access the repository.

Docker

Pipelines builds Docker images that get pushed to a registry. The Pipelines operator fetches the images from that registry for deployment. This Docker registry can be configured at install time. This means that we need to do an upfront docker login (that we explain below) before accessing the registry.

The Pipelines sbt plugin will use Docker to build and push application images to a Docker registry:

  • The sbt-pipelines plugin pushes Pipelines application Docker images to a Docker registry.

  • The Pipelines operator pulls Docker images from the registry to deploy Pipeline applications. * For OpenShift, this docker registry is configured on the server. It is always the configured registry that you should log into. Usually, it will be the built-in registry, but a different registry could be used, which will be configured during cluster installation. If you are not sure about how to access it, please contact your system administrator.

In order to ensure that the registry is accessible, a docker login needs to be done first:

$ sudo docker login -p  -u unused docker-registry-default.$CLUSTER_DOMAIN

where $CLUSTER_DOMAIN refers to the main DNS name of the cluster.

The password token can be obtained from the cluster console UI as follows:

  1. Login to the cluster

  2. Go to the menu option at the top right of the console (marked by ?)

  3. Select Command Line Tools option and follow the instructions there in to get the token

  4. Need to get the token afresh every time it expires

Alternatively, if you are logged in to the cluster, you can copy the oc login command from the menu option Copy Login Command which is the top item in the rightmost menu under your login name.

3.1.3. Install the Pipelines CLI

The Pipelines oc plugin is used to deploy and configure Pipelines applications. It is actually a plugin for the Kubernetes kubectl CLI, but it also works in oc. A kubectl plugin is a standalone program whose name starts with kubectl-. It must be located in a directory in your PATH.

Download the Pipelines kubectl plugin from one of the following locations and move it to a directory in your PATH:

Even if you plan to use the OpenShift oc CLI, you’ll need to use kubectl for the next few steps.

To verify that the plugin has been successfully installed, list all plugins using the following command.

kubectl plugin list

The result should look something like this:

  The following kubectl-compatible plugins are available:

  /home/user/go/bin/kubectl-pipelines

The Pipelines kubectl plugin is now correctly installed.

Now let’s install the plugin into the OpenShift oc CLI (if you plan to use it):

kubectl pipelines install-oc-plugin

See The install-oc-plugin command for more details.

Most of the CLI examples in this documentation use oc plugin pipelines …​ to run Pipelines commands. If you prefer to use kubectl, then use kubectl pipelines …​ (no plugin argument) instead.

3.1.4. The First Pipelines Application

Let’s walk through creating a first Pipelines application.

Create the sbt build File

Let’s see a basic sbt build project that shows the essentials for creating a Pipelines project. Two important points to note are:

  • The plugin PipelinesAkkaStreamsApplicationPlugin selects the streaming runtime as akka streams. Pipelines also support Spark as the runtime - we will discuss an example application with Spark runtime in the Build and Deploy a Pipelines Application guide.

  • mainBlueprint defines the blueprint of the application.

lazy val firstPipelinesProject = (project in file("."))
  .enablePlugins(PipelinesAkkaStreamsApplicationPlugin)
  .settings(
    libraryDependencies ++= Seq(..),
    name := "first-pipelines-project",
    mainBlueprint := Some("blueprint.conf"),
    ...
  )
Create the Domain Model

Any application starts with a domain model and in a Pipelines application, the domain model is designed schema-first. The user needs to define an Avro schema for every element of the domain. Let’s take a look at an example schema of an Employee data element :

{
    "namespace": "pipelines.examples",
    "type": "record",
    "name": "Employee",
    "fields":[
         {
            "name": "id",
            "type": "string"
         },
         {
             "name": "name",
             "type": "string"
         },
         {
             "name": "address",
             "type": "string"
         },
         {
              "name": "age",
              "type": "long"
         }
    ]
}

As part of the build process, the schema will be processed and appropriate Scala case classes will be generated in the same name and package as above, e.g. for the above schema, the build process will generate a case class named pipelines.examples.Employee.

Design the Streamlets

Streamlets form the basic building blocks of a Pipelines application and they need to be defined on domain element types. e.g. object EmployeeIngress extends HttpIngress[Employee] defines an ingress that receives Employee data on its http endpoint. [_httpingress] is one of the pre-built ingresses that comes as part of the Pipelines distribution.

While defining streamlets you need to expose the key as well as the schema corresponding to the data element. The keying function will be used for partitioning when the data is published to an outlet and the schema will be used to verify compatibility of data between inlets and outlets. The keying function and the avro schema are combined into a single abstraction called KeyedSchema. In Scala this can be defined as an implicit value while in Java it will have to be passed through the streamlet constructor.

implicit val employeeCodec: KeyedSchema[Employee] = AvroKeyedSchema[Employee](Employee.SCHEMA$)

To make it a more meaningful topology, let’s assume we have another Streamlet which does a validation over the Employees and splits into valid and invalid records. This streamlet is a Splitter that also comes out of the box with Pipelines.

object EmployeeValidation extends Splitter[Employee, InvalidEmployee, Employee] {
  override def createLogic = new SplitterLogic() {
    def flow = flowWithPipelinesContext()
      .map { employee 
        if (!employee.isValid()) Left(InvalidEmployee(employee, "Invalid employee"))
        else Right(employee)
      }
  }
}
Create the Blueprint

Streamlets are connected to each other via a blueprint. The blueprint is defined as a combination of streamlets and connections in a file usually called blueprint.conf.

blueprint {
  streamlets {
    employee-ingress = pipelines.examples.EmployeeIngress
    employee-validation = pipelines.examples.EmployeeValidation
  }
  connections {
    employee-ingress.out = employee-validation.in
    employee-validation.out-1 = ..
    employee-validation.out-2 = ..
  }
}

In the above blueprint, the ingress feeds into the validator. The validator has 2 outlets - one of them feeds valid Employees to some downstream inlet while the other publishes invalid Employee records to some other egress. The blueprint above is incomplete, but it gives an idea of how one can incrementally evolve the connection graph of streamlets declaratively. Also note that blueprints don’t need to be complete or closed to be deployed and new streamlets can be added over time.

An application can only be deployed if the blueprint is valid (though it may not be complete). The outlets and inlets must use compatible schemas and every inlet must be connected. You can verify if the blueprint is valid by using the verifyBlueprint command in sbt.

Publish the Application

This is the process that will package your streamlets and the blueprint into a Docker image, which is then pushed to a Docker registry. You will have to specify which Docker registry this is using the sbt 'pipelinesDockerRegistry' setting.

An example of how to set the docker registry:

pipelinesDockerRegistry := Some("docker-registry-default.$CLUSTER_DOMAIN")

(If you defined and used a shell variable for $CLUSTER_DOMAIN in the previous shell commands, here you’ll need to insert the actual value.)

Once the blueprint is verified, publish the application using sbt buildAndPublish. If everything goes ok, a "Success" prompt will be displayed.

Deploy the Application

Use the CLI plugin to deploy the application.

$ oc plugin pipelines deploy docker-registry-default.$CLUSTER_DOMAIN/pipelines/first-pipelines-project:292-c183d80

The actual image name to be deployed can be found in the output of the sbt buildAndPublish command.

The result of a successful deployment will look like this:

Deployment of application first-pipelines-project has started.
Monitoring the Application

Use the status command to check the status of a deployed application.

$ oc plugin pipelines status first-pipelines-project

Pipelines also offers a monitoring UI for monitoring the details of the applications and the cluster. More details on this can be found in the Build and Deploy a Pipelines Application guide.

This document is an overview of the steps required to build and deploy a simple pipelines application. There are quite a bit of details which we have skipped through here. Please have a look at Build and Deploy a Pipelines Application for more detailed examples, multi-project support, multi-runtime support and more.

4. Build and Deploy a Pipelines Application

This guide describes how to build, configure and run a simple example application on Pipelines.

By following this guide, you will learn how to use the Pipelines CLI and sbt plugin to build and deploy a distributed streaming application composed of streamlets.

This guide assumes that you have read An Introduction to Pipelines. It will frequently refer to terms and concepts that are explained in that document.

Also this document assumes that you have gone through Pipelines Quick Start, e.g. that you have the development environment already installed and configured.

4.1. Anatomy of a Pipelines Application

Applications are the logical units of deployment. Within a cluster, we may have one or several applications. Streamlets that belong to an application have a common lifecycle.

You can check if any application already exist, on the cluster that you are connected to, with the following command:

$ oc plugin pipelines list

The first step in creating an application is to create an sbt project. There are a couple of example projects that you can use as a starting point. The examples are setup to use the Pipelines sbt plugin. The examples are available at the pipelines-examples github repo.

Clone the pipelines-examples repo with the command shown below:

$ git clone git@github.com:lightbend/pipelines-examples.git

For the sake of this guide, we are going to work with the 'call-record-aggregator' example, which can be found in the call-record-aggregator directory on github:

$ cd pipelines-examples/call-record-aggregator

4.1.1. Multi Project Support

This is a project that has multiple sub-projects and a mix of Akka Stream and Spark streamlets. Hence this will also be a good example to work through in order to get an idea of how multi-projects are supported in a Pipelines application.

When working on a multi-project, the project that has the blueprint is the entrypoint of the application and will have to be built with PipelinesApplicationPlugin supplied by Pipelines. Here’s how we define it in build.sbt for call-record-aggregator:

lazy val callRecordPipeline = (project in file("./call-record-pipeline"))
  .enablePlugins(PipelinesApplicationPlugin)
  .settings(commonSettings)
  .settings(
    name := "call-record-pipeline",
    mainBlueprint := Some("blueprint.conf"),
  )
  .dependsOn(akkaCdrIngestor, akkaJavaAggregationOutput, sparkAggregation)

A few things to note from the above definition:

  • The name of the application is defined by the name of the sbt build for this project - here it is called call-record-pipeline which is the name with which this application will be deployed.

  • mainBlueprint defines the blueprint file of the application

  • Note we use PipelinesApplicationPlugin in the definition above which means this will be the entry point of the application

The other sub-projects of this application are:

  • datamodel which provides the schema definitions of the application

  • akkaCdrIngestor which has a bunch of Akka Streams streamlets written in Scala

  • akkaJavaAggregationOutput which has a few egresses implemented in Java using Akka Streams.

  • sparkAggregation which has a few Spark streamlets that deos the aggregation sing Spark structured streaming

Note

This sample project demonstrates how to structure your application when you have Akka Streams and Spark based streamlets implemented with a mix of Scala and Java.

4.1.2. Streamlet Runtime Selection

Each project must choose the runtimes it requires, either Akka Streams or Spark. A runtime can be selected by enabling the relevant SBT plugin in the build.sbt file.

For Akka-Streams (either for Scala or Java ):

lazy val akkaCdrIngestor= (project in file("./akka-cdr-ingestor"))
  .enablePlugins(PipelinesAkkaStreamsLibraryPlugin)
  ...

For Spark (currently only in Scala):

lazy val sparkAggregation = (project in file("./spark-aggregation"))
  .enablePlugins(PipelinesSparkLibraryPlugin)
  ...

Besides the above 2 plugins, Pipelines offers 2 more plugins:

  • PipelinesLibraryPlugin used to build libraries which are generic ones and don’t contain any streamlets e.g. datamodel in call-record-aggregator project

  • PipelinesApplicationPlugin used with the sub-project that acts as the main entry point to the application, the one with the blueprint.

4.2. Creating and Publishing the Application

Our application consists of a set of Streamlets, which implement the streaming business logic. The connections between the streamlets are recorded in the blueprint and the entire topology forms the application graph.

4.2.1. Creating Streamlets

Since we created the application from an example sbt project, you should already have a bunch of streamlets and a simple domain model ready to go. The domain model represents incoming call record data which are transformed for rendering simple analytics.

The domain model is specified in the form of an Avro schema that defines the contracts of the model elements. All the schema files can be found under src/main/avro folder of the datamodel sub-project. Here’s a sample schema from the example that you just created that describes the model for an abstraction named CallRecord:

{
    "namespace": "pipelines.examples.carly.data",
    "type": "record",
    "name": "CallRecord",
    "fields":[
         {
            "name": "user",
            "type": "string"
         },
         {
             "name": "other",
             "type": "string"
         },
         {
             "name": "direction",
             "type": "string"
         },
         {
              "name": "duration",
              "type": "long"
         },
         {
            "name": "timestamp",
            "type": "long"
         }
    ]
}

As part of the build process, the schema will be processed and appropriate Scala case classes will be generated in the same name and package as above, e.g. for the above schema, the build process will generate a case class named pipelines.examples.carly.data.CallRecord.

And here are the various streamlets that form the application topology:

  • pipelines.examples.carly.aggregator.CallRecordGeneratorIngress, a Spark streamlet that generates data in the form of a Dataset[CallRecord] through execution of a Spark StreamingQuery

  • pipelines.examples.carly.ingestor.CallRecordMerge, an Akka Stream streamlet that merges call records from multiple streamlets

  • pipelines.examples.carly.ingestor.CallRecordIngress, an Akka Stream streamlet that allows you to send JSON data to an HTTP endpoint hosted by the Pipelines operator using HTTP POST

  • pipelines.examples.carly.ingestor.CallRecordValidation, an Akka Stream streamlet that splits call records into streams of valid and invalid ones

  • pipelines.examples.carly.aggregator.CallStatsAggregator, a Spark streamlet that aggregates call record data based on specific logic within a window

  • pipelines.examples.carly.output.AggregateRecordEgress, an Akka Stream Java streamlet that writes aggregated call record statistics to the console

  • pipelines.examples.carly.output.InvalidRecordEgress, an Akka Stream Java streamlet that writes invalid call records to the console

The code for each of the above streamlets can be found in the call-record-aggregator project under the various sub-projects as discussed earlier. To learn more about developing streamlets, please refer to the Development Guide

4.2.2. Creating the application blueprint

The application blueprint is located in src/main/blueprint/blueprint.conf under sub-project call-record-pipeline. It connects the streamlets that have been shown in the previous section:

blueprint {
  streamlets {
    cdr-generator1 = pipelines.examples.carly.aggregator.CallRecordGeneratorIngress
    cdr-generator2 = pipelines.examples.carly.aggregator.CallRecordGeneratorIngress
    merge = pipelines.examples.carly.ingestor.CallRecordMerge
    cdr-ingress = pipelines.examples.carly.ingestor.CallRecordIngress
    cdr-validator = pipelines.examples.carly.ingestor.CallRecordValidation
    cdr-aggregator = pipelines.examples.carly.aggregator.CallStatsAggregator
    console-egress = pipelines.examples.carly.output.AggregateRecordEgress
    error-egress = pipelines.examples.carly.output.InvalidRecordEgress

  }
  connections {
    cdr-generator1.out = [merge.in-0]
    cdr-generator2.out = [merge.in-1]
    cdr-ingress.out = [merge.in-2]
    merge.out = [cdr-validator.in]
    cdr-validator.valid = [cdr-aggregator.in]
    cdr-aggregator.out = [console-egress.in]
    cdr-validator.invalid = [error-egress.in]
  }
}

The streamlets section contains named instances of streamlets that will be used. The connections section describes how these named streamlet instances are connected, from outlet to inlet(s). An inlet can only connect to one outlet. Many inlets can connect to the same outlet.

An application can only be deployed if the blueprint is valid. The outlets and inlets must use compatible schemas and every inlet must be connected. You can verify if the blueprint is valid by using the verifyBlueprint command in sbt.

4.2.3. Publishing the Application

Once we are done developing our streamlets and connecting the streamlets in a blueprint, we need to package and publish the application to the target cluster that we connected to earlier on.

The buildAndPublish command will create a docker container with our artifacts and publish it to the Pipelines docker registry.

cd into the call-record-aggregator directory and execute the following:

$ sbt buildAndPublish

If everything goes well, you will see a [success] message at the end.

4.2.4. Deploying the application

Deployment of the application is done using the Pipelines oc plugin, the plugin adds a sub-command to oc called pipelines. The complete list of commands available from the Pipelines plugin can be viewed as follows:

$ oc plugin pipelines

This command line tool can be used to deploy and operate Pipelines applications.

Usage:
  pipelines [command]

Available Commands:
  configure     Configures a deployed Pipelines Application.
  deploy        Deploys a Pipelines Application to the cluster.
  help          Help about any command
  list          Lists deployed Pipelines Application in the current cluster.
  scale         Scales a streamlet of a deployed Pipelines Application to the specified number of replicas.
  undeploy      Undeploy one or more Pipelines applications.
  version       Print the plugin version.

Flags:
  -h, --help   help for pipelines

Use "pipelines [command] --help" for more information about a command.

To deploy the application, the full docker image path of the application is needed, it’s printed as the final step in the buildAndPublish task.

In this example we will use the following docker image path for our cluster.

registry.streampipe.lightbend.com/pipelines/call-record-pipeline:292-c183d80

The docker image is needed because it contains the application descriptor, the application descriptor will be the basis for the Kubernetes Custom Resource that the CLI will create on the cluster.

Enter the following command to deploy the application.

$ oc plugin pipelines deploy registry-default.streampipe.lightbend.com/pipelines/call-record-pipeline:292-c183d80

The result of a successful deployment will look like this:

Deployment of application call-record-pipeline has started.

4.2.5. Providing configuration parameters when deploying a Pipelines application

When deploying an application there may be streamlets that have certain configuration requirements that need to be fulfilled. Examples of this can be credentials, hostnames or ports that must be configured per deployment.

These requirements can be provided to the deploy command as a set of key/value pairs appended after the image path.

$ oc plugin pipelines deploy registry-default.streampipe.lightbend.com/pipelines/call-record-pipeline:292-c183d80 cdr-aggregator.group-by-window="7 minute" cdr-aggregator.watermark="1 minute"

The name of the streamlet is used as prefix to the configuration parameter to distinguish between configuration parameters for different streamlets that have the same name.

If you are unsure of what parameters is required to deploy the application, you can execute the deploy command with just the application name and a list of missing configuration parameters will be printed.

$ oc plugin pipelines deploy registry-default.streampipe.lightbend.com/pipelines/call-record-pipeline:292-c183d80

Cannot find the following configuration key(s) defined in the command line arguments.
- group-by-window
- watermark

4.2.6. Reconfigure a deployed Pipelines Application

Sometimes there is a need for configure an already deployed application, this can be done using the configure command.

The configure command takes a deployed Pipelines application name and one or more configuration parameters.

$ oc plugin pipelines list

NAME                 NAMESPACE            VERSION     CREATION-TIME
call-record-pipeline call-record-pipeline 292-c183d80 2019-03-05 12:50:33 +0100 CET

Re-configure the application by running the following command:

$ oc plugin pipelines configure call-record-pipeline cdr-aggregator.group-by-window="5 minute" cdr-aggregator.watermark="2 minute"

2019/03/05 12:59:20 Configuration of application call-record-pipeline have been updated.

4.2.7. Scaling a streamlet in a Pipelines Application

Each type of streamlet (Akka or Spark) have a default set of replicas, this can be adjusted after deployment using the scale command.

The scale command takes 3 parameters, name of the application, name of the streamlet to scale and how many replicas that the streamlet should be scaled to.

$ oc plugin pipelines call-record-pipeline cdr-ingress 2

Streamlet cdr-ingress in application call-record-pipeline is being scaled to 2 replicas.

The number of replicas for a specific streamlet will remain in effect until the streamlet is removed by a new deployment of the application or if the application is undeployed.

4.2.8. List deployed Pipelines applications

To list all deployed Pipelines applications you can use the following command

$ oc plugin pipelines list

NAME                 NAMESPACE            CREATION-TIME
call-record-pipeline call-record-pipeline 2019-03-04 23:44:36 +0100 CET

4.2.9. Undeploying a Pipelines Application

A deployed application can be shutdown and removed using the undeploy command.

$ oc plugin pipelines undeploy call-record-pipeline

Undeployment of application `call-record-pipeline` started.

4.3. Sending data to your application

If all previous steps succeeded, at this point you should have a deployed and running application.

The application call-record-pipeline generates its own data to feed into the pipeline. But for most applications you need to feed data through an ingress, which is mostly an HttpIngress. In that case we can use any HTTP client to send test data to it. Here’s an example using curl:

$ curl -i -X POST [application-name].[cluster-name]/[ingress-streamlet-reference] -H "Content-Type: application/json" --data [data file name]

That data will now be processed and each data element will flow through the streamlets that you deployed.

4.4. Monitoring

Pipelines is fully integrated into the Lightbend Console. Besides the standard cluster-level views the Lightbend console now also has a special Pipelines section that allows you to inspect and monitor running Pipelines applications. More details can be found in the Monitoring Pipelines guide.

5. Monitoring Pipelines

5.1. Introduction to Pipelines Monitoring

This web-application is used to validate and monitor a Lightbend Pipelines application. Here you can get visual verification that your application looks as you expect, with the correct streamlets and the proper connections each with the desired schema. In addition you can watch your application run, checking on key performance indicators, such as latency and throughput, as well as the health of each streamlet and the monitors computed on them.

First a tour of the user interface components:

5.1.1. Pipeline Applications Page

image

The entry point for Lightbend Pipelines is the applications page. Each running pipelines application is represented with a 'tile'. Each tile contains basic information about the application (name, resource counts, how long it has been deployed, etc) along with a (live) thumbnail view of the blueprint and the health history over the current time period. Click on a tile to go to that application’s monitoring page.

To the left of the application tiles are panels containing:

  • a link to the cluster view

  • cluster resources being used for pipelines applications

  • a summary of applications organized by current health

5.1.2. Monitoring Page

image

You can always reach the embedded version of this document by clicking on any of the '?' icons found in the upper right of any panel in this application. Hide the help overlay by clicking 'CLOSE' in the upper right.

5.1.3. Fundamental Concepts: Current Period, Time and Selection

In this monitoring application all views (panels) are tightly coupled. Making a selection in one frequently affects most others. There are two main selection types used here (1) the application or (2) a streamlet. By default the application is selected - and when the page first loads you’ll see the center panel full of rows of healthbars, one for each streamlet in the application. If you click a streamlet in the blueprint diagram or click a healthbar title the application will then focus all panels on that streamlet - as it becomes the current selection.

Similarly there is a current time. The current-time is whatever time you’ve hovered over in a graph, healthbar or timeline - its a very transient concept and is constantly changing based on the position of the mouse in these panels. The current-time is visualized by a vertical crosshair in each time-based panel (graphs, healthbars) and is reflected in the time callout in both the main timeline as well as the blueprint panel. As you hover within a graph or healthbar this current time changes to track the mouse. If you mouse out of these panels then the current time snaps to 'NOW' (the most recent time sample) - you’ll see this in the time callout in the blueprint graph with a '( LIVE )' label.

image The duration of interest is governed by the time period as selected in the upper right hand corner of the screen. The options available include: 1 hour, 4 hours, 1 day and 1 week. Metrics are collected from streamlets at one rate (currently 10 second intervals) but healthbars and graphs are based on samplings of these underlying metrics. The duration of interest (i.e. the time period) determines the metric sampling rate for all collected metrics. This sampling rate governs the temporal resolution of displays (healthbars & graphs). Ten second sampling is used for one hour duration (360 samples / hour), 40 seconds for a four hour duration, etc.

image The health of a time-sample is determined as the 'worse' health over all collected metrics within its interval. However metrics shown in graphs are instantaneous samples and only reflect the state of the system at the time of collection.

5.1.4. How to use help

This help panel can be displayed by clicking any '?' icon from any panel. In addition to this in-app help you can find online help here.

5.2. Blueprint Graph

The blueprint graph shows the current state of the application as a directed graph of streamlets with flow from left to right. Each streamlet is depicted with a 'service type' (akka streams or spark streaming) icon, a name, instance count and ports.

image

Streamlets are colored by their current health - as determined by the monitors instrumented for the streamlet. At any point in time a streamlet’s health is equal to the least healthy monitor it possesses. Similarly the health of the application as a whole is equal to its least healthy streamlet. Health is visualized as:

  • Healthy (green)

  • Warning (orange)

  • Critical (red)

  • Unknown (grey)

image

Schema’s are defined for each inlet and outlet port and both ports and wires are colored by schema type, allowing you to get a quick sense of which ports are compatible. These schema colors are also used in the port details panel (see below).

Streamlets can consist of one or more pods. The scale of each streamlet is visualized below the streamlet icon. This streamlet consists of three instances (pods).

Current Selection Selecting a streamlet, by clicking on it, focuses the rest of the page on this selection. Clicking the background of the blueprint graph selects the application as a whole. The blueprint graph resizes with the window size, so the selection highlight effect depends on the resolution of the display and the complexity of the blueprint. This ranges from a very simple callout (for narrow window sizes), to a full resolution icon with a solid white outline for larger window sizes.

image

5.3. Application Details

image

The left most panel contains application status including the current (i.e. live) health rollups by streamlet as well as a summary of health transitions per streamlet over the current time period.

The amount of red (critical) and orange (warning) indicates the relative amount of time over the duration that the streamlet spent in an unhealthy condition. Streamlets in this list are ordered by their amount (weighted by severity) of health issues.

5.4. Health And Timeline

image

The center panel visualizes the health of the current selection (either application or streamlet) over the time period. The health model used in Lightbend Console is based on a hierarchical rollup of health from monitors to streamlets (or workloads) to applications and finally to the cluster as a whole.

5.5. Selection Health and Ports

image Use the tabs in this panel to show healthbars for the current selection (either application or streamlet) or the inlets and outlets (for streamlet selections only) schema, throughput and consumer lag metrics (at the current time) and other meta data.

5.5.1. Health

This panel shows healthbars for the current selection. When the application is selected (by clicking the background of the blueprint diagram) then this panel contains streamlet health, one streamlet per row. When a streamlet is selected then the panel contains monitor health (one row per monitor).

image

  • Healthy (green)

  • Warning (orange)

  • Critical (red)

  • Unknown (grey)

The healthbars can be ordered by

  • name (streamlet or monitor)

  • first-unhealthy (default)

  • most-unhealthy

First unhealthy simply orders by the first sample in each health metric to turn warning or critical. Most-unhealthy ordering is based on the summation of the number of samples over the duration that are either warning or critical - where critical has twice the weight as warning.

5.5.2. Inlet and Outlet Details

Click the 'Ports' tab and this panel changes to show details on the inlets and outlets of the selected streamlet. In the upper portion of the panel a graphic shows the streamlet along with its named ports colored by their schema type.

image

At the bottom of the panel is an expandable list of each port’s details including the port name, schema type, current values of key performance metrics and upstream or downstream connections. Clicking on a connection in the list will select that up/downstream streamlet.

Consumer Lag metrics are defined on each inlet and Throughput metrics are defined on both inlet and outlets (please see: this). The values shown for these metrics are valid for the current-time. If you mouse over the main timeline (or a graph) and thus change the current-time you’ll see these values change as well. In this manner you can correlate exact values for these key performance metrics with other metrics from this streamlet.

These key metrics are based upon reading from or writing data to Kafka, the message bus behind the scenes. Inlets connect to Kafka via a consumer-group while outlets are written to topics. Both of these are shown in the details.

5.6. Metric Graphs

Graphs of metrics for the current selection are shown in the right-most panel. There are two basic graph representations: paired-stack graphs and area graphs. Throughput is depicted as an upper/lower stacked graph where all the incoming data are shown on top and the out-going throughput data on bottom. Each upper and lower graph could contain multiple sources and they are stacked upon each other.

Area graphs show one or more curves overlaid upon each other.

Currently applications only produce one family of high level metrics: Throughput. (please see: Key Performance Metrics)

image

Shown in a paired-stack graph.

Whenever a streamlet is selected the right-most panel displays metric graphs relating to that streamlet. These metrics are those backing-up key streamlet monitors as well as other important metrics (such as consumer lag and streamlet throughput).

image

Each streamlet monitor is based upon a metric exported by the streamlet. When combined with an expression (based on the monitor type) and other parameters a monitor is defined. These monitor-based metrics are graphed in this panel.

In addition there are other key metrics for each streamlet type. Metrics in this category include consumer-lag which is defined for each streamlet inlet. Streamlet throughput is defined on streamlet outlets and is also shown in this panel.

When a streamlet outputs several metrics with the same name but with different labels then a single graph is created for the collection of metrics. As you mouse over the graph you’ll see, along with the crosshairs, each curve highlight. Curves occluded by others cannot be highlighted, so hover over an indicator chip in the upper right. This will highlight the curve (increasing its opacity) as well as show the labels unique to this curve below the graph.

image

image In this example there are two labels with values unique to curve: container & pod.

Most graphs display a description tooltip on hover.

5.6.1. Key Performance Metrics

Consumer Lag is a key metric available for streamlet inlets. Each inlet is part of a Kafka Consumer Group across all the instances of that streamlet. If there is one instance of the streamlet, it is a group of one. The group members divide up the kafka partitions of the topic they are reading. Every partition has a lag or latency measurement, which is how far back the consumer group reader is from the most recent message. This metric is captured in terms of the number of records each reader (instance) is behind the most recent record produced upstream - and is presented as the maximum across all readers in the group.

Throughput, another key metric, is available on both inlets and outlets. On streamlet inlets it represents the rate of data records read by the streamlet (in records / second). On outlets it is the rate of data records produced by the streamlet. It is useful to note that there might not be a one-to-one relationship between inlet records and outlet records and is dependent on the functionality of the streamlet itself.

For application throughput we compare the application’s incoming data (i.e. the data produced on all outlets of all ingress’s) with the outgoing data (i.e. the data consumed on all inlets of all egress streamlets in the application). Incoming data is shown in the upper stack, outgoing on the bottom stack as in the following image (note that the outgoing only has one curve (the blue one) as the other egress streamlet inlet rate is zero in this particular case).

image

5.6.2. Crosshair Behavior

As you move the mouse over a graph you’ll see the crosshair track the mouse. A timeline callout appears below the graph and the numerical value, based on the vertical position, is shown at the left axis. In addition you’ll see a small vertical line drawn on all graphs, healthbars and timelines on the page - allowing you to correlate behavior across metrics, monitors and streamlets.

5.7. Controls

image The top level controls panel is the jumping off point for other views on the current selection: metric dashboards (via grafana) or infrastructure monitoring

There is a Grafana metric dashboard for each streamlet as well as the application as a whole. In these dashboards you can see a variety of metrics for each service type of the current selection (Kafka, Akka Streams, Spark, JVM and Kubernetes) separated into groups. Here you also have finer-grain control over time periods and other graphing attributes.

image

5.7.1. Relationship with Lightbend Console Monitoring

There are two main views for monitoring Lightbend-enabled applications: the infrastructure and pipeline views. In the pipeline view you see a blueprint and streamlet-oriented frame of reference: all information is structured for a higher level perspective on a running pipeline-enabled application: where the main actors are the blueprint, its constituent streamlets, their monitors and metrics. Health at the application level is based on rolling up health at the streamlet level which is based on rolling up health at the monitor level.

image

image Another viewpoint is infrastructure-oriented: a cluster consists of nodes running pods (you can reach this view by clicking on the 'workload' icon within the controls panel: see right). Those pods are organized by the workloads in which they’re running. Workloads have monitors running on pods and the health of the workload is based on rolling up the health of its monitors. This infrastructure-based view does not have a high level concept of 'application' but it does allow you to define or customize monitors for workloads. While this customization is not available directly in the pipeline-based view, those monitors do carry over to the pipeline perspective.

image This is the corresponding infrastructure view for this pipeline application. Note that there are 11 workloads here (three with two pods each all others with one). But the pipeline view contains eight streamlets. This discrepancy is due to how spark jobs are handled: they have an executor and driver - resulting in an additional workload for each spark streamlet - or three additional workloads for this application (since there are three spark-based streamlets).

6. Developing Pipelines Streamlets

See the Pipelines concepts document if you are not yet familiar with it. The actual stream processing logic is implemented in streamlets, which are the basic building blocks of a streaming application. Akka streamlets can be developed in Scala or Java, Spark streamlets can only be developed in Scala.

6.1. Schema First Approach

Pipelines has adopted the schema first approach for building streaming data pipelines. The user needs to supply the avro schema as the starting point of the domain model for which streaming data pipelines need to be built.

Pipelines plugs in the appropriate plug-ins into the build system of the application to generate Java / Scala classes corresponding to the avro schema.

This approach has the advantage that the user only needs to take care of the core domain model and Pipelines does the heavy lifting of generating the classes and integrating them with the main application.

However since Pipelines takes the schema as the input, it needs to ensure that the corresponding data inlets and outlets honor the schema when allowing data to flow through them. This needs to be done to ensure data consistency across all the inlets and outlets. We discuss this in the next section.

6.2. Schema Code Generation

The user can choose what programming language to generate their schemas into by defining Settings in their SBT project. When no Settings are defined then by default the sbt-pipelines plugin will look for Avro schemas in src/main/avro and will generate Scala classes for them.

If you wish to override the location of your Avro schemas in your project, or if you wish to generate Java classes instead, you can do so by defining a Setting in your SBT project.

  • schemaCodeGenerator (Default: SchemaCodeGenerator.Scala) - The programming language to generate Avro schemas classes into.

  • schemaPaths (Default: Map(SchemaFormat.Avro → "src/main/avro")) - The relative path to the Avro schemas.

  • schemaFormat (Default: Seq(SchemaFormat.Avro)) - The schema formats to generate. Avro is the only format currently supported.

For example, to generate Java classes from Avro schemas in a non-default location.

lazy val datamodel = (project in file("./my-pipelines-library"))
  .enablePlugins(PipelinesLibraryPlugin)
  .settings(
    schemaCodeGenerator := SchemaCodeGenerator.Java,
    schemaPaths := Map(SchemaFormat.Avro -> "src/main/resources/avroschemas")
  )

6.3. Schema Aware Inlets and Outlets

In Pipelines, all streamlet inlets and outlets are schema-aware. This means two things:

  • Every inlet and outlet must allow only that data to flow through them which honor the schema for which they are programmed to.

  • Inlets and outlets can be connected together only if the schemas of an outlet and the corresponding inlet are compatible.

Let’s take a look at how Pipelines ensures both of the above guarantees.

6.3.1. Data Safety Guarantee through Types

As mentioned above, any pipeline definition starts with the user providing the avro schema definition for the domain object. Let’s assume the user provides the following avro schema as an input. It’s a definition of a call record as used by a telephone company.

{
    "namespace": "pipelines.examples",
    "type": "record",
    "name": "CallRecord",
    "fields":[
         {
            "name": "user",
            "type": "string"
         },
         {
             "name": "other",
             "type": "string"
         },
         {
             "name": "direction",
             "type": "string"
         },
         {
              "name": "duration",
              "type": "long"
         },
         {
            "name": "timestamp",
            "type": "long"
         }
    ]
}

In case of a Scala application, Pipelines will generate a Scala case class pipelines.examples.CallRecord corresponding to the above schema. This class will now be made available for use within the application.

When we define a streamlet where objects of type CallRecord will be passing through its inlet, we define the inlet as val in = AvroInlet[CallRecord]("call-record-in"). Pipelines ensures the following compile time guarantees through this type definition:

  • The inlet only allows a codec of type avro

  • Pipelines only allows the inlet to be used with an object of type CallRecord. For example when implementing createLogic if you do readStream(in) where in is an inlet parameterized by a type other than CallRecord, the compiler will complain

6.3.2. Outlet and the Partitioning Function

Similar to inlets, the user can define an outlet as val out = AvroOutlet[CallRecord]("call-record-out", \_.name). Besides the name of the outlet, it must also have a partitioning function that defines the logic using which data will be partitioned when writing to Kafka topics. Data partitioning in Kafka ensures scalability.

And all logic regarding data safety that we discussed for inlets in the last section applies for outlets as well.

6.4. Schema Aware StreamletLogic

When we implement StreamletLogic for a streamlet, we use the inlets and outlets which, as we discussed above, are schema aware. We also use the class generated from the schema, which too, is schema aware. Here’s an example:


class CallStatsAggregator extends SparkStreamlet {

  val in = AvroInlet[CallRecord]("in")
  val out = AvroOutlet[AggregatedCallStats]("out", _.startTime.toString)
  val shape = StreamletShape(in, out)

  override def createLogic = new SparkStreamletLogic {

    override def buildStreamingQueries = {
      val dataset = readStream(in)
      val outStream = process(dataset)
      val query = writeStream(outStream, out, OutputMode.Update)
      Seq(query)
    }

    private def process(inDataset: Dataset[CallRecord]): Dataset[AggregatedCallStats] = {
      val query =
        inDataset
          .withColumn("ts", $"timestamp".cast(TimestampType))
          .withWatermark("ts", "1 minute")
          .groupBy(window($"ts", "1 minute"))
          .agg(avg($"duration") as "avgCallDuration", sum($"duration") as "totalCallDuration")
          .withColumn("windowDuration", $"window.end".cast(LongType) - $"window.start".cast(LongType))

      query
        .select($"window.start".cast(LongType) as "startTime", $"windowDuration", $"avgCallDuration", $"totalCallDuration")
        .as[AggregatedCallStats]
    }
  }
}

In the above example, we have 1 inlet that allows data of type CallRecord and 1 outlet that allows data of type AggregatedCallStats. Here the user had supplied the schema for both of the above types from which Scala classes have been generated by Pipelines. And the entire StreamletLogic code is based on these two classes - we read CallRecord from the inlet, do processing and generate AggregatedCallStats to be sent to the outlet.

Hence the entire streamlet is guaranteed only to process data that conforms to the schema which the user had supplied.

6.5. Composing Streamlets using Blueprints

In Pipelines, a streaming application is composed of Streamlets. Each Streamlet defines one or more inlets and outlets. To create a functional pipeline, we must define how the Streamlets connect together by declaring which outlet from one Streamlet should connect with an inlet of another.

A blueprint specifies which streamlets should be used in a pipeline and how they should be connected together. It must be defined in the file src/main/blueprint/blueprint.conf within the file structure of the Pipelines application sbt project.

An example of a blueprint is shown below:

blueprint {
  streamlets {
    sensor-data = pipelines.examples.sensordata.SensorDataIngress
    metrics = pipelines.examples.sensordata.SensorDataToMetrics
    validation = pipelines.examples.sensordata.MetricsValidation
    reporter = pipelines.examples.sensordata.MetricsEgress
  }

  connections {
    sensor-data.out = [metrics.in]
    metrics.out = [validation.in]
    validation.out-0 = [reporter.in]
  }
}
Note

Blueprint files are specified in the HOCON format, a superset of JSON.

The above example shows four streamlets, "sensor-data", "metrics", "validation" and "reporter". The streamlets section declares the streamlets used in that blueprint and gives them a short name that we use to refer to them in the next section. The connections section declares how the inlets and outlets of the different participating streamlets are connected.

A blueprint file always consists of one blueprint section. The streamlets section defines which streamlets must be used in the blueprint and assigns a reference to every streamlet. The streamlet references are later on used for connecting streamlets together. Lets zoom in on the streamlets section in the example below:

  streamlets {
    sensor-data = pipelines.examples.sensordata.SensorDataIngress
    metrics = pipelines.examples.sensordata.SensorDataToMetrics
    validation = pipelines.examples.sensordata.MetricsValidation
    reporter = pipelines.examples.sensordata.MetricsEgress
  }

The above example shows four streamlet types defined in the pipelines.examples.sensordata package. Streamlet types (classes or objects) are specified by their fully qualified names.

Every assignment in the streamlets section assigns a streamlet reference to an instance of a type of streamlet. Once deployed, Pipelines will run at least one streamlet–depending on the requested number of replicas–for each streamlet reference that has been defined in the blueprint.

The Streamlets can be defined in Java or Scala, as Java or Scala classes with default constructors or as Scala objects. These classes must be available on the classpath, which can be defined directly in the pipelines application sbt project or in any dependency that the project has specified, as you would expect of any sbt project.

Note

You can define more than one streamlet reference to the same Scala object. In that case, as many streamlets as assigned streamlet references will be run once everything is deployed (assuming a scaling factor of one). You should view a Streamlet, defined as a Scala object, as a template that can be used many times to instantiate streamlets, not as a singleton object that will only run once as a streamlet.

The streamlet references assigned in the streamlets section can be used in the connections section to connect streamlets together. Lets zoom in on the streamlets section in the example below:

  connections {
    sensor-data.out = [metrics.in]
    metrics.out = [validation.in]
    validation.out-0 = [reporter.in]
  }

Every expression in a connections section defines how an outlet connects to one or more inlets. Every assignment follows the following pattern:

  <streamlet-reference-a>.<outlet> = [<streamlet-reference-b>.<inlet>, <streamlet-reference-c>.<inlet>, ...]

Streamlet outlets and inlets are always prefixed by a streamlet-reference, followed by a dot ('.'). As you can see from the above pattern, it is possible to connect many inlets to the same outlet.

Every streamlet type has a shape, defining how many inlets and outlets it has. All streamlets implement a shape() method which returns the shape of the streamlet. In this case, the streamlet referenced by sensor-data has a single outlet named "out". Similarly, the streamlet referenced by "metrics" has one inlet named "in" and one outlet named "out".

As discussed in the user guide, inlets and outlets of streamlets are explicitly typed, e.g. they only handle data that conform to specific Avro schemas. Inlets can only be connected to outlets if their schemas are compatible. You can verify if the blueprint connects all the streamlets correctly by using:

  sbt verifyBlueprint

The blueprint is automatically verified when sbt buildAndPublish is used.

6.6. Message Delivery Semantics

In the subsequent sections, we’ll sometimes mention different models for message delivery semantics provided by Spark-based and Akka-based streamlets.

Pipelines follows the 'let it crash' principle and can recover from most failure scenarios, except those deemed catastrophic, where the data used for recovery (snapshots) may have been lost. This approach also follows the general policy of Kubernetes, where processes are ephemeral and can be restarted, replaced, or scaled up/down at any time.

With message delivery semantics, we refer to the expected message delivery guaranties in the case of failure recovery. In a distributed application such as Pipelines, failure may happen at several different execution levels: from a failed task in an individual executor, to a pod that goes down, to a complete application crash.

After a failure recovery, we recognize these different message delivery guarantees:

At most once

Data may have been processed but will never be processed twice. In this case, data may be lost but processing will never result in duplicate records.

At-least-once

Data that has been processed may be replayed and processed again. In this case, each data record is guaranteed to be processed and may result in duplicate records.

Exactly once

Data is processed once and only once. All data is guaranteed to be processed and no duplicate records are generated. This is the most desirable guarantee for many enterprise applications, but it’s considered impossible to achieve in a distributed environment.

Effectively Exactly Once

is a variant of exactly once delivery semantics that tolerates duplicates during data processing and requires the producer side of the process to be idempotent. That is, producing the same record more than once is the same as producing it only once. In practical terms, this translates to writing the data to a system that can preserve the uniqueness of keys or use a deduplication process to prevent duplicate records from being produced to an external system.

7. Developing with Akka Streamlets

7.1. Akka Streamlet Fundamentals

An Akka-based streamlet has a couple of responsibilities:

  • It needs to capture your stream processing logic.

  • It needs to publish metadata which will be used by sbt-pipelines plugin to verify that a blueprint is correct. This metadata consists of the shape of the streamlet (StreamletShape) defined by the inlets and outlets of the streamlet. Connecting streamlets as mentioned in the blueprint needs to match on inlets and outlets to make a valid pipelines topology. And Pipelines automatically extracts the shapes from the streamlets to verify this match.

  • For a pipelines runtime, it needs to provide metadata so that it can be configured, scaled, and run as part of an application.

  • The inlets and outlets of a Streamlet have two functions:

    • To specify to Pipelines that the streamlet needs certain data streams to exist at runtime, which it will read from and write to, and

    • To provide handles inside the stream processing logic to connect to the data streams that Pipelines provides. The StreamletLogic provides methods that take an inlet or outlet argument to read from or write to specific data streams that Pipelines has set up for you.

The next sections will go into the details of defining an Akka Streamlet:

  • defining inlets and outlets

  • creating a streamlet shape from the inlets and outlets

  • creating a streamlet logic that uses the inlets and outlets to read and write data

7.1.1. Inlets and Outlets

A streamlet can have one or more inlets and outlets. Pipelines offers classes for Inlets and Outlets based on the codec of the data they manipulate. Currently Pipelines supports Avro and hence the classes are named AvroInlet and AvroOutlet. Each outlet also allows the user to define a partitioning function that will be used to partition the data.

7.1.2. StreamletShape

The StreamletShape captures the connectivity and compatibility details of an Akka-based streamlet. It captures which—and how many—inlets and outlets the streamlet has.

The sbt-pipelines plugin extracts—amongst other things—the shape from the streamlet that it finds on the classpath. This metadata is used to verify that the blueprint connects the streamlets correctly.

Pipelines offers an API StreamletShape to define various shapes with inlets and outlets. Each inlet and outlet can be defined separately with specific names. The outlet also needs to have a partitioning function to ensure effective data partitioning logic.

When you build your streamlet, you need to define the shape. You will learn how to do this in Building an Akka Streamlet. The next section describes how the StreamletLogic captures stream processing logic.

7.1.3. StreamletLogic

The stream processing logic is captured in a StreamletLogic, which is an abstract class. It provides:

  • The ActorSystem to run akka components in, a Materializer for akka-stream graphs and an ExecutionContext for asynchronous calls (in Scala these are in implicit scope inside the StreamletLogic).

  • The typesafe Config loaded from the classpath through a config method, which can be used to read configuration settings from.

  • A subset of the typesafe Config that is reserved for the streamlet,through a streamletRefConfig method, which can be used for deployment-time, user configurable settings.

  • Means to access data streams to read from inlets and write to outlets, for when you want to build your own custom streamlets.

A StreamletLogic will often setup an akka-stream graph and run it. The RunnableGraphStreamletLogic, which extends StreamletLogic, makes this very easy, you only need to provide the RunnableGraph that you want to run.

You can access the items described above, like actor system and config, directly through methods on the StreamletLogic, or through implicit scope.

The StreamletLogic is only constructed once the streamlet is run, so it is safe to put instance values and variables in it that you would like to use when the streamlet is running.

When you build your own Akka streamlet, you define both a StreamletShape and a subclass of StreamletLogic, more on that later.

7.1.4. Lifecycle

In general terms, when you publish and deploy a Pipelines application, each streamlet definition becomes a physical deployment as one or more Kubernetes artifacts. In the case of Akka-based streamlets, each streamlet is submitted as Deployment resource. Upon deployment, it becomes a set of running pods depending in the scale factor for that streamlet.

A so-called runner runs inside every pod. The runner connects the streamlet to data streams, at which point the streamlet starts consuming from inlets. The Streamlet advances through the data streams that are provided on inlets and writes data to outlets.

If you make any changes to the streamlets and deploy the application again, existing pods will be stopped, new pods will be started to reflect the changes. It could be that pods are restarted in other ways, for instance by administrators.

This means that streamlets can get stopped, started or restarted at any moment in time. The next section about message delivery semantics explains the options that are available for how to continue processing data after a restart.

7.1.5. Message Delivery Semantics

If you only want to process data that is arriving in real-time—ignoring data from the past—an Akka streamlet can simply start from the most recently arrived record. In this case, an at-most-once semantics is used (see Message Delivery Semantics). The streamlet will process data as it arrives. Data that arrives while the streamlet is not running will not be processed.

At-most-once message delivery semantics are applied to Akka streamlets that use standard akka-stream graphs like Source, Sink, Flow.

Akka Streamlets also supports at-least-once semantics for cases where the above mentioned options are not viable. In this case, offsets of incoming records are tracked, so that the streamlet can start from a specific offset, per inlet, on restart. Once records are written to outlets, the associated incoming record offsets are considered. Offsets are committed in batch, and only when the offset changes.

Akka streamlets that want to support at-least-once semantics, must use an akka-stream FlowWithContext that propagates a PipelinesContext through the flow. Pipelines automatically propagates offset details through the flow and commits offset changes in batches.

The FlowWithContext provides a constrained set of operators compared to the akka-stream Flow. The subset of operators process records in-order.

There are some side effects to this approach. For instance, if you filter records, only offsets associated to records that are not filtered will be committed. This means that on restart, more re-processing—of previously filtered records—can occur than you might expect.

Building an Akka Streamlet describes further how you can build akka streamlets.

7.2. Building an Akka Streamlet

The following sections describe how you can create an Akka streamlet. As mentioned in the Akka Streamlet Fundamentals, an Akka streamlet is defined by the following features:

  • It is a Streamlet. Pipelines offers a class for implementaing Akka streamlets, AkkaStreamlet which extends pipelines.streamlets.Streamlet. Any Akka streamlet needs to extend AkkaStreamlet.

  • It has a shape - we call it StreamletShape. Any Akka streamlet needs to define a concrete shape using the APIs available for the StreamletShape class, which defines the inlets and outlets of the streamlet.

  • It has a StreamletLogic that defines the business logic of the Akka streamlet.

In this tutorial we’ll build a simple Akka streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet ReportPrinter.

7.2.1. Extending from AkkaStreamlet

Lets start with building the ReportPrinter streamlet. The first thing to do is extend the pipelines.akkastream.AkkaStreamlet abstract class, as shown below:

Scala:

package com.example

import akka.stream.scaladsl.Sink

import pipelines.streamlets._
import pipelines.streamlets.avro._

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

Java:

package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

The code snippet above shows an object ReportPrinter that extends AkkaStreamlet. We have shown the steps needed to complete the implementation, which we will do in the next few sections.

The next step is to implement inlets and outlets of the streamlet.

7.2.2. Inlets and Outlets

The streamlet that we are building in this tutorial will have an inlet and no outlet.

Scala:

package com.example

import akka.stream.scaladsl.Sink

import pipelines.streamlets._
import pipelines.streamlets.avro._

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  val inlet = AvroInlet[Report]("report-in")
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

Java:

package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

Pipelines supports avro encoded processing of data - we make this explicit by defining the inlet as AvroInlet. Report is the class of objects that will be accepted by this inlet. This means that an inlet defined by AvroInlet[Report] will only accept avro encoded data for the class Report. The class Report will be generated by Pipelines during application build time from the avro schema that the user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier. As an example we can have the following avro schema for the Report object that contains a report of some of the attributes of products from an inventory:

{
  "namespace": "com.example",
  "type": "record",
  "name": "Report",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "description",
      "type": "string"
    },
    {
      "name": "keywords",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

In the definition of the inlet, "report-in" is the name of the inlet. It’s recommended that you use a domain specific name for the inlet which indicates the nature of data that this inlet is supposed to accept. We will use this inlet later to read data from it.

This streamlet does not have any outlet. But in general outlets are defined similarly, val out = AvroOutlet[Report]("report-out", _.name) will define an outlet which will write Avro encoded data for the object of type Report. Here "report-out" is the name of the outlet and _.name is the partitioning function that partitions the data from the outlet.

7.2.3. Streamlet Shape

Lets now define the shape of ReportPrinter by using the APIs in pipelines.streamlets.StreamletShape:

Scala:

package com.example

import akka.stream.scaladsl.Sink

import pipelines.streamlets._
import pipelines.streamlets.avro._

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  val inlet = AvroInlet[Report]("report-in")
  // 2. TODO Define the shape of the streamlet
  val shape = StreamletShape.withInlets(inlet)
  // 3. TODO Override createLogic to provide StreamletLogic
}

Java:

package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);

  // 2. TODO Define the shape of the streamlet
  public StreamletShape shape() {
   return StreamletShape.createWithInlets(inlet);
  }
  // 3. TODO Override createLogic to provide StreamletLogic
}

The above code overrides the shape method with a value that defines the shape of the streamlet. StreamletShape offers methods to define shapes, e.g. to define a streamlet with 2 inlets and 2 outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid).

The next step is to define the StreamletLogic.

7.2.4. Defining the StreamletLogic

The StreamletLogic class makes it possible for a user to specify domain logic. It is defined as an abstract class in pipelines.akkastream.StreamletLogic and provides an abstract method run() where the user can define the specific logic for the Akka Streamlet.

In this step we need to override createLogic from AkkaStreamlet in our ReportPrinter object. createLogic needs to return an instance of StreamletLogic which will do the processing based on the requirements of ReportPrinter object.

Scala:

package com.example

import akka.stream.scaladsl.Sink

import pipelines.streamlets._
import pipelines.streamlets.avro._

import pipelines.akkastream._
import pipelines.akkastream.scaladsl._

object ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  val inlet = AvroInlet[Report]("report-in")
  // 2. TODO Define the shape of the streamlet
  val shape = StreamletShape.withInlets(inlet)
  // 3. TODO Override createLogic to provide StreamletLogic
  def createLogic = new RunnableGraphStreamletLogic() {
    def format(report: Report) = s"${report.name}\n]n${report.description}"
    def runnableGraph =
      atMostOnceSource(inlet)
        .to(Sink.foreach(report  println(format(report))))
  }
}

Java:

package com.example;

import akka.NotUsed;
import akka.stream.*;
import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

public class ReportPrinter extends AkkaStreamlet {
  // 1. TODO Create inlets and outlets
  AvroInlet<Report> inlet = AvroInlet.<Report>create("report-in", Report.class);

  // 2. TODO Define the shape of the streamlet
  public StreamletShape shape() {
   return StreamletShape.createWithInlets(inlet);
  }

  // 3. TODO Override createLogic to provide StreamletLogic
  public RunnableGraphStreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getStreamletContext()) {
      public String format(Report report) {
        return report.getName() + "\n\n" +report.getDescription();
      }
      public RunnableGraph<NotUsed> createRunnableGraph() {
        return getAtMostOnceSource(inlet).to(Sink.foreach(report -> System.out.println(format(report))));
      }
    };
  }
}

In the above code we override createLogic to supply the domain logic for the streamlet.

In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier, val inlet = AvroInlet[Report]("report-in"), and do some processing on it.

The RunnableGraphStreamletLogic, which extends StreamletLogic, is easy to use when you only want to define a RunnableGraph that will be run. It only requires you to define a runnableGraph method which defines the graph that will be run, which is what we have done in the above code. The runnableGraph method specifies that we’ll create a Source from the inlet to read the reports and connect it to a Sink that will print out the reports.

Here are the steps that we do as part of the processing logic:

  • Since it’s a console printer, we would like to write to console as specified by the format method.

  • Every report read from the Source is printed by using Sink.foreach which is part of the akka.stream.scaladsl package.

Note that the processing logic can be quite complex and we can maintain state as part of the implementation of StreamletLogic.

Note

If the streamlet needs to have local state (vals, vars) for processing logic, it has to be put inside the StreamletLogic class and not as part of the Streamlet class. The Streamlet class is used by Pipelines for extraction of streamlets using reflection and hence cannot have any state within it.

Note

In summary, here are the steps for defining an Akka streamlet:

  • Define the inlets and outlets

  • Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Pipelines

  • Define the custom processing logic that will read data from inlets and write data to outlets

7.2.5. Using ReportPrinter in the blueprint

An example of a blueprint using the ReportPrinter could look like this:

blueprint {
  streamlets {
    ingress = com.example.ReportIngress
    report-printer = com.example.ReportPrinter
  }

  connections {
    ingress.out = [report-printer.report-in]
  }
}

The omitted ReportIngress could for instance be another AkkaStreamlet that writes Reports to its outlet.

7.2.6. At-least-once or At-most-once processing

You can access the inlet and outlet streams through methods on the StreamletLogic that return Akka Streams Sources and Sinks.

So far we’ve used the atMostOnceSource method to read from the inlet. When the streamlet is started, it will only receive elements that arrive after it is started. The same applies when the streamlet is restarted for any reason. In short, the atMostOnceSource provides at-most-once message delivery semantics, as described in Message Delivery Semantics. StreamletLogic also provides methods that support at-least-once message delivery semantics.

The methods that support at-least-once for getting sources and sinks automatically propagate the offsets for every element, so that the offset read in the stream through the attached source can be automatically advanced after processing through the attached sink, which provides the at-least-once messaging semantics.

In the next sections, we’ll describe the SourceWithContext and the FlowWithContext that are used for this purpose, as well as the `Sink`s that can be used to make sure that offsets are committed.

Sources for inlets

The atLeastOnceSource method returns a akka.stream.scaladsl.SourceWithContext for an inlet, the Java equivalent is getAtLeastOnceSource, which returns a akka.stream.javadsl.SourceWithContext. The SourceWithContext provides the elements that are read from an inlet, each element associated with a PipelinesContext. The PipelinesContext contains the offset in the stream. A FlowWithContext, which contains a subset of Flow, automatically propagates the context for every element, so that you don’t have to worry about it when using the Akka Streams operators to process the stream. It can be easily attached to a SourceWithContext using the via method, similar to how a Source and Flow are connected.

Note

In the Scala API, we provide a type alias pipelines.akkastream.SourceWithPipelinesContext that encapsulates the Pipelines-specific part of the SourceWithContext type definition.

The atMostOnceSource methods returns an Akka Streams Source, which always starts reading elements as they arrive.

Sinks for outlets

The atLeastOnceSink method (Java API equivalent is getAtLeastOnceSink) returns a Sink[(T, PipelinesContext)] to write to an outlet. Elements written to the sink will be committed when offsets change, in batch if the downstream processing is slower than elements received upstream. You can connect a SourceWithContext or a FlowWithContext to this type of sink with a to method, similar to how you would connect a Source or Flow to a Sink. There is also an atLeastOnceSink that takes no arguments, anything written to this sink will not end up in any outlet, but the offsets associated will be committed.

The noCommitSink method (Java API equivalent is getNoCommitSink) returns a Sink, for when you choose to always process elements as they arrive.

FlowWithContext

The FlowWithContext provides a constrained set of operators compared to the Akka Streams Flow. The subset of operators process records in-order. It is used for convenience, as a user you don’t have to worry about how the context per element is passed along, or how the offset of a stream is advanced after processing.

Operators that turn T into Seq[T]

The FlowWithContext propagates the PipelinesContext per element that it operates on. In the case of operators that create a Seq[T] of elements for every element, like mapConcat and grouped, The sequencing operation is also applied to the context, which means that the context is no longer PipelinesContext, but instead is turned into a Seq[PipelinesContext]. If you transform the Seq[T] to some type that is written to an outlet, you will have to map the context with mapContext and select which context you want to use for every element, since now there is a Seq[PipelinesContext] where a PipelinesContext is required. In most cases it makes sense to just use flowWithContext.mapContext(_.last) to just select the last PipelinesContext associated with the grouped input elements.

Converting between FlowWithContext and Flow

The Akka Streams Flow supports more operations than the FlowWithContext and allows for integrating with any kind of Graph, including custom GraphStage`s. The `FlowWithContext[In, PipelinesContext, Out, PipelinesContext, Mat] can be converted to a Flow[(In, PipelinesContext), (Out, PipelinesContext), Mat]. As you can see from the type signature, every element in the resulting Flow is a Tuple2 of an element and its PipelinesContext. (In Java an akka.japi.Pair type is used instead of the Scala Tuple2 type). The Flow[(In, PipelinesContext), (Out, PipelinesContext), Mat] can be converted (back) to a FlowWithContext[In, PipelinesContext, Out, PipelinesContext, Mat].

Being able to convert back and forth between FlowWithContext and Flow means that you can stop automatic context propagation, apply more advanced operations on the stream, and once you are finished, convert the Flow back into a FlowWithContext, as long as you pass along the elements with their contexts as tuples.

Note

If the order of elements is changed in custom operations on the Flow, it is likely that offsets will be advanced too early, which can result in data loss on Streamlet restart.

The same is true for the SourceWithContext[PipelinesContext, Out, Mat], which can be turned into a Source[(Out, PipelinesContext), Mat]. The endContextPropagation method on FlowWithContext returns a Flow. FlowWithContext.from turns a Flow back into a FlowWithContext. The endContextPropagation method on SourceWithContext returns a Source. The SourceWithContext.from creates a SourceWithContext from a Source. In Java SourceWithContext.fromPairs is the equivalant for turning a Source into a SourceWithContext.

At-least-once ReportPrinter

The atMostOnceSource does not provide any tracking of where the streamlet left off, so if the streamlet is restarted it will print all elements from the earliest available data in the outlet it is connected to.

In this section we’re going to use the atLeastOnceSource to track offsets and commit them automatically with an atLeastOnceSink.

The atLeastOnceSource method provides a SourceWithPipelinesContext[In] for an inlet. Internally, a PipelinesContext is paired with every element to keep track of the associated offset. The SourceWithPipelinesContext has many of the operators of a Source, it just automatically propagates the Pipelines context. In the code below it is connected to an atLeastOnceSink, which automcatically commits any offset changes found in the PipelinesContext for every element.

The code below shows how the ReportPrinter can be changed to use at-least-once semantics. All we need to do is change the streamlet logic:

Scala:

def createLogic = new RunnableGraphStreamletLogic() {
  def format(report: Report) = s"${report.name}\n\n${report.description}"
  def runnableGraph =
    atLeastOnceSource(inlet)
      .map { report 
        println(format(report))
        report
      }
      .to(atLeastOnceSink)
}

Java:

public RunnableGraph<NotUsed> createRunnableGraph() {
  return getAtLeastOnceSource(inlet)
    .map(report -> {
        System.out.println(format(report));
        return report;
    })
    .asSource()
    .to(getAtLeastOnceSink());
}

This completes the code for the ReportPrinter. A next step would be to use it in a blueprint, as described in Composing Streamlets using Blueprints

7.3. Utilities

The pipelines.akkastream.util library contains some predefined StreamletLogics:

  • HttpServerLogic

  • SplitterLogic

  • MergeLogic

The following sections describe how you implement stream processing logic with these utilities.

7.3.1. HttpServerLogic

Use case

An HttpServerLogic can be used to handle HTTP requests and store the received data into an outlet. You need to extend your streamlet from AkkaServerStreamlet so that Pipelines will expose an HTTP endpoint in Kubernetes. The HttpServerLogic typically unmarshalls incoming HTTP requests as they arrive and stores the data in the outlet. Http requests that are attempted while the HttpServerLogic is not running will result in a 503 response.

Examples

The HttpServerLogic defines an abstract method for the user to supply the processing logic, an HTTP route:

def route(sinkRef: WritableSinkRef[Out]): Route

The HttpServerLogic object has default implementations of the HttpServerLogic, which support some pre-baked routes:

  • the default method creates a HttpServerLogic that handles PUT and POST requests, where the data is read from the entity body.

  • the defaultStreaming method creates a HttpServerLogic that handles streaming HTTP requests, where the data is read from the entity body as a framed stream.

Handle PUT / POST requests by default

The code snippet below shows an example of an HttpServerLogic using the defaultLogic:

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

import pipelines.akkastream.AkkaServerStreamlet
import pipelines.akkastream.util.scaladsl._
import pipelines.streamlets.StreamletShape
import pipelines.streamlets.avro._

import SensorDataJsonSupport._

object SensorDataIngress extends AkkaServerStreamlet {
  val out = AvroOutlet[SensorData]("out", s  s.deviceId.toString + s.timestamp.toString)
  def shape = StreamletShape.withOutlets(out)
  override def createLogic = HttpServerLogic.default(this, out)
}

In Scala, the HttpServerLogic requires an implicit akka.http.scaladsl.marshalling.FromByteStringUnmarshaller[Out] to unmarshal the entity body into the data that you want to write to the outlet. (FromByteStringUnmarshaller[T] is an alias for Unmarshaller[ByteString, T])

An HttpServerLogic can only be used in combination with an AkkaServerStreamlet. The HttpServerLogic.default method requires a Server argument (this also applies to the HttpServerLogic.defaultStreaming method and the HttpServerLogic constructor, you cannot construct it without it). The AkkaServerStreamlet implements Server, so you can just pass this to it from inside the streamlet, as you can see in the above snippet.

In the above example, the SensorDataJsonSupport object defines implicit spray-json JSON formats for the SensorData type.

In Java, the akka.http.javadsl.unmarshalling.Unmarshaller<ByteString, T> is an argument to the constructor.

import pipelines.akkastream.AkkaServerStreamlet;

import pipelines.akkastream.StreamletLogic;
import pipelines.akkastream.util.javadsl.HttpServerLogic;

import pipelines.streamlets.StreamletShape;
import pipelines.streamlets.avro.AvroOutlet;

import akka.http.javadsl.marshallers.jackson.Jackson;

public class SensorDataIngress extends AkkaServerStreamlet {
  AvroOutlet<SensorData> out =  AvroOutlet.<SensorData>create("out", s -> s.getDeviceId().toString() + s.getTimestamp().toString(), SensorData.class);

  public StreamletShape shape() {
   return StreamletShape.createWithOutlets(out);
  }

  public StreamletLogic createLogic() {
    return HttpServerLogic.createDefault(this, out, Jackson.byteStringUnmarshaller(SensorData.class), getStreamletContext());
  }
}

In the above Java example, a Jackson Unmarshaller<ByteString, Out> is created for the SensorData class, using the Jackson.byteStringUnmarshaller method.

Handle streamed HTTP entities

The Scala example below shows how you can use the defaultStreaming method to unmarshal a JSON stream and write the unmarshalled data to the outlet:

import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._

import SensorDataJsonSupport._
import pipelines.akkastream.AkkaServerStreamlet
import pipelines.akkastream.util.scaladsl._
import pipelines.streamlets.StreamletShape
import pipelines.streamlets.avro._

object SensorDataStreamingIngress extends AkkaServerStreamlet {
  val out = AvroOutlet[SensorData]("out", s  s.deviceId.toString + s.timestamp.toString)
  def shape = StreamletShape.withOutlets(out)

  implicit val entityStreamingSupport = EntityStreamingSupport.json()
  override def createLogic = HttpServerLogic.defaultStreaming(this, out)
}

The defaultStreaming requires an implicit FromByteStringUnmarshaller[Out] and an EntityStreamingSupport. The above example provides a EntityStreamingSupport.json() to read the JSON stream. The defaultStreamingLogic uses this to read JSON from the request entity. The SensorDataJsonSupport in the above example provides implicit spray-json JSON Formats to unmarshal the JSON elements in the stream. The akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport object implicitly provides a FromByteStringUnmarshaller based on the formats in the SensorDataJsonSupport object.

The Java example below shows how to use the defaultStreaming in a similar manner:

import akka.http.javadsl.common.EntityStreamingSupport;
import akka.http.javadsl.marshallers.jackson.Jackson;

import pipelines.akkastream.AkkaServerStreamlet;

import pipelines.akkastream.util.javadsl.HttpServerLogic;
import pipelines.akkastream.StreamletLogic;
import pipelines.streamlets.StreamletShape;
import pipelines.streamlets.avro.AvroOutlet;

public class SensorDataStreamingIngress extends AkkaServerStreamlet {

  AvroOutlet<SensorData> out =  AvroOutlet.<SensorData>create("out", s -> s.getDeviceId().toString() + s.getTimestamp().toString(), SensorData.class);

  public StreamletShape shape() {
   return StreamletShape.createWithOutlets(out);
  }

  public StreamletLogic createLogic() {
    EntityStreamingSupport ess = EntityStreamingSupport.json();
    return HttpServerLogic.createDefaultStreaming(this, out, Jackson.byteStringUnmarshaller(SensorData.class), ess, getStreamletContext());
  }
}
Define a custom Route

If you want to provide your own akka-http Route, override the route method. The route method provides a sinkRef argument which you can use to write to the outlet.

override def createLogic = new HttpServerLogic(this, outlet) {
  def route(sinkRef: WritableSinkRef[Out]): Route = {
    put {
      entity(as[Data]) { data 
        onSuccess(sinkRef.write(data)) { _ 
          complete(StatusCodes.OK)
        }
      }
    }
  }
}

The above Scala example creates a route that will handle put requests where the entity contains Data, which is written to the outlet using the WritableSinkRef.

7.3.2. SplitterLogic

Use case

A SplitterLogic can be used to split a stream in two, writing elements to one of two outlets. Every element from the outlet will be processed through a FlowWithPipelinesContext, which provides at-least-once semantics.

Example

The SplitterLogic defines an abstract flow method for the user to supply a FlowWithPipelinesContext[I, Either[L, R]]. The Java version of Splitter uses an Either type that is bundled with pipelines as pipelines.akkastream.javadsl.util.Either.

The Scala example below shows a Splitter that validates metrics and splits the stream into valid and invalid metrics, which are written respectively to the valid and invalid outlets:

import pipelines.akkastream._
import pipelines.akkastream.util.scaladsl._
import pipelines.streamlets._
import pipelines.streamlets.avro._

object MetricsValidation extends AkkaStreamlet {
  val in = AvroInlet[Metric]("in")
  val invalid = AvroOutlet[InvalidMetric]("invalid", m  m.metric.deviceId.toString + m.metric.timestamp.toString)
  val valid = AvroOutlet[Metric]("valid", m  m.deviceId.toString + m.timestamp.toString)
  val shape = StreamletShape(in).withOutlets(invalid, valid)

  override def createLogic = new SplitterLogic(in, invalid, valid) {
    def flow = flowWithPipelinesContext()
      .map { metric 
        if (!SensorDataUtils.isValidMetric(metric)) Left(InvalidMetric(metric, "All measurements must be positive numbers!"))
        else Right(metric)
      }
  }
}

The Java example below provides similar functionality:

import akka.stream.javadsl.*;
import pipelines.akkastream.javadsl.util.Either;

import akka.NotUsed;
import akka.actor.*;
import akka.stream.*;

import com.typesafe.config.Config;

import pipelines.streamlets.*;
import pipelines.streamlets.avro.*;
import pipelines.akkastream.*;
import pipelines.akkastream.util.javadsl.*;

public class MetricsValidation extends AkkaStreamlet {
  AvroInlet<Metric> inlet = AvroInlet.<Metric>create("in", Metric.class);
  AvroOutlet<InvalidMetric> invalidOutlet = AvroOutlet.<InvalidMetric>create("invalid",  m -> m.metric.toString(), InvalidMetric.class);
  AvroOutlet<Metric> validOutlet = AvroOutlet.<Metric>create("valid", m -> m.getDeviceId().toString() + m.getTimestamp().toString(), Metric.class);

  public StreamletShape shape() {
   return StreamletShape.createWithInlets(inlet).withOutlets(invalidOutlet, validOutlet);
  }

  public SplitterLogic createLogic() {
    return new SplitterLogic<Metric,InvalidMetric, Metric>(inlet, invalidOutlet, validOutlet, getStreamletContext()) {
      public FlowWithContext<Metric, PipelinesContext, Either<InvalidMetric, Metric>, PipelinesContext, NotUsed> createFlow() {
        return createFlowWithPipelinesContext()
          .map(metric -> {
            if (!SensorDataUtils.isValidMetric(metric)) return Either.left(new InvalidMetric(metric, "All measurements must be positive numbers!"));
            else return Either.right(metric);
          });
      }
    };
  }
}

7.3.3. MergeLogic

Use case

A MergeLogic can be used to merge two or more inlets into one outlet.

Elements from all inlets will be processed with at-least-once semantics. The elements will be processed in semi-random order and with equal priority for all inlets.

Example

The Scala example below shows a Merge configured to combine elements from 2 inlets of the type Metric into one outlet of the same type.

object MetricMerge extends AkkaStreamlet {

  val in0 = AvroInlet[Metric]("in-0")
  val in1 = AvroInlet[Metric]("in-1")
  val out = AvroOutlet[Metric]("out", m  m.deviceId.toString + m.timestamp.toString)

  final override val shape = StreamletShape.withInlets(in0, in1).withOutlets(out)

  override final def createLogic = new MergeLogic(Vector(in0, in1), out)
}

object MetricsMerge extends Merge[Metric](5)

The Java example below provides similar functionality:

class TestMerger extends AkkaStreamlet {
    AvroInlet<Data> inlet1 = AvroInlet.<Data>create("in-0", Data.class);
    AvroInlet<Data> inlet2 = AvroInlet.<Data>create("in-1", Data.class);
    AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out",  d -> d.name(), Data.class);

    public StreamletShape shape() {
      return StreamletShape.createWithInlets(inlet1, inlet2).withOutlets(outlet);
    }
    public MergeLogic createLogic() {
      List<CodecInlet<Data>> inlets = new ArrayList<CodecInlet<Data>>();
      inlets.add(inlet1);
      inlets.add(inlet2);
      return new MergeLogic(inlets, outlet, getStreamletContext());
    }
}

7.4. Testing an Akka Streamlet

The purpose of the testkit is to allow users the ability to write unit tests for akkastream streamlets. The unit tests are meant to facilitate local testing of streamlets. The testkit allows writing of tests in Scala as well as Java.

7.4.1. Basic flow of testkit APIs

Here’s the basic flow that you need to follow when writing tests using the testkit:

  1. Instantiate the testkit

  2. Setup inlet taps that tap the inlet ports of the streamlet

  3. Setup outlet taps for outlet ports

  4. Push data into inlet ports

  5. Run the streamlet using the testkit and the setup inlet taps and outlet taps

  6. Use the probes of the outlet taps to a verify expected results

The testkit connects taps to inlets and outlets of a streamlet. The taps provide means to the tester to write data to the inlets and read data from outlets, making it possible to assert that the streamlet behaves as expected.

7.4.2. Using the testkit from Scala

Let’s consider an example where we would like to write unit tests for testing a FlowProcessor. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing framework.

Imports

Here are the imports that we need for writing the tests. These include some obligatory inputs for scalatest and akka and some of the imports for the testkit and the the code that is generated from the avro schemas.

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.testkit._

import org.scalatest._
import org.scalatest.concurrent._

import pipelines.streamlets._
import pipelines.streamlets.avro._
import pipelines.akkastream._
import pipelines.akkastream.scaladsl._
import pipelines.akkastream.testdata._
import pipelines.akkastream.testkit._
Setting up a sample TestProcessor

Let’s set up a TestProcessor streamlet that we would like to test. It’s a simple one that filters events based on whether the event id is an even number.

class TestProcessor extends AkkaStreamlet {
  val in = AvroInlet[Data]("in")
  val out = AvroOutlet[Data]("out", _.id.toString)
  final override val shape = StreamletShape.withInlets(in).withOutlets(out)

  val flow = Flow[Data].filter(_.id % 2 == 0)
  override final def createLogic = new RunnableGraphStreamletLogic() {
    def runnableGraph = atMostOnceSource(in).via(flow).to(atMostOnceSink(out))
  }
}
The unit test

Here’s how we would write a unit test using AkkaStreamletTestkit. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

class TestProcessorSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {

  private implicit val system = ActorSystem("AkkaStreamletSpec")
  private implicit val mat = ActorMaterializer()

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
  }

  "An TestProcessor" should {

    val testkit = AkkaStreamletTestKit(system, mat)

    "Allow for creating a 'flow processor'" in {
      val data = Vector(Data(1, "a"), Data(2, "b"), Data(3, "c"))
      val expectedData = Vector(Data(2, "b"))
      val source = Source(data)
      val proc = new TestProcessor
      val in = testkit.inletFromSource(proc.in, source)
      val out = testkit.outletAsTap(proc.out)

      testkit.run(proc, in, out, ()  {
        out.probe.receiveN(1) mustBe expectedData.map(d  proc.out.partitioner(d) -> d)
      })

      out.probe.expectMsg(Completed)
    }
  }
}
Initialization and cleanups

As per ScalaTest guidelines, we can do custom cleanups in methods like afterAll() and after() depending on your requirements. In the current implementation we shutdown the actor system in afterAll():

override def afterAll: Unit = {
  TestKit.shutdownActorSystem(system)
}

Similarly you can have beforeAll() and before() for custom initializations.

If you have a number of tests that work based on similar initializations and cleanups you can also have a common base trait from which the test trait can extend.

7.4.3. Using the testkit from Java

Using from Java is almost the same as from Scala, the only difference being that you need to use idiomatic Java abstractions and frameworks for writing the tests. In this example we will write the test for the same TestProcessor streamlet using the Java DSL of the toolkit in JUnit 4.

Imports

Here are the imports that we need for writing the tests. These include some obligatory inputs for JUnit and akka and some of the imports for the testkit and the code that is generated from the avro schemas.

import org.junit.Test;
import org.junit.Assert;

import akka.NotUsed;

import akka.stream.javadsl.*;

import pipelines.streamlets.*;
import pipelines.akkastream.*;
import pipelines.akkastream.javadsl.*;

import pipelines.akkastream.testdata.*;
import pipelines.akkastream.testkit.*;

import scala.compat.java8.FutureConverters;
Setting up a sample TestProcessor

Let’s set up a TestProcessor that we would like to test.

class TestProcessor extends AkkaStreamlet {
  AvroInlet<Data> inlet = AvroInlet.<Data>create("in", Data.class);
  AvroOutlet<Data> outlet = AvroOutlet.<Data>create("out", d -> d.name(), Data.class);

  public StreamletShape shape() {
    return StreamletShape.createWithInlets(inlet).withOutlets(outlet);
  }

  public StreamletLogic createLogic() {
    return new RunnableGraphStreamletLogic(getStreamletContext()) {
      public RunnableGraph<NotUsed> createRunnableGraph() {
        getAtMostOnceSource(inlet)
          .via(Flow.<Data>create().filter(d -> d.getId() % 2 == 0))
          .to(getAtMostOnceSink(outlet))
      }
    };
  }
}
The unit test

Here’s how we would write a unit test using JUnit. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

@Test
public void testFlowProcessor() {
  TestProcessor sfp = new TestProcessor();

  // 1. instantiate the testkit
  AkkaStreamletTestKit testkit = AkkaStreamletTestKit.create(system, mat);

  // 2. Setup inlet taps that tap the inlet ports of the streamlet
  QueueInletTap<Data> in = testkit.makeInletAsTap(sfp.shape().inlet());

  // 3. Setup outlet probes for outlet ports
  ProbeOutletTap<Data> out = testkit.makeOutletAsTap(sfp.shape().outlet());

  // 4. Push data into inlet ports
  in.queue().offer(new Data(1, "a"));
  in.queue().offer(new Data(2, "b"));

  // 5. Run the streamlet using the testkit and the setup inlet taps and outlet probes
  testkit.<Data>run(sfp, in, out, () -> {
    // 6. Assert
    return out.probe().expectMsg(new scala.Tuple2<String, Data>("2", new Data(2, "b")));
  });

  // 6. Assert
  out.probe().expectMsg(Completed.completed());
}
Initialization and Cleanups

As per JUnit guidelines, we can do custom initializations and cleanups in methods like setup() and tearDown() respectively depending on your requirements. One common practice is to set up a base class that does all common initializations and clean ups for your tests.

import org.junit.BeforeClass;
import org.junit.AfterClass;

import org.scalatest.junit.JUnitSuite;

import scala.concurrent.duration.Duration;

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.testkit.TestKit;

public abstract class JavaDslTest extends JUnitSuite {

  static ActorMaterializer mat;
  static ActorSystem system;

  @BeforeClass
  public static void setUp() throws Exception {
    system = ActorSystem.create();
    mat = ActorMaterializer.create(system);
  }

  @AfterClass
  public static void tearDown() throws Exception {
    TestKit.shutdownActorSystem(system, Duration.create(10, "seconds"), false);
    system = null;
  }
}

8. Developing with Spark Streamlets

8.1. Spark Streamlet Fundamentals

A Spark-based streamlet has a couple of responsibilities:

  • It needs to capture your stream processing logic.

  • It needs to publish metadata which will be used by sbt-pipelines plugin to verify that a blueprint is correct. This metadata consists of the shape of the streamlet (StreamletShape) defined by the inlets and outlets of the streamlet. Connecting streamlets as mentioned in the blueprint needs to match on inlets and outlets to make a valid pipelines topology. And Pipelines automatically extracts the shapes from the streamlets to verify this match.

  • For a pipelines runtime, it needs to provide metadata so that it can be configured, scaled, and run as part of an application.

  • The inlets and outlets of a Streamlet have two functions:

    • To specify to Pipelines that the streamlet needs certain data streams to exist at runtime, which it will read from and write to, and

    • To provide handles inside the stream processing logic to connect to the data streams that Pipelines provides. The StreamletLogic provides methods that take an inlet or outlet argument to read from or write to specific data streams that Pipelines has set up for you.

The next sections will go into the details of defining a Spark Streamlet:

  • defining inlets and outlets

  • creating a streamlet shape from the inlets and outlets

  • creating a streamlet logic that uses the inlets and outlets to read and write data

8.1.1. Inlets and Outlets

A streamlet can have one or more inlets and outlets. Pipelines offers classes for Inlets and Outlets based on the codec of the data they manipulate. Currently Pipelines supports Avro and hence the classes are named AvroInlet and AvroOutlet. Each outlet also allows the user to define a partitioning function that will be used to partition the data.

8.1.2. StreamletShape

The StreamletShape captures the connectivity and compatibility details of a Spark-based streamlet. It captures which—and how many—inlets and outlets the streamlet has.

The sbt-pipelines plugin extracts—amongst other things—the shape from the streamlet that it finds on the classpath. This metadata is used to verify that the blueprint connects the streamlets correctly.

Pipelines offers an API StreamletShape to define various shapes with inlets and outlets. Each inlet and outlet can be defined separately with specific names. The outlet also needs to have a partitioning function to ensure effective data partitioning logic.

If you build your own Spark streamlet, you need to define the shape. You will learn how to do this in Building a Spark Streamlet. The next section describes how the SparkStreamletLogic captures stream processing logic.

8.1.3. SparkStreamletLogic

The stream processing logic is captured in a SparkStreamletLogic, which is an abstract class.

The SparkStreamletLogic provides methods to read and write data streams, and provides access to the configuration of the streamlet, amongst other things

A SparkStreamletLogic must setup one or more Structured Streaming Queries, represented by a collection of StreamingQuerys, through the method buildStreamingQueries. These jobs will be run by the run method of SparkStreamlet to produce a StreamletExecution. A StreamletExecution is a simple class to manage a collection of StreamingQuery.

The SparkStreamletLogic is only constructed when the streamlet is run, so it is safe to put instance values and variables in it that you would like to use when the streamlet is running. Note that the Streamlet is created for extracting metadata and hence no instance values should be put inside a streamlet.

8.1.4. SparkStreamletContext

The SparkStreamletContext provides the necessary context under which a streamlet runs. It contains the following context data and contracts:

  • An active SparkSession to run Spark streaming jobs.

  • The typesafe Config loaded from the classpath through a config method, which can be used to read configuration settings.

  • The name used in the blueprint for the specific instance of this streamlet being run.

8.1.5. Lifecycle

In general terms, when you publish and deploy a Pipelines application, each streamlet definition becomes a physical deployment as one or more Kubernetes artifacts. In the case of Spark-based streamlets, each streamlet is submitted as a Spark application through the Spark Operator as a Spark Application resource. Upon deployment, it becomes a set of running pods, with one pod as a Spark Driver and n-pods as executors, where n is the scale factor for that streamlet.

Note

The Spark Operator is a Kubernetes operator dedicated to managing Spark applications.

The Spark Operator takes care of monitoring the streamlets and is responsible for their resilient execution, like restarting the Spark application in case of failure.

spark application deployment delegation
Figure 1. Spark Streamlet Deployment Model

In Spark Streamlet Deployment Model, we can visualize the chain of delegation involved in the deployment of a Spark streamlet:

  • The Pipelines Operator prepares a Custom Resource describing the Spark streamlet and submits it to the Spark Operator

  • The Spark Operator processes the Custom Resource and issues the deployment of a Spark driver pod.

  • The Spark Driver then requests executor resources from Kubernetes to deploy the distributed processing.

  • Finally, if/when resources are available, the Spark-bound executors start as Kubernetes pods. The executors are the components tasked with the actual data processing, while the Spark driver serves as coordinator of the (stream) data process.

In this architecture, the Spark driver runs the Pipelines-specific logic that connects the streamlet to our managed data streams, at which point the streamlet starts consuming from inlets. The streamlet advances through the data streams that are provided on inlets and writes data to outlets.

If you make any changes to the streamlet and deploy the application again, the existing Spark applications will be stopped, and the new version will be started to reflect the changes. It could be that Spark streamlets are restarted in other ways, for instance by administrators.

This means that a streamlet can get stopped, started or restarted at any moment in time. The next section about message delivery semantics explains the options that are available for how to continue processing data after a restart.

8.1.6. Message Delivery Semantics

The message delivery semantics provided by Spark-based streamlets are determined by the guarantees provided by the underlying Spark sources and sinks used in the streamlet. Recall that we defined the different message delivery guarantees in Message Delivery Semantics.

Let’s consider the following types of streamlets as forming a topology as illustrated in Ingress-Processor-Egress Streamlets:

  • an ingress, a streamlet that reads from an external streaming source and makes data available to the pipeline

  • a processor, a streamlet that has an inlet and an outlet - it does domain logic processing on data that it reads and passes the processed data downstream

  • an egress, a streamlet that receives data on its inlet and writes to an external sink

The following sections will use these types of streamlets for describing message delivery semantics.

blueprint simple plain
Figure 2. Ingress-Processor-Egress Streamlets
Message Delivery Semantics of Spark-based Ingresses

Spark-based ingresses use a Structured Streaming source to obtain data from an external streaming source and provide it to the Pipelines application. In this case, message delivery semantics are dependent on the capabilities of the source. In general, streaming sources deemed resilient will provide at-least-once semantics in a Pipeline application.

Refer to the The Structured Streaming Programming Guide or to Stream Processing with Apache Spark for more detailed documentation about your streaming source of choice.

Message Delivery Semantics of Spark-based Processors

A Spark based processor is a streamlet that receives and produces data internal to the Pipelines application. In this scenario, processors consume data from inlets and produce data to outlets using Kafka topics as the underlying physical implementation.

At the level of a Spark-based processor, this translates to using Kafka sources and sinks. Given that Kafka is a reliable data source from the Structured Streaming perspective, the message delivery semantics are considered to be at-least-once.

Note
Testing At-Least-Once Message Delivery

Bundled with our examples, spark-resilience-test can be used to verify that messages are processed with at-least-once semantics even while a part of the pipeline is continuously failing.

Message Delivery Semantics of Spark-based Egresses

The message delivery guarantees of a Spark-based egress are determined by the combination of the Kafka-based inlet and the target system where the egress will produce its data.

The data provided to the egress is delivered with at-least-once semantics. The egress is responsible to reliably produce this data to the target system. The overall message delivery semantics of the Pipeline will depend on the reliability characteristics of this egress.

In particular, it is possible to 'upgrade' the end-to-end message delivery guarantee to effectively-exactly-once by making the egress idempotent and ensuring that all other streamlets used provide at-least-once semantics.

For this purpose, we could use Structured Streaming’s deduplication feature or use a target system able to preserve uniqueness of primary keys, such as a RDBMS.

Message Delivery Semantics and the Kafka Retention Period

The at-least-once delivery is guaranteed within the Kafka retention configuration. This retention is a configuration proper to the Kafka brokers that dictates when old data can be evicted from the log. If a Spark-based processor is offline for a longer time than the configured retention, it will restart from the earliest offset available in the corresponding Kafka topic, which might silently result in data loss.

8.1.7. Reliable Restart of Stateful Processes

In a Pipeline application, it’s possible to use the full capabilities of Structured Streaming to implement the business logic required in a streamlet. This includes stateful processing, from time-based aggregations to arbitrary stateful computations that use (flat)MapGroupsWithState.

All stateful processing relies on snapshots and a state store for the bookkeeping of the offsets processed and the computed state at any time. In Pipelines, this state store is deployed on Kubernetes using Persistent Volume Claims) (PVCs) backed by a storage class that allows for access mode ReadWriteMany.

PVCs are automatically provisioned for each Pipelines application. The volumes are claimed for as long as the Pipeline application is deployed, allowing for seamless re-deployment, upgrades, and recovery in case of failure.

Warning
Storage Size is Currently Fixed

Please note that — currently — the volume size allocated for storage is fixed per Pipelines platform deployment.

The storage of snapshots and state has certain impact on the upgradability of the Spark-based components. Once a state representation is deployed, the state definition (schema) may not change.

Upgrading an application that uses stateful computation requires planning ahead of time to avoid painting ourselves into a corner. Significant changes to the application logic, addition of sources or changes to the schema representing the state are not allowed. For details on the degrees of freedom and upgrade options, please refer to the Structured Streaming documentation

8.2. Building a Spark Streamlet

The following sections describe how you can create a Spark streamlet. As mentioned in the Spark Streamlet Fundamentals, a Spark streamlet is defined by the following features:

  • It is a Streamlet. Pipelines offers a class for implementaing Spark streamlets, SparkStreamlet which extends pipelines.streamlets.Streamlet. Any Spark streamlet needs to extend SparkStreamlet.

  • It has a shape - we call it StreamletShape. Any Spark streamlet needs to define a concrete shape using the APIs available for the StreamletShape class, which defines the inlets and outlets of the streamlet.

  • It has a StreamletLogic that defines how the streamlet generates StreamingQuerys from the business logic.

In this tutorial we’ll build a simple Spark streamlet that accepts data in an inlet and writes them to the console. It can be used to print reports from data that arrives at its inlet. Let’s call the streamlet ReportPrinter.

8.2.1. Extending from SparkStreamlet

Lets start with building the ReportPrinter streamlet. The first thing to do is extend the pipelines.spark.SparkStreamlet abstract class, as shown below:

package com.example

import pipelines.streamlets._
import pipelines.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. TODO Create inlets and outlets
  // 2. TODO Define the shape of the streamlet
  // 3. TODO Override createLogic to provide StreamletLogic
}

The code snippet above shows an abstract class SparkConsoleEgress that extends SparkStreamlet. We have shown the steps needed to complete the implementation, which we will do in the next few sections.

The next step is to implement inlets and outlets of the streamlet.

8.2.2. Inlets and Outlets

The streamlet that we are building in this tutorial will have an inlet and no outlet.

package com.example

import pipelines.streamlets.avro._
import pipelines.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. TODO Define the shape of the streamlet using inlets and outlets
  // 3. TODO Override createLogic to provide StreamletLogic, where the inlets and outlets are used to read and write streams.
}

Pipelines supports avro encoded processing of data - we make this explicit by defining the inlet as AvroInlet. Report is the class of objects that will be accepted by this inlet. This means that an inlet defined by AvroInlet[Report] will only accept avro encoded data for the class Report. The class Report will be generated by Pipelines during application build time from the avro schema that the user supplies - this ensures that the data which the inlet accepts conforms to the schema that the user had supplied earlier. As an example we can have the following avro schema for the Report object that contains a report of some of the attributes of products from an inventory:

{
  "namespace": "com.example",
  "type": "record",
  "name": "Report",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "description",
      "type": "string"
    },
    {
      "name": "keywords",
      "type": {
        "type": "array",
        "items": "string"
      }
    }
  ]
}

In the definition of the inlet, "report-in" is the name of the inlet. It’s recommended that you use a domain specific name for the inlet which indicates the nature of data that this inlet is supposed to accept. We will use this inlet later to read data from it.

This streamlet does not have any outlet. But in general outlets are defined similarly, val out = AvroOutlet[Report]("report-out", .name) will define an outlet which will write Avro encoded data for the object of type Report. Here "report-out" is the name of the outlet and .name is the partitioning function that partitions the data from the outlet.

8.2.3. Streamlet Shape

Lets now define the shape of ReportPrinter by using the APIs in pipelines.streamlets.StreamletShape:

package com.example

import pipelines.streamlets._
import pipelines.streamlets.avro._
import pipelines.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet using inlets and outlets
  override val shape = StreamletShape.withInlets(in)

  // 3. TODO Override createLogic to provide StreamletLogic, where the inlets and outlets are used to read and write streams.
}

The above code overrides the shape method with a value that defines the shape of the streamlet. StreamletShape offers methods to define shapes, e.g. to define a streamlet with 2 inlets and 2 outlets, we could write StreamletShape.withInlets(in0, in1).withOutlets(valid, invalid).

The next step is to define the SparkStreamletLogic.

8.2.4. Defining the SparkStreamletLogic

The SparkStreamletLogic class makes it possible for a user to specify domain logic. It is defined as an abstract class in pipelines.spark.SparkStreamletLogic and provides an abstract method buildStreamingJobs where the user can define the specific logic for the Spark Streamlet.

In this step we need to override createLogic from SparkStreamlet in our ReportPrinter object. createLogic needs to return an instance of SparkStreamletLogic which will do the processing based on the requirements of ReportPrinter object.

package com.example

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.OutputMode

import pipelines.streamlets._
import pipelines.streamlets.avro._
import pipelines.spark._

object ReportPrinter extends SparkStreamlet {
  // 1. Create inlets and outlets
  val in = AvroInlet[Report]("report-in")

  // 2. Define the shape of the streamlet using inlets and outlets
  override val shape = StreamletShape.withInlets(in)

  // 3. Override createLogic to provide StreamletLogic, where the inlets and outlets are used to read and write streams.
  override def createLogic = new SparkStreamletlogic {
    // Define some formatting attributes
    val numRows = 50
    val truncate = false

    override def buildStreamingQueries = {
      val inDataset = readStream(in)
      val q = inDataset.writeStream
        .format("console")
        .option("numRows", numRows)
        .option("truncate", truncate)
        .outputMode(OutputMode.Append())
        .start()
      Seq(q)
    }
  }
}

In the above code we override createLogic from SparkStreamletLogic with an instance that overrides buildStreamingQueries to supply the domain logic for the streamlet. In this case, since we are implementing a printer streamlet for console, all we need to do is read from the inlet that we defined earlier, val in = AvroInlet[Report]("report-in"), and do some processing on it.

Here are the steps that we do as part of the processing logic:

  • Since it’s a console printer, we would like to write to console as specified by .format("console") in the implementation above

  • We use 2 parameters on how to display (a) how many rows to display at once and (b) if we would like to truncate long lines. These are defined by values numRows and truncate in the concrete implementation of SparkStreamletLogic.

Note that the processing logic can be quite complex and we can maintain state as part of the implementation of SparkStreamletLogic.

Note

If the streamlet needs to have local state (vals, vars) for processing logic, it has to be put inside the SparkStreamletLogic class and not as part of the Streamlet class. The Streamlet class is used by Pipelines for extraction of streamlets using reflection and hence cannot have any state within it.

Note

In summary, here are the steps for defining a Spark streamlet:

  • Define the inlets and outlets

  • Define the concrete shape using the inlets and outlets. The shape of the streamlet is the metadata that will be used by Pipelines

  • Define the custom processing logic that will read data from inlets and write data to outlets

8.2.5. Using ReportPrinter in the blueprint

An example of a blueprint using the ReportPrinter could look like this:

blueprint {
  streamlets {
    ingress = com.example.ReportIngress
    report-printer = com.example.ReportPrinter
  }

  connections {
    ingress.out = [report-printer.report-in]
  }
}

The omitted ReportIngress could for instance be another SparkStreamlet that writes Reports to its outlet.

8.3. Testing a Spark Streamlet

8.3.1. Using the testkit

The purpose of the testkit is to allow users the ability to write unit tests for Spark streamlets. The unit tests are meant to facilitate local testing of Spark Streamlets.

8.3.2. Basic flow of testkit APIs

Here’s the basic flow that you need to follow when writing tests using the testkit:

  1. Mixin the trait SparkTestSupport to the test class. This trait provides the basic functionalities of managing the SparkSession, basic initialization and cleanups and the core APIs of the testkit

  2. Create the Spark streamlet that needs to be tested

  3. Setup inlet taps that tap the inlet ports of the streamlet

  4. Setup outlet taps for outlet ports

  5. Push data into inlet ports

  6. Run the streamlet using the testkit and the setup inlet taps and outlet taps

  7. Write assertions to ensure that the expected results match the actual ones

8.3.3. Details of the workflow

Let’s consider an example where we would like to write unit tests for testing a SparkStreamlet that reads data from an inlet, does some processing and writes processed data to an outlet. We will follow the steps that we outlined in the last section. We will use ScalaTest as the testing library.

Setting up a sample SparkStreamlet

Here is a list of imports needed for writing the test suite.

import scala.collection.immutable.Seq
import scala.concurrent.duration._

import org.apache.spark.sql.streaming.OutputMode

import pipelines.streamlets.StreamletShape
import pipelines.streamlets.avro._
import pipelines.spark.avro._
import pipelines.spark.testkit._
import pipelines.spark.sql.SQLImplicits._

SparkStreamlet is an abstract class. Let’s set up a concrete instance that we would like to test. For more details on how to implement a Spark streamlet, please refer to Building a Spark Streamlet.

// create Spark Streamlet
val processor = new SparkStreamlet {
  val in = AvroInlet[Data]("in")
  val out = AvroOutlet[Simple]("out", _.name)
  val shape = StreamletShape(in, out)

  override def createLogic() = new SparkStreamletLogic {
    override def buildStreamingQueries = {
      val dataset = readStream(in)
      val outStream = dataset.select($"name").as[Simple]
      val query = writeStream(outStream, out, OutputMode.Append)
      Seq(query)
    }
  }
}
The unit test

Here’s how we would write a unit test using ScalaTest. The various logical steps of the test are annotated with inline comments explaining the rationale behind the step.

class SparkProcessorSpec extends SparkTestSupport { // 1. Mixin SparkTestSupport

  "SparkProcessor" should {
    "process streaming data" in {

      // 2. create Spark streamlet
      val processor = new SparkStreamlet {
        val in = AvroInlet[Data]("in")
        val out = AvroOutlet[Simple]("out", _.name)
        val shape = StreamletShape(in, out)

        override def createLogic() = new SparkStreamletLogic {
          override def buildStreamingQueries = {
            val dataset = readStream(in)
            val outStream = dataset.select($"name").as[Simple]
            val query = writeStream(outStream, out, OutputMode.Append)
            Seq(query)
          }
        }
      }

      // 3. setup inlet tap on inlet port
      val in: SparkInletTap[Data] = inletAsTap[Data](processor.in)

      // 4. setup outlet tap on outlet port
      val out: SparkOutletTap[Simple] = outletAsTap[Simple](processor.out)

      // 5. build data and send to inlet tap
      val data = (1 to 10).map(i  Data(i, s"name$i"))
      in.addData(data)

      // 6. Run the streamlet using the testkit and the setup inlet taps and outlet probes
      run(processor, Seq(in), Seq(out), 2.seconds)

      // get data from outlet tap
      val results = out.asCollection(session)

      // 7. Assert that actual matches expectation
      results should contain(Simple("name1"))
    }
  }
}
The SparkTestSupport trait

This provides the core functionalities of the testkit and needs to be mixed in with the main test class. This trait provides the following functionalities:

  1. Manage SparkSession for all tests

  2. Basic initialization and cleanups through methods beforeAll() and afterAll of ScalaTest. If you want custom logic for initialization and cleanups, then override these methods in your test class, but don’t forget to invoke super.beforeAll() or super.afterAll() before adding your custom logic.

  3. Provide core APIs of the testkit - all testkit APIs like inletAsTap, outletAsTap, run are available on the SparkTestSupport trait.

Special Note on Aggregation Query

There may be situations where the Spark query that you are testing involves aggregation operators. The testkit gives some special considerations to writing tests for such queries. Here’s an example of a Spark Streamlet that involves aggregation operators:

class CallStatsAggregator extends SparkStreamlet {

  val in = AvroInlet[CallRecord]("in")
  val out = AvroOutlet[AggregatedCallStats]("out", _.startTime.toString)
  val shape = StreamletShape(in, out)

  val GroupByWindow = "group-by-window"
  val Watermark = "watermark"

  override def createLogic = new SparkStreamletLogic {

    override def buildStreamingQueries = {
      val dataset = readStream(in)
      val outStream = process(dataset)
      val query = writeStream(outStream, out, OutputMode.Update)
      Seq(query)
    }

    private def process(inDataset: Dataset[CallRecord]): Dataset[AggregatedCallStats] = {
      println(s"Starting query with watermark $watermark, group-by-window $groupByWindow")
      val query =
        inDataset
          .withColumn("ts", $"timestamp".cast(TimestampType))
          .withWatermark("ts", "1 minute")
          .groupBy(window($"ts", "1 minute"))
          .agg(avg($"duration") as "avgCallDuration", sum($"duration") as "totalCallDuration")
          .withColumn("windowDuration", $"window.end".cast(LongType) - $"window.start".cast(LongType))

      query
        .select($"window.start".cast(LongType) as "startTime", $"windowDuration", $"avgCallDuration", $"totalCallDuration")
        .as[AggregatedCallStats]
    }
  }
}

The above query involves aggregation operations like groupBy over window and subsequent averaging of call durations tracked by the query. Such stateful streaming queries use a StateStore and need to be run with OutputMode.Update (recommended) or OutputMode.Complete when streaming data appears in the incoming DataSet. OutputMode.Update updates the aggregate with incoming data and hence is recommended for writing tests involving aggregation queries. On the other hand OutputMode.Complete does not drop old aggregation state since by definition this mode preserves all data in the Result Table.

For queries that do not involve aggregation, only new data need to be written and the query needs to be run with OutputMode.Append. This is the default behavior that the testkit sets for the user.

In order to have the query run with OutputMode.Update, the user needs to pass in this argument explicitly when setting up the outlet probe:

// setup outlet tap on outlet port
val out = outletAsTap[AggregatedCallStats](aggregator.shape.outlet, OutputMode.Update)

For more details on OutputMode, have a look at the relevant documentation on Spark.

Also since the stateful queries use StateStore, the latter needs to be explicitly stopped when the test suite ends:

override def afterAll(): Unit = {
  super.afterAll()
  StateStore.stop() // stop the state store maintenance thread and unload store providers
}

9. Using pipelines CLI

The Pipelines CLI allows you to create, manage, deploy, and operate Pipelines applications.

The CLI examples in this documentation assume you use oc plugin pipelines …​ to run Pipelines commands. If you prefer to use kubectl instead, then use kubectl pipelines …​ for all these commands.

If you use the oc CLI for OpenShift, you’ll need the Kubernetes kubectl CLI to run one command to install the Pipelines plugin into oc. See The install-oc-plugin command for more details.

Synopsis

oc plugin pipelines [command] [flags]

Options

  -h, --help   help for pipelines

9.1. The configure command

Configures a deployed Pipelines Application.

Synopsis

Configures a deployed Pipelines Application.

oc plugin pipelines configure [parameters]

Examples

oc plugin pipelines configure my-app mystreamlet.hostname=localhost

or to list all required configuration parameters:

oc plugin pipelines configure my-app

Options

  -h, --help   help for configure

9.2. The deploy command

Deploys a Pipelines Application to the cluster.

Synopsis

Deploys a Pipelines Application to the cluster. The arguments to the command consists of a docker image path and optionally one or more key=value pairs, separated by a space.

oc plugin pipelines deploy [parameters]

Examples

oc plugin pipelines deploy registry.test-cluster.ingestion.io/pipelines/sensor-data-scala:292-c183d80 valid-logger.log-level=info valid-logger.msg-prefix=valid

Options

  -h, --help   help for deploy

9.3. The install-oc-plugin command

Installs the Pipelines OpenShift CLI oc plugin.

Synopsis

Installs the Pipelines OpenShift CLI oc plugin. After installation all commands can be accessed by executing oc plugin pipelines.

kubectl pipelines install-oc-plugin [parameters]

Options

  -h, --help   help for install-oc-plugin

9.4. The list command

Lists deployed Pipelines Application in the current cluster.

Synopsis

Lists deployed Pipelines Application in the current cluster.

oc plugin pipelines list [parameters]

Options

  -h, --help   help for list

9.5. The scale command

Scales a streamlet of a deployed Pipelines Application to the specified number of replicas.

Synopsis

Scales a streamlet of a deployed Pipelines Application to the specified number of replicas.

oc plugin pipelines scale [parameters]

Examples

oc plugin pipelines my-app my-streamlet 2

Options

  -h, --help   help for scale

9.6. The status command

Gets the status of a Pipelines Application.

Synopsis

Gets the status of a Pipelines Application.

oc plugin pipelines status [parameters]

Examples

oc plugin pipelines status my-app

Options

  -h, --help   help for status

9.7. The undeploy command

Undeploy one or more Pipelines applications.

Synopsis

Use this command to undeploy one or more Pipelines applications by providing a space separated list of application names.

oc plugin pipelines undeploy [parameters]

Options

  -h, --help   help for undeploy

9.8. The version command

Print the plugin version.

Synopsis

Print the plugin version.

oc plugin pipelines version [parameters]

Options

  -h, --help   help for version