The development process

To understand the value that Akka Data Pipelines and Cloudflow offer, let’s see how they add value at each stage of the typical design-develop-test-package-deploy  lifecycle.

Streaming application design

When creating streaming applications, a large portion of the development effort goes into the mechanics of streaming. These include configuring and setting up connections to the messaging system, serializing and deserializing messages, setting up fault-recovery storage, and more. All of these mechanics must be addressed before you can finally add the most important and valuable part—​business logic.

business logic
Fig 2. - Business Logic in a Streaming Application

Cloudflow introduces a new component model, called Streamlets. The Streamlet model offers an abstraction to:

  • Identify the connections of a component as inlets and outlets, relieving you from setting up connections.

  • Attach the schema of the data for each connection, providing type safety at compile time, automatic serialization/deserialization, and avoiding potentially critical issues at deployment.

  • Provide an entry point for the business logic of the component, allowing you to focus on coding the most valuable business-specific logic.

Focus on business logic with streamlets

In the current version, Cloudflow includes backend Streamlet implementations for Akka, Apache Spark - Structured Streaming, and Apache Flink. These allow you to write business logic in the native API of the backend. Additionally, you can extend open source Cloudflow with new streaming backends.

The following table shows the current support for Java and Scala implementations for each runtime.

Table 1. Scala|Java Support for Streamlet implementations

Streamlet API

Scala

Java

Akka

X

X

Flink

X

X

Spark Structured Streaming

X

Let’s take a look at an example Streamlet:

Spark Streamlet Example
class MovingAverageSparklet extends SparkStreamlet { (1)

  val in    = AvroInlet[Data]("in")
  val out   = AvroOutlet[Data]("out", _.key)
  val shape = StreamletShape(in, out) (2)

  override def createLogic() = new SparkStreamletLogic {
    override def buildStreamingQueries = { (3)

      val groupedData = readStream(in) (4)
        .withColumn("ts", $"timestamp".cast(TimestampType))
        .withWatermark("ts", "1 minutes")
        .groupBy(window($"ts", "1 minute", "30 seconds"), $"key")
        .agg(avg($"value").as("avg"))
      val query = groupedData.select($"key", $"avg".as("value")).as[Data]

      writeStream(query, out, OutputMode.Append).toQueryExecution
    }
  }
}
1 SparkStreamlet is the base class that defines Streamlet for the Apache Spark backend.
2 The StreamletShape defines the inlet(s) and outlet(s) of the Streamlet, each inlet/outlet is declared with its corresponding data type.
3 buildStreamingQueries is the entry point for the Streamlet logic.
4 The code provided is written in pure Spark Structured Streaming code, minus the boilerplate to create sessions, connections, checkpoints, etc.

Akka Streams and Flink-based Streamlets follow the same pattern.

Compose application flow with a blueprint

Once you have developed components as Streamlets, you compose the application’s end-to-end flow by creating a blueprint. The blueprint declares the streamlet instances that belong to an application and how each streamlet instance connects to the others.

The following example shows a simple Blueprint definition:

blueprint example
blueprint {
  streamlets { (1)
    ingress = sensors.SparkRandomGenDataIngress
    process = sensors.MovingAverageSparklet
    egress = sensors.SparkConsoleEgress
  }
  topics { (2)
    data {
      producers = [ingress.out]
      consumers = [process.in]
    }
    moving-averages {
      producers = [process.out]
      consumers =  [egress.in]
    }
  }
}
1 streamlets section: declares instances from the streamlets available in the application (or its dependencies).
2 connections section: declares how the inlets/outlets of a streamlet should be connected.

Note that the declaration of instances in the streamlets section supports component reuse by letting you define multiple instances of a streamlet, with potentially different configurations. With the blueprint file in place, you can verify whether all components in the application are properly connected.

The verifyBlueprint function, is part of the sbt-cloudflow plugin. The following shows example output from running sbt verifyBlueprint:

$ sbt verifyBlueprint
[info] Loading settings for project global-plugins from plugins.sbt ...
[info] Loading global plugins from /home/maasg/.sbt/1.0/plugins
[info] Loading settings for project spark-sensors-build from cloudflow-plugins.sbt,plugins.sbt ...
[info] Loading project definition from cloudflow/examples/spark-sensors/project
[info] Loading settings for project sparkSensors from build.sbt ...
[info] Set current project to spark-sensors (in build file:cloudflow/examples/spark-sensors/)
[info] Streamlet 'sensors.MovingAverageSparklet' found
[info] Streamlet 'sensors.SparkConsoleEgress' found
[info] Streamlet 'sensors.SparkRandomGenDataIngress' found
[success] /cloudflow/examples/spark-sensors/src/main/blueprint/blueprint.conf verified.

The verification uses the schema information provided by the blueprint to check that all connections between streamlets are compatible. Once the blueprint verification succeeds, you know that the components of your streaming application can talk to each other. After verification, you can run the complete application.

The verification task is usually performed as a pre-requisite of other tasks, like runLocal or buildApp that we see next. While it’s possible to run it independently as shown here, it is most often used indirectly as part of those larger build tasks.

You can also use Akka Data Platform’s Lightbend Console to visualize data flows from a blueprint:

Cloudflow application
Figure 1. Cloudflow application visualization in Lightbend Console

In the screen shot above, you can see the dataflow from left to right. Each streamlet icon shows its service type--in this case Akka Streams or Spark Streaming—​and name. This simple example demonstrates the benefits of visualization over trying to create a mental model from configuration files and source code.

Test locally with the sandbox

Cloudflow comes with a local execution mode called sandbox. The sandbox instantiates all streamlets of an application’s blueprint with their connections in a single, local JVM.

See the sandbox in action in the following screencast.

Running a Cloudflow App Locally
$>sbt
sbt>runLocal
...
[info] Streamlet 'carly.aggregator.CallAggregatorConsoleEgress' found
[info] Streamlet 'carly.aggregator.CallRecordGeneratorIngress' found
[info] Streamlet 'carly.aggregator.CallStatsAggregator' found
[success] /cloudflow/examples/call-record-aggregator/call-record-pipeline/src/main/blueprint/blueprint.conf verified.
 ┌──────────────┐ ┌──────────────┐ ┌───────────┐
 │cdr-generator1│ │cdr-generator2│ │cdr-ingress│
 └───────┬──────┘ └──────┬───────┘ └─────┬─────┘
         │               │               │
         └─────────┐     │     ┌─────────┘
                   │     │     │
                   v     v     v
            ┌────────────────────────┐
            │[generated-call-records]│
            └────────────┬───────────┘
                         │
                         v
                      ┌─────┐
                      │split│
                      └─┬─┬─┘
                        │ │
                        │ └───────────┐
                        │             └┐
                        v              │
         ┌────────────────────┐        │
         │[valid-call-records]│        │
         └─────────┬──────────┘        │
                   │                   │
                   v                   │
           ┌──────────────┐            │
           │cdr-aggregator│            │
           └─┬────────────┘            │
             │                         │
             v                         v
 ┌───────────────────────┐ ┌──────────────────────┐
 │[aggregated-call-stats]│ │[invalid-call-records]│
 └────────────┬──────────┘ └───────┬──────────────┘
              │                    │
              v                    v
      ┌──────────────┐      ┌────────────┐
      │console-egress│      │error-egress│
      └──────────────┘      └────────────┘
---------------------------- Streamlets per project ----------------------------
spark-aggregation - output file: file:/tmp/cloudflow-local-run7107015168807762297/spark-aggregation-local.log

    cdr-aggregator [carly.aggregator.CallStatsAggregator]
    cdr-generator1 [carly.aggregator.CallRecordGeneratorIngress]
    cdr-generator2 [carly.aggregator.CallRecordGeneratorIngress]

akka-java-aggregation-output - output file: file:/tmp/cloudflow-local-run7107015168807762297/akka-java-aggregation-output-local.log

	console-egress [carly.output.AggregateRecordEgress]
	error-egress [carly.output.InvalidRecordEgress]

akka-cdr-ingestor - output file: file:/tmp/cloudflow-local-run7107015168807762297/akka-cdr-ingestor-local.log

	cdr-ingress [carly.ingestor.CallRecordIngress]
	- HTTP port [3000]
	split [carly.ingestor.CallRecordSplit]

--------------------------------------------------------------------------------

------------------------------------ Topics ------------------------------------
[aggregated-call-stats]
[generated-call-records]
[invalid-call-records]
[valid-call-records]
--------------------------------------------------------------------------------

----------------------------- Local Configuration -----------------------------
No local configuration provided.
--------------------------------------------------------------------------------

------------------------------------ Output ------------------------------------
Pipeline log output available in folder: /tmp/cloudflow-local-run7107015168807762297
--------------------------------------------------------------------------------

Running call-record-aggregator
To terminate, press [ENTER]

The sandbox provides you with a minimalistic operational version of the complete application. Use it to exercise the functionality of the application end-to-end and verify that it behaves as expected. You get a blazing fast feedback loop for the functionality you are developing—​removing the need to go through the full package, upload, deploy, and launch process on a remote cluster.

Packaging and publishing artifacts

Once you are confident that the application functions as expected, you can build and publish its artifacts. Cloudflow applications are packaged as Docker images that contain the necessary dependencies to run the different streamlets on their respective backends. Depending on the project structure, this process will generate one or more Docker images, at least one for each runtime used in the application.

Those images get published to the Docker repository of your choice, which must be configured in the project.

The process of building the application creates and publishes those Docker images. The final result is an application descriptor encoded as a JSON file that can be used with the kubectl Cloudflow plugin to deploy the application to a cluster.

The buildApp task in the sbt-cloudflow plugin loads, verifies, creates and publishes the Docker images, and finally generates the JSON descriptor that we can use to deploy the application. The following abridged example shows the different stages that you can observe in the output of this task.

Stages of Building a Cloudflow Application
$ sbt buildApp
... (1)
[info] Set current project to root (in build file:/cloudflow/examples/call-record-aggregator/)
[info] Streamlet 'carly.output.AggregateRecordEgress' found
[info] Streamlet 'carly.output.InvalidRecordEgress' found
[info] Streamlet 'carly.ingestor.CallRecordIngress' found
[info] Streamlet 'carly.ingestor.CallRecordSplit' found
[info] Streamlet 'carly.ingestor.CallRecordStreamingIngress' found
... (2)
[success] /cloudflow/examples/call-record-aggregator/call-record-pipeline/src/main/blueprint/blueprint.conf verified.
... (3)
[info] Sending build context to Docker daemon  51.97MB
[info] Step 1/4 : FROM lightbend/akka-base:2.0.10-cloudflow-akka-2.6.6-scala-2.12
[info]  ---> bbcdde34a60a
[info] Step 2/4 : USER 185
[info] Successfully built dc0b6c01246e
[info] Tagging image dc0b6c01246e with name: eu.gcr.io/my-gcp-project/akka-cdr-ingestor:475-2cd83d8
...
[info] Successfully built and published the following image:
[info]   eu.gcr.io/my-gcp-project/akka-cdr-ingestor:475-2cd83d8
[info] 57ceaacf8169: Pushed
... (4)
[success] Cloudflow application CR generated in /cloudflow/examples/call-record-aggregator/target/call-record-aggregator.json
[success] Use the following command to deploy the Cloudflow application:
[success] kubectl cloudflow deploy /cloudflow/examples/call-record-aggregator/target/call-record-aggregator.json
[success] Total time: 129 s (02:09), completed Jun 24, 2020 12:58:46 PM
1 Project gets loaded
2 The application is validated by combining the info in the blueprint, streamlets, and schema information.
3 The Docker images are built and published
4 The application descriptor is generated in the /target folder of the project and we get a hint of the command to use for initialing the deployment

Next, we see how to use the application descriptor to deploy the application to a cluster.

Deploy with kubectl extensions for a YAML-less experience

After testing and packaging, you are ready to deploy the application to a Kubernetes cluster—​either one your manage or a hosted cluster if you purchase through GCP. In contrast with the usual YAML-full experience that typical K8s deployments require, Cloudflow uses the blueprint information and the streamlet definitions to auto-generate an application deployment.

Cloudflow also comes with a kubectl plugin that augments the capabilities of your local kubectl installation to work with Cloudflow applications. You use your usual kubectl commands to auth against your target cluster. Then, with the kubectl cloudflow plugin you can deploy and manage a Cloudflow application as a single logical unit.

$ kubectl cloudflow deploy /cloudflow/examples/call-record-aggregator/target/call-record-aggregator.json

This method is not only developer friendly, but is also compatible with the typical CI/CD deployments. This allows you to take the application from dev to production in a controlled way.

Benefits summary

Cloudflow and Akka Data Pipelines give you a set of powerful tools to accelerate application development, deployment, and management:

  • The Streamlet API lets you focus on business value and use your knowledge of widely popular streaming runtimes, like Akka Streams, Apache Spark Structured Streaming, and Apache Flink to create full-fledged streaming applications.

  • The blueprint lets you easily compose your application with a verification phase, informed by schema definitions bringing peace of mind.

  • The sandbox lets you exercise a complete application in seconds, giving you a real-time feedback loop to speed up the debugging and validation phases.

  • The kubectl cloudflow plugin gives you the ability to deploy and control the lifecycle of your application on an enabled K8s cluster.

  • Akka Data Pipelines' Lightbend Console gives you visibility into the health and performance of your application.

Next, see how to set up your development environment.