Lightbend

© 2016-2019, Lightbend, Inc. All Rights Reserved.
Release Date: May 31, 2019
Last Doc Update: 2019-05-22 19:04:06 UTC

1. Introduction

Welcome to Lightbend Fast Data Platform, 2.1.1 for OpenShift.

Tip
  • This documentation is a single HTML page or PDF file. Use your browser or PDF reader to search for content.

  • For the latest documentation on Fast Data Platform for DC/OS, see here.

Fast Data Platform is an integrated suite of tools and services for building and running fast data (streaming) systems, either on-premise or hosted in the cloud. Fast Data Platform delivers value three ways:

  • An accelerated on-ramp for building streaming data systems: Configuration and installation tools to get you up and running quickly. New in this release is the incubating release of Pipelines, our new system for building, deploying, and managing streaming applications.

  • Expert guidance, so you pick the streaming components best suited for your project: Bundled sample applications, documentation, and access to Lightbend’s Developer Assist provide quick, expert answers to all your questions.

  • Intelligent monitoring and management: Assure cluster reliability at scale for peace of mind.

1.1. Introducing Lightbend Pipelines

New in 2.1.1 is Lightbend Pipelines, a unique, powerful system for building scalable, streaming-data pipelines composed of multi-component streamlets written in one or more streaming engines. Most streaming data solutions comprise multiple components, implemented in different technologies, which you have to integrate yourself. What you use for streaming ETL is different than what you use for aggregations or model serving. Yet these various components must act as a single service to other services. Pipelines supports streamlets written in Akka Streams and Spark Structured Streaming, transparently integrated together. Developer productivity is accelerated by eliminating much of the boilerplate required when writing such applications, especially when composing domain-logic streamlets into complete applications. The Pipelines runtime environment automates many tasks, such as deployment, routing data between streamlets, and ensuring data safety. A powerful, intuitive GUI visualizes the running pipelines with easy access to important health and performance metrics. Management is simplified by automatic handling of many failure scenarios and restarts.

1.2. Overview of Fast Data Platform

The 2.1.1 release for OpenShift includes the set of components and services that are divided into different levels of support:

  • Supported components are those for which Lightbend has committed to fixing problems encountered within the component as per Lightbend’s standard support and maintenance policy.

  • Certified components are those for which Lightbend or one of our partners has done testing for interoperability with Lightbend Fast Data Platform, but Lightbend disclaims all warranties and makes no promises about the component’s operation or production readiness. Support for the integration between a certified component and Lightbend Fast Data Platform will be provided on a "best effort" basis only. Support for the certified component itself may be available through a third-party.

  • Incubating components are those for which Lightbend his included as a candidate for full support or certified status. That is, we believe these components are sufficiently useful for customers that we are considering full support for them. In some cases, we may decide not to proceed with full support, but consider the component sufficiently robust for certified status. Hence, incubating is somewhat orthogonal to supported vs. certified.

  • Unsupported and Not Certified components are all others that aren’t covered in the previous categories. Customers may need to run such components, which is allowed on Fast Data Platform, because of its open nature. Lightbend will assist customers as is reasonable by standard Lightbend practices. However, Lightbend will not guarantee to fix platform bugs specific to unsupported or uncertified components and Lightbend will not commit to service level agreements (SLAs) for such components.

1.2.1. Platform Requirements

The Lightbend Fast Data Platform, 2.1.1 for OpenShift, requires RedHat OpenShift 3.11, which includes Kubernetes 1.11.

1.2.2. What’s Included in this Release

This release includes support for the Strimzi Kafka Operator (Kafka 2.0), the Kubernetes Operator for Apache Spark (Spark 2.4), Kafka Streams, and the Lightbend Reactive Platform version 2.0, which includes the open-source libraries Akka (especially Akka Streams), Play, and Lagom, as well as Lightbend Enterprise Suite for production monitoring and management of services and applications. Also included is our suite of sample applications running on OpenShift.

The 2.1.1 for OpenShift distribution is packaged in an archive called fdp-2.1.1-OpenShift.zip, which contains the complete set of Fast Data Platform documentation, in HTML and PDF formats. It is also available online at developer.lightbend.com.

The documentation discusses the platform and prerequisite tool requirements for Fast Data Platform and how to install them, followed by instructions for installing Fast Data Platform itself.

The following supported components are included in Fast Data Platform:

Table 1. Supported Components in Fast Data Platform 2.1.1 for OpenShift
Name Component Version1 Release Version2 Notes

Lightbend Pipelines

1.0.1

1.0.1

Lightbend system for building, deploying, and managaging streaming pipelines combining Akka Streams and Spark Structured Streaming.

Lightbend Reactive Platform

2.1

2.1

Lightbend open-source libraries for microservices, Akka, Play, and Lagom, and production tooling, including the Cinnamon library for monitoring and the provided by Lightbend Enterprise Suite console.

Apache Spark

2.4.3

2.4.3

Includes Spark Batch, Spark Streaming, Structured Streaming, SQL, and ML. SparkR and Spark GraphX are included, but not supported in Fast Data Platform. Built with Hadoop version 2.7.3. See Spark for details.

Kubernetes Operator for Apache Spark

2.4.3

v1beta1-0.8.2-2.4.3

Kubernetes Apache Spark operator. See The Kubernetes Operator for Apache Spark for details.

Spark History Server

2.4.3

2.4.3

Apache Kafka

2.1.0

2.1.0

Support for Apache Kafka, Kafka Connect API, Kafka Streams API. Possible support for additional Kafka-ecosystem libraries and tools is being evaluated

Strimzi

2.1.0

0.11.3

Kubernetes Operator for Apache Kafka. See Strimzi: The Kubernetes Operator for Apache Kafka for details.

  1. Baseline component version label.

  2. Corresponding operator or other version label.

Fast Data Platform and RedHat OpenShift are open platforms, allowing you to run additional components that aren’t supported by Lightbend. Some of them we use regularly, so we can certify that they work well in production scenarios with Fast Data Platform, although we do not offer production support for them, so use them with caution. Databases, in particular, can require nontrivial configuration tuning for optimal performance and reliability.

Table 2. Certified Components in Fast Data Platform 2.1.1 for OpenShift
Name Component Version Notes

Apache Flink

1.8.0

Large-scale, sophisticated streaming data system

Intel BigDL

0.5.0

A Spark-compatible deep learning library

InfluxDB

1.4.3

Used by our sample apps for storing time-series data

Grafana

4.6.2

Used by our sample apps for displaying time-series data

Cassandra

3.0.16

Used by our sample apps for storing aggregated data

Kubeflow

0.4.1

A popular system for implementing Machine Learning pipelines, including model training and model serving

More details about what’s new with the supported and certified components can be found in the Release Notes.

1.3. About This Documentation

This documentation is organized as follows:

All the documentation for version 2.1.1 for OpenShift is distributed with Fast Data Platform and is also available at https://developer.lightbend.com/docs/fast-data-platform/2.1.1-OpenShift//. The documentation for the latest release for OpenShift is available at https://developer.lightbend.com/docs/fast-data-platform/current/.

1.3.1. Conventions

We use a few conventions in these documents. For shell commands, we omit the prompt when just one or more commands is shown, for example:

ls -a $HOME

This make it easier to copy and paste the command! However, when output is also shown, we use $ as the prompt for the actual commands, so there’s a distinction between commands and output:

$ ls -a $HOME
.bashrc .bash_profile ...

1.4. Release History

Here is the release history for Fast Data Platform. Some patch releases are omitted. More details can be found in the release notes for each release:

Table 3. Release History of Fast Data Platform
Release Date Highlights

V2.1.1-OpenShift and V2.1.1-ICP4D

May 30, 2019

Upgrades to Strimzi and Spark Operator. Support for IBM Cloud Private for Data.

V2.1.0-OpenShift

April 5, 2019

Lightbend Pipelines. Numerous improvements to Strimzi and Spark Operator.

V2.0.2-OpenShift

March 1, 2019

Kubeflow added as certified component. Improved monitoring of Apache Flink jobs

V2.0.1-OpenShift

February 11, 2019

Kafka lag indicator dashboard in Lightbend Console, refinements to Spark Operator

V2.0.0-OpenShift

December 19, 2018

Fast Data Platform on Kubernetes-based, RedHat OpenShift

V1.3.2-DC/OS

January 11, 2019

DC/OS 1.12, Fast Data Platform Manager refinements, sample app open source, miscellaneous bug fixes

V1.2.0-DC/OS

June 29, 2018

Production hardening, bug fixes and enhancements throughout the product

V1.1.0-DC/OS

February 23, 2018

Kafka 1.0, Hive on Spark Incubating, general improvements

V1.0.3-DC/OS

January 18, 2018

Bug fixes in sample applications and various robustness improvements

V1.0.0-DC/OS

October 30, 2017

General availability, production ready

Interested in trying the current release? Sign up here or contact your Lightbend account manager.

2. Release Notes

These release notes discuss the recent enhancements and the known limitations of the supported and certified components included with Fast Data Platform. (For definitions of those terms, see Overview of Fast Data Platform.)

2.1. Tips for Installation of Fast Data Platform Components

2.2. Spark

This release contains a number of refinements to Spark. We also discuss known limitations.

2.2.1. Current Status of Spark on Kubernetes

This release of Fast Data Platform comes with a Lightbend custom build of the Google-sponsored Spark Operator, based on the Spark release 2.4.3.

Spark 2.3 introduced Kubernetes support, but it was significantly rewritten for Spark 2.4. Hence, the Kubernetes support is not as a mature and full-featured, compared to Spark on Mesos and YARN. See the Spark Applications section in the Running Custom Services and Applications for details.

2.2.2. SparkR and PySpark

Fast Data Platform currently does not provide production support for the R interface to Spark, SparkR. Support for the Python interface, PySpark, is currently limited to best effort. We recommend the Scala, Java, and SQL interfaces.

2.2.3. Support for Spark GraphX

While GraphX has been a part of the Spark distribution for a long time, it is no longer well supported by the Spark community. For example, it is still based on the older RDD API at its core, while the rest of the Spark components are migrating to the newer Dataset core. Hence, we do not provide production support for GraphX, although it will remain a part of the Fast Data Platform Spark distribution.

2.2.4. Spark Streaming Can Crash When Used with Kafka

A concurrency bug in Spark Streaming before release 2.4 can cause your jobs to crash when using Kafka in certain contexts (see SPARK-19185). The bug is related to caching of Kafka Consumers. However, in Spark 2.2.X and 2.3.X, you can disable this caching as a temporary workaround, and this bug is fixed as of Spark 2.4.0. Nevertheless, keep this issue in mind even when using Spark 2.4, in case the bug is not completely resolved.

See this Apache Spark documentation for more details.

2.2.5. Continuous Processing Mode for Spark Structured Streaming

As described in Spark Streaming Best Practices, there are two runtime modes for Structured Streaming, micro-batch and continuous. The latter is still considered experimental. Hence, it is not supported by Lightbend for production applications.

3. Before You Install: Cluster Recommendations and Prerequisites

Lightbend Fast Data Platform is installed into a OpenShift cluster. This chapter discusses where to get information about setting up OpenShift clusters, how to determine hardware sizing and software configuration requirements, and what additional prerequisites are required before installing Fast Data Platform.

We’ll assume physical hardware when discussing sizing. If you are deploying into a cloud environment, use the descriptions of your cloud provider’s image configurations to choose appropriate analogs.

If you are new to OpenShift and Kubernetes, see Appendix: Kubernetes Concepts for a quick overview.

3.1. Planning the Cluster

There are many variables that need to be considered when planning the cluster. We discuss many of the considerations. For more comprehensive information, OpenShift provides several resources to aid the planning process:

A production deployment requires numerous services and resources, some of which may be installed in the cluster or managed externally. Examples include Docker Registry and version control. You can install these services inside the cluster, in which case you’ll need to allocate hardware resources for them, but we recommend separate locations, so it’s easier to do cluster administration, such as upgrading OpenShift, without impacting these services.

The availability requirements of the deployed applications also significantly impacts sizing. Highly-available clusters require redundancy, e.g., at least three Kafka brokers. Furthermore, resiliency can be achieved by replicating services across data centers and with cloud vendor availability zones.

A single organization may run multiple clusters instead of one large, all encompassing cluster. Highly available systems may have multiple production clusters in multiple data centers. Separate clusters for staging, inspection, and final testing before production deployment need to be close to a production-like environment. In fact, this need is often handled by using the production cluster with segmentation of data from actual production workloads.

On the other end of the spectrum are development clusters, where data loss is acceptable and therefore single instances of critical services can be sufficient. These clusters should be used to test multi-node integration and end-to-end testing before releasing to the pre-production testing environment. These clusters don’t need to be replicants of the production environment. These clusters may even be short lived, such as a cluster with a new version of OpenShift to test for compatibility.

You will want to control access to your cluster, consider using private clusters, for example as described in this Google Cloud documentation. This setup will typically use a front-end load balancer, such a node running HAProxy in front of the cluster master nodes. On AWS this might be an ELB instance.

This proxy load balancer will be the target for the oc commands you use to administer the cluster and submit applications. For single-master test clusters, a VPN into the cluster private network can be used to directly target the single master. You will also need public proxy nodes and/or load balancers for services you intend to make available.

Building the cluster itself utilizes a bootstrap or builder node from which one runs the installation. This node can also serve as a bastion for private network access.

An OpenShift cluster will need storage for Docker and DNS services. Build services, such as Jenkins, can be installed in the cluster for CI/CD processes.

Making application services available to users requires additional ingress resources. Nodes with HAProxy or load balancers are used to proxy ingress. Infrastructure nodes, possibly with public IP addresses, can be utilized to segment user services from infrastructure services. X.509 Certificates will be required for secure, TLS connections. Clusters with public endpoints will also require public IP addresses and DNS records. Sub-domains can be utilized to off-load DNS record management, particularly useful when new ingress routes will be created and removed more often.

Cluster monitoring and log aggregation are critical for successful cluster administration. They also require sufficient resources.

Note

Lightbend strongly recommends that tools for metrics and log aggregation be installed in the cluster or available as external services. They should provide appropriate query and presentation capabilities, essential for isolating and debugging problems.

Time-series data storage is required for logging and metric data, such as Lightbend Telemetry (a.k.a. Cinnamon library). Lightbend Console uses Prometheus for data storage and Grafana for dashboards and services. Lightbend Console also ingests OpenShift cluster metrics.

An alternative is the EFK stack, which uses Fluentd to aggregate event logs into Elasticsearch.

If you need a log management solution, Lightbend recommends Humio. It can store events locally in the cluster or in the Humio cloud with live dashboards that provide very flexible, powerful, and fast query capabilities. Contact Humio for licensing information and for OpenShift-specific installation instructions.

3.1.1. OpenShift Versions Supported

Lightbend Fast Data Platform requires RedHat OpenShift 3.11, which includes Kubernetes 1.11.

3.1.2. Operating Systems Supported

Because Fast Data Platform runs on RedHat OpenShift, it requires using one of the operating systems currently supported by OpenShift:

  • Red Hat Enterprise Linux (RHEL) 7.4 or 7.5 with the latest packages from the Extras channel

See Operating System Requirements in the OpenShift documentation for further details.

3.1.3. Storage

Persistent Storage is a critical aspect of planning your cluster, such as choosing the most appropriate kinds of storage for stateful services like Kafka, and properly sizing them. Some of the kinds of data you might need to store include the following:

  • Application data requiring long-term storage

  • Transient data, such as messages in Kafka topics between processing services and Spark "checkpoints"

  • Docker images and other build artifacts

  • Logs and metrics

As a general rule, clusters require multiple types and classes of storage. These OpenShift examples are informative. OpenShift can also provide ephemeral local storage for pods. See also the OpenShift Persistent Storage and Configuring Docker Storage.

The cluster can utilize SAN or other centralized storage solutions, particularly with fast I/O, less must be provided for by the cluster itself. The Hadoop Distributed File System (HDFS) is a commonly used off-cluster storage location.

See the Managing Storage for more information about the commonly-used storage types.

How much storage a cluster requires is driven by storage needs of the applications and services. This is discussed in Kafka Broker Disks. For Lightbend Console, see its documentation on default values, which lists the default Persistent Volume sizes it uses and how to modify those values.

3.1.4. Network

Network performance is critical to the health of the cluster. The fastest available network should be utilized for best results.

3.1.5. Security

There are many considerations for securing a cluster. Start at the OpenShift documentation, Container Security Guide, for a good introduction to the domain. The resource site Kubernetes Security provides further details.

The OpenShift reference architectures should also be followed, where applicable. These architectures are maintained by the OpenShift architecture team and document best practices.

We recommend putting all clusters inside a private network, for example as described here for Google Cloud. This provides better control over public access to the cluster. Administrator access to the master control plane can be provided using an external load balancer.

We also recommend encrypting the cluster secret storage mechanism. By default, secrets, such as passwords and API keys, are base64 encoded in etcd. Third-Party stores, such as Vault and Conjur, are solutions designed for secure secret storage.

3.2. Cluster Node Types and Recommended Configurations

Several types of nodes, serving different roles, are defined for a cluster. Here we discuss their roles and resource recommendations for production configurations. For non-production clusters, downsize as you see fit, based on anticipated loads, whether or not redundancy and failover capabilities are needed, etc.

The following resources provide valuable background information, which we summarize in this documentation:

3.2.1. Bootstrap Node

A single bootstrap node is often used as a staging location for building and maintaining the cluster. This is the host where you install and run Ansible. It can also be used as a jumpbox for accessing the nodes, such as via SSH.

The bootstrap node does not require a lot of resources:

Table 4. Bootstrap Resource Recommendations
Resource Amount

Processor

4 cores

Memory

16 GB RAM

Hard disk

60 GB

However, you may want to allocate additional resources, such as disk space for storing a larger libraries of playbooks and source images. See Preparing your hosts in the OpenShift documentation for further info.

3.2.2. OpenShift Master Nodes

Master nodes run services that manage cluster resources to form the control plane. These include the API server, controller manager server, and etcd.

For non-critical clusters, such as for development and testing, one master that also runs etcd can be sufficient. Production clusters should have at least three masters, for failover. Review the OpenShift environment scenarios for more information.

Here are our minimum recommendations:

Table 5. Master Node Resource Recommendations
Resource Amount

Node Count

1 (development/test), 3 or 5 (production)

Processor

4 cores/node

Memory

32GB RAM

Disk Space

120GB Solid-state drive (SSD)

Note that etcd frequently performs small amounts of storage input and output. Using SSDs with etcd storage is highly recommended to minimize I/O latency.

It is strongly recommended to configure durable storage using RAID. The RAID controllers should be configured with a BBU and cache configured in writeback mode.

3.2.3. General-Purpose Worker Nodes

General-purpose Nodes are where most of your storage and compute activities will occur. Worker nodes have more diverse requirements, depending on the kinds of services and workloads run on them.

The following configuration is suitable for general-purpose applications and infrastructure nodes.

Table 6. General-Purpose Node Resource Recommendations
Resource Amount Discussion

Nodes

6 or more

Higher for busier clusters

Processor

2 cores/node

Much higher for compute-intensive jobs (e.g., Spark)

Memory

16 GB RAM

Much higher for memory-intensive jobs (e.g., SQL joins)

Hard disk

60 GB+

Much higher for local caching of data, such as state in Spark streaming jobs, or GlusterFS 'converged mode' nodes.

Tip

Use enough nodes to have at least 35-40 cores if you install all Fast Data Platform components. For smaller non-production clusters only install the services your really need.

3.2.4. Infrastructure Nodes

For production environments, dedicated infrastructure nodes are recommended to keep Registry and router pods on dedicated nodes.

3.2.5. Specialized Compute Nodes

Node ConfigMaps and host labels can be used to further determine the placement of pods by the scheduler. For example, some high-storage nodes would be best for databases. However, segmenting of available nodes have some disadvantages. In particular, each segment requires some headroom to reschedule pods in the event of failure. Specialization of the nodes reduces the schedulers ability to make use of available resources across the cluster.

3.2.6. Kafka Brokers

Kafka brokers are the master services in a Kafka cluster. The ideal number of Kafka broker nodes depends on the number of concurrent producers and consumers and the data rates at which they write and read data, respectively. Also, more brokers means more resiliency in the case of node failures. So, expect the number of Kafka brokers to grow as use of Kafka itself grows.

Here are some "rule of thumb" recommendations. How to calculate specific needs for your environment is discussed next.

Table 7. Kafka Broker Resource Recommendations
Resource Amount

Nodes

5 or more

Processor

4-16+ cores

Memory

32-64 GB

Hard Disk

1+TB (SSDs recommended)

Note

Brokers are not CPU-bound under most production workloads. When TLS is used within the cluster and for client connections, then CPU will trend higher to support the requisite crypto operations.

Kafka Broker Disks

Because Kafka persists messages to log segments files, excellent disk I/O performance is a requirement. Here are some considerations.

Writing Kafka logs to multiple disks improves performance through parallelism. Since Kafka requires that an entire partition fit on a single disk, this is an upper bound on the amount of data that can be stored by Kafka for a given topic partition. Hence, the number of topics, the number of partitions per topic, the size of the records, etc. all affect the log retention limits. So, if you have a high volume partition, allocate it to a drive dedicated exclusively to it.

Also be sure to allocate enough disk space cluster-wide for partition replicas, for resilience.

Our own testing using Kafka with SSD drives demonstrates that write performance around 1.2GB/sec plus or minus 400MB/sec is easily achievable, staying above 1.0GB/sec most of the time. These results are similar to those reported by the Sangrenel project for Kafka performance testing.

Obviously, such rates can fill up any drive quickly, causing potential data loss and other problems. Based on the anticipated data rate, you can calculate how much disk space may be required and how to trade off the following variables to meet your needs and keep Kafka healthy:

  • Disk size - Multi-TB hard drives have more capacity compared to SSDs, but trade off read and write performance.

  • Number of partitions - Using more partitions increases throughput per topic through parallelism. It also allows more drives to be used, one per partition, which increases capacity. Note that Kafka only preserves message order per partition, not over a whole, multi-partition topic. Topic messages can be partitioned randomly or by hashing on a key.

  • Replication factor - The replication factor provides resiliency for a partition. Replicas are spread across available brokers, therefore your replication factor is always <= the number of brokers in your cluster. It must be accounted for when determining disk requirements because there is no difference between how much space a leader partition and a replica partition uses. The total number of partitions for a given a topic is the number of partitions * the replication factor.

  • Topic granularity - Consider using fine-grained topics, rather than more partitions, when each client needs to see all messages in a topic, in order, to track the evolving state encapsulated in those messages. This is a balance between supporting high volume workloads where more partitions offer an advantage or application message bus workloads where topics may describe very specific entities within the domain and topic granularity would be ideal. However, keep in mind that many applications only require strict ordering by key (for example, the activity for customer accounts, where the account id is the key). If the topic is partitioned by hashing on the key, then this requirement will be met, even without total ordering over the entire topic.

  • Retention policy - The longer the retention policy, the more disk space required. For high-volume topics, consider moving the data to downstream, longer-term storage as quickly as possible and use a more aggressive retention policy. An alternative to fixed time or partition size retention is compaction. Compaction only works with keyed messages. Messages produced with the same key as a previous message in the partition will "update" the value associated with that key. Consumers will only see the latest version of the message when they consume the partition. The log cleaner process will "compact" the log on a regular basis and as a result reclaim disk space. Compaction has compelling use cases but is not suitable in situations where you want a clean immutable event log. See the Apache Kafka documentation on Compaction for more details.

  • Dedicated drives - Using dedicated drives for the Kafka brokers not only improves performance, by removing other activity from the drive, but should the drive become full, then other services that need drive space will not be adversely affected.

  • Do not use RAID for fault tolerance. Kafka provides fault tolerance guarantees at the application layer (with topic replication factor) making fault tolerance at the hard disk level redundant.

  • We’ve observed that broker disk quota settings appear to be ignored. Hence, this mechanism doesn’t prevent Kafka from filling a drive.

Warning

Kafka does not degrade gracefully if it runs out of disk space. Make sure your sizing estimates are generous, monitor disk utilization, and take corrective action well before disk exhaustion occurs! In a default deployment Kafka can consume all disk space available on the disks it accesses, which not only causes Kafka to fail, but also the other applications and services on the node.

To estimate disk requirements for your need, per broker, begin by estimating the projected usage. Here is an example:

Table 8. Kafka Broker Disk Considerations
Quantity Example Discussion

Average topic message throughput (MT)

1000 msg/sec

For each topic, how many messages on average per unit time?

Average message size (MS)

10 KB

Note that Kafka works best with messages under 1MB in size.

Broker message retention length (RL)

1 week (168 hours or 604800 seconds)

This could also be a size.

Number of topics (NT)

1

Average number of topic partitions (NP)

3

Or do a more specific calculation summing each topics number of partitions.

Average replication factor (NR)

3

Or do a more specific summarization

Number of Kafka Brokers (KB)

9

With these variables, the estimated disk size per broker is calculated as follows:

Disk per broker = ( (MT / NP) * MS * RL * NT * NP * NR ) / KB

Which can be simplified to this:

Disk per broker (simplified) = ( MT * MS * RL * NT * NR ) / KB

Using our example numbers:

( 1000 msg/s * 10KB * 1 week * 1 topic * 3 replicas ) / 9 brokers
( 10 MB/s * 604800 seconds * 3 replicas ) / 9 brokers
18.81 TB / 9 brokers = ~2.1 TB per broker.

This calculation is just an estimate. It assumes partitions are the same size and evenly distributed across all brokers. While Kafka will attempt to assign partitions evenly at topic creation time, it’s extremely likely that some partitions will end up being larger than others if a non-trivial partitioning strategy is used. In this case you will likely need to rebalance partitions at some point (an attempt to evenly distribute partitions across available brokers). It’s important to pad your estimate to account for this reality, as well as to accommodate future growth.

Let’s discuss estimating retention time. If you already have servers with fixed disks, you could modify the formula and choose to solve for a maximum time-based message retention length.

Starting with the previous approximate value for the disk per broker (DB): 2 TB (rounding down…), the maximum retention time is as follows:

Max retention time = ( DB * KB ) / ( (MT / NP) * MS * NT * NP * NR )

Which can be simplified to this:

Max retention time (simplified) = ( DB * KB ) / ( MT * MS * NT * NR )

Using our example numbers:

(2 TB * 9 brokers) / ( 1000 msg/s * 10 KB * 1 topic * 3 replicas)
18 TB / ( 10 MB/s * 1 topic * 3 replicas)
18 TB / 30 MB/s = 600000 seconds = ~167 hours = ~1 week
Kafka Broker Memory

Server memory is the most important resource for a broker to be able to support a high volume and low latency workload. The broker runs as a JVM process and does not require much system memory itself. In fact our default broker configuration allocates a maximum of 5GB of JVM heap space. However, the reason a lot of memory is ideal for a broker is because it will utilize the operating system’s Page Cache to store recently-used data.

Page Cache (aka Disk Cache) is memory allocated by the operating system to be used to store pages of data that were recently read from or written to disk. Broker’s perform an operation called a zero-copy transfer to efficiently store recently produced messages in the page cache so it can be read quickly by low latency consumers. Zero-copy is what gives Kafka such low latency characteristics. Kafka messages are also stored in a standardized binary message format that is shared between the producer, broker, and consumer. This lets data be transferred to each participant without any modification required. What all this means is that under normal workloads produced messages are written once to Page Cache (real memory) and made available to consumers immediately without any mutation, serialization, byte-copying, or other overhead costs.

The efficient use of Page Cache allows brokers to swamp downstream consumers with more data than can be sent over the network, making network throughput the limiting performance factor. Writing Page Cache to disk is performed by the operating system and not the broker. If Page Cache flushes do not occur frequently enough then there’s a risk that the Page Cache will get full and as a result, block writes to disk, which creates a big problem in terms of write latency. Make sure your operating system is configured to flush Page Cache to disk fast and efficiently. See the Kafka broker Operating System section for guidance on configuring your operating system’s behavior when writing Page Cache to disk.

To support the efficient use of Page Cache it’s important to allocate a respectable amount of memory to broker processors over and above what is allocated to the broker JVM process heap space. For most production workloads consider starting with the minimum memory requirements defined in the Kafka Broker section, 32GB.

See the Apache Kafka documentation’s sections on Persistence, Efficiency, and Understanding Linux OS Flush Behaviour sections for more information about how brokers utilize disk, zero-copy, Page Cache, and system memory.

Changing Broker JVM Heap Space

The Apache Kafka distribution by default uses a maximum of 1GB of heap space out of the box, which should be more than enough for local development and testing purposes. For production, it’s recommended to start with 5GB and to tune that number based on monitoring of its utilization. If there’s not enough heap space for the broker, then you will observe JVM Out Of Memory exceptions in the broker logs and the broker will fail. The most likely reason for running out of memory is if you have an above-average number of partitions or larger message size than normal. The more partitions you have the more messages can be produced to or consumed from Kafka in parallel. The maximum size of messages (defined by the broker property message.max.bytes, default 1MB) will result in larger message representations in the broker JVM.

You can define the amount of heap space assigned to each broker at OpenShift package install time using the YAML configuration file discussed in Creating a Kafka Cluster.

Broker Operating System Configuration

Kafka relies on the operating system configuration related to virtual memory and files to function efficiently. On Linux operating systems these settings are applied with sysctl and made permanent when persisted in /etc/sysctl.conf.

Virtual Memory Swapping

Virtual memory swapping should be tuned to occur rarely. Physical memory is so cheap today that swapping memory to disk is usually unnecessary and incurs significant latencies. Hence, it should be avoided. However, most operating systems still have the option with a default setting that is not ideal for Kafka brokers. Virtual memory swapping on Linux is configured with vm.swappiness. Usually, it has a default setting of 60%. The higher this number the more aggressively the system will swap to disk to conserve space in memory. It’s recommended to set your vm.swappiness setting to 1 which instructs your system to do the minimum amount of swapping, such as a last resort effort to avoid an Out Of Memory condition. If you observe your broker swapping to disk regularly, it is a sign you should evaluate this setting, related virtual memory settings, and consider adding more brokers or adding more physical memory to existing broker nodes.

Page Flushing Configuration, i.e., Writing Page Cache to Disk

The importance of the Page Cache in Kafka was described in the Broker Memory section above. Due to the broker’s heavy reliance on the Page Cache, it’s important to optimize how frequently the system writes "dirty" pages (i.e., pages updated in memory but not yet written to disk). The vm.dirty_background_ratio setting is the percentage of memory that can contain dirty pages before the system will trigger asynchronous operations to persist them to disk. Most Linux systems generally set this to 10% (10), which is reasonable for most workloads, but it would be wise to use 5 or lower for Kafka broker nodes. The higher this value, the higher the risk of losing unpersisted data in the event of failure. Therefore your requirements about message durability will help guide you on how low to set this configuration.

The vm.dirty_ratio setting is crucial for low latency workloads. This setting determines the maximum percentage of memory occupied by dirty pages before the system is forced to synchronously write pages to disk. When the system reaches this limit, all subsequent writes to the Page Cache will be blocked, which has obvious implications for the latency of your messages. Most Linux systems generally set this to 20% by default, which is too low for most production workloads. Consider setting this to 50% (50) as a starting point.

File Descriptor Limits

By default, many Linux operating systems configure a conservative number of allowed open file descriptors. This setting controls how many file references the operating system can keep active at any given time. Brokers make efficient use of the file system, but require a large number of open file descriptors to accommodate references to log segment files, log index files, and log time index files for all topic partitions that the broker contains. There are few consequences of increasing this limit. It’s recommended to define a limit of 100000 as a starting point and monitor broker and system logs in case this limit is met. The file descriptor limit is configured with fs.file-max and generally has a default setting of less than 10000.

Use the following to calculate roughly how many open file handles your broker may have. Be sure to round up this number to accommodate for system processes, socket connections (they each require a descriptor too!), and any other tasks that may be running on the same node.

(Average Number of Partitions) * (Partition Size/Log Segment File Size)

Consult the Apache Kafka documentation’s Hardware and OS section for more details. To learn more about tuning virtual memory configuration in Linux consult this Wikipedia article on Swappiness and read these posts on The Lone Sysadmin blog: Better Linux Disk Caching & Performance with vm.dirty_ratio & vm.dirty_background_ratio, Adjust vm.swappiness to Avoid Unneeded Disk I/O.

3.3. Installing OpenShift

For installing RedHat OpenShift, a good place to start is Install OpenShift Container Platform.

The GitHub OpenShift-Ansible repo contains Ansible roles and playbooks to install, upgrade, and manage OpenShift clusters.

The installation and management of a capable, multi-node cluster requires the execution of Ansible from the Bootstrap Node. The installation of production-like cluster is thus specific to the target environment. OpenShift reference architectures should be observed. On Amazon Web Services, the OpenShift Quick Start provides a nice reference build with minimal effort.

Clusters created using oc cluster up and Minishift are suitable for development tasks only.

3.4. Installing Other Software Prerequisites

The Fast Data Platform leverages several other tools that must be installed beforehand. They include oc (see OpenShift CLI Tools for Working with Clusters), Minishift (see Prototyping Your Applications with Minishift), and Helm (see About Helm), which we discuss next.

3.4.1. OpenShift CLI Tools for Working with Clusters

In OpenShift, the basic idea is to declare the state of anything related to the cluster (services, policies etc) and let OpenShift make sure that state is satisfied. In order for this to happen, OpenShift describes state with a set of objects. Manipulation of these objects is done via API Server calls.

OpenShift provides a convenient CLI tool called oc to deploy and manage objects, to talk to your cluster to acquire useful information, and to perform administration operations.

The detailed instructions for installing and using oc can be found at this OpenShift documentation. Here we provide a summary of the steps.

  1. Obtain a user name and password for your cluster and the URL for the cluster’s OpenShift GUI from your administrator.

  2. Log into the GUI.

  3. In the upper right is a question mark next to a downward-pointing arrow.

  4. Click the arrow to open a menu and select the About entry.

  5. On the About page, click the Command Line Tools link.

  6. Click the link Latest Release to download oc.

  7. Follow the rest of the instructions on this page to install and configure oc.

3.4.2. Prototyping Your Applications with Minishift

If you are developing applications, it is convenient to use Minishift or you can also work directly with your OpenShift cluster.

Minishift is much more convenient for development, because it runs a "tiny" OpenShift cluster on your workstation, which can be much more efficient to use than working with a regular cluster. Minishift is the analog of Kubernetes' Minikube.

However, keep in mind that some things can’t be reproduced and tested with Minishift, since it is a single-node cluster.

Follow the installation instructions here to install Minishift.

Some issues you might encounter are mentioned in these instructions where the might occur. See also the Troubleshooting Minishift section below.

For an alternative to Minishift, see Appendix: "OC Cluster Up".

Minishift Setup Detailed Instructions

Let’s look at an example for installing Minishift on Ubuntu.

First, check if your machine has VM support:

egrep '(vmx|svm)' /proc/cpuinfo

You should see vmx or svm flags listed in the list of output flags.

Set Up the KVM Driver:

sudo curl -L https://github.com/dhiltgen/docker-machine-kvm/releases/download/v0.7.0/docker-machine-driver-kvm -o /usr/local/bin/docker-machine-driver-kvm
sudo chmod +x /usr/local/bin/docker-machine-driver-kvm

sudo apt install libvirt-bin qemu-kvm
sudo usermod -a -G libvirtd $USER
sudo newgrp libvirtd

Make sure you have your kvm module loaded:

$ lsmod | egrep 'kvm'

You should see something like the following:

kvm_intel             212992  4
kvm                   598016  1 kvm_intel
irqbypass              16384  1 kvm
Note

The group name varies, as on 18.04 the group is named libvirt while on 16.04 libvrtd. You can always check your groups with: compgen -g.

Download the latest Minishift version from https://github.com/minishift/minishiftreleases.

Start Minishift, but note that you may need to adjust the settings to run applications like Spark that can take a lot of memory. See the documentation for details.

./minishift start

You’ll see a lot of console output, like this with a lot of it elided (for OpenShift 3.10):

-- Starting profile 'minishift'
-- Check if deprecated options are used ... OK
-- Checking if https://github.com is reachable ... OK
-- Checking if requested OpenShift version 'v3.10.0' is valid ... OK
...
-- Minishift VM will be configured with ...
   Memory:    4 GB
   vCPUs :    2
   Disk size: 20 GB

   Downloading ISO 'https://github.com/minishift/minishift-centos-iso/releases/download/v1.12.0/minishift-centos7.iso'
 344.00 MiB / 344.00 MiB [=== ... ===] 100.00% 0s
-- Starting Minishift VM ................ OK
...
-- OpenShift cluster will be configured with ...
   Version: v3.10.0
-- Pulling the OpenShift Container Image .......... ... ... OK
-- Copying oc binary from the OpenShift container image to VM ... OK
-- Starting OpenShift cluster .............. ... ...
Getting a Docker client ...
...
Login to server ...
Creating initial project "myproject" ...
Server Information ...
OpenShift server started.

The server is accessible via web console at:
    https://192.168.42.164:8443

You are logged in as:
    User:     developer
    Password: <any value>

To login as administrator:
    oc login -u system:admin

-- Exporting of OpenShift images is occuring in background process with pid 19075.

Now you can access the OpenShift Web console at https://192.168.42.164:8443 on your machine. The GUI is the same as the one shown by oc cluster up.

Setup your oc CLI command:

./minishift oc-env

It outputs:

export PATH="`.../.minishift/cache/oc/v3.11/linux:$PATH"
# Run this command to configure your shell:
# eval $(minishift oc-env)

So, run the command shown in the comment:

$ eval $(minishift oc-env)

Now run these commands to log in and set your "project":

oc login -u system:admin
oc project default

On public clouds like AWS, it is not possible to use KVM. So, when starting Minishift, you will get this error:

-- Starting Minishift VM ....... FAIL E0921 12:01:44.205290    4802 start.go:437] Error starting the VM: Error creating the VM. Error creating machine: Error in driver during machine creation: [Code-8] [Domain-44] invalid argument: could not find capabilities for domaintype=kvm . Retrying.
Error starting the VM: Error creating the VM. Error creating machine: Error in driver during machine creation: [Code-8] [Domain-44] invalid argument: could not find capabilities for domaintype=kvm

If you encounter this issue, see this experimental feature, Install Minishift on AWS ec2. A related PR has been merged into the Minishift code base and will appear in a subsequent release: https://github.com/minishift/minishiftpull/2422.

Accessing the Docker Daemon

You can always test and build your images directly via the Docker Daemon that runs in Minishift. For more information see this RedHat documentation.

Starting from Scratch

In case you face issues with Minishift and the VM is not recoverable you can always launch Minishift from scratch as follows.

First, delete the current running Minishift instance run:

./minishift delete

Check if the VM is still running:

sudo virsh list --all

If it is running, stop the Minishift VM as follows:

sudo virsh destroy minishift

Delete the VM:

sudo virsh undefine minishift

Remove everything under the Minishift directory:

rm -rf ~/.minishift
Troubleshooting Minishift

When you run Spark jobs in Minishift (as discussed later in this chapter), if your executors never transition out of the "pending" state, you probably don’t have enough resources. The examples suggest adjustments with parameters passed to Spark. Or, consider restarting Minishift with more resources by passing the following flags with larger values than before:

--cpus=4 --memory=8192

You may face issues accessing the Internet from within Minishift. If so, one option for Ubuntu is to make sure that /run/systemd/resolve/resolv.conf matches /etc/resolv.conf. In general, kubedns uses your host DNS to make things work properly.

If you run in a namespace other than the default, use the following command before launching your jobs:

oc adm policy add-scc-to-user anyuid -z spark-sa -n spark

Otherwise, you may hit this issue. For more information on security contexts for the containers on OpenShift check this article.

3.4.3. About Helm

Helm is a popular Kubernetes package manager. It allows you to specify how to install services and applications along with their dependencies. You write a Helm chart, a YAML file that specifies the details, and use the Tiller service to do the installation

Warning

The same version of Helm and Tiller have to be installed on the cluster and on all workstations using the helm command to manage installations.

Note

The forthcoming v3.0 release of Helm will replace Tiller with an alternative approach that addresses known security vulnerabilities in Tiller.

To learn more about Helm and Helm charts, see the very comprehensive Helm documentation.

Fast Data Platform uses Helm for deploying Spark, Kafka, and other services and applications.

Install Helm

Installing Helm is fairly straightforward. Follow this documentation for installing Helm. However, there are some differences depending on whether you are installing the Helm client on your local development machine or the Tiller service in the OpenShift cluster or installing .

Install the Helm Client

Install the helm client based on instructions for the local platform you run from the Helm documentation, Installing Helm.

Do not attempt to initialize the Tiller server with helm init. Depending on your platform you may need to perform additional steps as indicated in the next section.

Install the Helm Server, Tiller

The Helm Tiller server requires additional RBAC security role considerations when being installed in OpenShift. These instructions are based on those provided in an OpenShift blog post Getting started with Helm on OpenShift.

Warning

The referenced OpenShift template will create a ClusterRoleBinding of the cluster-admin role to the tiller service account used to run tiller. Ideally this would not be required, but if you install any charts that also require the installation of a ClusterRoleBinding, then cluster-admin is required. In future versions of Helm Tiller will no longer be required and will depend on the roles associated with the current user doing an install.

Create a new project, i.e., a Kubernetes namespace, for deploying to Tiller. Here we’ll use the project name tiller, where $OPENSHIFT_HOST is the access host for your cluster:

$ oc new-project tiller
Now using project "tiller" on server "https://$OPENSHIFT_HOST/".
...

Set up parameters for the tiller OpenShift deployment. TILLER_NAMESPACE should be the project you created in the previous step (e.g., tiller). Assign the most recent version of Helm you installed to HELM_VERSION. At the time of this writing, 2.14.0 is the latest version.

export TILLER_NAMESPACE=tiller
export HELM_VERSION=v2.14.0

Create and apply the Lightbend Tiller template with the parameters specified in the previous step.

$ oc process -f https://developer.lightbend.com/docs/fast-data-platform/2.1.1-OpenShift/resources/helm-tiller-template.yaml \
  -p TILLER_NAMESPACE="${TILLER_NAMESPACE}" \
  -p HELM_VERSION=${HELM_VERSION} | oc create -f -
serviceaccount "tiller" created
role.authorization.openshift.io "tiller" created
rolebinding.authorization.openshift.io "tiller" created
clusterrolebinding.authorization.openshift.io "tiller-clusterrolebinding" created
deployment.extensions "tiller" created

Wait for tiller deployment to complete. This is a blocking operation.

$ oc rollout status deployment tiller
Waiting for rollout to finish: 0 of 1 updated replicas are available...
deployment "tiller" successfully rolled out
$ oc get po --namespace=tiller
NAME                      READY     STATUS    RESTARTS   AGE
tiller-<id>-4sxlt         1/1       Running   0          8m
Add the Lightbend Helm Chart Repo

Lightbend Helm charts are published to our own repo, which you will need to add. Here we use the name lightbend. This name will be used elsewhere in the documentation when installing applications and services. If you use a different name, change those commands as appropriate.

helm repo add lightbend https://repo.lightbend.com/helm-charts
helm repo update
Test Your Helm Installation

Verify that your helm client is able to connect to the tiller server

$ helm version
Client: &version.Version{SemVer:"v2.14.0", GitCommit:"...", GitTreeState:"clean"}
Server: &version.Version{SemVer:"v2.14.0", GitCommit:"...", GitTreeState:"clean"}

Using a client installation, check the helm installation with a smoke test:

helm create mychart
rm -rf mychart/templates/*.*
touch mychart/templates/ConfigMap.yaml

Add the following to the ConfigMap.yaml file:

GNU nano 2.5.3                         File: mychart/templates/ConfigMap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
name: mychart-ConfigMap
data:
myvalue: "Hello World"

Now install it:

$ helm install ./mychart
NAME:   eponymous-chipmunk
LAST DEPLOYED: Fri Sep 21 11:18:52 2018
NAMESPACE: tiller
STATUS: DEPLOYED

RESOURCES:
==> v1/ConfigMap
NAME               DATA  AGE
mychart-ConfigMap  1     0s

Helm generated a name, eponymous-chipmunk. Normally, you’ll want to pass the --name …​ arguments to specify a name.

Is it installed?

$ helm list
NAME              	REVISION	UPDATED                 	STATUS  	CHART        	APP VERSION	NAMESPACE
eponymous-chipmunk	1       	Fri Sep 21 11:18:52 2018	DEPLOYED	mychart-0.1.0	1.0        	tiller

4. Installation Instructions

Contact Us or your account manager for information about a subscription license.

This chapter tells you how to install the Lightbend Fast Data Platform. You must first set up your cluster nodes, either on-premise or in a cloud environment, then install RedHat OpenShift and related tools. See the Before You Install: Cluster Recommendations and Prerequisites for those details.

Once your OpenShift cluster is up and running, you will follow a set of steps described here to install Fast Data Platform and its components. You’ll be able to choose the components you wish to install.

The installation of all services and sample applications is done using helm. The source code for the sample applications can be found at the Lightbend GitHub site.

4.1. Quick Command Summary

Here we summarize the commands to install all the Lightbend Fast Data Platform supported services using helm install. The links take you to details about each component, including discussions about which components you may or may not want to install. We assume you’ve completed the prerequisite steps in Before You Install: Cluster Recommendations and Prerequisites.

Note

This list of steps is not exhaustive. Click on the links to the detailed sections for more details.

Set Up Lightbend Credentials

These instructions are adapted from the Lightbend Console documentation for setting up your credentials. If you need credentials (user ID and password), Contact Lightbend.

Store the user ID and password in a file $HOME/.lightbend/commercial.credentials:

realm = Bintray
host = dl.bintray.com
user = <userid>
password = <token>

You might need to update your existing credentials if you didn’t obtain them recently for Cinnamon, Lightbend’s telemetry library. If you get this error from OpenShift, it was unable to find the commercial repository after installation:

$ oc describe pod -n lightbend es-console
 Repo 'commercial-registry' was not found

Reset your credentials to update them and gain access to the Lightbend Console:

  • In a browser, open this URL.

  • To the right of your password, click Show.

  • Click Reset to get new credentials.

  • Copy the new values to update the user and password entries in $HOME/.lightbend/commercial.credentials. After this, you should be able to install Lightbend Console.

Run the following install script to install Lightbend Console:

curl -O https://raw.githubusercontent.com/lightbend/helm-charts/master/enterprise-suite/scripts/install-es.sh
chmod u+x install-es.sh
./install-es.sh --version=1.0.1

If you don’t install Console, at least run this command to set up the Lightbend Helm Chart repo:

helm repo add lightbend https://repo.lightbend.com/helm-charts
helm install lightbend/strimzi-kafka-operator \
  --name my-strimzi-operator \
  --namespace lightbend \
  --version 0.11.3

Recall from Test Your Helm Installation that if we don’t specify the --name argument, helm will generate one for us. Normally, you’ll want to choose a suitable name.

More information on the helm install command is available here.

helm install lightbend/fdp-sparkoperator \
  --name my-spark-operator \
  --namespace lightbend \
  --version 0.3.0

Here, we’ll assume a PVC with NFS is used for storage. (This is suitable for development and test scenarios. Production deployments require GlusterFS.) We’ll install the history server chart with the default settings that would automatically provision a PVC backed by NFS.

oc adm policy add-scc-to-user privileged -z spark-sa --namespace lightbend
oc adm policy add-scc-to-group anyuid system:authenticated

helm install lightbend/spark-history-server \
  --name my-spark-history-server \
  --namespace lightbend \
  --version 0.4.0

4.1.1. Uninstalling Components

To uninstall any of these components, use the helm delete command:

helm delete [options] NAME

Two useful options are worth mentioning here:

  • --dry-run can be used to see what will happen without actually doing it

  • --purge will remove the release from the store and make its name available for later use

When policies were changed before installing a component, consider undoing the changes after it is uninstalled, e.g.,

oc adm policy remove-scc-to-user ...
oc adm policy remove-scc-to-group ...

That’s the synopsis of the installation commands for the supported components. Now we’ll discuss the details, and also mention options for installing certified components.

4.2. Install Lightbend Reactive Platform Components

Lightbend Reactive Platform is our suite of open-source tools for writing reactive applications, along with commercial components for production management and monitoring of reactive applications, described at a high level here. Developer and administration documentation can be found at the Lightbend Tech Hub.

4.3. Install Lightbend Console

We recommend installing Lightbend Console for the integrated dashboards with in-depth monitoring of Fast Data Platform components, from reactive applications written in Akka, Lagom, and Play, to Spark applications and Kafka brokers. It is the most effective way to monitor these applications and services, because of its deep awareness of the internals of these systems and the common challenges of running them in production at scale.

See the Lightbend Console documentation for details on installing and using Lightbend Console.

When building reactive applications, you will want to leverage the build tools described in the Reactive Platform docs and leverage the Lightbend Cinnamon telemetry tools, in order to fully exploit Lightbend Console.

Lightbend Fast Data Platform adds several dashboards for Spark and Kafka to Lightbend Console.

4.4. Install Fast Data Platform Components

The following components in Fast Data Platform are installed using Helm. The links take you directly to to the installation instructions in the Management and Monitoring Guide that are part of larger discussions about how to use these components. Hence, this section functions as a quick navigation guide to the installation steps described elsewhere.

Table 9. Installable Supported Components in Fast Data Platform 2.1.1 for OpenShift
Name Discussion Instructions

Strimzi, the Kubernetes Operator for Apache Kafka

Strimzi: The Kubernetes Operator for Apache Kafka

Installing Strimzi

Kubernetes Operator for Apache Spark

The Kubernetes Operator for Apache Spark

Installing the Spark Operator

Lightbend Reactive Platform with Enterprise Suite

Lightbend Tech Hub

Install Lightbend Reactive Platform Components

Sample Applications

Sample Applications

same

Table 10. Installable Certified Components in Fast Data Platform 2.1.1 for OpenShift
Name Discussion Helm Chart

Flink

Apache Flink

Flink Helm Chart

Installing additional certified components used by the sample applications are discussed in Use of Grafana, InfluxDB, Zeppelin, and Cassandra in the Sample Applications chapter.

5. Management and Monitoring Guide

The primary management and monitoring capabilities in Fast Data Platform for general applications are provided by the RedHat OpenShift web console and CLI tools, along with Lightbend Enterprise Suite, mentioned previously.

However, we recommend using Lightbend Pipelines, which provides in-depth monitoring and visibility automatically, as well as a rich development experience. See here for more information.

This chapter focuses on the Fast Data Platform administration using those tools, along with specific management details for services like Spark and Kafka, such as the web consoles for Spark, Spark History Server, and Lightbend Enterprise Suite. We’ll assume you have some familiarity with Kubernetes concepts such as the operator pattern, which is used by the Spark Operator and the Kafka Operator, Strimzi. If not, please see Appendix: Kubernetes Concepts.

5.1. Managing Fast Data Platform Storage

Fast Data Platform components fully leverage the Kubernetes PersistentVolume subsystem and are typically defined in terms of PVCs for purposes beyond development and initial testing. Fast Data Platform does not dictate which StorageClass to be used, leaving this decision to administrators.

Fast Data Platform leverages two types of volumes:

  • Pod specific volumes, which are used only by a specific pod, for example storage for Kafka used by the Strimzi operator.

  • Shared volumes, which are used when multiple pods need to access the same PVC and the underlying PV at the same time. They can use the same volume even, if they are running on different hosts. Spark checkpointing and Spark History server both use this approach; the history server, Spark driver, and Spark executor pods need to access the same PVC in order to read and write events.

This differentiation is important when deciding on a specific StorageClass that needs to be used for dynamic provisioning. In order to support shared volumes, the underlying file system needs to support multi consumer access. Examples of such file systems are NFS and GlusterFS.

Lightbend Fast Data Platform requires GlusterFS for production deployments, because typical production deployments require the scalability of a distributed file system, not just cluster-wide access to a local file system. This Red Hat post discusses the capabilities of GlusterFS and it provides Ansible scripts for installing and using GlusterFS in OpenShift on several environments. A set of reference architectures are provided here, which you should consult when planning production deployments. More details are discussed in Dynamic Provisioning Case Study - GlusterFS.

For pod specific storage any StorageClass can be used. The additional important consideration here is node affinity. If a StorageClass used has a node affinity, for example AWS EBS, a pod must remain on a specific node in order to preserve its data. Although this type of persistence typically provides best performance, it requires the deployment to implement node affinity. In contrast, file systems like NFS and GlusterFS allow access from any node in the cluster, but generally incur minor performance overhead as well as increased resource utilization due to writes to multiple destinations for data consistency and failover.

Another consideration is the access mode of the underlying storage. Here are the different AccessModes allowed by Kubernetes:

  • ReadWriteOnce – the volume can be mounted as read-write by a single node

  • ReadOnlyMany – the volume can be mounted read-only by many nodes

  • ReadWriteMany – the volume can be mounted as read-write by many nodes

We’ll need ReadWriteMany access for our purposes here.

The last consideration is PV/PVC sharing. A PV is a cluster-wide resource but a PVC is scoped to a particular namespace. At any moment, a PV can only be bound to a particular PVC. But a PVC can be mounted in multiple volumes, provided the underlying PV supports sharing (e.g. NFS). If you have multiple projects and namespaces, but you would like to connect to the shared NFS volume, you would need to have a separate PV and PVC for each project. However, they can call point to the same underlying NFS disk.

Note

The default local storage won’t work, because it doesn’t support ReadWriteMany.

5.1.1. Dynamic Provisioning Case Study - NFS

Let’s look at a case study on how to set up dynamic provisioning of NFS PVs, which is a good choice for development and testing work on Spark History Server and Spark checkpointing, should you use either of them. We require GlusterFS for production scenarios, however, which we discuss below.

As explained earlier, a StorageClass needs to be configured to allow auto-provisioning of PVs with desired characteristics, such as ReadWriteMany. When running a OpenShift environment, no NFS PV provisioner is built in. Shown below is a sample OpenShift cluster deployed on AWS, for which the only available StorageClass is called gp2 that uses the AWS EBS provisioner.

$ oc get sc
NAME            PROVISIONER                                            AGE
gp2 (default)   kubernetes.io/aws-ebs                                  17d

To install an NFS provisioner, three prerequisites need to be met.

The first prerequisite is a running NFS server. For an easy-to-use NFS server setup, install the Fast Data Platform NFS chart. Now suppose your NFS server service is up and running in a namespace called nfs, then Kubernetes is going to generate a DNS name: nfs-server.nfs.cluster for your service.

The second prerequisite is setting up security context constraints (SCCs) to allow provisioner pod to run as root. Root privilege is needed because the provisioner pod needs to give /persistentvolumes directory in the pod 777 permissions in order to write information about provisioned PVs.

oc adm policy add-scc-to-user privileged -n -z default

Lastly, NFS is not among the list of supported volumes by the SCC restricted. We need to add it to the list. Run oc edit scc restricted and then add nfs to the list of `volume`s. Save the changes.

Now you are ready to install the NFS client provisioner Helm chart:

helm install \
  --name my-nfs-client-provisioner \
  --set nfs.server=nfs-server.hs.svc \
  --set nfs.path=/ stable/nfs-client-provisioner

Note that you need to specify the DNS name of the NFS service as well as the basepath of the mount point, which is the root directory by default. If you omit the --name argument, helm will create a name like pedantic-panda-nfs-client-provisioner.

See Uninstalling Components for a discussion of using helm delete to remove components.

After the chart is installed, a new StorageClass called nfs-client is now available as shown below.

$ oc get sc
NAME            PROVISIONER                               AGE
gp2 (default)   kubernetes.io/aws-ebs                     16d
nfs-client      cluster.local/my-nfs-client-provisioner   20h

Note that the provisioner is per NFS server, which means that if you have multiple NFS servers you can have multiple provisioners (one for each NFS server) and consequently one StorageClass for each provisioner.

Now the user can create PVCs that reference nfs-client. For example:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: claim1
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: nfs-client
  resources:
    requests:
      storage: 1Mi
Note

The StorageClass in earlier versions of Kubernetes was referenced as an annotation to a PVC, but now it’s part of the spec.storageClassName.

Once the claim1 PVC shown above is created, a PV would be automatically provisioned by the NFS provisioner. You can check that the provisioning is successful by running oc get pv, which would show the list of available PVs and their statuses. You should see one that is bound to claim1:

In case you run into issues during PV provisioning or if you would like to see more detailed information, you can check the logs from the NFS provisioner pod. For example, the following log line indicates a PV has been successfully created:

I1012 14:26:40.115552       1 event.go:221] Event(v1.ObjectReference{Kind:"PersistentVolumeClaim", Namespace:"default", Name:"claim1", UID:"d5835003-ce2a-11e8-8439-0238590a06be", APIVersion:"v1", ResourceVersion:"4342527", FieldPath:""}): type: 'Normal' reason: 'Provisioning' External provisioner is provisioning volume for claim "default/claim1"

If a PV is created through dynamic provisioning, as opposed to manual provisioning, then it would be deleted once the PVC that is bound to the PV is deleted.

5.1.2. Dynamic Provisioning Case Study - GlusterFS

GlusterFS can be installed while installing OpenShift, which we recommend especially for production deployments.

The OpenShift documentation provides complete examples of GlusterFS usage in static and dynamic provisioning scenarios.

You can also follow a similar procedure to the NFS case study just discussed. First make sure that the GlusterFS provisioner is installed by viewing the available StorageClasses:

$ oc get sc
NAME                PROVISIONER               AGE
glusterfs-storage   kubernetes.io/glusterfs   4d

Then follow the same, basic setup steps that were used for NFS; define a PVC pointing to the GlusterFS StorageClass and then a GlusterFS PV will be dynamically provisioned when the PVC is created.

5.1.3. Conclusion: Storage Requirements for Fast Data Platform

OpenShift supports both GlusterFS and NFS. When building a small cluster for development, testing, demos, etc. NFS is a viable option.

For production deployments, GlusterFS provides a more robust, full-featured option, which is why Lightbend Fast Data Platform requires GlusterFS for production deployments.

5.2. Strimzi: The Kubernetes Operator for Apache Kafka

Strimzi 0.11.3 is an open-source Kubernetes Operator for running Apache Kafka on Kubernetes and OpenShift. It helps facilitate many common operational tasks for Kafka such as initial deployment, rolling configuration updates, and managing a dedicated ZooKeeper cluster (also known as a ZooKeeper ensemble). Strimzi applies The Operator Pattern, which allows it to automatically react to user or environment changes. Strimzi is the supported mechanism for deploying and managing Kafka on Lightbend Fast Data Platform clusters.

Strimzi has high quality documentation and blog posts that describe its feature well. We recommend reading the Strimzi documentation for a thorough understanding of the project before running it in production. The purpose of the following sections is to provide an overview, a quick start for developers, and an easy reference to using Strimzi with Fast Data Platform.

The rest of this section focuses on Strimzi operations. For a discussion of application concerns, like client connections and topic management, see Kafka Applications in Running Custom Services and Applications.

5.2.1. Components of Strimzi

A Strimzi Kafka cluster consists of many Kubernetes resources, but the most important user-facing components are the following.

  • Cluster operator and Kafka CustomResourceDefinition

  • Topic operator and KafkaTopic CustomResourceDefinition

  • User operator and KafkaUser CustomResourceDefinition

  • Kafka StatefulSet of n brokers

  • ZooKeeper StatefulSet of n nodes

  • Service for client connections to Kafka

The operators are JVM applications that run in pods managed by Deployments adjacent to the Kafka cluster. One Cluster operator can manage as many Kafka clusters as your infrastructure can support. Topic and User operators are deployed on a per-cluster basis.

The Topic operator allows the user to provision and view Kafka Topics as a Kubernetes resource called KafkaTopic. This functionality is bidirectional, so topics created by the Topic operator are created in Kafka and topics created by other means are represented as KafkaTopic resources in Kubernetes. This is a convenient representation of Kafka topics. See the Kafka Topics section for more details on how to manage KafkaTopic resources as well as the Strimzi documentation, Using the Topic operator.

The User operator is similar to the Topic operator. It allows you to provision a user in Kafka using the Kubernetes resource KafkaUser. Depending on the authentication type, a secret will be automatically generated which can be retrieved and provided to the user. The KafkaUser is also used to define authorization rules to topics within Kafka. Unlike the Topic operator, users not provisioned by the operator are not bidirectionally synchronized as KafkaUser resources. For more details on using the User operator consult the Strimzi documentation, Using the User operator.

Strimzi documentation references

5.2.2. Installing Strimzi

Installation and running a Strimzi Kafka cluster is a two step process.

  1. Install the Strimzi Cluster operator with Helm.

  2. Create a Kafka Kubernetes resource which the Cluster operator will use to create a Kafka and ZooKeeper cluster.

Installation of the Strimzi Cluster Operator with Helm
Note

This release of Fast Data Platform supports Strimzi version 0.11.3. See What’s Included in this Release for version details.

You can install the Strimzi Helm Chart by referencing the artifact directly from the GitHub Releases page, or by using the Lightbend chart repository. To use the chart repository you must first add the repository to your local helm configuration. Supported Strimzi releases are published directly to the Lightbend Helm Chart Repository every release cycle.

To begin the installation,you must add the Lightbend chart repository (if you haven’t done this already) and install a supported version of the Strimzi Cluster operator.

helm repo add lightbend https://repo.lightbend.com/helm-charts

helm install lightbend/strimzi-kafka-operator \
  --name my-strimzi \
  --namespace lightbend \
  --version 0.11.3 \
  --debug

Use different --name and --namespace arguments, if you prefer. The --debug flag is useful, but optional.

See Uninstalling Components for a discussion of using helm delete to remove components.

Helm Chart Configuration

For a full list of configuration settings that can be passed to this chart, see the values.yaml and chart README.md in the Strimzi project repository. In most cases the defaults are good enough to run the chart. However, the following configuration settings are the ones you are most likely to be interested in changing:

  • watchNamespaces - By default the Cluster operator will only watch Kafka CRD’s created or updated within the namespace the Cluster operator is running in. However, if you wish to create Kafka resources in a different namespace from your Cluster operator, or if you wish to watch multiple namespaces, then you can add them as a Go map for this configuration (i.e. when using the Helm CLI --set watchNamespaces="{lightbend,foo,bar}").

  • imageRepositoryOverride, imageTagOverride - Override the default repository and tag of all Docker Images used by Strimzi. This is useful when you want to host the Docker Image in a private registry or use customized images.

  • logLevel - When troubleshooting issues with the Cluster operator it can be useful to use a lower log level here (i.e. --set logLevel=DEBUG).

  • resources - Like any other pod, you may define resource requests and limits for the Cluster operator. The default should be satisfactory, but if you observe that the pod is running out memory or is slow to respond then you may want to increase the resource limits for the pod (i.e. --set resources.limits.memory=1Gi).

To learn more about other common use cases you can do with the Strimzi Helm Chart see the Strimzi blog post, Using Helm to deploy the Strimzi Kafka Operator. It discusses how to override Docker Image coordinates, install without Tiller, and how to configure and update watched namespaces of the Cluster Operator.

Consult the documentation for helm install to learn all the different ways that you can provide configuration overrides when installing a Helm Chart.

Additional Strimzi documentation references:

5.2.3. Creating a Kafka Cluster

Once the Cluster operator has been installed and is in a healthy state you may create a Kafka cluster. Every Kafka cluster created will also create a ZooKeeper cluster that Kafka brokers are automatically configured to use. Communication with the ZooKeeper cluster is secured using TLS tunnels. A Kafka and ZooKeeper cluster is defined with a single instance of the Kafka CRD. Below you will find the Kafka definition for a simple cluster with one Kafka Broker and one ZooKeeper node.

Important
The Kafka CRD YAML shown next is to be used as a reference to describe the structure of this resource, but it lacks critical configuration items. For example, it doesn’t include the monitoring configuration required for Strimzi Kafka clusters to be detected and monitored by Lightbend Console. For complete YAML examples, refer to the Kafka Resource Templates table below.
apiVersion: kafka.strimzi.io/v1alpha1
kind: Kafka                                        # The type of resource.  Can also be referenced in lowercase as "kafka" for oc commands
metadata:
  name: my-cluster                                 # The name of this cluster.
spec:
  kafka:
    replicas: 1                                    # Number of brokers
    listeners:                                     # Broker listeners configuration used to expose Kafka brokers to clients
      plain: {}
      tls: {}
    readinessProbe:                                # Readiness probe config for pods in Kafka StatefulSet
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:                                 # Readiness probe config for pods in Kafka StatefulSet
      initialDelaySeconds: 15
      timeoutSeconds: 5
    config:                                        # Kafka broker configuration
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
    storage:                                       # Broker disk storage type to use
      type: ephemeral
    metrics: {}                                    # Prometheus JMX Exporter rules used to expose JVM and Kafka metrics to Prometheus and Lightbend Console
  zookeeper:
    replicas: 1                                    # Number of ZooKeeper nodes
    readinessProbe:                                # Readiness probe config for pods in ZooKeeper StatefulSet
      initialDelaySeconds: 15
      timeoutSeconds: 5
    livenessProbe:                                 # Liveness probe config for pods in ZooKeeper StatefulSet
      initialDelaySeconds: 15
      timeoutSeconds: 5
    storage:
      type: ephemeral                              # Node disk storage type to use
    metrics: {}
  entityOperator:
    topicOperator: {}                              # Optional Topic operator configuration. Omit this property to not deploy this operator.
    userOperator: {}                               # Optional User operator configuration. Omit this property to not deploy this operator.

For a full explanation of all the configuration settings that can be made in a Kafka resource, see the Kafka cluster configuration section of the Strimzi documentation.

Kafka Resource Templates

The following templates can be used to create kafka clusters for various scenarios. They provide a set of Kafka CRD’s you can use as a starting point for what might best suit your needs.

The Persistence Type column states whether this template is configured to create ad-hoc volumes (Ephemeral) or whether it provisions a PersistentVolumeClaim to be matched up with an actual PersistentVolume, depending on the configuration of your default Kubernetes StorageClass. Refer to the Strimzi Persistence section for more details.

Table 11. Kafka Resource Templates
Kafka Resource YAML Description Metrics Broker ZooKeeper Persistence Type

developer.yaml

Provisions a cluster with one Kafka broker and one ZooKeeper node using limited resources. The User Operator configuration is not defined to reduce the amount of resources required. This is ideal for testing Kafka applications on a single node developer configuration with minishift.

Yes

1

1

Ephemeral

testing.yaml

Provisions a cluster ideal for testing Kafka applications in an actual Kubernetes cluster for integration testing. It provisions a three Broker Kafka cluster and one ZooKeeper node. It deploys the Topic and User operators. This may be useful to test on a single dev machine, but keep in mind that its resource requirements will be more than that of the Developer template. This configuration is useful when you want to test things like consumer group balancing or broker failure events with your application. It’s configured to use PersistentVolumeClaims, but you can change this if appropriate. All the necessary Prometheus metrics are exported so they may be consumed in Lightbend’s Enterprise Suite and its bundled Grafana dashboards (see the Monitoring section for more details).

Yes

3

1

Persistent Volumes

production.yaml

Provisions a small cluster ideal for running in production. It provisions a three Broker Kafka cluster and a three ZooKeeper node ensemble by default, but you should scale this out to your needs. Resource limits are in line with recommendations made in the Kafka Broker Recommendations section. It deploys the Topic and User operators. No pod or node affinity configuration is provided, but it’s strongly recommended that Kafka Brokers run on standalone nodes to avoid the noisy neighbor problem. Read this Strimzi blog post for additional information about Strimzi pod affinity and anti-affinity.

Yes

3

3

Persistent Volumes

Once you have downloaded one of these YAML files and edited it as required, you can apply it with the oc command into a namespace that the Cluster operator is configured to watch (the operator’s own namespace, by default). For example:

oc apply -f testing.yaml -n lightbend

After creating the resource you should see resources being created by the Cluster operator within Kubernetes. The creation is complete once you see.

  • A healthy ZooKeeper StatefulSet

  • A healthy Kafka Broker StatefulSet

  • A healthy Entity Operator Deployment. This Deployment manages the Topic and User operators. The Developer deployment will not deploy the User operator.

Watch the pods in your destination namespace.

Here is example output of a healthy Kafka cluster called my-cluster:

$ oc get pod -n myproject
NAME                                          READY     STATUS             RESTARTS   AGE
my-cluster-entity-operator-7cbdb56674-jfhdv   3/3       Running            0          10s
my-cluster-kafka-0                            2/2       Running            0          31s
my-cluster-kafka-1                            2/2       Running            0          31s
my-cluster-kafka-2                            2/2       Running            0          31s
my-cluster-zookeeper-0                        2/2       Running            0          58s
my-cluster-zookeeper-1                        2/2       Running            0          58s
my-cluster-zookeeper-2                        2/2       Running            0          58s

For more information, see these Strimzi documentation references:

These Strimzi blog posts are also informative:

5.2.4. Strimzi Persistence

Strimzi supports two modes of persistence for Kafka and ZooKeeper: ephemeral and persistent storage.

Ephemeral mode is suitable only for development and testing purposes and not for production. This deployment uses emptyDir volumes for storing Broker information in ZooKeeper and topic partitions in Kafka. Using an emptyDir volume means that its content is tied to the pod life cycle and is deleted when the pod is terminated.

Persistent mode uses a PersistentVolume (PV) to store data. This mode should be used in most situations. The PersistentVolumes is acquired using a PersistentVolumeClaim (PVC) to make it independent of the actual type of storage that may be used. For example, a PVC may use use HostPath volumes on minishift or Amazon EBS volumes in Amazon AWS deployments. You may also provide more criteria when defining persistence, for instance you can request a certain StorageClass. PV’s are sticky to a pod within a StatefulSet, which means once a StatefulSet pod is deployed to a Kubelet it will always start on that Kubelet and use the same PV, unless you explicitly delete the pod.

Depending on your Kubernetes configuration you may have Dynamic Volume Provisioning enabled. When enabled it will automatically provision PersistentVolume based on the criteria found in the PersistentVolumeClaims generated by the StatefulSets used for Broker and ZooKeeper clusters.

Please read the Managing Storage discussion earlier in this Management and Monitoring Guide for more details about using Kubernetes PersistentVolume Subsystem.

Storage is defined within the Kafka and ZooKeeper spec’s, spec.kafka.storage and spec.zookeeper.storage properties, respectively.

Relevant Strimzi documentation references:

Relevant Kubernetes documentation references:

5.2.5. Strimzi Secure Access

Broker to broker traffic is automatically configured to use TLS by Strimzi.

To use TLS certificates for client connections, you can use the Strimzi user operator to generate secrets to setup on the client for TLS. Authorization rules can also be setup using the user operator and KafkaUser Strimzi custom resources.

5.2.6. Update a Running Kafka Cluster

To update a running Kafka and ZooKeeper cluster you need to apply an update to the Kafka resource you’ve originally defined. As long as the Kafka resource shares the same name and is in the same namespace, then the Cluster operator will know to update an existing cluster instead of creating a new one. The Cluster operator will calculate the delta between the existing and new Kafka resource and generate a reconciliation plan to apply internally.

# Update `testing.yaml` and apply it to perform an update
oc apply -f testing.yaml -n lightbend

5.2.7. Scale Kafka Brokers

To change the number of Kafka brokers in your cluster you must update the spec.kafka.replicas field of the Kafka resource to a new integer. Once updated using the same oc apply …​ command just discussed, the Cluster operator will increase the Kafka StatefulSet to the new replicas count you provided.

5.2.8. Update the Broker Configuration

Updating the Kafka Broker information requires you to make the same update across all broker instances. As a user you simply update the spec.kafka.config field of the Kafka resource. Once applied, the Cluster operator will initiate a rolling update on the Kafka StatefulSet. You can watch this process by watching the status of your pods.

5.2.9. Delete Kafka Cluster

A Kafka and ZooKeeper cluster can be deleted easily by deleting the Kafka resource using its metadata.name. This will delete all the entity operators (User and Topic operators), Kafka and ZooKeeper StatefulSets and their underlying pods, as well as various other Kubernetes resources that are associated with that cluster.

oc delete kafka my-cluster -n lightbend

You can also delete all clusters associated with a Cluster operator by deleting the Cluster operator Deployment by deleting the Helm Chart Release.

helm delete strimzi-operator

5.2.10. Monitoring

Monitoring is essential for ensuring a healthy Kafka cluster.

Exposing Kafka and ZooKeeper Metrics

Strimzi uses the Prometheus JMX Exporter to export Kafka and ZooKeeper metrics for the Prometheus service used by Lightbend Console. The Prometheux JMX Exporter rules defined in the Testing and Production resource files from the Kafka Resource Templates section above are configured so that they are compatible with conventions defined in Lightbend Console and Grafana Strimzi dashboards that are bundled with Lightbend Console.

Relevant Strimzi documentation references:

Other relevant references:

Lightbend Console

Lightbend Console provides support for monitoring Strimzi Kafka and ZooKeeper clusters.

When the Cluster operator is installed, Console will automatically detect its Deployment. When a Kafka resource is applied, Console will automatically detect the Kafka and ZooKeeper StatefulSets and identify them as Kafka or ZooKeeper workloads by including the logo of each respective project in the Service Types column for the workloads.

Below is a screenshot of the homepage of Console that shows a Strimzi Cluster operator and a Kafka and ZooKeeper cluster with the name my-cluster. Notice the Apache Kafka and Apache ZooKeeper icons beside each workload (StatefulSet).

Lightbend Console Homepage
Figure 1. Lightbend Console - Kafka and ZooKeeper

When you click the Kafka workload you see a view with the default Monitors that are defined based on metrics returned from Kafka, Kubernetes, and other sources.

For more information about how to read these monitors consult the Lightbend Console documentation.

Lightbend Console Kafka View
Figure 2. Lightbend Console - Kafka Monitors

When you click the Grafana icon in the top left of the view you will see the automatically-generated Grafana dashboard with an assortment of metrics from Kafka, Kubernetes, and the JVM.

Lightbend Console Generated Grafana Dashboard for Kafka
Figure 3. Lightbend Console - Grafana Dashboard for Kafka
Bundled Dashboards with Grafana

Lightbend Console’s Grafana server also comes bundled with custom dashboards that provide a more service-centric view into the health of your Kafka and ZooKeeper clusters. These dashboards represent a good starting point for key metrics to monitor Kafka and ZooKeeper clusters, but depending on your infrastructure you may need to update them and make them your own. You can find both dashboards by navigating to the Grafana homepage and click the Prometheus Lightbend item under Installed Apps. The dashboard’s are named Strimzi Kafka and Strimzi ZooKeeper.

Kafka Grafana Dashboard

The Kafka dashboard has several variables defined. Each variable depends on the one preceding it and will further restrict the number of metrics returned. You must at least select a Namespace and Cluster Name.

  • Namespace - The Kubernetes namespace the Strimzi cluster resides in.

  • Cluster Name - The metadata.name of your Kafka resource (i.e. my-cluster)

  • Broker - All, or the individual Kafka Broker to view.

  • Topic - All, or the individual topic to view for metrics that are per topic.

  • Partition - All, or the individual partition to view for metrics that are per partition.

Normally you will want to only set the Namespace and Cluster Name variable and leave the other variables in the All setting.

Strimzi Kafka Dashboard
Figure 4. Kafka Grafana Dashboard
ZooKeeper Grafana Dashboard

The ZooKeeper dashboard has less variables than the Kafka dashboard. Each variable depends on the one preceding it and will further restrict the number of metrics returned. You must at least select a Namespace and Cluster Name.

  • Namespace - The Kubernetes namespace the Strimzi cluster resides in.

  • Cluster Name - The metadata.name of your Kafka resource (i.e. my-cluster)

  • Node - All, or the individual ZooKeeper node to view.

Normally you will want to only set the Namespace and Cluster Name variable and leave the other variables in the All setting.

Strimzi ZooKeeper Dashboard
Figure 5. ZooKeeper Grafana Dashboard

5.3. The Kubernetes Operator for Apache Spark

The Kubernetes Operator for Apache Spark is an open-source project whose main goal is to make specifying, running and managing Spark jobs as idiomatic as other native Kubernetes workloads. Consider it as an alternative to using plain spark-submit.

Note

The installation instructions install Lightbend’s build of the Spark operator, which has patches and other modifications. This version of the operator must be used when production support is required.

As its name suggests, it’s an implementation of the operator pattern, which we discussed in The Operator Pattern.

The major advantages of using Spark Operator compared to spark-submit are the following:

  1. A Custom Resource Definition (CRD) called SparkApplication: They represent Spark jobs and enable native Kubernetes workflow on management of Spark jobs. For example, just like the way all pods can be listed with oc get pods, all deployed Spark jobs can be listed with oc get sparkapplications.

  2. A CLI tool called sparkctl: It simplifies managing of Spark jobs, obviating the need to use the more generic oc for most such tasks. Tasks that sparkctl supports include creating, listing, and deleting SparkApplications, plus checking their status and getting their logs.

  3. Easily accessible Spark web UI: The sparkctl CLI allows the Spark web UI served in the driver pod to be accessed locally through port forwarding.

  4. Spark job specification using YAML: Spark jobs need to be defined in YAML files before being submitted using oc or sparkctl. This declarative approach of describing Spark jobs enables easy version control.

  5. Easy customization of driver and executor pods: This is done through the Mutating Admission Webhook capability. You can conveniently mount custom ConfigMaps, Volumes, as well as set pod affinity and anti-affinity constraints.

  6. Watching and restarting drivers: Similar to a deployment as described above.

Shown below is the architecture diagram of the Operator https://github.com/lightbend/spark-on-k8s-operator/blob/master/docs/design.md from its GitHub project [documentation, window="spark_op3"].

Spark Operator Architecture
Figure 6. Spark Operator Architecture

This section only covers the practical operational aspects of the Spark Operator. For information about running and managing Spark jobs with the Spark Operator, see Managing Spark Jobs Using the Spark Operator in Running Custom Services and Applications.

For more details about the Operator’s architecture design, available APIs, and how to use it in general, see the official documentation for Spark Operator.

Note

The Operator runs Spark jobs using the Kubernetes scheduler in Apache Spark. Jobs are launched on demand using CLI tools (i.e. kubectl or sparkctl). Unlike in the standalone mode, no cluster resources are pre-provisioned.

5.3.1. Enabling the Admission Webhooks

Note

Skip this section if you are installing the operator with webhook disabled.

The default installation setting for the operator enables the mutating admission webhook. In order for the webhook to work, the Kubernetes API server needs to enable this feature. This requires administrative access to the cluster master nodes. The Admission Webhooks are not enabled by default on some versions of some Kubernetes-based systems. However, these webhooks are required by an increasing number of CRDs and may be enabled by default in future versions of these systems. For clusters being provided as a service, contact the service provider about enabling these webhooks.

The following configuration should be added to the admissionConfig:pluginConfig of the master configuration:

    ValidatingAdmissionWebhook:
      configuration: {kind: DefaultAdmissionConfig, apiVersion: v1, disable: false}
    MutatingAdmissionWebhook:
      configuration: {kind: DefaultAdmissionConfig, apiVersion: v1, disable: false}

If you enable the admission webhooks on an existing cluster, enable on all master nodes by restarting the master api and controllers services using master-restart:

# master-restart api
# master-restart controllers
Using OC Cluster

If you are using oc cluster up to run OpenShift locally, use --write-config to enable the admission webhooks. From this example for installing Openshift:

oc cluster up --write-config

# Enable admission webhooks
sed -i -e 's/"admissionConfig":{"pluginConfig":null}/"admissionConfig": {\
    "pluginConfig": {\
        "ValidatingAdmissionWebhook": {\
            "configuration": {\
                "apiVersion": "v1",\
                "kind": "DefaultAdmissionConfig",\
                "disable": false\
            }\
        },\
        "MutatingAdmissionWebhook": {\
            "configuration": {\
                "apiVersion": "v1",\
                "kind": "DefaultAdmissionConfig",\
                "disable": false\
            }\
        }\
    }\
}/' openshift.local.clusterup/kube-apiserver/master-config.yaml

5.3.2. Installing the Spark Operator

Unlike using plain spark-submit, which does not require any upfront installation, the Spark Operator needs to be installed in the cluster before it can be used to run Spark jobs. Use the Lightbend helm chart (which is adapted from this Helm chart) to install the operator. First, make sure you added the Lightbend Helm chart repo, as discussed here, then run helm install. Here we’ll assume you used the repo name lightbend when the repo was added:

Note

Add a flag --set enableWebhook=false to the command below to disable webhook.

$ helm install lightbend/fdp-sparkoperator \
  --name spark-operator \
  --namespace lightbend \
  --version 0.3.0 \
  --debug

The Spark Operator chart installed above is built with Scala 2.11. We also provide an image lightbend/sparkoperator:2.1.1-OpenShift-v1beta1-0.8.2-2.4.3-rh-2.12 that is built with Scala 2.12.

Note

Scala 2.11 support is deprecated. A subsequent release of Fast Data Platform will drop support for Scala 2.11.

Use different --name and --namespace arguments, if you prefer. The --debug flag is useful, but optional.

See Uninstalling Components for a general discussion of using helm delete to remove components. But for the Spark Operator, one thing to note is that during helm install, CRD definitions are created by the operator’s Golang code. Because Helm is not directly responsible for creating and managing the CRDs, it won’t be able to delete them during helm delete. To perform a clean uninstall, the user needs to manually delete the existing definitions by running:

$ oc delete crd sparkapplications.sparkoperator.k8s.io
$ oc delete crd scheduledsparkapplications.sparkoperator.k8s.io

before running helm install again to install a new version of the operator chart. Since the Spark Operator is in beta status, backward compatibility is guaranteed. Therefore manually deleting the CRD definitions is only required if the user wants to take advantage of features introduced in the new CRD definition.

The chart supports exporting Prometheus metrics, documented here, and the Mutating Admission Webhook described above.

The following properties can be customized when installing (adapted from here):

Table 12. Spark Operator Configuration
Parameter Description Default

operatorImageName

The name of the operator image

lightbend/sparkoperator

operatorVersion

The version of the operator to install

2.1.1-OpenShift-v1beta1-0.8.2-2.4.3-rh

sparkJobNamespace

Kubernetes namespace where Spark jobs are to be deployed.

default

enableWebhook

Whether to enable mutating admission webhook

true

enableMetrics

Whether to expose metrics to be scraped by Prometheus

true

controllerThreads

Number of worker threads used by the SparkApplication controller

10

installCrds

Whether to install CRD definitions

true

metricsPort

Port for the metrics endpoint

10254

metricsEndpoint

Metrics endpoint

"/metrics"

metricsPrefix

Prefix for the metrics

""

resyncInterval

Informer resync interval in seconds

30

webhookPort

Service port of the webhook server

8080

5.3.3. Upgrading the Spark Operator

Assuming the Spark Operator is already installed, you can upgrade it with the following steps.

First, find out the Helm release name of the Spark Operator using helm list. We’ll refer to it as $RELEASE_NAME below.

Next, store the updated parameters for the new release in a YAML file such as the one below:

operatorVersion: v1beta1-0.8.2-2.4.3
enableWebhook: false

Then, upgrade the Operator by running this command:

$ helm upgrade --values values.yaml $RELEASE_NAME lightbend/fdp-sparkoperator --recreate-pods

Alternatively, you can directly set the updated parameters with --set operatorVersion=2.4.3 --set enableWebhook=false. See the helm upgrade documentation for more options.

Once the upgrade is finished, you can now submit and manage Spark jobs using the upgraded Operator. Note that upgrading the Operator does not affect pre-existing Spark jobs including those that are running, have finished or failed. Those jobs can still be managed using sparkctl. For example, you can still get logs, check their status or delete them.

5.3.4. Multitenancy Support

The Spark Operator supports running multiple instances in a Kubernetes cluster. Each team can have an instance of the Operator monitor only the namespace that they deploy jobs in and not any other namespace. Different instances of the Operator will not cause conflict with each other as long as the following two conditions are met:

  1. Each namespace can have at most one Operator deployment.

  2. No two Operator deployments can monitor the same namespace.

5.3.5. Example: Run a Spark Job Using the Operator

Let’s go through an example to demonstrate how you can run a Spark job and customize it with the operator.

First, create a namespace called spark-ns where the Spark pods are going to run:

$ oc create ns spark-ns

Then install the Operator:

$ helm install lightbend/fdp-sparkoperator \
  --name spark-operator-ns \
  --namespace lightbend \
  --version 0.3.0 \
  --set sparkJobNamespace=spark-ns \
  --debug

Running the command above would generate some output like the following:

NAME:   spark-operator-ns
LAST DEPLOYED: Mon Dec 17 14:29:12 2018
NAMESPACE: lightbend
STATUS: DEPLOYED

RESOURCES:
==> v1/RoleBinding
NAME                AGE
spark-role-binding  39s

==> v1/Service
NAME                                         TYPE       CLUSTER-IP      EXTERNAL-IP  PORT(S)  AGE
spark-operator-ns-fdp-sparkoperator-webhook  ClusterIP  172.30.250.153         443/TCP  39s

==> v1/Deployment
NAME                                 DESIRED  CURRENT  UP-TO-DATE  AVAILABLE  AGE
spark-operator-ns-fdp-sparkoperator  1        1        1           0          39s

==> v1/Pod(related)
NAME                                                  READY  STATUS             RESTARTS  AGE
spark-operator-ns-fdp-sparkoperator-7d5dd95698-96hhn  0/1    ContainerCreating  0         39s

==> v1/ServiceAccount
NAME                                 SECRETS  AGE
spark-operator-ns-fdp-sparkoperator  2        39s
spark-operator-ns-spark              2        39s

==> v1/ClusterRole
NAME                                    AGE
spark-operator-ns-fdp-sparkoperator-cr  39s

==> v1/ClusterRoleBinding
NAME                                     AGE
spark-operator-ns-fdp-sparkoperator-crb  39s

==> v1/Role
NAME        AGE
spark-role  39s

You can see that two service accounts got created as part of the Helm deployment. The one that ends with -spark (i.e. spark-operator-ns-spark in the example above) is what we are going to use to submit our Spark job.

To demonstrate a use case of the mutating admission webhook, let’s create a ConfigMap and mount it in the Spark pods to customize the configuration of the Spark job. To keep the example simple, assume you have a log4j.properties file with only one line in it:

log4j.rootCategory=DEBUG, console

The goal is to override the default logging level of INFO with DEBUG without modifying your job image. You need to create a ConfigMap using that file in the Spark job namespace:

$ oc -n spark-ns create configmap conf-cm --from-file=log4j.properties

This gives you a ConfigMap conf-cm in the namespace spark-ns. Now to prepare your Spark job for submission, get the job spec YAML ready:

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark-ns
spec:
  type: Scala
  mode: cluster
  image: "lightbend/spark:2.1.1-OpenShift-2.4.3-rh"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
  restartPolicy:
    type: Never
  sparkConfigMap: conf-cm
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.3
    serviceAccount: spark-operator-ns-spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.3

Of specific importance in the YAML are the following settings:

  1. metadata.namespace. It tells the namespace where the job is going to be deployed.

  2. spec.sparkConfigMap. The operator mounts the ConfigMap conf-cm onto path /etc/spark/conf in both the driver and executor pods. Additionally, it also sets the environment variable SPARK_CONF_DIR to point to /etc/spark/conf in the driver and executors, achieving the goal of overriding the default logging configuration.

  3. spec.driver.serviceaccount. Make sure the service account used here is the one created when installing the chart. Other service accounts may not have sufficient permissions to create executor pods and the headless service used by the executors to connect to the driver pod.

With all the above steps done, save the YAML in a file called test-job.yaml and run the Spark job by submitting it:

$ oc apply -f test-job.yaml

In general, you are not restricted to mounting ConfigMaps or volumes in any specific path in the pods. You are free to mount them anywhere. For example, the following YAML segment mounts a ConfigMap my-cm on the paths /etc/mydata in driver and executor pods:

  volumes:
    - name: config-vol
      configMap:
        name: my-cm
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.3
    serviceAccount: original-markhor-spark
    volumeMounts:
      - name: config-vol
        mountPath: /etc/mydata
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.3
    volumeMounts:
      - name: config-vol
        mountPath: /etc/mydata

Note that the ConfigMap my-cm is first defined as a volume called config-vol, which is then mounted in the pods. You can also directly mount the ConfigMap:

  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.3
    serviceAccount: original-markhor-spark
    configMaps:
      - name: my-cm
        path: /etc/mydata
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.3
    configMaps:
      - name: my-cm
        path: /etc/mydata
Note

The mount path will shadow whatever that path contains. So it’s advisable to create a new sub-directory for your mounted data. For example, when mounting a volume config-vol in /etc, mount it at /etc/configvol instead of /etc to avoid shadowing other files and directories under /etc.

5.3.6. Logging with the Operator

To obtain logs of a Spark job deployed with the Operator, there are again two ways, using oc or sparkctl. To use oc, you must first determine the pod name. Note that driver pods remain available to be inspected even after the job finishes, but executor pods are gone once the job is finished. The pod name can be found either in the Kubernetes dashboard UI or via oc describe sparkapplication <your-job-name>. Once the pod name is known, logs can be retrieved by running

$ oc logs 

sparkctl offers a more convenient way to get logs, simply run the following command:

$ sparkctl log 

For more details including additional flags to get executor logs, refer to the sparkctl documentation.

See also Log Aggregation with Humio for an alternative, using a log-aggregation system like Humio to ingest and analyze logs.

5.3.7. Monitoring with Metrics

The Spark Operator also supports exposing Prometheus metrics that indicate the current status of the jobs launched with it and the status of the Operator itself. The Helm chart automatically enables exporting metrics when installing the Operator.

At this time, the following metrics are exported (adapted from this documentation):

Table 13. Spark Operator Metrics
Metric Description

spark_app_submit_count

Total number of SparkApplications submitted by the Operator.

spark_app_success_count

Total number of SparkApplications which completed successfully.

spark_app_failure_count

Total number of SparkApplications which failed to complete.

spark_app_running_count

Total number of SparkApplications which are currently running.

spark_app_success_execution_time_microseconds

Execution time for applications which succeeded.

spark_app_failure_execution_time_microseconds

Execution time for applications which failed.

spark_app_executor_success_count

Total number of Spark Executors which completed successfully.

spark_app_executor_failure_count

Total number of Spark Executors which failed.

spark_app_executor_running_count

Total number of Spark Executors which are currently running.

Because the Operator uses the Prometheus JMX exporter to serve metrics and the Prometheus JMX exporter runs as an in-pod local Java agent that exposes an HTTP server, the list of exported metrics can be viewed by querying the /metrics endpoint of its HTTP server.

For example, to view the exported application-level metrics, run the following command to forward the port for local access:

$ oc --namespace lightbend port-forward  10254:9999

Port 10254 is the operator’s default metrics endpoint and we used 9999 as the local port, but you can use whatever you want. Now either use curl or in a browser go to http://localhost:9999/metrics to see all exported metrics.

Similarly, the same procedure can be followed to inspect metrics being collected in a driver or executor pod, but note that driver and executor pods only export metrics when configured to do so. Below is an example YAML that runs spark-pi and exposes metrics for both driver and executor pods. You may need to change the name of the Spark job service account to yours and change the namespace to where you want the job to run.

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: lightbend
spec:
  type: Scala
  mode: cluster
  image: "lightbend/spark:2.1.1-OpenShift-2.4.3-rh"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
  arguments:
    - "100000"
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.3
    serviceAccount: spark
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.3
  monitoring:
    exposeDriverMetrics: true
    exposeExecutorMetrics: true
    prometheus:
      jmxExporterJar: "/prometheus/jmx_prometheus_javaagent-0.3.1.jar"
      port: 8090
  restartPolicy:
    type: Never

The example Spark job runs spark-pi but with an argument of 100000. This large number gives the Spark job more work to do, making the job run longer. The purpose is to allow the user to see Prometheus metrics exported by the driver and executor pods, which can take some time to generate. Spark-pi with the default argument finishes too soon to allow the metrics to be observed.

The Operator also supports overriding the Prometheus configuration through the setting spec.monitoring.prometheus.configuration in the job YAML. Put the contents of your configuration file there as a string to override the default configuration.

Note

spec.monitoring.prometheus.configuration expects the CONTENTS of your custom Prometheus configuration, NOT the path to your configuration file.

If you want to point to your own configuration file in the image then use the property spec.monitoring.prometheus.configFile as shown next.

...
prometheus:
  configFile: "/etc/metrics/conf/prometheus.yaml"
...

Note that the Lightbend Spark Operator image has slightly different Prometheus configurations than the Google Cloud Platform (GCP) image. Several important differences are the following: 1. The GCP image transforms all Spark metrics by prepending the metric names with the app ID but the Lightbend one doesn’t. 2. The GCP image transforms all Spark metrics into lower case but the Lightbend one preserves the camel case spellings.

By default, the operator assumes the use of the prometheus.io domain for the metrics. If you have used a different domain when you installed your Prometheus server, either manually or via the Lightbend Console, you need to pass the extra annotations shown next to the driver and executor specs.

my.domain.io/port: "9100"
my.domain.io/scrape: "true"
Note

At this time, we discourage using a unique domain, as many services assume the default value of prometheus.io.

5.4. Spark History Server

Spark History Server is a valuable service that makes Spark job consoles available after the jobs have finished running. By default, The Spark driver hosts a small website with this console, but after the job exits, the website exits with it. Spark History Server is useful because in practice you want this information when the job crashed or otherwise misbehaved!

When Spark History Server is running and the Spark job is submitted with the appropriate flags to use it, then important information is captured and Spark History Server’s own web console will allow you to view this information after a job has finished, as well as while it’s still running.

For details on using Spark History Server with Spark jobs, either when invoked using spark-submit or using The Kubernetes Operator for Apache Spark, see Using Spark History Server in Running Custom Services and Applications.

5.4.1. Installing Spark History Server

Use the Lightbend helm chart for Spark Operator (which is adapted from this helm chart). First, make sure you added the Lightbend Helm chart repo, as discussed here, then use helm install, as we’ll show below. We’ll assume you used the repo name lightbend when the repo was added.

But first, we have to discuss prerequisites, namely that file-based storage is required for Spark History Server. The Helm chart supports using one of two options as the backing storage for the history server: HDFS or a Kubernetes Persistent Volume Claim (PVC). One or the other must be configured before Spark jobs can use Spark History Server.

Using HDFS with Spark History Server

You’ll need the NameNode address for your HDFS cluster, e.g., hdfs://myhdfs/ and the directory under HDFS where you want the log files to be written, such as /history. We’ll use these examples below.

Spark History Server requires the same hdfs-site.xml and core-site.xml used by your HDFS cluster. They must be stored inside the container at runtime to be able to communicate with HDFS. This is accomplished via ConfigMaps as follows.

Assuming you have copied hdfs-site.xml and core-site.xml to the current directory, run the following command to create ConfigMaps`:

$ oc --namespace lightbend create configmap hdfs-site --from-file=hdfs-site.xml
$ oc --namespace lightbend create configmap core-site --from-file=core-site.xml
Note

Because ConfigMaps are specific to a namespace, you need to create these ConfigMaps in the namespace where you plan to install the chart.

Now we’re ready to install the history server chart:

$ helm install lightbend/spark-history-server \
  --name my-spark-history-server \
  --namespace lightbend \
  --version 0.4.0 \
  --set hdfs.logDirectory=hdfs://myhdfs/history/ \
  --set pvc.enablePVC=false \
  --debug
Note

The Spark history server chart installed above is built with Scala 2.11. We also provide an image lightbend/spark-history-server:2.1.1-OpenShift-2.4.3-rh-2.12 that is built with Scala 2.12.

The --namespace argument must match what was used for the ConfigMaps. The --debug flag is useful, but optional. Pick a suitable value for the --name argument.

Note that we used myhdfs as the server name for the HDFS URI and passed the full URL for the history directory with0 --set hdfs.logDirectory. Give this URL to your application developers; they’ll need it when submitting Spark jobs.

Using a Persistent Volume Claim with Spark History Server

You will need to set up a Persistent Volume Claim (PVC) as backing storage for Spark History Server. Managing Storage provides a brief introduction to the concept of PVCs and Managing Fast Data Platform Storage provides specific instructions for using PVCs with GlusterFS and NFS. Note that ReadWriteMany access is required to share the storage with the history server pod and the Spark job pods, which will mount the same PVC.

We’ll show an example using NFS, which is suitable for development and test purposes, but we require the use of GlusterFS for production, as discussed in Managing Fast Data Platform Storage. Either option is referenced with the setting pvc.existingPVC.

By default, an NFS server and PVC are set up by the Spark History Server Helm chart. It uses the static PVC approach where we are using a predefined Persistent Volume, as opposed to dynamic provisioning.

Here are the commands to install Spark History Server with the default settings, using helm, starting with policy settings.

helm install lightbend/spark-history-server \
  --name my-spark-history-server \
  --namespace lightbend \
  --version 0.4.0

oc adm policy add-scc-to-user nfs-provisioner -z my-spark-history-server-fdp-nfs --namespace lightbend

The following output of the first command will be similar to the following (if you add the --debug flag):

NAME:   my-spark-history-server
LAST DEPLOYED: Fri Dec 14 22:54:07 2018
NAMESPACE: lightbend
STATUS: DEPLOYED

RESOURCES:
==> v1/Pod(related)
NAME                                                               READY  STATUS             RESTARTS  AGE
my-spark-history-server-fdp-spark-history-server-7dd6795787-5hw62  0/1    ContainerCreating  0         2s

==> v1/PersistentVolume
NAME           CAPACITY  ACCESS MODES  RECLAIM POLICY  STATUS  CLAIM                  STORAGECLASS  REASON  AGE
nfs-pv-lightbend  5Gi       RWX           Retain          Bound   lightbend/nfs-pvc-lightbend  2s

==> v1/ClusterRole
NAME                                                 AGE
my-spark-history-server-fdp-spark-history-server-cr  2s

==> v1/ClusterRoleBinding
NAME                                                  AGE
my-spark-history-server-fdp-spark-history-server-crb  2s

==> v1/Deployment
NAME                                              DESIRED  CURRENT  UP-TO-DATE  AVAILABLE  AGE
my-spark-history-server-fdp-nfs                   1        0        0           0          2s
my-spark-history-server-fdp-spark-history-server  1        1        1           0          2s

==> v1/ConfigMap
NAME                                              DATA  AGE
my-spark-history-server-fdp-spark-history-server  6     2s

==> v1/PersistentVolumeClaim
NAME                   STATUS  VOLUME                                    CAPACITY  ACCESS MODES  STORAGECLASS  AGE
nfs-pvc-lightbend         Bound   nfs-pv-lightbend                             5Gi       RWX           2s
my-spark-history-server-fdp-nfs  Bound   pvc-1b77a6c5-ffb0-11e8-852b-02c5d6573608  5Gi       RWO           gp2  2s

==> v1/ServiceAccount
NAME                                              SECRETS  AGE
my-spark-history-server-fdp-nfs                   2        2s
my-spark-history-server-fdp-spark-history-server  2        2s

==> v1/Service
NAME                                              TYPE          CLUSTER-IP     EXTERNAL-IP  PORT(S)                     AGE
my-spark-history-server-fdp-nfs                   ClusterIP     172.30.230.74         2049/TCP,20048/TCP,111/TCP  2s
my-spark-history-server-fdp-spark-history-server  LoadBalancer  172.30.67.127      18080:30503/TCP             2s

See Uninstalling Components for a discussion of using helm delete to remove components.

You can get the application URL by running the following commands. Note that the UI will take a minute or two to show up after the pods and services are ready and for the LoadBalancer IP to be available.

First, you can watch the status by running this command:

oc -n lightbend get svc -w my-spark-history-server-fdp-spark-history-server

You can get the application URL by running these commands:

$ service_ip=$(oc get svc --namespace lightbend my-spark-history-server-fdp-spark-history-server -o jsonpath='{.status.loadBalancer.ingress[0].hostname}')
$ echo http://$service_ip:18080

The echo command will print something like http://a12f5f067cd2e11e8b6b7061800f6795-854361343.us-west-2.elb.amazonaws.com:18080 (for an AWS installation).

Note the service account generated for the NFS server. We run the following command to enable NFS to run privileged containers, substituting the correct value for my_nfs_service_account, consistent with the previous adm policy command:

oc adm policy add-scc-to-user scc-name -z my_nfs_service_account --namespace lightbend

The default scc-name is nfs-provisioner.

Installing the History server using an existing GlusterFS PVC, e.g., gluster-pvc, is also straightforward.

$ helm install lightbend/spark-history-server \
  --name my-spark-history-server \
  --namespace lightbend \
  --version 0.4.0 \
  --set fdp-nfs.enabled=false \
  --set pvc.existingPVC=gluster-pvc \
  --debug

Note that we disabled the default NFS server creation. You could easily create your PVC using the following yaml file:

gluster-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: gluster-pvc
  namespace: lightbend
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: glusterfs-storage
  resources:
    requests:
      storage: 10Gi
Using the Spark Operator with the History Server

The Spark Operator Helm chart needs to enable the mutating admission webhook (which is the default setting) in order for it to work with the history server.

The history server is using a PVC for event storage; that same PVC also needs to be mounted in the Spark driver and executor pods. As discussed in Managing Fast Data Platform Storage, this means the underlying storage has to support the access mode ReadWriteMany, such as GlusterFS and NFS.

Now, to use the Spark History Server with the Spark operator, we need to update the YAML configuration file of any Spark jobs we create with the appropriate settings.

Suppose you have a PVC called gluster-pvc for GlusterFS, then your Spark job YAML should look as follows:

apiVersion: "sparkoperator.k8s.io/v1beta1"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: my-ns
spec:
  type: Scala
  mode: cluster
  image: "lightbend/spark:2.1.1-OpenShift-2.4.3-rh"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
  sparkConf:
    "spark.eventLog.enabled": "true"
    "spark.eventLog.dir": "file:/mnt"
  volumes:
    - name: spark-data
      persistentVolumeClaim:
        claimName: gluster-pvc
  driver:
    cores: 0.1
    coreLimit: "200m"
    memory: "512m"
    labels:
      version: 2.4.3
    serviceAccount: 
    volumeMounts:
      - name: spark-data
        mountPath: /mnt
  executor:
    cores: 1
    instances: 3
    memory: "512m"
    labels:
      version: 2.4.3
    volumeMounts:
      - name: spark-data
        mountPath: /mnt
  restartPolicy:
    type: Never

The relevant sections in this YAML are the following:

  1. spec.sparkConf. Here spark.eventLog.enabled needs to be true and spark.eventLog.dir needs to be the path to the event log directory. In this case, it is a path local to the driver and executor pods.

  2. spec.volumes. Specify the name and type (persistentVolumeClaim in this case) of the volume(s) to mount.

  3. driver.volumeMounts and executor.volumeMounts. They specify the named volume in spec.volumes be mounted in the driver and executor pods, respectively.

We used the Scala 2.11 build of Spark. Change to 2.12 for the more recent version of Scala.

Note

Scala 2.11 support is deprecated. A subsequent release of Fast Data Platform will drop support for Scala 2.11.

Then you can submit the above YAML using the Spark Operator. The driver and executor pods would write events in the PVC, which can then be read by the history server and displayed in its UI.

If your Spark history server deployment is configured to use HDFS instead of a PVC, then the steps are similar, except that in the YAML you need to replace driver.volumeMounts and executor.volumeMounts with driver.configMaps and executor.configMaps, which should point to pre-created ConfigMaps using hdfs-site.xml and core-site.xml. Those XML files are needed for the driver and executor pods to communicate with HDFS.

Apache Flink is a distributed processing framework for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

Note

Flink is a certified component of Fast Data Platform, as defined in Overview of Fast Data Platform.

All of the Lightbend Fast Data Platform Flink Docker images are based on the standard Flink images. Unfortunately we can’t use these images directly, because they do not include Prometheus support, that we need for integration with Lightbend Console for monitoring. As a result, the Lightbend versions of these images add the flink-metrics-prometheus jar.

Additionally, Flink does not provide a single image that can support both Session and Job clusters (discussed below), due to different entry points that they use.

We provide six images for Flink 1.8.0, two for Debian Linux (Scala 2.11 and 2.12), two for Alpine Linux (Scala 2.11 and 2.12), and two for Ubuntu (Scala 2.11 and 2.12):

  • lightbend/flink:1.8.0-scala_2.11 (Debian)

  • lightbend/flink:1.8.0-scala_2.12 (Debian)

  • lightbend/flink:1.8.0-scala_2.11-alpine

  • lightbend/flink:1.8.0-scala_2.12-alpine

  • lightbend/flink:1.8.0-scala_2.11-ubuntu

  • lightbend/flink:1.8.0-scala_2.12-ubuntu

Note

Scala 2.11 support is deprecated. A subsequent release of Fast Data Platform will drop support for Scala 2.11.

All of these images contain the Prometheus pod scraper, which allow Prometheus annotations in Helm chart for metric ingestion into Prometheus from both Job and Task managers.

Following the Apache Flink documentation, there are two ways of installing Flink on OpenShift:

  1. Session cluster - a shareable cluster that can run multiple jobs

  2. Job cluster - a dedicated cluster for a single job

Let’s discuss each of these options.

Session Cluster

This installation should be used if you want to create a shared Flink cluster, to which you can submit number Flink applications (as jar files) and then run them.

The Session cluster installation uses the Lightbend Apache Flink Helm chart.

First, make sure you have added the Lightbend Helm chart repo, as discussed in Add the Lightbend Helm Chart Repo. Here we’ll assume you used the repo name lightbend when the repo was added.

The chart is also available as a zip file in our documentation here: resources/flink/flink-session-cluster-helm-chart.zip. This chart is adapted from this experimental Apache Flink chart, described in the Flink deployment on Kubernetes documentation.

The Helm charts discussed in this section install the Scala 2.12 build for Flink. To use Scala 2.11, download the chart, edit the values.yaml file, and change the imageTag from 1.8.0-scala_2.12 to 1.8.0-scala_2.11. Then pass the argument -f /path/to/your/values.yaml in the helm install command shown next, which will override the default value.

Install Flink chart using the following command:

helm install lightbend/flinkchart \
  --name my-flink \
  --namespace lightbend \
  --version 1.8.0 \
  --debug

Use different --name and --namespace arguments, if you prefer. The --debug flag is useful, but optional. Of course, don’t forget the -f …​/values.yaml, if necessary.

See Uninstalling Components for a discussion of using helm delete to remove components.

The following properties can be customized when installing Flink:

This chart will create the Job Manager, the Job Manager service, and the requested number of Task Managers.

It is also possible to create a route exposing port 8081 of the service. Once the Route is created, it exposes the Apache Flink dashboard that can be used to run and view Flink jobs. See the OpenShift documentation for details on defining routes.

To run your own jobs, click on the Submit new jobs button, that will open the submit job pane. Upload your assembly jar and start it.

Job Cluster

This installation should be used if you want to create a separate Flink cluster for a specific kind of Flink application.

In order to deploy a job cluster using Docker images, create an image which contains the Flink binaries as well as the user code jars for the job to execute. The user application jar has to be copied into the /opt/flink/log directory, so it’s available to Flink’s classloader. For an example, see the build.sbt file in the Fast Data Platform sample application for Flink. This Flink documentation describes how to build such a cluster.

Once the image is created, it can be installed using the Helm Chart included in this documentation: resources/flink/flink-job-cluster-helm-chart.zip, which is adapted from the chart for the Flink sample application. You’ll want to download this chart, edit the values to match your application, then run it using the helm install command.

Apache Flink logs can be ingested into your preferred log management solution. Humio is a log management solution that Lightbend recommends.

Note

Humio is not included in platform and has to be purchased separately.

If Humio is installed, then the Flink logs, like all pod logs, are already accessible to Humio. Using Humio’s instructions for dashboard creation, a Flink dashboard can be built easily, such as this one:

Humio Flink Dashboard

By clicking one of the server links, you can see messages for this server:

Humio Flink Messages

Notice the query shown. You can tweak this query to drill down to the information you want.

6. User Guide

The User Guide explains how to write, deploy, and manage Fast Data Platform applications, starting with a discussion of the sample applications provided with Fast Data Platform.

For guidance on cluster administration, see the Management and Monitoring Guide.

This chapter is composed of several major sections:

  • Sample Applications - A survey of the sample applications, which provide examples of how to solve particular problems with Fast Data Platform.

  • Running Applications - Guidance for running various kinds of applications with different criteria, such as Spark jobs.

  • Design Patterns - A catalog of architectural design patterns for fast data applications.

6.1. Sample Applications

Fast Data Platform provides several sample applications that illustrate how to use the components included in Fast Data Platform to solve particular application goals. Hence, a quick way to learn how to use Fast Data Platform for your own applications is to start with one of the sample applications.

The sample applications are available as open source in a GitHub repository, https://github.com/lightbend/fdp-sample-applications/, where you will find scripts for building and launching the applications in Fast Data Platform.

Pre-built Docker images for the applications are hosted on the Lightbend DockerHub repository, https://hub.docker.com/u/lightbend/. Helm charts are also provided in the Lightbend Helm repository for installing in OpenShift.

Warning

The sample applications are provided as-is, without warranty. They are intended to illustrate techniques for implementing various scenarios using Fast Data Platform, but they have not gone through a robust validation process, nor do they use all the techniques commonly employed for highly-resilient, production applications. Please use them with appropriate caution.

The design choices in the applications were driven by three considerations:

  • What tools to pick for a particular problem scenario.

  • How to write applications using those tools.

  • How to deploy and run those applications in Fast Data Platform.

The following sample applications are provided at the time of this writing, but more applications are added on a regular basis, outside the regular Fast Data Platform release process. The links go to the corresponding section in the fdp-sample-applications GitHub repository.

Table 15. Fast Data Platform Sample Applications
Name Description

Anomaly detection using BigDL and speculative model serving

Machine learning using Intel BigDL, exporting model as a Tensorflow graph and model serving using Akka Streams

Taxi Times with Flink

Predicting Taxi Travel Times with Flink

Model Serving in Streams

One approach to serving data science models in production streaming jobs using Kafka, Akka Streams, and Kafka Streams

KillrWeather

A port to Fast Data Platform of the well-known demonstration app, KillrWeather. It demonstrates a stream pipeline including Kafka, Spark Streaming, and Spark Structured Streaming (the newer replacement for Spark Streaming), Cassandra, Akka middleware, and other tools. This is the most complete sample application

In the following sections, we provide an overview of each sample application. Each of them has a README file in the GitHub repo with detailed instructions on how to build and run the application locally, how to build one or more Docker images for the application, and how to deploy either your built images or Lightbend’s prebuilt images in Fast Data Platform.

6.1.1. Preliminaries

Each sample application is provided in two forms:

There are several ways documented in the READMEs for deploying and running the sample applications locally on a development machine: using SBT, using an IDE, and using Docker. The READMEs also document how to deploy to Fast Data Platform clusters on OpenShift. When using Docker images, you can either use Lightbend’s prebuilt images in Docker Hub or your own, custom-built images. This is the best way to start exploring the applications, then use the source code to study the details and create modifications.

Helm charts are also provided in the Lightbend Helm repository and the source code GitHub repository for running in OpenShift.

6.1.2. Prerequisites

Before you use the sample applications in your cluster, the following prerequisites must be met:

  1. Fast Data Platform is up and running

  2. Operators for Kafka (Strimzi) and Spark are installed

  3. Optional, certified services are installed for some applications, as discussed below

  4. Helm is installed on your workstation

  5. The oc CLI is useful to have installed on your workstation

Use of Grafana, InfluxDB, Zeppelin, and Cassandra

Several certified tools are used by the sample applications: Cassandra, InfluxDB, Grafana, and Zeppelin. Recall that certified tools are not supported components in Fast Data Platform, discussed in Overview of Fast Data Platform.

Note

Cassandra and Zeppelin are used in "basic" ways, which means it’s feasible to replace them with your preferred alternatives. For example, the KillrWeather README has notes on replacing Cassandra with alternative persistence options. Replacing InfluxDB and Grafana would be more involved.

The following table discusses which tools are used by which sample applications.

Table 16. Certified Tools for the Sample Applications
Tool Description

Grafana

Used for real time visualization application’s data

InfluxDB

Used to store time-series data in several applications, which is then used for additional processing and visualization with Grafana

Cassandra

Use in KillrWeather to store computed statistics, etc.

Zeppelin

Use in KillrWeather to query data in the Cassandra tables for KillrWeather. Also useful for general data science projects

Helm charts for installing these components are provided for your convenience in the supportingcharts directory of the Sample Applications GitHub repo.

For example, to install InfluxDB from the supportingcharts directory, run a Helm command like this:

helm install influxdbchart --name influx --namespace bar

See the README for the sample application of interest and the READMEs in each chart directory for more details.

See Uninstalling Components for a discussion of using helm delete to remove "components", including the sample applications.

Some further setup is required for most of these services, such as creation of dashboards in Grafana and tables in InfluxDB and Cassandra. However, in most cases, a sample application will do these steps automatically. When manual steps are required, they will be discussed when needed.

6.1.3. Overview of the Sample Applications

Now we’ll briefly discuss each sample application found in the GitHub repository.

Anomaly detection using BigDL and speculative model serving

This application demonstrates a full lifecycle of anomaly detection using supervised learning and model serving. It is developed using deep learning framework Intel’s BigDL and speculative model serving implementation.

The Intel BigDL is a library for deep learning model training and serving on Spark.

Note

BigDL is a third-party, certified library for machine learning. Lightbend does not provide support for BigDL.

Detailed instructions of how to build, deploy, and run this application can be found in its README.

This application is adapted to Fast Data Platform from the publicly-available Ververica Flink training. It uses a public dataset of taxi rides in New York City. The details of the dataset can be found here. The application trains a regression model on the taxi data to create a classifier that predicts how long a ride will take.

Detailed instructions of how to build, deploy, and run this application can be found in its README.

Akka Streams and Kafka Streams Model Server

This application illustrates one solution to a common design problem, how to serve Data Science models in a streaming production system.

Detailed instructions of how to build, deploy, and run this application can be found in its README.

The application has the following main components, each of which has its own Docker image:

  1. An Akka Streams implementation of the model serving

  2. A Kafka Streams implementation of the model serving

  3. A data loader for running either one of model serving components

In addition to images, data used for testing is packaged in a Zip file and made available via HTTP, http://s3-eu-west-1.amazonaws.com/fdp-killrweather-data/data/data.zip.

Tip

More complete and more recent implementations of these techniques can be found in the Lightbend open source tutorials, model-serving-tutorial and kafka-with-akka-streams-kafka-streams-tutorial.

Fast Data Platform KillrWeather

This is a port to Fast Data Platform of the well-known demonstration app, KillrWeather. It is the largest of the sample applications. It combines Kafka, Spark Streaming, Akka-based middleware for stream processing, Cassandra, and Zeppelin.

Detailed instructions of how to build, deploy, and run this application can be found in its README.

This application also requires the following certified services: Cassandra, InfluxDB, and Grafana.

Note

Deploy all the required services before deploying KillrWeather, as discussed above. At start up, KillrWeather will detect their presence and automatically define tables in Cassandra and InfluxDB, and define a dashboard in Grafana. Zeppelin does not need to be installed in advance.

The architecture of KillrWeather is shown in the following diagram:

KillrWeather
Figure 7. KillrWeather Architecture

Historical weather data is fed to a Kafka topic using one of three clients, one that uses the new GRPC protocol, one that uses the Kafka native API, and one that uses REST. From that topic, data is read by Spark Streaming application (both ordinary and structured streaming implementations are available), where statistics over the data are computed. The raw data and the results are written to two places. Output to Cassandra is an example of a longer-term storage option suitable for downstream use. To see the data in Cassandra, instructions are provided for using a Zeppelin notebook configured with Cassandra Query Language support. Writing the output to InfluxDB, a time-series database, supports graphing the raw and processed data in real time using Grafana.

6.2. Running Custom Services and Applications

This section dives into what you need to know to write, deploy, manage, and debug applications in OpenShift.

OpenShift leverages Kubernetes, which is the leading container orchestrator technology, supporting both stateless and stateful services and applications. If you are new to Kubernetes concepts, see Appendix: Kubernetes Concepts for a quick introduction.

You should also be familiar with the CLI tools for OpenShift, which you will need. See OpenShift CLI Tools for Working with Clusters for more information, including installation instructions.

6.2.1. Kafka Applications

Kafka clusters in Lightbend Fast Data Platform are managed using the Strimzi: The Kubernetes Operator for Apache Kafka. This section discusses developer concerns, like client connections and topic management.

Client Connectivity

Creating a Strimzi Kafka cluster generates several Kubernetes services using the convention of {metadata.name}-{service-name}. The metadata.name is the name assigned to your Kafka resource (a CRD), and the service-name is the Strimzi service name. The reason for this convention is to allow for distinct Service names is to support multiple instances of Strimzi Kafka clusters in the same Kubernetes cluster.

Not all the services are meant for users, some exist to facilitate internal operations required by StatefulSets. Only services meant for users are documented.

For example, if you gave your Kafka cluster the name my-cluster then the following services are created for users.

  • my-cluster-kafka-bootstrap

A load balanced ClusterIP service used for initial client connections to the Kafka cluster. It should be used by all internal user client connections. For example, any Kafka client application can use this Service DNS name in its Kafka client consumer properties bootstrap.servers. The service exposes ports for client, tls-client, inter-broker connections, and Prometheus endpoints. (Prometheus is used to aggregate monitoring telemetry.)

  • my-cluster-zookeeper-client

A load balanced ClusterIP service used for initial client connections to the Zookeeper cluster. It should be used by all internal end user client connections. The service exposes ports for client and Prometheus endpoints.

Managing Kafka Topics

Strimzi allows you to manage topics using the Kubernetes resources. This is a convenient way to keep track of your topic settings. It also lets you store Kafka topic information in a way that’s more compatible with the GitOps way of performing operational tasks.

The KafkaTopic resource exposes all the standard Kafka topic information such as name, partition count, and replication factor. It also allows you to configure Topic Configuration.

The following example shows the KafkaTopic YAML resource definition for a topic called replicated-topic to be applied to the Strimzi Cluster with the name my-cluster. It has 10 partitions and a replication factor of 3.

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaTopic
metadata:
  name: replicated-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10
  replicas: 3

To apply this resource use the kubectl command.

kubectl apply -f replicated-topic.yaml -n lightbend

For more information, see these Strimzi documentation references:

Performance Tuning: Kafka Message Sizes

Kafka places an upper limit on the size of messages you can send. The default 1 MB limit strikes a balance between the convenience of being able to send large messages and the known reduction in throughput as message sizes increase, described in this blog post.

For general guidance on performance tuning Kafka: The Definitive Guide provides essential advice. Here is what it says about message sizes:

There are noticeable performance impacts from increasing the allowable message size. Larger messages will mean that the broker threads that deal with processing network connections and requests will be working longer on each request. Larger messages also increase the size of disk writes, which will impact I/O throughput.

— Kafka: The Definitive Guide
page 28

In addition, larger messages can sometimes result in longer garbage collection (GC) pauses.

Tip

Long GC pauses may cause a Kafka broker to drop its ZooKeeper session. If you detect these occurrences in the logs, try configuring a longer timeout value for zookeeper.session.timeout.ms.

However, that specific benchmark test is five years old and it was performed on hardware that may not match the optimal configuration for your environment. Therefore, if you need to balance your performance vs. convenience for message sizes, you can adjust several configuration parameters:

  • message.max.bytes - the system-wide value. It applies to each batch of messages. Compression can be used to squeeze more messages under this limit. In fact, try compression first before adjusting the configuration parameters.

  • max.partition.fetch.bytes and fetch.max.bytes (see fetch.message.max.bytes for the older consumer API) - determines the maximum message size that a consumer will grab. Set it to be greater than or equal to message.max.bytes.

  • max.request.size - the corresponding setting for producers. set it to be less than or equal to message.max.bytes.

  • replica.fetch.max.bytes - when partition replication is enabled, this value must be greater than or equal to message.max.bytes or messages won’t be replicated.

  • max.message.bytes - can be set per topic, but defaults to message.max.bytes (the names are confusing).

If your messages are too big for the configured maximums, then two options exist:

  1. Split the message into pieces that fall under the limit. Reconstruct the original message on the consumer side.

  2. Write large data "blobs" to a separate data store, e.g., a file system, and send the reference to it as a message.

Both options are described in more detail in this Workday blog post by Adam Kotwasinski.

6.2.2. Spark Applications

Now we’ll discuss running Spark applications in depth, starting with a discussion of the state of Spark support on Kubernetes, and by extension, on OpenShift.

Running Spark Applications in OpenShift Clusters

The are three methods to run Spark on OpenShift:

  • Use the new Lightbend Pipelines, which makes it easier to define reusable Spark Structured Streaming "streamlets" and compose them into applications with simplified deployment and management options. You can even mix and match with Akka Streams-based streamlets.

  • Use the Spark Operator.

  • Run spark applications natively on OpenShift, i.e., with spark-submit.

Note that Pipelines uses the Spark Operator.

About Spark on OpenShift

Support for Spark on Kubernetes, and by extension OpenShift, was officially introduced with the Spark release 2.3.

The current release, Spark 2.4, has significantly changed and improved the Kubernetes support, compared to Spark 2.3, which is why Fast Data Platform on OpenShift only supports Spark 2.4.

We’ll review the current state of Spark on Kubernetes and then describe how to deploy Spark 2.4 applications. The most up-to-date documentation is in the Spark on Kubernetes GitHub project.

In this release, not all of the Spark features and services are supported on Kubernetes:

Table 17. Spark Features Currently Supported on Kubernetes
Feature Supported?

Scala 2.12

incubating 1

Cluster Mode

Client Mode

Python Support

R Support

Security: Kubernetes RBAC support

Security: Kerberos and HDFS Support

Security: Encryption at rest and on-the-wire

Dynamic Allocation and Shuffle Service

Spark Shell Support

Notebook Support

Supervision and Driver HA

  1. While Scala 2.12 support for Spark 2.4 is considered experimental by the Spark project, Lightbend supports Scala 2.12 for Spark 2.4. Scala 2.11 support is considered deprecated. A subsequent release of Fast Data Platform will drop support for Scala 2.11.

Table 18. External Spark Services Currently Supported on Kubernetes
Service Supported?

Spark History Server

Spark Operator

Spark Shuffle Service

Spark Thrift Server

Subsequent releases will remove these limitations.

Prerequisites for Spark on OpenShift

There are some basic prerequisites for running a Spark application on OpenShift:

  • A version of OpenShift officially supported for Fast Data Platform (see Platform Requirements for details) or a local Minishift environment. Make sure you have access to the cluster using the oc CLI tool.

  • We recommend a minimum of 3 CPUs and 4GB of memory to be able to start a simple Spark application with a single executor.

  • You must have appropriate permissions to list, create, edit and delete pods in your cluster. You can verify that you can list these resources by running oc auth can-i <list|create|edit|delete|watch> pods.

For example:

$ oc auth can-i delete pods
yes
  • The service account credentials used by the driver pods must be allowed to create pods, services and ConfigMaps.

  • You must have Kubernetes DNS configured in your cluster.

Spark on OpenShift Artifacts

In order to use Spark on Kubernetes you need the Lightbend distribution of the Spark Operator. Installing the Spark Operator describes how to install it.

The Lightbend operator distribution uses a custom build of Spark, which can be found at the following S3 locations:

Table 19. Lightbend-built Spark 2.4.3 Distributions on S3
URL Description

2.1.1-OpenShift-spark-2.4.3

Scala 2.11

2.1.1-OpenShift-spark-2.4.3-2.12

Scala 2.12

Lightbend also provides Docker images for Spark with the Scala and Java APIs, Spark History Server, and the Kubernetes Spark Operator:

Table 20. Kubernetes Docker Images for Spark 2.4.3 with Scala and Java APIs
Image Scala Version Operating System Supported?

lightbend/spark:2.1.1-OpenShift-2.4.3-rh

2.11

RHEL

yes

lightbend/spark:2.1.1-OpenShift-2.4.3-rh-2.12

2.12

RHEL

incubating 1

lightbend/spark:2.1.1-OpenShift-2.4.3

2.11

Alpine

certified

lightbend/spark:2.1.1-OpenShift-2.4.3-2.12

2.12

Alpine

certified

lightbend/spark:2.1.1-OpenShift-2.4.3-ubuntu

2.11

Ubuntu

certified

lightbend/spark:2.1.1-OpenShift-2.4.3-ubuntu-2.12

2.12

Ubuntu

certified

lightbend/sparkoperator:2.1.1-OpenShift-v1beta1-0.8.2-2.4.3-rh

2.11

RHEL

yes

lightbend/sparkoperator:2.1.1-OpenShift-v1beta1-0.8.2-2.4.3-rh-2.12

2.12

RHEL

incubating 1

lightbend/spark-history-server:2.1.1-OpenShift-2.4.3-rh

2.11

RHEL

yes

lightbend/spark-history-server:2.1.1-OpenShift-2.4.3-rh-2.12

2.12

RHEL

incubating 1

  1. Lightbend supports the RHEL image for Scala 2.11 and the Scala 2.12 build, even though the Spark project considers Scala 2.12 support experimental for Spark 2.4.X. Ubuntu and Alpine images are offered for those users who wish to use them, but they are certified, not supported. (The reason the Alpine images don’t have an OS suffix, like the Ubuntu and RedHat images, is because these are the conventional names for the images in the Spark on Kubernetes project.)

The RHEL image is considered the most secure, up-to-date image. It is derived from the latest java base image provided by Red Hat. It comes with the latest secured version of java which complies with these RedHat policies.

All images we have follow the best principles for making images on Openshift as defined by the OpenShift managing images guide.

In contrast, the Alpine images are the default configuration used by the Spark on Kubernetes community project. However, many people prefer working with Ubuntu for some use cases, e.g., machine learning.

Note

Lightbend currently offers certified support for PySpark (Python API) Also, while features like JMX metrics are enabled for the default images with Java and Scala support, they are not enabled in the Python images, nor have they been tested with Python.

Table 21. Kubernetes Docker Images for Spark 2.4.3 with PySpark, Scala 2.11, and Java APIs
Image Scala Version Operating System Supported?

lightbend/spark-py:2.1.1-OpenShift-2.4.3

2.11

Alpine

certified

lightbend/spark-py:2.1.1-OpenShift-2.4.3-ubuntu

2.11

Ubuntu

certified

Getting the API Server Host and Port

For the commands that follow, you’ll need the host and port for the API Server. We’ll use $API_SERVER and $API_SERVER_PORT as placeholders for the address and port of the API Server.

To get the API Server host and port on OpenShift, run the following commands:

$ oc login 

$ ./oc status
In project default on server https://xxxxxxxx.us-west-2.elb.amazonaws.com:443
...

The same command works on Minishift or oc cluster up.

In order to login in your real cluster follow these instructions.

Client Mode

In client mode, the Spark job’s driver is run in the same process that submits the job, including jobs submitted within a container. In client mode, the user has to specify the headless service for the Spark driver and other low level details. That includes specifying the appropriate selector for the service so that the driver is accessible from the executors, exposing the appropriate ports for the driver and the block manager and defining the correct configuration options to spark-submit. These options include setting the service account with which the driver will create the executor pods, the driver pod name and the driver’s host name.

We’ll see examples later in this document showing how to submit jobs using client mode. See also the Spark on Kubernetes documentation.

Cluster Mode

In cluster mode, when the user runs spark-submit, it communicates with the Kubernetes API Server to submit the driver pod. Then the driver pod will start the requested executors to run the user’s job.

Executors discover the driver via a headless service which is automatically setup for you. This is explained visually in the next diagram.

Spark on Kubernetes using cluster mode
Figure 8. The spark-submit steps in cluster mode when launching a Spark Job

After the Spark application is finished, the executors will be terminated and cleaned up (by default).

Hence, no remaining pods will be shown in the OpenShift UI. The driver pod on the other hand will remain in the "completed" state in the OpenShift API until it is garbage collected or manually removed. Moreover, the logs will be retained for debugging purposes.

Note

In the completed state, the driver pod does not use any computational or memory resources.

Executors that fail will be retained in case of an error, thus allowing debugging of the error.

We’ll discuss monitoring the driver’s status below.

Cluster Mode Example

Let’s see an example. From a remote machine (client on the diagram), you can run spark-submit in cluster mode as follows, where $SPARK_HOME is used for the directory where you installed your local copy of Spark 2.4.3:

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://$API_SERVER:$API_SERVER_PORT \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.executor.instances=1 \
  --conf spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3-rh \
  local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar

The k8s://https://$API_SERVER:$API_SERVER_PORT URL is the API Server URL, we used the RHEL-based Docker image discussed in the table, Kubernetes Docker Images for Spark 2.4.3 with Scala and Java APIs, and the local:// path is relative to the container’s root.

We launched the Spark Job in the default namespace with the default account. It is more common to run in a specific namespace and account. See Running Spark Jobs in an Arbitrary Namespace below for information about security configurations that might be required for your cluster:

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://$API_SERVER:$API_SERVER_PORT \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.executor.instances=1 \
  --conf spark.kubernetes.namespace=lightbend \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
  --conf spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3-rh \
  local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar

Let’s investigate what happens when we launch this example in a cluster.

When the job is launched the spark driver will appear as a pod in the OpenShift UI and after a while, the executors will start, assuming there are enough resources in your cluster.

Spark driver and executor pods
Figure 9. Spark driver and executor pods.

Back at the CLI, the watcher for spark-submit code will monitor the driver and will wait until it completes. The output at the end will be similar to the following:

...
2018-10-09 11:51:37 INFO  LoggingPodStatusWatcherImpl:54 - State changed, new state:
	 pod name: spark-pi-1539074975947-driver
	 namespace: lightbend
	 labels: spark-app-selector -> spark-2be44cfefb8c4c7e952a434da257cff4, spark-role -> driver
	 pod uid: 427e107c-cba0-11e8-9ed4-0a1e7d601ca0
	 creation time: 2018-10-09T08:49:40Z
	 service account name: spark-sa
	 volumes: spark-local-dir-1, spark-conf-volume, spark-sa-token-zvhvm
	 node name: ip-10-0-83-229.us-west-2.compute.internal
	 start time: 2018-10-09T08:49:40Z
	 phase: Succeeded
	 container status:
		 container name: spark-kubernetes-driver
		 container image: docker.io/lightbend/spark:<version>
		 container state: terminated
		 container started at: 2018-10-09T08:49:45Z
		 container finished at: 2018-10-09T08:51:36Z
		 exit code: 0
		 termination reason: Completed
2018-10-09 11:51:37 INFO  LoggingPodStatusWatcherImpl:54 - Container final statuses:


	 container name: spark-kubernetes-driver
	 container image: docker.io/lightbend/spark:<version>
	 container state: terminated
	 container started at: 2018-10-09T08:49:45Z
	 container finished at: 2018-10-09T08:51:36Z
	 exit code: 0
	 termination reason: Completed
2018-10-09 11:51:37 INFO  Client:54 - Application spark-pi finished.
2018-10-09 11:51:37 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-10-09 11:51:37 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-458e2a40-c1d4-49cb-8074-5c32a02c4b9c

Similar output will be written for both minishift and real clusters. In the UI, the driver will be shown with status completed.

Terminated driver
Figure 10. Terminated driver

You can examine the pod details and get terminal access in the pod by clicking the pod link:

Driver details
Figure 11. Driver details

The same concepts apply for the executor pods.

The Spark driver is exposed as a service in Kubernetes so that executors can access it via a specific Kubernetes DNS name, e.g., something like service-pi-<number>-driver-svc.

Service driver
Figure 12. Spark driver service created

Two ports are also exposed, the block manager’s port and the driver’s port for executors trying to connect, e.g., 7078/TCP and 7079/TCP, respectively. Their default values are shown in the service driver UI and also in the service details page.

Service details
Figure 13. Spark driver service details

Finally, spark-submit properties are passed via a ConfigMap that is loaded in the driver’s container.

Spark driver ConfigMap
Figure 14. Spark driver config map

We will describe the available properties in detail in All Configuration Options for Spark on Kubernetes.

Accessing the Driver’s UI

To see the Spark Job console served by the driver, first get the name with the following command:

$ oc get po -n spark
NAME                            READY     STATUS      RESTARTS   AGE
spark-pi-1539074861981-driver   0/1       Completed   0          1h
spark-pi-1539074975947-driver   0/1       Completed   0          1h
spark-pi-1539080412580-driver   1/1       Running     0          19s
spark-pi-1539080412580-exec-1   1/1       Running     0          7s

As long as the driver pod is running, the following command will port-forward the UI, assuming its default port, 4040:

$ oc port-forward spark-pi-1539080412580-driver 4040:4040 -n spark
Forwarding from 127.0.0.1:4040 -> 4040
Forwarding from [::1]:4040 -> 4040

The second 4040 is the port on your machine, which you can change to any value.

Now access the driver’s UI at http://localhost:4040, which should look like the following:

Spark driver UI
Figure 15. Spark driver UI
Configuring Spark Jobs

Configuration options for Spark on Kubernetes can be found in the Spark on Kubernetes documentation. Many of these options are unique to Kubernetes. See All Configuration Options for Spark on Kubernetes for details.

Not all spark-submit arguments are supported by Spark on Kubernetes. Here we discuss some of the important arguments, such as those involving extra packages, jar archives, and files. Other basic arguments are shown in the spark-submit examples:

Note

To avoid any issues with dependencies or file resolution, it is recommended to add your artifacts in your image. Keep in mind though that local dependencies (files coming from client’s file system) are not supported by Spark on Kubernetes. Also, depending the security rules in your organization, this may cause issues as it is also common for Docker images to not be allowed to contain user code.

  • --packages

This is not yet supported in cluster mode. Packages will be resolved locally at the spark-submit side and the driver pod will fail. In client mode, it should work as expected for both the driver and the executors.

The way it works is as follows; the driver resolves the dependency locally and when an executor needs it, it will fetch the dependency. The following log messages show this in action:

2018-09-06 14:02:17 INFO  Utils:54 - Copying /checkpointing-test-assembly-1.0.jar to /var/data/spark-cf13ca19-a768-4dcb-95bf-1f7936d7223a/spark-0a7e4fab-4518-4aa1-8568-5a238b44c64b/17813922191536242530632_cache
2018-09-06 14:02:17 INFO  Utils:54 - Copying /var/data/spark-cf13ca19-a768-4dcb-95bf-1f7936d7223a/spark-0a7e4fab-4518-4aa1-8568-5a238b44c64b/17813922191536242530632_cache to /opt/spark/work-dir/./checkpointing-test-assembly-1.0.jar
2018-09-06 14:02:17 INFO  Executor:54 - Adding file:/opt/spark/work-dir/./checkpointing-test-assembly-1.0.jar to class loader
2018-09-06 14:02:17 INFO  Executor:54 - Fetching spark://spark-test-app-1-svc.default.svc:7077/jars/com.github.scopt_scopt_2.11-3.5.0.jar with timestamp 1536242530630
2018-09-06 14:02:17 INFO  TransportClientFactory:267 - Successfully created connection to spark-test-app-1-svc.default.svc/10.28.2.109:7077 after 4 ms (0 ms spent in bootstraps)

This output was generated for a previous example, where we passed the arguments --packages com.github.scopt:scopt_2.11:3.5.0 in client mode.

  • --files

This is used to specify a comma-separated list of files to be placed in the working directory of the driver and each executor. File globs are allowed.

Executors will fetch the files from the driver. You can also use remote URLs.

Here is an example:

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://$API_SERVER:$API_SERVER_PORT \
  --deploy-mode cluster \
  --files http://central.maven.org/maven2/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar \
  --jars scopt_2.11-3.5.0.jar \
  --name spark-test \
  --class ...  \
  --conf spark.executor.instances=2 \
  --conf spark.kubernetes.container.image.pullPolicy=Always \
  --conf spark.kubernetes.container.image=...s \
  local:///my.jar

The driver log should include the following:

2018-09-06 14:17:12 INFO  SparkContext:54 - Added JAR file:///scopt_2.11-3.5.0.jar at spark://spark-pi-1536243425076-driver-svc.default.svc:7078/jars/scopt_2.11-3.5.0.jar with timestamp 1536243432785
2018-09-06 14:17:12 INFO  SparkContext:54 - Added JAR file:///checkpointing-test-assembly-1.0.jar at spark://spark-pi-1536243425076-driver-svc.default.svc:7078/jars/checkpointing-test-assembly-1.0.jar with timestamp 1536243432787
2018-09-06 14:17:12 INFO  SparkContext:54 - Added file http://central.maven.org/maven2/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar at http://central.maven.org/maven2/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar with timestamp 1536243432789
2018-09-06 14:17:12 INFO  Utils:54 - Fetching http://central.maven.org/maven2/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar to /var/data/spark-64fad406-8403-4dc8-8be0-560a4db9c7b9/spark-7a9a62f7-a8d3-4a38-ac1d-6cd12019d9a7/userFiles-b9fe3bff-94fd-49ea-9984-52a1739d04fe/fetchFileTemp1542353258489317882.tmp

Each executor log should include the following:

2018-09-06 14:17:19 INFO  Utils:54 - Fetching spark://spark-pi-1536243425076-driver-svc.default.svc:7078/jars/checkpointing-test-assembly-1.0.jar to /var/data/spark-64fad406-8403-4dc8-8be0-560a4db9c7b9/spark-320ae8a2-93de-4f94-b5bb-a311492f6496/fetchFileTemp4817001870916928764.tmp
2018-09-06 14:17:19 INFO  Utils:54 - Copying /var/data/spark-64fad406-8403-4dc8-8be0-560a4db9c7b9/spark-320ae8a2-93de-4f94-b5bb-a311492f6496/-7567323641536243432787_cache to /opt/spark/work-dir/./checkpointing-test-assembly-1.0.jar
2018-09-06 14:17:19 INFO  Executor:54 - Adding file:/opt/spark/work-dir/./checkpointing-test-assembly-1.0.jar to class loader
2018-09-06 14:17:19 INFO  Executor:54 - Fetching spark://spark-pi-1536243425076-driver-svc.default.svc:7078/jars/scopt_2.11-3.5.0.jar with timestamp 1536243432785
2018-09-06 14:17:19 INFO  Utils:54 - Fetching spark://spark-pi-1536243425076-driver-svc.default.svc:7078/jars/scopt_2.11-3.5.0.jar to /var/data/spark-64fad406-8403-4dc8-8be0-560a4db9c7b9/spark-320ae8a2-93de-4f94-b5bb-a311492f6496/fetchFileTemp7825647058637784794.tmp
2018-09-06 14:17:19 INFO  Utils:54 - /var/data/spark-64fad406-8403-4dc8-8be0-560a4db9c7b9/spark-320ae8a2-93de-4f94-b5bb-a311492f6496/753430701536243432785_cache has been previously copied to /opt/spark/work-dir/./scopt_2.11-3.5.0.jar
2018-09-06 14:17:19 INFO  Executor:54 - Adding file:/opt/spark/work-dir/./scopt_2.11-3.5.0.jar to class loader
2018-09-06 14:17:19 INFO  TorrentBroadcast:54 - Started reading broadcast variable 0
2018-09-06 14:17:19 INFO  TransportClientFactory:267 - Successfully created connection to spark-pi-1536243425076-driver-svc.default.svc/10.28.1.102:7079 after 1 ms (0 ms spent in bootstraps)
  • --jars

Similar to --files, it specifies a comma-separated list of jars to include on the classpaths of the driver and executors.

For example:

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://$API_SERVER:$API_SERVER_PORT \
  --deploy-mode cluster \
  --jars file:///scopt_2.11-3.5.0.jar \
  --name spark-test \
  --class com.lightbend.fdp.spark.test.StructuredMetrics  \
  --conf spark.executor.instances=2 \
  --conf spark.kubernetes.container.image.pullPolicy=Always \
  --conf spark.kubernetes.container.image=... \
  local:///myjar.jar

You should see a message similar to this in the driver log:

2018-09-06 11:58:11 INFO  SparkContext:54 - Added JAR file:///scopt_2.11-3.5.0.jar at spark://spark-pi-1536235080845-driver-svc.default.svc:7078/jars/scopt_2.11-3.5.0.jar with timestamp 1536235091626

Similarly, if you use a remote URL:

2018-09-06 12:02:06 INFO  SparkContext:54 - Added JAR http://central.maven.org/maven2/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar at http://central.maven.org/maven2/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar with timestamp 1536235326157

This flag is also supported on the executor side, where logs should look similar to this:

2018-09-06 12:37:50 INFO  Executor:54 - Fetching spark://spark-pi-1536237447668-driver-svc.default.svc:7078/jars/checkpointing-test-assembly-1.0.jar with timestamp 1536237459269
2018-09-06 12:37:50 INFO  TransportClientFactory:267 - Successfully created connection to spark-pi-1536237447668-driver-svc.default.svc/10.28.2.61:7078 after 3 ms (0 ms spent in bootstraps)
2018-09-06 12:37:50 INFO  Utils:54 - Fetching spark://spark-pi-1536237447668-driver-svc.default.svc:7078/jars/checkpointing-test-assembly-1.0.jar to /var/data/spark-4b9c34a3-31d8-4fe3-b65e-462978f91aea/spark-0c43fb41-ccd3-439a-86b2-7172600099e6/fetchFileTemp978282503687687306.tmp
2018-09-06 12:37:50 INFO  Utils:54 - Copying /var/data/spark-4b9c34a3-31d8-4fe3-b65e-462978f91aea/spark-0c43fb41-ccd3-439a-86b2-7172600099e6/-19373393901536237459269_cache to /opt/spark/work-dir/./checkpointing-test-assembly-1.0.jar
2018-09-06 12:37:50 INFO  Executor:54 - Adding file:/opt/spark/work-dir/./checkpointing-test-assembly-1.0.jar to class loader
2018-09-06 12:37:50 INFO  Executor:54 - Fetching http://central.maven.org/maven2/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar with timestamp 1536237459266
2018-09-06 12:37:50 INFO  Utils:54 - Fetching http://central.maven.org/maven2/com/github/scopt/scopt_2.11/3.5.0/scopt_2.11-3.5.0.jar to /var/data/spark-4b9c34a3-31d8-4fe3-b65e-462978f91aea/spark-0c43fb41-ccd3-439a-86b2-7172600099e6/fetchFileTemp5060046497482776167.tmp
2018-09-06 12:37:50 INFO  Utils:54 - Copying /var/data/spark-4b9c34a3-31d8-4fe3-b65e-462978f91aea/spark-0c43fb41-ccd3-439a-86b2-7172600099e6/-12207727651536237459266_cache to /opt/spark/work-dir/./scopt_2.11-3.5.0.jar
2018-09-06 12:37:50 INFO  Executor:54 - Adding file:/opt/spark/work-dir/./scopt_2.11-3.5.0.jar to class loader
2018-09-06 12:37:50 INFO  TorrentBroadcast:54 - Started reading broadcast variable 1
2018-09-06 12:37:50 INFO  TransportClientFactory:267 - Successfully created connection to spark-pi-1536237447668-driver-svc.default.svc/10.28.2.61:7079 after 3 ms (0 ms spent in bootstraps)
  • --exclude-packages

The same as for packages above, but has the opposite sense.

Spark Scheduling Configuration

Spark on Kubernetes scheduling design has changed significantly in Spark 2.4 compared to 2.3. These changes made it more robust and more Kubernetes-aware. All details are discussed in the project design doc.

Let’s discuss how several scheduling parameters affect the number of scheduled executors. After it starts, the Spark task scheduler waits for the resource manager back end to become ready. It will wait for 30 seconds by default (controlled by the configuration setting, spark.scheduler.maxRegisteredResourcesWaitingTime) or it will stop waiting as soon as the back end reports that it is ready. Here, "ready" means that it has allocated the resources needed, i.e., the number of executors required to run all the job’s tasks.

The condition that controls this is the following formula:

totalRegisteredExecutors >= initialExecutors * minRegisteredRatio

These variables have the following values:

  • minRegisteredRatio is set to 0.8 by default, if spark.scheduler.minRegisteredResourcesRatio is not set

  • initialExecutors is controlled by spark.executor.instances, if set, or its value defaults to 2

  • totalRegisteredExecutors is the current number of executors registered to the back-end. When they are created, executors communicate to the backed for their registration.

Consider the following example invocation flags:

--conf spark.executor.memory=800M
--conf spark.driver.memory=2g
--conf spark.executor.instances=100

We request 100 executors, along with values for the executor and driver memory. On a small test cluster, like Minishift, this cannot be satisfied given the default CPU resources assigned to the executors.

As we see in the next image, many executors are is still in the pending state.

pending-executors-no-more-resources
Figure 16. Pending executors because we can’t schedule more resources

By checking the logs we observe two things. First, the number of allocated executors is 36 so far. Second, the tasks started running after a specific time period passed. So the request for 100 executors is never satisfied.

log-waiting-to-scheduling
Figure 17. Waiting to schedule more resources

The back end will ask for executors in batches. At some point the batch cannot be satisfied.

log-with-batch-requests
Figure 18. The log shows requests were made in batches

The batch size is controlled by the property spark.kubernetes.allocation.batch.size (default: 5).

The Spark on Kubernetes scheduler watches the state of the executors and runs a control loop trying to achieve what the user requested. The user requested 100 executors, so with a delay based on the property spark.kubernetes.allocation.batch.delay (default value: one second) it will check if there are zero pending executors and some other conditions in order to request the next batch. In the above example this will never be the case as the fifth executor will not start.

By tuning the configuration properties, we can get more executors, for example:

--conf spark.executor.memory=500M
--conf spark.driver.memory=2g
--conf spark.kubernetes.executor.request.cores=0.1
--conf spark.executor.instances=100
--conf spark.scheduler.minRegisteredResourcesRatio=0.01
--conf spark.kubernetes.allocation.batch.size=50
more-executors-with-tuned-resources
Figure 19. More executors can run with tuned parameters
$ oc get pods -n spark
NAME                              READY     STATUS    RESTARTS   AGE
spark-pi-1541422989824-driver     1/1       Running   0          8m
spark-pi-1541422989824-exec-1     1/1       Running   0          8m
spark-pi-1541422989824-exec-10    1/1       Running   0          8m
spark-pi-1541422989824-exec-100   1/1       Running   0          7m
spark-pi-1541422989824-exec-11    1/1       Running   0          8m
spark-pi-1541422989824-exec-12    1/1       Running   0          8m
spark-pi-1541422989824-exec-13    1/1       Running   0          8m
...
spark-pi-1541422989824-exec-97    1/1       Running   0          7m
spark-pi-1541422989824-exec-98    1/1       Running   0          7m
spark-pi-1541422989824-exec-99    1/1       Running   0          7m
batch-of-50-requests
Figure 20. A batch of 50
waiting-to-schedule-more
Figure 21. Waiting to schedule more

We can observe in the logs that we have two requests for 50 executors, all satisfied. Note that task scheduling started when at least 20*0.01 = 0.2 executors were available, which happens after the first executor registers.

In case of failures the back end will try to satisfy the requested number of executors. For example:

$ oc get pods -n spark
NAME READY STATUS RESTARTS AGE
test-cpus-1539340474549-driver 1/1 Running 0 13s
test-cpus-1539340474549-exec-1 1/1 Running 0 5s
test-cpus-1539340474549-exec-2 1/1 Running 0 4s
test-cpus-1539340474549-exec-3 1/1 Running 0 4s
test-cpus-1539340474549-exec-4 1/1 Running 0 4s

$ oc delete pods test-cpus-1539340474549-exec-4 -n spark
pod "test-cpus-1539340474549-exec-4" deleted

$ oc get pods -n spark
NAME READY STATUS RESTARTS AGE
test-cpus-1539340474549-driver 1/1 Running 0 32s
test-cpus-1539340474549-exec-1 1/1 Running 0 24s
test-cpus-1539340474549-exec-2 1/1 Running 0 23s
test-cpus-1539340474549-exec-3 1/1 Running 0 23s
test-cpus-1539340474549-exec-5 1/1 Running 0 8s

The job completes successfully in this scenario.

Spark CPU and Memory Management

Like other container orchestrators, OpenShift supports container isolation for memory and CPU. There are two ways to get isolation per container on a given node.

One way is the CPU shares method, where containers from different pods share the available CPUs, given the limit configuration option we used earlier. They limit the time they use the CPUs (see here for more details). OpenShift controls CPU limits by passing the CPU period and CPU quota options to Docker. These flags have the following meanings:

  • --cpu-period denotes the period in which container CPU utilization is tracked. It defaults to 100ms (100000).

  • --cpu-quota is the total amount of CPU time that a container can use in each CPU period.

As an example, a CPU limit of 1.5 means 150000 quota in a period of 100000. That period spans across all CPUs on the node (more details here). If CPUs are over-utilized then application pauses may be experienced as any container exceeding its quota in a given period will not be allowed to run again until the next period.

The second way of controlling CPUs allocated to containers is via CPU sets. In this case CPUs are exclusively assigned to containers.

These options for CPU management depend on the CPU management policy on the node. Two policies are defined:

  • The none policy explicitly enables the existing default CPU affinity scheme, providing no affinity beyond what the OS scheduler does automatically. Limits on CPU usage for guaranteed pods are enforced using CFS quota.

  • The static policy allows containers in guaranteed pods with integer CPU requests exclusive access to CPUs on the node. This exclusivity is enforced using the CPU set cgroup controller. Guaranteed pods are the ones that have request and limit set to the same value.

Note

For OpenShift, CPU shares are available by default while CPU sets are considered a technology incubating feature and not recommended at this time. In the Kubernetes project itself, they are considered a beta feature.

Until recently, the JVM did not respect container quotas. For example, because Spark runs on the JVM, a Spark job will see all available CPUs on the node, if it runs on a JVM with this limitation. The current version of Java for Spark on OpenShift is shown as follows:

java -version
openjdk version "1.8.0_171"
OpenJDK Runtime Environment (IcedTea 3.8.0) (Alpine 8.171.11-r0)
OpenJDK 64-Bit Server VM (build 25.171-b11, mixed mode)

With Java 8 update 131 and later and Java 9 and later, two fixes were added which make the JVM aware of Docker CPU limits transparently: JDK-8170888 and JDK-6515172.

As described in Oracle’s blog:

  1. That means if -XX:ParalllelGCThreads, or -XX:CICompilerCount are not specified as command line options, the JVM will apply the Docker CPU limit as the number of CPUs the JVM sees on the system. The JVM will then adjust the number of GC threads and JIT compiler threads just like it would as if it were running on a bare metal system with number of CPUs set as the Docker CPU limit. If -XX:ParallelGCThreads or -XX:CICompilerCount are specified as JVM command line options, and Docker CPU limit are specified, the JVM will use the -XX:ParallelGCThreads and -XX:CICompilerCount values. …​

But this applies to CPU sets not CPU shares. The bottom line is that CPU shares are not used directly by the Java version used by default in Spark on OpenShift. Users need to tune the above options on their own.

Note

There are cases where CFS and GC threads interact, causing pauses. Consider monitoring these quantities if you suspect this is happening.

For the sake of completeness, in Java 10 and newer JVMs, memory and CPU are solely configured from cgroups, JDK-8146115 and JDK-8179498. So by default, CPU shares are also examined.

The driver belongs to the Burstable QoS because CPU limit is configurable while the CPU request value cannot be configured. So, the CPU limit and CPU request cannot match (the latter has no value). Executors provide the option to specify the request and limit for the CPUs.

Regarding memory at the driver side, if you set spark.driver.memory, this will be assigned to the flag -Xmx. The default assigned value is -Xmx1g. The driver also uses more memory beyond the JVM heap size, so a value of 1408Mi is assigned to limits and requests, by default.

For the executor, the minimum and maximum are set for the JVM heap with these default values, -Xms1g -Xmx1g. That means we don’t need to explicitly define the following options: -XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap, so that the JVM is aware of the docker limits. The executor assigns the same value for total memory to limits and requests with all options being configurable.

You can always check the resource restrictions by running oc describe pods <pod-name>. By default without any parameter passed, we get output like the following for both the driver and the executors:

Limits:
  memory:  1408Mi
Requests:
  CPU:     1
  memory:  1408Mi
Spark Dependency Management

If your application’s dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to by their appropriate remote URIs. Also, application dependencies can be pre-mounted into custom-built Docker images. Those dependencies can be added to the classpath by referencing them with local:// URIs or setting the SPARK_EXTRA_CLASSPATH environment variable in your Docker files. The local:// scheme is also required when referring to dependencies in custom-built Docker images in spark-submit. Note that using application dependencies from the submission client’s local file system is currently not yet supported.

The idea of accessing a resource from within the image reflects the Kubernetes state of the art approach of how apps should be packaged in images and run via a container orchestrator. The same idea can be utilized by Spark apps.

Logging Configurations for Spark Jobs

Logging in Spark can be configured in two ways. One option is to set the logging configuration in your job’s source code. For example, the SparkContext allows you to set the log level using a method. Or, you can access the root logger directly, LogManager.getRootLogger().setLevel(Level…​). The latter approach could also be used on the executor side, for example:

...foreachPartition(p =>
LogManager.getRootLogger().setLevel(Level.DEBUG)
 val log = LogFactory.getLog("execlog:")
 log.debug("...")
}

The other option is to set the log4j properties in the corresponding configuration file under $SPARK_HOME/conf. This method is usually preferred, as it requires no source-code changes.

There is a template file named conf/log4j.properties.template that comes with each Spark distribution, which is traditionally used. However, when run in OpenShift, the conf directory will be overridden at run time. The driver’s internal ConfigMap is loaded as a file, namely spark.properties, in that folder. This is done using a volume that is mounted at the same path before the file is written and overrides everything:

$ oc exec -it spark-pi-1536970567591-driver -n spark sh
sh-4.4# ls /opt/spark/conf/
spark.properties

So, a better approach is to create a log4j properties file under another directory within your image and point to that using the following spark-submit arguments:

--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///log4j.properties ..."
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=file:///log4j.properties  ..."

Here we set our log4j properties file at the root directory in our image. This of course requires you to build your own image.

Here is a log4j.properties.template file, which you can modify as desired:

log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

In order to get the logs you can always use the Kubernetes UI or the CLI. However, the executors log output, along with other information, is not available via the Kubernetes UI due to a known issue described earlier.

Use the following CLI command to get a snapshot of the logs:

oc logs spark-pi-1536919646594-driver

If you want to stream the log contents for your Spark pod, pass the -f option (for "follow"):

oc logs -f spark-pi-1536919646594-driver

The oc logs command allows for more flexibility providing several more options; use the help flag -h to see them:

oc logs -h
Log Aggregation with Humio

Spark on Kubernetes integrates with Humio, a commercial log aggregation and query system, which Lightbend recommends. With Humio you can easily select your Spark pod and check its logs:

Selecting logs for a Spark pod
Figure 22. Selecting logs for a Spark pod

Humio comes with a lot of features for manipulating the logs. The following screenshot shows how to query only the raw messages in the logs:

Humio "rawmessage" query
Figure 23. Humio "rawmessage" query for Spark

Contact Humio for OpenShift-specific installation instructions. See the Humio documentation for more general information.

Configuration for Debugging a Spark Process

Debugging a Spark application can be done by using the information provided in the Spark UI of the driver and through the driver and executor logs. Useful information can also be acquired by monitoring several metrics exposed by Spark, which are described below. Worst case, you can attach a profiler or debugger to the Spark JVM directly using JConsole or VisualVm.

This is a two step process. First you need to do port-forwarding of the port that is exposed in the container and then use the appropriate software to connect to that port from your local machine. For example for JConsole, use these flags:

-Dcom.sun.management.jmxremote.port=8090
-Dcom.sun.management.jmxremote.rmi.port=8090

Use this command to forward the output on port 8090:

oc port-forward spark-1536608707660-driver 8090:8090

Now launch JConsole and create a connection to localhost:8090, which connects you to the remote process.

Attaching a profiler may require the extra step of loading a profiling agent in the Spark JVM process. That means you need to add any libs required according to your profiler’s documentation in your image and pass any required options when launching your job, like -agentpath: to either spark.executor.extraJavaOptions or spark.driver.extraJavaOptions.

Finally, since in many cases when an error occurs the executors are cleaned up, the logs cannot be examined easily. To prevent this, set spark.kubernetes.executor.deleteOnTermination to false so the logs can be retrieved using the following command:

$ oc logs spark_executor_pod_name -n 
Accessing Kafka

To integrate your Spark jobs with Kafka follow the Structured Streaming Kafka integration instructions on the Spark site. The strict requirement is that Spark jobs need to have direct network access to the brokers.

Running Spark Jobs with an Arbitrary User

By default, Spark pod containers are run with a random user id (uid) picked from the range specified for each project. You can inspect the range of available ids per project using the following command:

---
$ oc describe project spark-user
Name:			spark-user
Created:		2 days ago
Labels:			
Annotations:		openshift.io/sa.scc.mcs=s0:c38,c12
			openshift.io/sa.scc.supplemental-groups=1001430000/10000
			openshift.io/sa.scc.uid-range=1001430000/10000
Display Name:		
Description:		
Status:			Active
Node Selector:		
Quota:			
Resource limits:	
----

To learn more about user management on Openshift please see this blog post and the related documentation about security context constraints. See also this Appendix for more information on Spark images delivered by Lightbend that are designed for Openshift.

Although it is not recommended you could run the Spark pods with their built-in users, for example, root. You will need to modify the scc (security context constraints) for your user to be able to run with runAsAny. Be aware that this could create issues with volume management. See here for details. For example, default supplemental groups are not produced if you use runAsAny, which might prevent access to storage by default.

Running Spark Jobs in an Arbitrary Namespace

Let’s see how we can run a Spark job in an arbitrary namespace. We will submit the job as a cluster user who runs spark-submit directly from a local machine. We also assume you have logged in with oc login.

Start by defining the following resources:

spark-rbac.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: spark
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark-sa
  namespace: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: spark-role
  namespace: spark
rules:
- apiGroups:
  - ""
  resources:
  - "pods"
  verbs:
  - "*"
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: spark-role-binding
  namespace: spark
subjects:
- kind: ServiceAccount
  name: spark-sa
  namespace: spark
roleRef:
  kind: Role
  name: spark-role
  apiGroup: rbac.authorization.k8s.io
Note
  • If you are installing in the lightbend, omit the part in Using a Pod that creates a new namespace named spark. Moreover it is possible to define a cluster role instead, but that grants more privileges than strictly necessary. See here, for more on Kubernetes roles.

  • We assume that you have the permissions to create namespaces in the cluster scope. A non-privileged user will have to create a project as follows:

$ oc new-project spark

Next, we’ll use the following RBAC file, spark-rbac-non-privileged.yaml:

spark-rbac-non-privileged.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark-sa
  namespace: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: spark-role
  namespace: spark
rules:
- apiGroups:
  - ""
  resources: ["pods" ]
  verbs:
  - get
  - create
  - delete
  - watch
  - list
  - update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: spark-role-binding
  namespace: spark
subjects:
- kind: ServiceAccount
  name: spark-sa
  namespace: spark
roleRef:
  kind: Role
  name: spark-role
  apiGroup: rbac.authorization.k8s.io

Now, we can create the RBAC configuration:

$ oc create -f spark-rbac-non-privileged.yaml
namespace spark created
serviceaccount "spark-sa" created
clusterrole.rbac.authorization.k8s.io "spark-role" created
clusterrolebinding.rbac.authorization.k8s.io "spark-role-binding" created

To launch your job, pass the following two configuration options to Spark:

--conf spark.kubernetes.namespace=spark
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa

You can check your Spark jobs status in the new namespace using the following command:

$ oc get po -n spark
NAME                             READY     STATUS              RESTARTS   AGE
spark-pi-1539109476179-driver    1/1       Running             0          21s
spark-pi-1539109476179-exec-1    1/1       Running             0          10s
spark-pi-1539109476179-exec-10   0/1       ContainerCreating   0          3s
spark-pi-1539109476179-exec-2    1/1       Running             0          10s
spark-pi-1539109476179-exec-3    1/1       Running             0          10s
spark-pi-1539109476179-exec-4    1/1       Running             0          10s
spark-pi-1539109476179-exec-5    1/1       Running             0          10s
spark-pi-1539109476179-exec-6    0/1       ContainerCreating   0          3s
spark-pi-1539109476179-exec-7    0/1       ContainerCreating   0          3s
spark-pi-1539109476179-exec-8    0/1       ContainerCreating   0          3s
spark-pi-1539109476179-exec-9    0/1       ContainerCreating   0          3s

If you are a user with limited privileges and you would like to run your jobs in a namespace of another project, then the project admin needs to provide you with the appropriate rights to execute certain operations in that namespace. A sample of these rights are shown next:

spark-user-rbac.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: spark-role
  namespace: spark
rules:
- apiGroups:
  - ""
  resources: ["pods", "services", "configmaps" ]
  verbs:
  - get
  - create
  - delete
  - watch
  - list
  - update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: spark-role-binding
  namespace: spark
subjects:
- kind: User
  name: john
  apiGroup: rbac.authorization.k8s.io
roleRef:
  kind: Role
  name: spark-role
  apiGroup: rbac.authorization.k8s.io

In this example user john is allowed to do certain operations on pods, services and configmaps in namespace spark.

The admin needs to run:

$ oc create -f spark-user-rbac.yaml

to set the proper role.

Note

The Spark back-end implementation uses this fabric8io/kubernetes-client library, which by default reads the configuration in $HOME/.kube/config.

If you are not logged in when you submit the Spark job, you can use a token (bearer) provided by the OpenShift UI.

Click on the Copy Login Command and text like the following will be copied to your clipboard:

$ oc login https://xxxxxxxx.us-west-2.elb.amazonaws.com --token=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx`

Note the token value. Use it in place of $TOKEN in the following options you will need pass to spark-submit:

--conf spark.kubernetes.authenticate.submission.oauthToken=$TOKEN
--conf spark.kubernetes.authenticate.submission.caCertFile=/path/to/ca.crt

The CA.crt of your cluster needs to be downloaded to your remote machine. That certificate is available in any pod launched in the default namespace under its file system path. See this documentation for more details.

You can also generate X.509 certificates for an existing user to use them with your Spark jobs. To do this, ssh to a master node and execute the following command, using the appropriate value for the user. Here, we’ll assume user stavros:

sudo oc --user=stavros adm create-api-client-config \
  --client-dir=./"" \
  --signer-cert='/etc/origin/master/ca.crt' \
  --signer-key='/etc/origin/master/ca.key' \
  --signer-serial='/etc/origin/master/ca.serial.txt' \
  --certificate-authority='/etc/origin/master/ca.crt'

This will generate the following files:

ca.crt
stavros.crt
stavros.key
stavros.kubeconfig

Copy the files to your machine and add the following configuration options to your spark-submit command:

--conf spark.kubernetes.authenticate.submission.caCertFile=/path/to/ca.crt
--conf spark.kubernetes.authenticate.submission.clientKeyFile=/path/to/stavros.key
--conf spark.kubernetes.authenticate.submission.clientCertFile=/path/to/stavros.crt
Multi-tenancy and Spark Applications

Namespaces allow you to divide cluster resources among users, providing a scope where names must be unique and possibly map a company’s organizational structure to the cluster.

Currently on Kubernetes, a namespace is considered as the tenant in the most basic form of multi-tenancy. This is the scenario where multiple departments or lines of business share the same cluster as tenants.

On Openshift, users create projects which are in 1:1 correspondence with namespaces, but act as a wrapper around them in order to provide the desired level of security and isolation. Also, projects allow you to create namespaces with a pre-configured template and assign specific, default polices to them. Projects can be further isolated with advanced configuration options using network policies.

Spark supports RBAC and is namespace-aware, as we have seen earlier. This means users can isolate Spark workloads by creating their own namespaces.

Spark uses a service account to create the required resources, thus securing that account is critical per project. Service accounts in general can access resources across namespaces when required, but Spark applications only target a specific namespace when they are deployed with the spark-submit tool.

Spark applications require access to storage. A Persistent Volume (PV) accessed by a namespaced Persistent Volume Claim (PVC) is used for this purpose. Users who have access to a PV via their running pods must be validated also according to the POSIX rights of the volume they access. This is described in detail here.

All Configuration Options for Spark on Kubernetes

The following table lists the available configuration options.

Table 22. Spark on Kubernetes Configuration Options

Property Name

Default

Allowed Values

Explanation

spark.kubernetes.namespace

default

The namespace that will be used for running the driver and executor pods

spark.kubernetes.container.image

-

-

Container image to use for the Spark application. This is usually of the form example.com/repo/spark:v1.0.0. This configuration is required and must be provided by the user, unless explicit images are provided for each different container type.

spark.kubernetes.driver.container.image

Value of spark.kubernetes.container.image

-

Container image to use for the driver

spark.kubernetes.executor.container.image

Value of spark.kubernetes.container.image

-

Container image to use for the executors

spark.kubernetes.container.image.pullPolicy

IfNotPresent

Valid values are: Always, Never, and IfNotPresent.

Kubernetes image pull policy.

spark.kubernetes.container.image.pullSecrets

-

-

Comma separated list of Kubernetes secrets used to pull images from private image registries.

spark.kubernetes.allocation.batch.size

5

a positive integer

Number of pods to launch at once in each round of executor pod allocation.

spark.kubernetes.allocation.batch.delay

1s

a positive time value in seconds.

Time to wait between each round of executor pod allocation. Specifying values less than 1 second may lead to excessive CPU usage on the spark driver.

spark.kubernetes.authenticate.submission.caCertFile

-

-

Path to the CA cert file for connecting to the Kubernetes API server over TLS when starting the driver. This file must be located on the submitting machine’s disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.caCertFile instead.

spark.kubernetes.authenticate.submission.clientKeyFile

-

-

Path to the client key file for authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine’s disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientKeyFile instead.

spark.kubernetes.authenticate.submission.clientCertFile

-

-

Path to the client cert file for authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine’s disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientCertFile instead.

spark.kubernetes.authenticate.submission.oauthToken

-

-

OAuth token to use when authenticating against the Kubernetes API server when starting the driver. Note that unlike the other authentication options, this is expected to be the exact string value of the token to use for the authentication. In client mode, use spark.kubernetes.authenticate.oauthToken instead.

spark.kubernetes.authenticate.submission.oauthTokenFile

-

-

Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server when starting the driver. This file must be located on the submitting machine’s disk. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.oauthTokenFile instead.

spark.kubernetes.authenticate.driver.caCertFile

-

-

Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting executors. This file must be located on the submitting machine’s disk, and will be uploaded to the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.caCertFile instead.

spark.kubernetes.authenticate.driver.clientKeyFile

-

-

Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This file must be located on the submitting machine’s disk, and will be uploaded to the driver pod as a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientKeyFile instead.

spark.kubernetes.authenticate.driver.clientCertFile

-

-

Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This file must be located on the submitting machine’s disk, and will be uploaded to the driver pod as a Kubernetes secret. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientCertFile instead.

spark.kubernetes.authenticate.driver.oauthToken

-

-

OAuth token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. Note that unlike the other authentication options, this must be the exact string value of the token to use for the authentication. This token value is uploaded to the driver pod as a Kubernetes secret. In client mode, use spark.kubernetes.authenticate.oauthToken instead.

spark.kubernetes.authenticate.driver.oauthTokenFile

-

-

Path to the OAuth token file containing the token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. Note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication. This token value is uploaded to the driver pod as a secret. In client mode, use spark.kubernetes.authenticate.oauthTokenFile instead.

spark.kubernetes.authenticate.driver.mounted.caCertFile

-

-

Path to the CA cert file for connecting to the Kubernetes API server over TLS from the driver pod when requesting executors. This path must be accessible from the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.caCertFile instead.

spark.kubernetes.authenticate.driver.mounted.clientKeyFile

-

-

Path to the client key file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientKeyFile instead.

spark.kubernetes.authenticate.driver.mounted.clientCertFile

-

-

Path to the client cert file for authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. Specify this as a path as opposed to a URI (i.e. do not provide a scheme). In client mode, use spark.kubernetes.authenticate.clientCertFile instead.

spark.kubernetes.authenticate.driver.mounted.oauthTokenFile

Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server from the driver pod when requesting executors. This path must be accessible from the driver pod. Note that unlike the other authentication options, this file must contain the exact string value of the token to use for the authentication. In client mode, use spark.kubernetes.authenticate.oauthTokenFile instead.

spark.kubernetes.authenticate.driver.serviceAccountName

-

Any valid existing Kubernetes service account name.

Service account that is used when running the driver pod. The driver pod uses this service account when requesting executor pods from the API server. Note that this cannot be specified alongside a CA cert file, client key file, client cert file, and/or OAuth token. In client mode, use spark.kubernetes.authenticate.serviceAccountName instead.

spark.kubernetes.authenticate.caCertFile

-

-

In client mode, path to the CA cert file for connecting to the Kubernetes API server over TLS when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).

spark.kubernetes.authenticate.clientKeyFile

-

-

In client mode, path to the client key file for authenticating against the Kubernetes API server when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).

spark.kubernetes.authenticate.clientCertFile

-

-

In client mode, path to the client cert file for authenticating against the Kubernetes API server when requesting executors. Specify this as a path as opposed to a URI (i.e. do not provide a scheme).

spark.kubernetes.authenticate.oauthToken

-

-

In client mode, the OAuth token to use when authenticating against the Kubernetes API server when requesting executors. Note that unlike the other authentication options, this must be the exact string value of the token to use for the authentication.

spark.kubernetes.authenticate.oauthTokenFile

-

-

In client mode, path to the file containing the OAuth token to use when authenticating against the Kubernetes API server when requesting executors.

spark.kubernetes.driver.label.[LabelName]

-

-

Add the label specified by LabelName to the driver pod. For example, spark.kubernetes.driver.label.something=true. Note that Spark also adds its own labels to the driver pod for bookkeeping purposes.

spark.kubernetes.driver.annotation.[AnnotationName]

-

-

Add the annotation specified by AnnotationName to the driver pod. For example, spark.kubernetes.driver.annotation.something=true.

spark.kubernetes.executor.label.[LabelName]

-

-

Add the label specified by LabelName to the executor pods. For example, spark.kubernetes.executor.label.something=true. Note that Spark also adds its own labels to the driver pod for bookkeeping purposes.

spark.kubernetes.executor.annotation.[AnnotationName]

-

-

Add the annotation specified by AnnotationName to the executor pods. For example, spark.kubernetes.executor.annotation.something=true.

spark.kubernetes.driver.pod.name

-

-

Name of the driver pod. In cluster mode, if this is not set, the driver pod name is set to "spark.app.name" suffixed by the current timestamp to avoid name conflicts. In client mode, if your application is running inside a pod, it is highly recommended to set this to the name of the pod your driver is running in. Setting this value in client mode allows the driver to become the owner of its executor pods, which in turn allows the executor pods to be garbage collected by the cluster.

spark.kubernetes.executor.lostCheck.maxAttempts

10

a positive time value.

Number of times that the driver will try to ascertain the loss reason for a specific executor. The loss reason is used to ascertain whether the executor failure is due to a framework or an application error which in turn decides whether the executor is removed and replaced, or placed into a failed state for debugging.

spark.kubernetes.submission.waitAppCompletion

true

Boolean

In cluster mode, whether to wait for the application to finish before exiting the launcher process. When changed to false, the launcher has a "fire-and-forget" behavior when launching the Spark job.

spark.kubernetes.executor.deleteOnTermination

true

Boolean

Specify whether executor pods should be deleted in case of failure or normal termination.

spark.kubernetes.report.interval

1s

a positive time value

Interval between reports of the current app status in cluster mode.

spark.kubernetes.driver.limit.cores

-

-

Specify a hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for the driver pod.

spark.kubernetes.executor.request.cores

-

-

Specify the CPU request for each executor pod. Values conform to the Kubernetes [convention](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#meaning-of-cpu). Example values include 0.1, 500m, 1.5, 5, etc., with the definition of CPU units documented in [CPU units](https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/#cpu-units). This is distinct from spark.executor.cores: it is only used and takes precedence over spark.executor.cores for specifying the executor pod CPU request if set. Task parallelism, e.g., number of tasks an executor can run concurrently is not affected by this.

spark.kubernetes.executor.limit.cores

-

-

Specify a hard CPU [limit](https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/#resource-requests-and-limits-of-pod-and-container) for each executor pod launched for the Spark Application.

spark.kubernetes.node.selector.[labelKey]

-

-

Adds to the node selector of the driver pod and executor pods, with key labelKey and the value as the configuration’s value. For example, setting spark.kubernetes.node.selector.identifier to myIdentifier will result in the driver pod and executors having a node selector with key identifier and value myIdentifier. Multiple node selector keys can be added by setting multiple configurations with this prefix.

spark.kubernetes.driverEnv.[EnvironmentVariableName]

-

-

Add the environment variable specified by EnvironmentVariableName to the Driver process. The user can specify multiple of these to set multiple environment variables.

spark.kubernetes.driver.secrets.[SecretName]

-

-

Add the Kubernetes Secret named SecretName to the driver pod on the path specified in the value. For example, spark.kubernetes.driver.secrets.spark-secret=/etc/secrets.

spark.kubernetes.executor.secrets.[SecretName]

-

-

Add the Kubernetes Secret named SecretName to the executor pod on the path specified in the value. For example, spark.kubernetes.executor.secrets.spark-secret=/etc/secrets.

spark.kubernetes.driver.secretKeyRef.[EnvName]

-

-

Add as an environment variable to the driver container with name EnvName (case sensitive), the value referenced by key key in the data of the referenced Kubernetes Secret. For example, spark.kubernetes.driver.secretKeyRef.ENV_VAR=spark-secret:key.

spark.kubernetes.executor.secretKeyRef.[EnvName]

-

-

Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key ` key ` in the data of the referenced Kubernetes Secret. For example, spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key.

spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path

-

-

Add the Kubernetes Volume named VolumeName of the VolumeType type to the driver pod on the path specified in the value. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint.

spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly

-

-

Specify if the mounted volume is read only or not. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false.

spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName]

-

-

Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value, must conform with Kubernetes option format. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim.

spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path

-

-

Add the Kubernetes Volume named VolumeName of the VolumeType type to the executor pod on the path specified in the value. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint.

spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly

false

-

Specify if the mounted volume is read only or not. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false.

spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].options.[OptionName]

-

-

Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim.

spark.kubernetes.local.dirs.tmpfs

false

-

Configure the emptyDir volumes used to back SPARK_LOCAL_DIRS within the Spark driver and executor pods to use tmpfs backing i.e. RAM. See #local-storage[Local Storage,window="kubernetes"] earlier on this page for more discussion of this.

spark.kubernetes.memoryOverheadFactor

0.1

-

This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, and various systems processes. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM jobs. This is done as non-JVM tasks need more non-JVM heap space and such tasks commonly fail with "Memory Overhead Exceeded" errors. This preempts this error with a higher default.

spark.kubernetes.pyspark.pythonVersion

2

-

This sets the major Python version of the docker image used to run the driver and executor containers. Can either be 2 or 3.

Alternative Ways to Run Spark Jobs on OpenShift

Let’s examine a few other ways to run Spark jobs.

Using a Pod

Besides spark-submit it is also possible to run a Spark Job in a fire-and-forget manner via a Pod definition, which specifies what to do in a declarative manner. The idea is to run a pod in the cluster which will execute the spark-submit command itself to run the Spark Job, using cluster or client mode.

For example you can use the following sample YAML files as a template to run a job, where we assume the $API_SERVER:$API_PORT is the Kubernetes default, kubernetes.default.svc:

pod-cmd.yaml
apiVersion: v1
kind: Pod
metadata:
  name: test-deployment-pi-cmd
  namespace: spark
spec:
  serviceAccountName: spark-sa
  containers:
  - name: test-deployment-pi
    image: lightbend/spark:2.1.1-OpenShift-2.4.3-rh
    command:
      - 'sh'
      - '-c'
      - "myuid=$(id -u);
         mygid=$(id -g);
         uidentry=$(getent passwd $myuid);
         if [ -z \"$uidentry\" ] ; then
           if [ -w /etc/passwd ] ; then
             echo \"$myuid:x:$myuid:$mygid:anonymous uid:$SPARK_HOME:/bin/false\" >> /etc/passwd;
           else
             echo \"Container ENTRYPOINT failed to add passwd entry for anonymous UID\";
           fi;
         fi;
         /opt/spark/bin/spark-submit
         --master k8s://https://kubernetes.default.svc
         --deploy-mode cluster
         --name spark-pi
         --class org.apache.spark.examples.SparkPi
         --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa
         --conf spark.kubernetes.namespace=spark
         --conf spark.executor.memory=2G
         --conf spark.driver.memory=2G
         --conf spark.executor.instances=2
         --conf spark.kubernetes.container.image.pullPolicy=Always
         --conf spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3-rh
         local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
  restartPolicy: Never

Here we override the entrypoint of the image so we need to make sure we can run as an arbitrary user.

Another option utilizes the logic of the entrypoint to run arbitrary commands, which may be preferable:

pod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: test-deployment-pi
  namespace: spark
spec:
  serviceAccountName: spark-sa
  containers:
  - name: test-deployment-pi
    image: lightbend/spark:2.1.1-OpenShift-2.4.3-rh
    args:
      -  "/opt/spark/bin/spark-submit"
      -  "--master"
      -  "k8s://https://kubernetes.default.svc
      -  "--deploy-mode"
      -  "cluster"
      -  "--name"
      -  "spark-pi"
      -  "--class"
      -  "org.apache.spark.examples.SparkPi"
      -  "--conf"
      -  "spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa"
      -  "--conf"
      -  "spark.kubernetes.namespace=spark"
      -  "--conf"
      -  "spark.executor.memory=2G"
      -  "--conf"
      -  "spark.driver.memory=2G"
      -  "--conf"
      -  "spark.executor.instances=2"
      -  "--conf"
      -  "spark.kubernetes.container.image.pullPolicy=Always"
      -  "--conf"
      -  "spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3-rh"
      -  "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"

We take advantage of the pass through mode of the Spark image, which can run arbitrary commands.

Make sure you have the right privileges to launch Spark jobs in the given namespace and with the given service account. By default, Spark uses default for both.

Here we target a specific namespace spark, which a normal user must create ahead of time with this command:

$ oc new-project spark

To bind a cluster role to a default account in a namespace or to implement privileges in a more fine-grained way, use a pod-rbac.yaml:

pod-rbac.yaml
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark-sa
  namespace: spark
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: spark-role
  namespace: spark
rules:
- apiGroups:
  - ""
  resources: ["pods" , "services" , "configmaps"]
  verbs:
  - get
  - create
  - delete
  - list
  - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: spark-role-binding
  namespace: spark
subjects:
- kind: ServiceAccount
  name: spark-sa
  namespace: spark
roleRef:
  kind: Role
  name: spark-role

The reason we need this is because we will run the spark-submit command from within the cluster and since this "submission" pod is going to use a specific namespace and service account to run, that service account needs to have the right privileges to create the driver and executor pods. This pod-rbac.yaml file configures the driver’s service so that the executors can access the driver and the driver’s ConfigMap. Also it needs to be able to watch the driver pod. (This can be deactivated, see All Configuration Options for Spark on Kubernetes for details.)

Compare this approach with the privileges we needed above when we used cluster mode with spark-submit, executed outside the cluster. The user needs to be able to talk to the API Server and have privileges to create and watch the driver pod. Then the service service account used by the driver needs to be able to create executor pods, ConfigMaps, and services.

Now submit the pod spec with the following command:

$ oc create -f pod.yaml
Note

When you use create, you will need to run oc delete -f pod.yaml before you can re-create the same pod. This cleans up the previous instance.

Tip

Use spark.kubernetes.submission.waitAppCompletion set to false so that the pod running spark-submit doesn’t wait around, but exits after submission.

One downside of this approach, and any approach that uses cluster mode from within the cluster, is that it requires extra rights for watching the driver pod. If you submit from outside the cluster, spark-submit will utilize your Kubernetes configuration to access the API server for watching the driver pod. However, in both cases, you are required to have extra privileges such as the ability to run get pods, get services, etc. Hence, the RBAC step above is always required.

This is the most isolated way you can run and watch a Spark job from within the cluster, including "fire and forget" scenarios.

Client mode is a more convenient way to run jobs since you don’t need an extra pod. On the other hand, some additional setup is required, as client mode assumes bi-directional communication between the host and the driver, port accessibility etc. If we run in the cluster we can avoid most of these issues anyway, but a headless service must be pre-installed.

Here is an example of how to accomplish this:

service.yaml
kind: Service
apiVersion: v1
metadata:
  name: spark-test-app-1-svc
  namespace: spark
spec:
  clusterIP: None
  selector:
    spark-app-selector: spark-test-app-1
  ports:
  - protocol: TCP
    name: driver-port
    port: 7077
    targetPort: 7077
  - protocol: TCP
    name: block-manager
    port: 10000
    targetPort: 10000
pod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: spark-test-app-1
  labels:
    spark-app-selector: spark-test-app-1
spec:
  serviceAccountName: spark-sa
  containers:
  - name: test-deployment-pi
    image: lightbend/spark:2.1.1-OpenShift-2.4.3-rh
    args:
      -  "/opt/spark/bin/spark-submit"
      -  "--master"
      -  "k8s://https://kubernetes.default.svc
      -  "--deploy-mode"
      -  "client"
      -  "--name"
      -  "spark-pi"
      -  "--class"
      -  "org.apache.spark.examples.SparkPi"
      -  "--conf"
      -  "spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa"
      -  "--conf"
      -  "spark.kubernetes.namespace=spark"
      -  "--conf"
      -  "spark.executor.memory=2G"
      -  "--conf"
      -  "spark.driver.memory=2G"
      -  "--conf"
      -  "spark.executor.instances=2"
      -  "--conf"
      -  "spark.kubernetes.container.image.pullPolicy=Always"
      -  "--conf"
      -  "spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3-rh"
      -  "--conf"
      -  "spark.kubernetes.authenticate.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token"
      -  "--conf"
      -  "spark.kubernetes.authenticate.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
      -  "--conf"
      -  "spark.kubernetes.driver.pod.name=spark-test-app-1"
      -  "--conf"
      -  "spark.driver.host=spark-test-app-1-svc.spark.svc"
      -  "--conf"
      -  "spark.driver.port=7077"
      -  "--conf"
      -  "spark.driver.blockManager.port=10000"
      -  "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
  restartPolicy: Never

Now run these commands:

$ oc create -f service.yaml
$ oc create -f pod.yaml

The same approach can be used for a Job, as we’ll see in the next section.

But first, there are a few considerations for the executors' garbage collection in client mode.

If you run your Spark driver in a pod, it is highly recommended to set spark.driver.pod.name to the name of that pod. When this property is set, the Spark scheduler will deploy the executor pods with an OwnerReference, which in turn will ensure that once the driver pod is deleted from the cluster, all of the application’s executor pods will also be deleted. The driver will look for a pod with the given name in the namespace specified by spark.kubernetes.namespace, and an OwnerReference pointing to that pod will be added to each executor pod’s OwnerReferences list. Be careful to avoid setting the OwnerReference to a pod that is not actually the corresonding driver pod, or else the executors may be terminated prematurely when the wrong pod is deleted.

If your application is not running inside a pod, or if spark.driver.pod.name is not set when your application is running in a pod, keep in mind that the executor pods may not be properly deleted from the cluster when the application exits. The Spark scheduler attempts to delete these pods, but if the network request to the Kubernetes API server fails for any reason, these pods will remain in the cluster. The executor processes should exit when they cannot reach the driver, so the executor pods should not consume CPU and memory resources in the cluster after your application exits.

Using a Job

Jobs in Kubernetes are managed by the Job controller, which makes sure that the job will run to completion. The major difference compared to a pod is that it will be restarted even if the node fails that it is running on.

The corresponding YAML file looks like the following:

job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: test-deployment-pi-job
spec:
  template:
    metadata:
      name: test-deployment-pi-job
    spec:
      serviceAccountName: spark-sa
      containers:
      - name: test-deployment-pi
        image: lightbend/spark:2.1.1-OpenShift-2.4.3-rh
        args:
          -  "/opt/spark/bin/spark-submit"
          -  "--master"
          -  "k8s://https://kubernetes.default.svc"
          -  "--deploy-mode"
          -  "cluster"
          -  "--name"
          -  "spark-pi"
          -  "--class"
          -  "org.apache.spark.examples.SparkPi"
          -  "--conf"
          -  "spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa"
          -  "--conf"
          -  "spark.kubernetes.namespace=spark"
          -  "--conf"
          -  "spark.executor.memory=2G"
          -  "--conf"
          -  "spark.driver.memory=2G"
          -  "--conf"
          -  "spark.executor.instances=2"
          -  "--conf"
          -  "spark.kubernetes.container.image.pullPolicy=Always"
          -  "--conf"
          -  "spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3-rh"
          -  "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
      restartPolicy: Never

The downside of using a job is that if you delete it with the command oc delete -f …​, the Spark pod will not also be deleted.

Warning

Using a job would provide fault-tolerance if you run Spark in client mode with checkpointing enabled, but there are corner cases that make this risky. See this discussion for more details. The current option for driver’s high availability is via the Spark Operator.

Using a Helm Chart

If you use Helm and you have installed the Tiller service, you can use it to run Spark jobs.

Begin by creating a sample Helm chart:

helm create sparkchart

Then remove all files under the templates folder, as we will not need them.

Next, add the following pod spec file under the templates folder:

templates/pod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: test-deployment-pi
  namespace: spark
spec:
  serviceAccountName: spark-sa
  containers:
  - name: test-deployment-pi
    image: {{.Values.image}}
    args:
      -  "/opt/spark/bin/spark-submit"
      -  "--master"
      -  "k8s://https://kubernetes.default.svc"
      -  "--deploy-mode"
      -  "cluster"
      -  "--name"
      -  "{{.Values.jobName}}"
      -  "--class"
      -  "{{.Values.class}}"
      -  "--conf"
      -  "spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa"
      -  "--conf"
      -  "spark.kubernetes.namespace=spark"
      -  "--conf"
      -  "spark.executor.memory=2G"
      -  "--conf"
      -  "spark.driver.memory=2G"
      -  "--conf"
      -  "spark.executor.instances=2"
      -  "--conf"
      -  "spark.kubernetes.container.image.pullPolicy=Always"
      -  "--conf"
      -  "spark.kubernetes.container.image={{.Values.image}}"
      -  "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
  restartPolicy: Never

Now the folder structure for the spark chart should look as follows:

sparkchart
├── charts
├── Chart.yaml
├── templates
│   └── pod.yaml
└── values.yaml

If we install the chart it will launch a pod which will submit a spark job in cluster mode. Before you do that you need to make sure your user can list pods in the namespace tiller was installed and has the privileges to access it.

$ helm install sparkchart --name spark-example --namespace spark
NAME:   spark-example
LAST DEPLOYED: Thu Dec 13 13:21:29 2018
NAMESPACE: spark
STATUS: DEPLOYED

RESOURCES:
==> v1/Pod
NAME                READY  STATUS             RESTARTS  AGE
test-deployment-pi  0/1    ContainerCreating  0         1s

Let’s explain what is happening here. Template files will be filled in with the values from the default values. Tiller, which is running in the cluster, will evaluate them and install the package.

The values file is as follows:

sparkchart/values.yaml
image: "lightbend/spark:2.1.1-OpenShift-2.4.3-rh"
class: "org.apache.spark.examples.SparkPi"
jobName: "spark-pi"
jarPath: "/opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"

Values allows us to parameterize our installation and thus our Spark job.

Here we see that installing the Helm chart launched our job:

$ oc get po
NAME                            READY   STATUS      RESTARTS   AGE
spark-pi-1544700092577-driver   0/1     Completed   0          46s
test-deployment-pi              0/1     Completed   0          50s

Check your installed Helm releases with the following command:

$ helm list | grep sparkchart
NAME           	REVISION	UPDATED                 	STATUS  	CHART             APP VERSION     	NAMESPACE
spark-example   1         Thu Dec 13 13:21:29 2018	DEPLOYED	sparkchart-0.1.0  1.0             spark

If we had any dependencies defined, these would have been put under the charts directory. To check how your template files are rendered you can run without installing Helm as follows:

$ helm install --name spark-example2 --dry-run --debug ./sparkchart
[debug] Created tunnel using local port: '36051'

[debug] SERVER: "127.0.0.1:36051"

[debug] Original chart version: ""
[debug] CHART PATH: /home/stavros/tests/helm/sparkchart

NAME:   spark-example2
REVISION: 1
RELEASED: Thu Dec 13 13:26:33 2018
CHART: sparkchart-0.1.0
USER-SUPPLIED VALUES:
{}

COMPUTED VALUES:
class: org.apache.spark.examples.SparkPi
image: lightbend/spark:2.1.1-OpenShift-2.4.3-rh
jarPath: /opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar
jobName: spark-pi

HOOKS:
MANIFEST:

---
# Source: sparkchart/templates/pod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: test-deployment-pi
  namespace: spark
spec:
  serviceAccountName: spark-sa
  containers:
  - name: test-deployment-pi
    image: lightbend/spark:2.1.1-OpenShift-2.4.3-rh
    args:
      -  "/opt/spark/bin/spark-submit"
      -  "--master"
      -  "k8s://https://kubernetes.default.svc"
      -  "--deploy-mode"
      -  "cluster"
      -  "--name"
      -  "spark-pi"
      -  "--class"
      -  "org.apache.spark.examples.SparkPi"
      -  "--conf"
      -  "spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa"
      -  "--conf"
      -  "spark.kubernetes.namespace=spark"
      -  "--conf"
      -  "spark.executor.memory=2G"
      -  "--conf"
      -  "spark.driver.memory=2G"
      -  "--conf"
      -  "spark.executor.instances=2"
      -  "--conf"
      -  "spark.kubernetes.container.image.pullPolicy=Always"
      -  "--conf"
      -  "spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3-rh"
      -  "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
  restartPolicy: Never

Let’s now run Spark in client mode. We create another spec and put two YAML files under the templates dir, one for the pod spec and one for the service. Check the previous section related to client mode for the context of these files:

sparkchart-client
├── charts
├── Chart.yaml
├── templates
│   ├── pod.yaml
│   └── service.yaml
└── values.yaml
$ oc get po
NAME                            READY     STATUS      RESTARTS   AGE
spark-pi-1537039409921-exec-1   1/1       Running     0          11s
spark-test-app-1                1/1       Running     0          16s
Note

There is an order with which resources are installed, as described in this Helm documentation.

You might ask, what advantages do Helm Charts bring compared to the approach based on OpenShift Deployment that we discussed in the last section? The latter allows you to easily scale, update, and manage your Pods and ReplicaSets where you deploy your Spark application.

What Helm brings to the table is an additional abstraction that lets you keep track of the resources you’ve installed via the chart. A chart contains a set of files that describe OpenShift resources that work together. For example, a chart might include Deployment resource and Service definitions, as we saw above in the client-mode example. When you install that chart into your cluster via Helm, the result is a Helm release with some given name. Helm also becomes more of an advantage when you are dealing with multiple components, possibly in a pipeline. You package all the components in a Helm release so that you can manage the whole group together.

Moreover, Helm is convenient for CI/CD automation for your Spark Jobs. Here is a typical work flow.

  1. Code is built with a CI tool, unit testing is done.

  2. A Docker image and helm chart are created and pushed to the related registries.

  3. Integration tests are run against the Helm chart and the Docker images in a staging environment.

  4. The chart is deployed.

  5. Profit!!

See Uninstalling Components for a discussion of using helm delete to remove components.

Running Spark Jobs with High Availability (HA)

It is possible to use Spark Jobs with high availability (HA) using the checkpointing mechanism. Checkpointing stores crucial state data in a distributed storage system, which must be shared between the driver and executor pods.

Support for checkpointing in OpenShift is relatively new. Lightbend verified that the core functionality works as expected, but there are areas for improvement. For example, when you restart a streaming job, even if you modify certain configuration settings at restart, such as spark.executorEnv or spark.kubernetes.executor.limit.cores, the checkpointing logic will cause the old values to be used, even if they were not initially, explicitly defined. Thus, this limitation effectively prevents the user from changing configuration settings like these after the job has been submitted the first time.

Using Spark History Server

For details on installing and configuring Spark History Server, see Spark History Server in Management and Monitoring Guide. For example, it discusses the configuration required to use HDFS or Persistent Volume Claims (PVCs) as durable stores, which need to be set up before the following instructions can be used.

The different deployment methods, spark-submit and Spark Operator, require different configurations when submitting Spark jobs.

When using spark-submit, flags that specify mounting the PVC in the driver and executor pods are needed to make sure the events can be properly logged, besides the regular flags required for Spark History Server integration, spark.eventLog.enabled and spark.eventLog.dir. Here is a complete example spark-submit command:

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://$API_SERVER:$API_SERVER_PORT \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.eventLog.enabled=true \
  --conf spark.eventLog.dir=file:/mnt \
  --conf spark.executor.instances=2 \
  --conf spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3 \
  --conf spark.kubernetes.container.image.pullPolicy=Always \
  --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-hs-pvc \
  --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/mnt \
  --conf spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \
  --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-hs-pvc \
  --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/mnt \
  --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false \
  local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar

When using the Spark Operator, if it is used to deploy the Spark job using a YAML file, specify the PVC as a volume to be mounted in the pods. An example is shown below:

Note

Mounting PVs requires the Spark Operator’s webhook to be enabled. See Enabling the Admission Webhooks for details.

  apiVersion: sparkoperator.k8s.io/v1beta1
  kind: SparkApplication
  metadata:
    name: spark-pi-test
    namespace: lightbend
  spec:
    type: Scala
    mode: cluster
    image: "lightbend/spark:2.1.1-OpenShift-2.4.3"
    imagePullPolicy: Always
    mainClass: org.apache.spark.examples.SparkPi
    mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
    sparkConf:
      "spark.eventLog.enabled": "true"
      "spark.eventLog.dir": "file:/mnt"
    volumes:
      - name: spark-data
        persistentVolumeClaim:
          claimName: spark-hs-pvc
    driver:
      cores: 0.1
      coreLimit: "200m"
      memory: "512m"
      labels:
        version: 2.4.3
      serviceAccount: spark
      volumeMounts:
        - name: spark-data
          mountPath: /mnt
    executor:
      cores: 1
      instances: 3
      memory: "512m"
      labels:
        version: 2.4.3
      volumeMounts:
        - name: spark-data
          mountPath: /mnt
    restartPolicy: Never

If everything went fine you should see your Spark jobs in the history server:

Spark History Server on Kubernetes
Figure 24. Spark History Server on Kubernetes
When Using HDFS with Spark History Server

If you have configured HDFS as your backing store for Spark History Server, then deploying your Spark jobs proceeds as follows.

When using spark-submit, two flags are required:

--conf spark.eventLog.enabled=true
--conf spark.eventLog.dir=hdfs://myhdfs/history/

Note that the value passed to the spark.eventLog.dir flag must match the value in the --set hdfs.logDirectory=…​ argument used when installing Spark History Server.

When using the Spark Operator, edit the job’s YAML file and change the spec.sparkConf field to include the following settings:

  spec:
    sparkConf:
      "spark.eventLog.enabled": "true"
      "spark.eventLog.dir": "hdfs://myhdfs/history"

Again, note the value for the history URI must match the value used when setting up Spark History Server with HDFS.

Finally, when you submit your spark job, you will also need to set the following properties:

--conf spark.eventLog.enabled=true
--conf spark.eventLog.dir=hdfs://myhdfs/history
Limitations and Known Issues for Spark on OpenShift

There are some known limitations with current version:

Executor pods are removed automatically by default, which may not be what you want.

Invalid DNS name due to hostname truncation

There is a limit on the hostname size and if that is truncated at the wrong character it will cause failures.

Make spark-submit more useful with Kubernetes

Other environments like standalone allow killing Spark drivers, but it’s not supported yet by Kubernetes. The workaround is to use oc or kubectl instead.

SPARK_MOUNTED_CLASSPATH contains incorrect semicolon on Windows

Affects Windows users only.

Improve state back end

There might be a performance issue with a large number of executors, as the drivers store state about the their Pods.

Managing Spark Jobs Using the Spark Operator

Let’s discuss deploying and managing Spark jobs with the Spark Operator. For details on installing and managing the operator itself, see here in the Management and Monitoring Guide. That section also discusses how to retrieve job logs and metrics.

Note

As discussed in the Management Guide, the installation instructions install Lightbend’s build of the Spark operator, which has patches and other modifications. This version of the operator must be used when production support is required.

Deploying a Spark Job Using the Spark Operator

To use the Spark Operator to deploy a Spark job, first write your Spark job specifications in a YAML file. The format of the YAML file is similar to what you would use to define other Kubernetes resources (e.g. deployment or persistent volume), except that the kind field is now SparkApplication, the name of the CRD definition created by the Operator. The necessary information you need to provide in the file are the following:

  • The name of the job,

  • The namespace you would like the job to be deployed in

  • Docker image for the job

  • Application jar file in the image

  • Main class of the job

  • Driver and executor specifications (e.g. resource requests and limits).

Optionally if needed you can also mount volumes in the driver and/or executor pods. We will demonstrate one such use in the Spark History Server section. To find example YAML files for some Spark jobs, refer to the Spark on Kubernetes repository examples.

Once the job YAML file is ready, it can be deployed with

oc apply -f your-job.yaml

Or using the sparkctl CLI included in the Operator repo:

sparkctl create your-job.yaml

Note that the Helm chart does not install and set up sparkctl for you locally. You need to download the CLI from here:

Then add the CLI to your PATH. If your OS or architecture is not supported you will need to build the CLI yourself using the source code from the Operator project.

Deleting a Spark Job Using the Operator

To delete a Spark job and all its associated resources, simply run

oc delete your-job-name

Here, your-job-name is the name given in the YAML file’s metadata.name field. If you forget or would like to confirm the name of your job, run this command:

oc list sparkapplications

This works, because the CRD sparkapplication objects are just like other Kubernetes resources that can be deleted or listed with oc.

You can also delete the job using the equivalent sparkctl command:

sparkctl delete your-job-name

The delete operation will clean up all associated resources for your Spark job including volumes and pods, as long as they are in the YAML file that was originally used to create the SparkApplication. If your job was created by other means (e.g. spark-submit), then there would not be a SparkApplication resource corresponding to your job, therefore this method would not work.

Upgrading a Spark Job Using the Operator

To modify the job configurations that have already been deployed using the Operator, make the changes in the job’s YAML file and then re-deploy the job using oc apply -f or sparkctl create. The existing pods will be replaced, in the case when the job Docker image changes or is updated, or in the case when only adding labels to the pods, in order to run a new job.

Understanding the Life Cycle of Spark Applications Managed by the Spark Operator

We described above the core actions a user can do, given the Kubernetes CRD APIs, to manage applications. It is important though to understand the model of the Spark Applications life cycle, as it is implemented by the Spark Operator, so that debugging the state transition of an app is easier in cases where problems occur.

The Spark Operator creates a controller for each of the two types of applications it supports SparkApplications and ScheduledSparkApplications. We will focus here on SparkApplications, as ScheduledSparkApplications is a wrapper of SparkApplications with some additional semantics.

First, as useful background the controller pattern is described in the official Kubernetes docs as follows:

In Kubernetes, a controller is a control loop that watches the shared state of the cluster through the apiserver and makes changes attempting to move the current state towards the desired state.

The controller uses two SharedInformers, one for each of the type of Kubernetes API resource objects it monitors. The SharedInformer provides a single shared cache among controllers with updates for the objects it watches.

One informer monitors for pod state changes and the other for CRD object changes. Both informers have a callback mechanism that reacts to the following modifications: Add, Update, Delete.

The controller shares a work queue with informers. Each time there is an update, the informer adds an event to the queue to be processed by the controller’s workers. Events are processed in parallel. All events are processed so that the SparkApplication structure held by the operator can be synchronized with the real state of the application running in the cluster. Also the informers' cache is re-synced every 30 seconds.

Hence, even if the Spark Operator is down and any events are missed during that time, it will list the SparkApplications and make the proper updates needed to its internal data structures.

Due to the use of the shared informer, the updates will not be fully replayed but only the latest event related to the application will be delivered, as discussed here.

A SparkApplication has the states shown in the following diagram:

Spark Application states
Figure 25. Spark Application states

When a new application is created after a new custom resource object creation, it starts with state New. It has not run yet. The related informer will create an object for that app with that state and will submit it to the work queue for further processing.

When that object is processed the controller logic will try to submit the application using the spark-submit utility. If the submission is successful then the application will reach state Submitted. Otherwise, it will move to state Submission Failed.

If submission was successful then the Spark back-end (invoked by spark-submit) will create a driver pod object submitted to the Kubernetes API server, in the usual way.

If Spark Operator’s mutating webhook is enabled, then that pod object will be mutated before it is stored in Kubernetes.

After that, the Pod will be scheduled for running and the informer again will notify the controller for the new pod addition.

The Spark Application will be updated again several times depending on the status of the driver:

PodPending -> SubmittedState
PodRunning -> RunningState
PodSucceeded -> SucceedingState
PodFailed -> FailingState
Any other state -> UnknownState

For more on the pod life cycle and for an explanation of the above pod states, check the official Kubernetes documentation. Note that the completed state is not taken into consideration here, as it is actually derived from the ORed result of checking against two states Succeeded and Failed.

When the user updates the customer resource object of the Spark application, it goes into the Invalidating state while the controller updates it. Then the same application is scheduled for further processing, causing it to enter the PendingRerun state. From there it can be submitted again.

There is another scenario where the application can reach the PendingRerun state. If the application is either in the Succeeding or Failing state, then if the re-try policy forces the application to be re-tried, it will be moved to the PendingRerun state.

There are several properties available for specifying in detail the retry policy and they are described here.

Note that the two final states are Failed and Completed. They will be reached if the operator cannot schedule the specific application any more.

You can check the state transition of the application with one of the following options:

  • Look at the Spark operator’s log using: kubectl logs <operator-pod-name> -n <namespace>.

  • Use the Kubernetes events API to list the events in a given namespace: kubectl get events -n <namespace>.

  • Find the relevant information in the describe output: kubectl describe sparkapplication <app-name> -n <name-space>.

Spark Monitoring

To monitor Spark jobs, start with this section of the official Spark documentation. We’ll add additional information here and focus on Kubernetes.

We’ll use the Prometheus JMX exporter to expose Spark metrics directly to Prometheus according to Spark Metrics internals which we describe next. Prometheus is a popular time-series database for metrics aggregation, especially in Kubernetes-based systems.

Prometheus is also the database used by Lightbend Enterprise Suite Console, so our approach allows you to analyze Spark metrics in this Console. Note that using the JMX exporter requires that the job run long enough for metrics to be available, which also depends on the Prometheus scraping period. The default period is 10 seconds.

The Spark job driver and executors use the JMX exporter within their containers to get the JMX-exposed metrics and expose them to a container port from which Prometheus can scrape them.

Spark Metrics System and Configuration

Spark exposes metrics for many of the components that make up Spark. A metric system consists of a source, several sinks and a metric data structure. Metrics coming from registered sources are captured by the metric system and pushed to the registered sinks. The list of exposed metrics for Kubernetes is listed in the Metrics for Spark on Kubernetes.

Spark creates a metric system data structure per component such as the following:

  • master: The Spark standalone master process.

  • applications: A component within the master which reports on various applications.

  • worker: A Spark standalone worker process.

  • executor: A Spark executor.

  • driver: The Spark driver process (the process in which your SparkContext is created).

  • shuffleService: The Spark shuffle service.

For Spark on Kubernetes we care about the driver, executor, and shuffleService. However, the Spark shuffle service is not yet implemented for Kubernetes.

A metrics system source specifies from where to collect metrics data. There are two kinds of sources:

  1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect Spark components’ internal states. These sources are related to an instance and will be added after a specific metrics system is created.

  2. Common source, like JvmSource, which will collect low level state. It is configured by configuration and loaded through reflection.

Each metrics system has a metric registry which is exposed in the SparkEnv of the given instance. This is where the metrics from each source are registered and kept in memory. The user can add additional custom metrics to the SparkEnv by adding a customized source.

When a metric system is created, its configuration is loaded as well. First, the default values are loaded for the driver’s internal servlet that exposes the metrics on the driver REST API (/metrics/json). Then, the values coming from a file with the name metrics.properties are loaded by the class loader. Finally, the metric properties passed via SparkConf are loaded.

When properties are passed via a file, use $SPARK_HOME/conf/metrics.properties. At this time, Spark on Kubernetes does not use the conf directory within its Docker image. Instead, it reads its configuration from a Kubernetes ConfigMap and overrides anything stored there.

The workaround, which we’ll see later, is to pass configuration options when using spark-submit.

The metrics configuration format is as follows:

[instance].[sink|source].[name].[options] = xxxx

Users can pass configurations to the metric systems using Spark conf options, such as this example:

spark.metrics.conf.*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink

In this example, for every instance (*), we expose metrics to a JMX sink.

Metric names are by default prefixed with spark.app.id. If you want to group them differently, for example per application name, which is useful for monitoring them across application invocations, then override the spark.metrics.namespace property.

Some sources have a polling period. For details, see the comments in the metrics.properties.template file in the Spark source code repository.

JVM Metrics

For both driver and executor we can get low level JVM metrics by using the jvmSource, which registers the following JVM metrics sets:

metricRegistry.registerAll(new GarbageCollectorMetricSet)
metricRegistry.registerAll(new MemoryUsageGaugeSet)
metricRegistry.registerAll(
  new BufferPoolMetricSet(ManagementFactory.getPlatformMBeanServer))

The JMX exporter exposes its own JVM metrics by default, independently of Spark. These are shown here:

JMX Exporter JVM metrics
Figure 26. JMX Exporter JVM metrics

See Metrics for Spark on Kubernetes for the list of JVM metrics exposed for the drivers and executors.

Spark Streaming Metrics

When spark structured streaming is used, the streamingExecutionQuery at the driver side registers a metric source specialized for streaming queries if spark.sql.streaming.metricsEnabled is set to true.

The name of the source is defined by this expression:

spark.streaming.${Option(name).getOrElse(id)}
override val id: UUID = UUID.fromString(streamMetadata.id)

Name is the name of the query the user passed. If that is missing a UUID is created from streamMetadata.id. That is the unique id of the StreamingQuery that needs to be persisted across restarts.

The old streaming implementation also defines a source with the name in this format, starting with the value returned in the Spark API for ssc.sparkContext.appName, appended with .StreamingMetrics. For example, if the appName is myBigApp then source name is myBigApp.StreamingMetrics.

Regarding the task metrics listed for the executor, they are of type counter because they are increased each time the executor runs a task.

The executor also registers a shuffle service metrics set, based on whether or not the external shuffle service is available:

if (externalShuffleServiceEnabled) {
  new ShuffleMetricsSource("ExternalShuffle", shuffleClient.shuffleMetrics())
} else {
  new ShuffleMetricsSource("NettyBlockTransfer", shuffleClient.shuffleMetrics())
}

Sinks for several Prometheus metric types are supported:

  • ConsoleSink: Logs metrics information to the console.

  • CSVSink: Exports metrics data to CSV files at regular intervals.

  • JmxSink: Registers metrics for viewing in a JMX console.

  • MetricsServlet: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.

  • GraphiteSink: Sends metrics to a Graphite node.

  • Slf4jSink: Sends metrics to slf4j as log entries.

  • StatsdSink: Sends metrics to a StatsD node.

Spark Metrics not Exposed via a Source

Certain metrics are not exposed to any type of Source, such as the job completion time. These are available via the metrics REST API though the driver’s GUI, http://localhost:4040/api/v1 and the history server, if it’s installed.

For example you can get details about a job id using the path: /applications/[app-id]/jobs/[job-id]. All metrics are gathered at the AppStatusListener that listens for any event published to it and updates the application metrics accordingly.

Lightbend implemented a fix for this upstream issue, which makes several metrics exposed via a source. The fix is provided with Lightbend’s Spark on Kubernetes image. These metrics can be enabled or disabled by using the following Spark configuration option spark.app.status.metrics.enabled.

Prometheus JMX Exporter Setup

The Prometheus JMX exporter requires a set of patterns in order to pickup JMX metrics and possibly rewrite them with a different name or format. There is a template already available for Spark here. Consider the following example:

  - pattern: "metrics<>Count"
    name: spark_executor_$3
    type: COUNTER
    labels:
      app_id: "$1"
      executor_id: "$2"

This pattern will add a prefix spark_executor to the jvmGCTime metric and two labels, the app id and the executor id. That is app-<id>.executor.jvmGCTime.

Similarly, histograms and timers are supported.

To complete the picture, we needed to add JMX dependencies to our Spark image, based on the existing Spark image. Something like this:

FROM lightbend/spark:<version>
ADD spark-prom-exporter.yaml /spark-prom-exporter.yaml
ADD jmx_prometheus_javaagent-0.3.1.jar /jmx_prometheus_javaagent-0.3.1.jar

Here are the contents of the spark-prom-exporter.yaml (also available here):

spark-prom-exporter.yaml
rules:

  # These come from the master
  # Example: master.aliveWorkers
  - pattern: "metrics<name=master\\.(.*)><>Value"
    name: spark_master_$1

  # These come from the worker
  # Example: worker.coresFree
  - pattern: "metrics<name=worker\\.(.*)><>Value"
    name: spark_worker_$1

  # These come from the application driver
  # Example: app-20160809000059-0000.driver.DAGScheduler.stage.failedStages
  - pattern: "metrics<name=(.*)\\.driver\\.(DAGScheduler|BlockManager|jvm|appStatus)\\.(.*)><>Value"
    name: spark_driver_$2_$3
    type: GAUGE
    labels:
      app_id: "$1"

  # These come from the application driver
  # Counters for appStatus
  - pattern: "metrics<name=(.*)\\.driver\\.appStatus\\.(.*)><>Count"
    name: spark_driver_appStatus_$2_count
    type: COUNTER
    labels:
      app_id: "$1"

  # Gauge for appStatus
  - pattern: "metrics<name=(.*)\\.driver\\.appStatus\\.(.*)><>Value"
    name: spark_driver_appStatus_$2
    type: GAUGE
    labels:
      app_id: "$1"

  # These come from the application driver
  # Emulate timers for DAGScheduler like messagePRocessingTime
  - pattern: "metrics<name=(.*)\\.driver\\.DAGScheduler\\.(.*)><>Count"
    name: spark_driver_DAGScheduler_$2_count
    type: COUNTER
    labels:
      app_id: "$1"

  # HiveExternalCatalog is of type counter
  - pattern: "metrics<name=(.*)\\.driver\\.HiveExternalCatalog\\.(.*)><>Count"
    name: spark_driver_HiveExternalCatalog_$2_total
    type: COUNTER
    labels:
      app_id: "$1"

  # These come from the application driver
  # Emulate histograms for CodeGenerator
  - pattern: "metrics<name=(.*)\\.driver\\.CodeGenerator\\.(.*)><>Count"
    name: spark_driver_CodeGenerator_$2_count
    type: COUNTER
    labels:
      app_id: "$1"

  # These come from the application driver
  # Emulate timer (keep only count attribute) plus counters for LiveListenerBus
  - pattern: "metrics<name=(.*)\\.driver\\.LiveListenerBus\\.(.*)><>Count"
    name: spark_driver_LiveListenerBus_$2_count
    type: COUNTER
    labels:
      app_id: "$1"

  # Get Gauge type metrics for LiveListenerBus
  - pattern: "metrics<name=(.*)\\.driver\\.LiveListenerBus\\.(.*)><>Value"
    name: spark_driver_LiveListenerBus_$2
    type: GAUGE
    labels:
      app_id: "$1"

  # These come from the application driver if it's a streaming application
  # Example: app-20160809000059-0000.driver.com.example.ClassName.StreamingMetrics.streaming.lastCompletedBatch_schedulingDelay
  - pattern: "metrics<name=(.*)\\.driver\\.(.*)\\.StreamingMetrics\\.streaming\\.(.*)><>Value"
    name: spark_driver_streaming_$3
    labels:
      app_id: "$1"
      app_name: "$2"

  # These come from the application driver if it's a structured streaming application
  # Example: app-20160809000059-0000.driver.spark.streaming.QueryName.inputRate-total
  - pattern: "metrics<name=(.*)\\.driver\\.spark\\.streaming\\.(.*)\\.(.*)><>Value"
    name: spark_driver_structured_streaming_$3
    labels:
      app_id: "$1"
      query_name: "$2"

  # These come from the application executors
  # Example: app-20160809000059-0000.0.executor.threadpool.activeTasks (value)
  #  app-20160809000059-0000.0.executor.JvmGCtime (counter)
  - pattern: "metrics<name=(.*)\\.(.*)\\.executor\\.(.*)><>Value"
    name: spark_executor_$3
    type: GAUGE
    labels:
      app_id: "$1"
      executor_id: "$2"

  # Executors counters
  - pattern: "metrics<name=(.*)\\.(.*)\\.executor\\.(.*)><>Count"
    name: spark_executor_$3_total
    type: COUNTER
    labels:
      app_id: "$1"
      executor_id: "$2"

  # These come from the application executors
  # Example: app-20160809000059-0000.0.jvm.threadpool.activeTasks
  - pattern: "metrics<name=(.*)\\.([0-9]+)\\.(jvm|NettyBlockTransfer)\\.(.*)><>Value"
    name: spark_executor_$3_$4
    type: GAUGE
    labels:
      app_id: "$1"
      executor_id: "$2"

  - pattern: "metrics<name=(.*)\\.([0-9]+)\\.HiveExternalCatalog\\.(.*)><>Count"
    name: spark_executor_HiveExternalCatalog_$3_count
    type: COUNTER
    labels:
      app_id: "$1"
      executor_id: "$2"

  # These come from the application driver
  # Emulate histograms for CodeGenerator
  - pattern: "metrics<name=(.*)\\.([0-9]+)\\.CodeGenerator\\.(.*)><>Count"
    name: spark_executor_CodeGenerator_$3_count
    type: COUNTER
    labels:
      app_id: "$1"
      executor_id: "$2"

scrape_configs:
  - job_name: "spark_app"
    scrape_interval: "5s"
    target_groups:
      - targets: ['localhost:9001']

With our pre-built image, you can run a Spark job with monitoring enabled as follows:

$SPARK_HOME/bin/spark-submit \
  --verbose \
  --master k8s://https://$API_SERVER:$API_SERVER_PORT \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
  --conf spark.kubernetes.namespace=lightbend \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.executor.memory=2G \
  --conf spark.driver.memory=2G \
  --conf spark.kubernetes.driver.annotation.prometheus.io/port=9100 \
  --conf spark.kubernetes.driver.annotation.prometheus.io/scrape=true \
  --conf spark.kubernetes.executor.annotation.prometheus.io/port=9100 \
  --conf spark.kubernetes.executor.annotation.prometheus.io/scrape=true \
  --conf spark.executor.instances=2 \
  --conf spark.sql.streaming.metricsEnabled=true \
  --conf spark.kubernetes.container.image.pullPolicy=Always \
  --conf spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3 \
  --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///log4j.properties -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8090 -Dcom.sun.management.jmxremote.rmi.port=8090 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -javaagent:/prometheus/jmx_prometheus_javaagent-0.3.1.jar=9100:/etc/metrics/conf/prometheus.yaml" \
  --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8090 -Dcom.sun.management.jmxremote.rmi.port=8090 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost -javaagent:/prometheus/jmx_prometheus_javaagent-0.3.1.jar=9100:/etc/metrics/conf/prometheus.yaml" \
  --conf spark.metrics.conf.*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink \
  --conf spark.metrics.conf.executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource \
  --conf spark.metrics.conf.driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource \
  local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar 1000000
Note

In all provided images the export config is stored at /etc/metrics/conf/prometheus.yaml at image and the exporter jar at /prometheus/jmx_prometheus_javaagent-0.3.1.jar. If you want to adjust the config option you can always override the image or use Spark Operator.

Here we assumed the default Prometheus domain. If you install Prometheus with Lightbend Console with a different domain for example by passing prometheusDomain=my.domain.io to the installation script then, you need to change the above annotations accordingly. Also here we assume the default metrics path /metrics. If you want to change that you need to modify the Prometheus configuration file we provided above (it also exists in the Spark images) by adding the proper scrape_config options at the end. Also you will need to use an additional annotation prometheus.io/path=/my_path.

Note

At this time, we discourage using a unique domain, as many services assume the default value of prometheus.io.

Running JMX Exporter as a Sidecar Container
Warning

This an unsupported scenario as it requires a custom image built from this Spark pull request. The pod template feature as described in the PR will be available with Spark 3.0.0. Here we provide the examples for the sake of completeness and for the interested user who might want to try this.

Another common way to run the JMX exporter is as a side car container, which we describe next. Note, however, that according to official documentation for the JMX exporter project, the preferred way to run it as a Java agent:

This exporter is intended to be run as a Java Agent, exposing a HTTP server and serving metrics of the local JVM. It can be also run as an independent HTTP server and scrape remote JMX targets, but this has various disadvantages, such as being harder to configure and being unable to expose process metrics (e.g., memory and CPU usage). Running the exporter as a Java Agent is thus strongly encouraged.”

There are several fully-configurable Docker images available, all supporting JMX exporter in Spark standalone version. All run the JMX Prometheus server:

We used the first one, which is actually the basis for the others:

FROM sscaling/jmx-prometheus-exporter
ENV SERVICE_PORT=9100
COPY config.yaml /opt/jmx_exporter/config.yaml

The config.yaml file is essentially the same as the Spark config YAML file, spark-prom-exporter.yaml, with the following addition:

...
startDelaySeconds: 0
username:
password:
jmxUrl: service:jmx:rmi:///jndi/rmi://localhost:8090/jmxrmi
ssl: false
rules:
Note

All the config options can be passed externally. It’s not necessary to hard-code them. We’re showing that here for demonstration purposes.

Next, launch a Spark job. It should run for more than 10 secs for Prometheus to pick up metrics from it:

$SPARK_HOME/bin/spark-submit \
  --master k8s://https://$API_SERVER:$API_SERVER_PORT \
  --deploy-mode cluster \
  --name spark-pi \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.kubernetes.driver.annotation.prometheus.io/port=9100 \
  --conf spark.kubernetes.driver.annotation.prometheus.io/scrape=true \
  --conf spark.kubernetes.executor.annotation.prometheus.io/port=9100 \
  --conf spark.kubernetes.executor.annotation.prometheus.io/scrape=true \
  --conf spark.executor.instances=2 \
  --conf spark.sql.streaming.metricsEnabled=true \
  --conf spark.kubernetes.container.image.pullPolicy=Always \
  --conf spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3 \
  --conf "spark.executor.extraJavaOptions=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8090 -Dcom.sun.management.jmxremote.rmi.port=8090 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost" \
  --conf "spark.driver.extraJavaOptions=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=8090 -Dcom.sun.management.jmxremote.rmi.port=8090 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=localhost" \
  --conf spark.kubernetes.driver.podTemplateFile=/path/to/spark/spark-2.4.3-bin-lightbend/spark-driver.yaml \
  --conf spark.kubernetes.executor.podTemplateFile=/path/to/spark-2.4.3-bin-lightbend/spark-executor.yaml \
  --conf spark.metrics.conf.*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink \
  --conf spark.metrics.conf.executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource \
  --conf spark.metrics.conf.driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource \
  local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar 1000000

Here are the YAML files for the driver and executor:

spark-driver.yaml
apiVersion: v1
Kind: Pod
metadata:
  labels:
    template-label-key: driver-template-label-value
spec:
  containers:

  - name: test-driver-container
    image: lightbend/spark:2.1.1-OpenShift-2.4.3
    ports:
    - containerPort: 8090

  - name: test-exporter-container
    image: some_repo/jmxexporter
    ports:
    - containerPort: 9100
spark-executor.yaml
apiVersion: v1
Kind: Pod
metadata:
  labels:
    template-label-key: executor-template-label-value
spec:
  containers:

  - name: test-executor-container
    image: lightbend/spark:2.1.1-OpenShift-2.4.3
    ports:
    - containerPort: 8090

  - name: test-exporter-container
    image: some_repo/jmxexporter
    ports:
    - containerPort: 9100

The downside of this approach is that the side containers do not exit together. This means that the Spark pod could keep running even if the Spark context stops.

Another way to run the JMX exporter is within the Spark container in a another process. Here, you run the JMX Prometheus server as a background process or use supervisord to control the Spark processes and that server. At this time, Spark uses tiny to run the spark submit command within the container.

Note

If you use the Spark Operator, note that it has an open issue related to adding sidecar containers.

Lightbend Enterprise Suite Console - Spark Grafana Dashboard

Lightbend Enterprise Suite Console comes with a sophisticated Grafana dashboard to easily monitor your Spark job on a per-namespace basis in the Kubernetes cluster. A number of panels are defined to cover the Spark driver and executors metrics for different scenarios like streaming jobs monitoring, JVM monitoring etc.

Here is a screenshot of the dashboard:

Grafana Dashboard for Spark
Figure 27. Grafana Dashboard for Spark
Metrics for Spark on Kubernetes

The following relevant metrics exist for Spark on Kubernetes. First the driver metrics:

Table 23. Spark on Kubernetes Driver Metrics

Source Name

Metric

Description

JMX Type

CodeGenerator

compilationTime

Histogram of the time it took to compile source code text (in milliseconds).

HISTOGRAM

generatedClassSize

Histogram of the bytecode size of each class generated by CodeGenerator.

HISTOGRAM

generatedMethodSize

Histogram of the bytecode size of each method in classes generated by CodeGenerator.

HISTOGRAM

sourceCodeSize

Histogram of the length of source code text compiled by CodeGenerator (in characters)

HISTOGRAM

HiveExternalCatalog

partitionsFetched

Tracks the total number of partition metadata entries fetched via the client api.

COUNTER

filesDiscovered

Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex.

COUNTER

fileCacheHits

Tracks the total number of files served from the file status cache instead of discovered.

COUNTER

hiveClientCalls

Tracks the total number of Hive client calls (e.g. to lookup a table).

COUNTER

parallelListingJobCount

Tracks the total number of Spark jobs launched for parallel file listing.

COUNTER

DAGScheduler

stage.failedStages

GAUGE

stage.runningStages

GAUGE

stage.waitingStages

GAUGE

job.allJobs

GAUGE

job.activeJobs

GAUGE

BlockManager

memory.maxMem_MB

GAUGE

maxOnHeapMem_MB

GAUGE

memory.maxOffHeapMem_MB

GAUGE

memory.remainingMem_MB

GAUGE

memory.remainingOnHeapMem_MB

GAUGE

memory.remainingOffHeapMem_MB

GAUGE

memory.memUsed_MB

GAUGE

memory.onHeapMemUsed_MB

GAUGE

memory.offHeapMemUsed_MB

GAUGE

memory.diskSpaceUsed_MB

GAUGE

spark.streaming.${Option(name).getOrElse(id)}

inputRate-total

inputRowsPerSecond

GAUGE

processingRate-total

processedRowsPerSecond

GAUGE

latency

durationMs.get("triggerExecution")

GAUGE

eventTime-watermark

convertStringDateToMillis(progress.eventTime.get("watermark"))

GAUGE

states-rowsTotal

stateOperators.map(_.numRowsTotal).sum

GAUGE

states-usedBytes

.stateOperators.map(.memoryUsedBytes).sum

GAUGE

"%s.StreamingMetrics".format(ssc.sparkContext.appName)

receivers

number of receivers created, default value is 0

GAUGE

totalCompletedBatches

default value is 0

GAUGE

totalReceivedRecords

default value is 0

GAUGE

totalProcessedRecords

default value is 0

GAUGE

unprocessedBatches

default value is 0

GAUGE

waitingBatches

default value is 0

GAUGE

runningBatches

default value is 0

GAUGE

retainedCompletedBatches

default value is 0

GAUGE

lastCompletedBatch_submissionTime

A value of -1 indicates abnormal behavior

GAUGE

lastCompletedBatch_processingStartTime

A value of -1 indicates abnormal behavior

GAUGE

lastCompletedBatch_processingEndTime

A value of -1 indicates abnormal behavior

GAUGE

lastCompletedBatch_processingDelay

A value of -1 indicates abnormal behavior

GAUGE

lastCompletedBatch_schedulingDelay

A value of -1 indicates abnormal behavior

GAUGE

lastCompletedBatch_totalDelay

A value of -1 indicates abnormal behavior

GAUGE

lastReceivedBatch_submissionTime

A value of -1 indicates abnormal behavior

GAUGE

lastReceivedBatch_processingStartTime

A value of -1 indicates abnormal behavior

GAUGE

lastReceivedBatch_processingEndTime

A value of -1 indicates abnormal behavior

GAUGE

lastReceivedBatch_records

A value of -1 indicates abnormal behavior

GAUGE

LiveListenerBus

listenerProcessingTime.org.apache.spark.HeartBeatReceiver

TIMER

listenerProcessingTime.org.apache.spark.status.AppStatusListener

TIMER

numEventsPosted

COUNTER

queue.appStatus.listenerProcessingTime

TIMER

queue.appStatus.numDroppedEvents

COUNTER

queue.appStatus.size

GAUGE

queue.executorManagement.listenerProcessingTime

TIMER

queue.executorManagement.numDroppedEvents

COUNTER

queue.executorManagement.size

GAUGE

Here are the executor metrics:

Table 24. Spark on Kubernetes Executor Metrics

Source Name

Metric

Description

JMX Type

executor

threadpool.activeTasks

The number of active tasks for this executor’s thread pool. Calls getActiveCount() on java’s thread pool.

GAUGE

threadpool.completeTasks

The number of completed tasks for this executor’s thread pool. Calls getCompletedTaskCount() on java’s thread pool.

GAUGE

threadpool.currentPool_size

Number of threads in the java’s thread pool executor

GAUGE

threadpool.maxPool_size

Max pool size of java’s thread pool executor

GAUGE

filesystem.{hdfs,file}.read_bytes

Calls read_bytes() on a hadoop filesystem object

GAUGE

filesystem.{hdfs,file}.read_ops

Calls read_ops() on a hadoop filesystem object

GAUGE

filesystem.{hdfs,file}.largeRead_ops

Calls largeRead_ops() on a hadoop filesystem object

GAUGE

filesystem.{hdfs,file}.write_ops

Calls write_ops() on a hadoop filesystem object

GAUGE

cpuTime

CPU Time the executor spends actually running tasks (including fetching shuffle data) in nanoseconds.

COUNTER

runTime

Time the executor spends actually running tasks (including fetching shuffle data)

COUNTER

JvmGCTime

Amount of time the JVM spent in garbage collection while executing tasks.

COUNTER

deserializeTime

Time taken on the executor to deserialize tasks

COUNTER

deserializeCpuTime

CPU Time taken on the executor to deserialize this task in nanoseconds

COUNTER

resultSerializationTime

Amount of time spent serializing the task results.

COUNTER

shuffleFetchWaitTime

Time the tasks spent waiting for remote shuffle blocks. This only includes the time blocking on shuffle input data. For instance if block B is being fetched while the task is still not finished processing block A, it is not considered to be blocking on block B.

COUNTER

shuffleWriteTime

Time the tasks spent blocking on writes to disk or buffer cache, in nanoseconds.

COUNTER

shuffleTotalBytesRead

Total bytes fetched in the shuffle by tasks (both remote and local).

COUNTER

shuffleRemoteBytesRead

Total number of remote bytes read from the shuffle by tasks.

COUNTER

shuffleRemoteBytesReadToDisk

Total number of remotes bytes read to disk from the shuffle by tasks.

COUNTER

shuffleLocalBytesRead

Shuffle data that was read from the local disk (as opposed to from a remote executor).

COUNTER

shuffleRecordsRead

Total number of records read from the shuffle by tasks.

COUNTER

shuffleRemoteBlocksFetched

Number of remote blocks fetched in this shuffle by tasks.

COUNTER

shuffleLocalBlocksFetched

Number of local blocks fetched in this shuffle by tasks.

COUNTER

shuffleBytesWritten

Number of bytes written for the shuffle by tasks

COUNTER

shuffleRecordsWritten

Total number of records read from the shuffle by tasks

COUNTER

bytesRead

Total number of bytes read.

COUNTER

recordsRead

Total number of records read.

COUNTER

bytesWritten

Total number of bytes written.

COUNTER

recordsWritten

Total number of records written.

COUNTER

resultSize

The number of bytes tasks transmitted back to the driver as the TaskResult.

COUNTER

diskBytesSpilled

The number of on-disk bytes spilled by tasks.

COUNTER

memoryBytesSpilled

The number of in-memory bytes spilled by tasks.

COUNTER

CodeGenerator

compilationTime

Histogram of the time it took to compile source code text (in milliseconds).

HISTOGRAM

generatedClassSize

Histogram of the bytecode size of each class generated by CodeGenerator.

HISTOGRAM

generatedMethodSize

Histogram of the bytecode size of each method in classes generated by CodeGenerator.

HISTOGRAM

sourceCodeSize

Histogram of the length of source code text compiled by CodeGenerator (in characters)

HISTOGRAM

HiveExternalCatalog

partitionsFetched

Tracks the total number of partition metadata entries fetched via the client api.

COUNTER

filesDiscovered

Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex.

COUNTER

fileCacheHits

Tracks the total number of files served from the file status cache instead of discovered.

COUNTER

hiveClientCalls

Tracks the total number of Hive client calls (e.g. to lookup a table).

COUNTER

parallelListingJobCount

Tracks the total number of Spark jobs launched for parallel file listing.

COUNTER

NettyBlockTransfer

shuffle-client.usedHeapMemory

GAUGE

shuffle-client.usedDirectMemory

GAUGE

shuffle-server.usedHeapMemory

GAUGE

shuffle-server.usedDirectMemory

GAUGE

Here are the JVM metrics related to the Spark driver and the executors:

Driver JVM metrics
Figure 28. Driver JVM metrics
Executor JVM metrics
Figure 29. Executor JVM metrics
Spark Streaming Best Practices

Lightbend Fast Data Platform is designed to support streaming architectures. Spark is a critical part of this strategy. Spark comes with two APIs for streaming: the old Streaming API (DStreams) and the new Structured Streaming API. We recommend using the Structured Streaming API in new projects and migrating existing applications that use the old one, which may be deprecated in a subsequent release of Spark.

The old API has several drawbacks:

  • It is difficult to reason about its behavior, because when a micro-batch job fails and it is re-run, the semantics of data handling are not clearly defined, for example with respect to exactly once behavior.

  • It lacks support for advanced constructs like event-time windowing and declarative handling of data that arrives late or out of order.

  • It lacks graceful shutdown support, as discussed here.

On the other hand, Structured Streaming has several advantages (summarized from this discussion):

  • It is based on the higher-level SQL APIs (DataFrames) and lambda functions (Datasets). In fact, all jobs are effectively streaming SQL queries.

  • It supports SQL data sources and related formats (JSON, parquet, etc).

  • It supports windowing and aggregations based on event time.

  • It has improved state management, via incremental checkpointing.

  • It allows you to start and stop individual queries, although you need to add the appropriate management code to your application.

  • You can restart from an existing checkpoint, even when you upgrade your query, in many cases. See this discussion for the details.

For an in-depth comparison between these two APIs, see this Lightbend webinar video.

Structured Streaming comes in two run-time modes: micro-batch and continuous. The former works similarly to the old streaming API, while the latter provides lower latency, but is still considered experimental and is not fully implemented. For example, only map-like operations are supported by continuous processing. Therefore, Lightbend does not recommend nor currently support continuous processing for production use at this time.

There are a few features in the old API that are not present in the Structured Streaming API: backpressure and Streaming Dynamic Allocation. If you still want to use the old API for these features, see the following discussions on backpressure in Spark Streaming and backpressure with dynamic scaling.

When you deploy applications that use Structured Streaming, we recommend the following best practices.

Plan for Resource Utilization

Plan for your job’s resource utilization and over-allocate if you expect load peaks. The key idea is that each triggering interval (micro-batch) will have the data available ingested by the executors and the tasks they manage and processing must complete reliably without taking too much time. During peaks, each executor will receive more data than normal and may crash with insufficient memory and may take a long time to run if there aren’t enough CPU cores allocated to maximize parallelism.

Hence, based on estimate of your data volume and peak loads, tune the following configuration parameters for the trigger interval you want:

  • spark.executor.memory - Memory per executor process, use the same format as JVM memory strings with a unit suffix ("k", "m", "g" or "t") (e.g. 512m, 2g)

  • spark.driver.memory - Memory for the driver process

  • spark.executor.cores - Number of cores for each executor

  • spark.default.parallelism - Default number of partitions in RDDs returned by join, reduceByKey, etc.

  • spark.sql.shuffle.partitions - Default number of partitions to use when shuffling data for joins or aggregations in the Dataset API

Enable Checkpointing

Inevitably, if you run a job long enough, something will go wrong and the job will either crash or need to be restarted for code upgrades, as mentioned above, or to re-tune the fixed configuration parameters. Checkpointing minimizes the risk of data loss, allowing restarted jobs to pick up where they left off.

Know When to Use foreachBatch

Spark 2.4.0 introduced foreachBatch, described here, for integrating with connectors that have no streaming support when writing to a sink, such as Cassandra and Vertica Connector. This feature is also useful for writing to multiple output sinks, as described in the blog post.

Monitor Spark

Use Lightbend Console monitor the following Spark metrics:

  • inputRate-total - The aggregate rate (rows per second) across all sources of data arriving.

  • processingRate-total - The aggregate rate (rows per second) across all sources at which Spark is processing data.

  • latency - How long it took for the triggered micro-batches to complete.

Set up alerts for risky trends, like growing latency.

You can also ingest metrics into Kafka topics for downstream consumption by other systems. This blog post describes what to do, for example registering the appropriate Listener.

Manage Your Logs

Collect the logs for debugging purposes. First, consider configuring your executor pods on Kubernetes so they aren’t deleted on termination, which also deletes their logs. Of course, this will require an additional process to periodically remove them when they are not longer needed.

A better alternative is to use a log-aggregation system so the log entries are captured outside the pods for safe-keeping. The ELK Stack is a popular option, as are cloud-based solutions. At Lightbend, we find Humio to be an excellent tool, available as a cloud service or on-premise, with a rich and powerful query capability.

If you don’t have a log-management system, keep in mind that often the most interesting debug information is in the executor logs for the Spark tasks. Hence, when debugging a problem, find the logs for tasks that failed if the driver log doesn’t provide enough diagnostic information.

Tips for Using Structured Streaming with Kafka

Recall that in Kafka terminology, a consumer reads from a Kafka topic partition and a producer writes to a partition.

When integrating with Kafka, follow the Apache Spark website instructions carefully.

For example, make sure you set maxOffsetsPerTrigger to an appropriate value to limit the data you consume per trigger according to your application needs. Some experimentation may be required to determine this value.

The current implementation of Kafka support in Spark assigns each Kafka partition to an executor and there is one Kafka consumer per partition created. Hence, a good rule of thumb is to plan your topic partitioning and Spark job configuration so that you have 2-3 tasks per executor per micro-batch.

Structured Streaming support for Kafka utilizes two caches, one for consumers and one for producers. These caches are per executor, so they are shared across the tasks managed by the executor.

The consumer cache has a default max capacity of 64, which is configurable through the spark.sql.kafkaConsumerCache.capacity property. If you see lots of log messages about consumers being evicted from this cache, your executor is probably using more than 64 consumers (i.e., it is reading from more than 64 partitions). Consider increasing the cache capacity to avoid inefficient reconstruction of the consumer objects.

The policy for this cache is to remove the least-recently-used entry, but only if it is currently not in use. For long running tasks that means that the cache may grow beyond the set cache capacity. So, make sure the server has sufficient resources for these consumers and don’t forget there may be more than one executor per server.

The producer cache operates differently. It has no upper limit for its capacity capacity. Instead, there is a setting spark.kafka.producer.cache.timeout that controls when an an entry expires. The default value is 10 minutes.

A producer will be created or used by the cache in the executor for every task, but due to bug SPARK-21869, tasks that run for longer than the timeout will fail with the following exception: java.lang.IllegalStateException: Cannot send after the producer is closed. Unlike the consumer cache, the code does not check if the producer is in use, so it can be evicted and closed even while still in use! Currently, the only workaround for this issue is to set the timeout to a value much longer than the observed task execution time. Note that 10 minutes is actually a long time, so if you see this exception in your task logs, also consider if you have properly sized your Spark job and number of topic partitions so that micro-batches complete in less time!

You may need to tune kafkaConsumer.pollTimeoutMs. This option is passed directly to the consumer’s poll loop at the executor side and defines a timeout interval during which polling will block until data is available in the consumer buffer. If you observe significant delay in brokers sending data, you should consider increasing this polling timeout and investigate possible causes of the delay.

The default value is now set to 120 seconds (if spark.network.timeout is not set) and not 512 milliseconds as stated in the documentation.

For all available configuration options you should review the Spark Structured Streaming Kafka Integration Guide carefully.

Finally, sometimes it’s necessary to turn on DEBUG level logging to get useful information about low-level behavior for Kafka consumers and producers.

Known bugs

Although production ready, Structured Streaming has a number of known bugs. Here is the current bug list for Spark 2.4.0. It may be useful when hitting an issue to search this list in case the issue has already been reported. Often a bug report includes known workarounds.

Appendix: Lightbend’s Contributions to Spark, Spark Operator and related projects

Our contributions to the Spark on Kubernetes project are shown next:

6.2.3. Kubeflow

The Kubeflow project is a multi-architecture, multi-cloud framework for running entire machine learning pipelines in Kubernetes. Its current mission is to make it easy for anyone to develop, deploy, and manage composable, portable, and scalable machine learning applications on Kubernetes everywhere.

Note

Kubeflow is a certified component of Fast Data Platform, as defined in Overview of Fast Data Platform. Kubeflow is very active, rapidly evolving project. The currently-certified version is 0.4.1.

Kubeflow is composed of several components, including the following:

  • JupyterHub, which allows users to request an instance of a Jupyter Notebook server dedicated to them.

  • One or more training controllers. These are components that simplify and manage the deployment of training jobs, such as TensorFlow jobs. The Kubeflow community is working on controllers for PyTorch, Caffe2, and other systems.

  • A serving component that helps you serve predictions with your models.

  • Kubeflow pipelines, an engine for scheduling multi-step ML workflows along with a user interface for managing and tracking the execution of pipelines.

  • ModelDB, an end-to-end system that tracks models as they are built.

  • Argo, a workflow scheduling tool for Kubernetes.

Note

Yes, the correct, if inconsistent, spellings are Kubeflow and TensorFlow.

See the full list of components in the Kubeflow documentation. Refer to the Kubeflow documentation for more general information and this Lightbend blog post series for specific information about installing and using Kubeflow on OpenShift, which we summarize and generalize next.

Installation

Kubeflow’s installation is currently based on ksonnet, a configurable, typed, templating system for the Kubernetes application developer. The first step to install Kubeflow is to install ksonnet following these instructions. In the case of Mac OS, the installation can be done using Homebrew:

brew install ksonnet/tap/ks

The next step is to connect to your cluster, after which you can create a ksonnet Kubeflow project. This is done by running the following commands to download the following kfctl.sh script:

KUBEFLOW_SRC=...
mkdir ${KUBEFLOW_SRC}
cd ${KUBEFLOW_SRC}
export KUBEFLOW_TAG=v0.4.1

curl https://raw.githubusercontent.com/kubeflow/kubeflow/${KUBEFLOW_TAG}/scripts/download.sh | bash

We’re using a variable KUBEFLOW_SRC to refer to the directory where you want to download the source. Define this variable if you need it, but we won’t need it for any other purpose.

Run the following commands to setup and deploy Kubeflow:

KFAPP=my_app_name
scripts/kfctl.sh init ${KFAPP} --platform none
cd ${KFAPP}
../scripts/kfctl.sh generate k8s

KFAPP is used for convenience. Use any name you want for the Kubeflow deployment.

Now run the following oc commands to set the required access permissions:

oc adm policy add-scc-to-user anyuid -z ambassador -n kubeflow
oc adm policy add-scc-to-user anyuid -z jupyter -n kubeflow
oc adm policy add-scc-to-user anyuid -z katib-ui -n kubeflow
oc adm policy add-scc-to-user anyuid -z default -n kubeflow

Next update the tf_operator image to the latest version. The current release 0.4.0, which is used by default installation, is broken:

ks param set tf-job-operator tfJobImage gcr.io/kubeflow-images-public/tf_operator:latest

If you want to see what is going to be installed, take a look inside kfctl.sh. It will run the following commands:

ks apply default -c ambassador
ks apply default -c jupyter
ks apply default -c centraldashboard
ks apply default -c tf-job-operator
ks apply default -c pytorch-operator
ks apply default -c metacontroller
ks apply default -c spartakus
ks apply default -c argo
ks apply default -c pipeline

You can always comment out some of these lines if you do not want to install the corresponding components. You can also add additional components from .

Now run the following command to install Kubeflow (assuming you are still in the $KFAPP directory):

../scripts/kfctl.sh apply k8s

Once installation is complete, do the following steps:

  • Make sure that all the pods are running correctly

  • Expose the ambassodor service as a route and verify it by going to the created URL. This should bring up the main Kubeflow screen.

Kubeflow main screen
Figure 30. Kubeflow Main Screen
Additional Installation Steps

Kubeflow is now installed, but in order for its components to operate correctly, additional installation steps must be performed.

If you want to use the TFJob operator, additional RBAC permissions have to be given to the tf-job-operator service account under which tf-job-operator is running. To do this, create the following YAML file that defines both a role and role bindings. Save it to a file tfjobs-role.yaml (or use another name):

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: tfjobs-role
  labels:
    app: tfjobs
rules:
- apiGroups: ["kubeflow.org"]
  resources: ["tfjobs", "tfjobs/finalizers"]
  verbs: ["get", "list", "watch", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: tfjobs-rolebinding
  labels:
    app: tfjobs
roleRef:
  kind: Role
  name: tfjobs-role
subjects:
  - kind: ServiceAccount
    name: tf-job-operator

Now run the following command to install it:

$ oc apply -f tfjobs-role.yaml -n kubeflow

If you want to use the study job operator, additional RBAC permissions have to be given to studyjob-controller service account under which studyjob-controller is running. As before, create the following YAML file defining the role and role binding. Save it locally to studyjobs-role.yaml:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: studyjobs-role
  labels:
    app: studyjobs
rules:
- apiGroups: ["kubeflow.org"]
  resources: ["studyjobs", "studyjobs/finalizers"]
  verbs: ["get", "list", "watch", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: studyjobs-rolebinding
  labels:
    app: studyjobs
roleRef:
  kind: Role
  name: studyjobs-role
subjects:
  - kind: ServiceAccount
    name: studyjob-controller

Run the following command to install it:

$ oc apply -f studyjobs-role.yaml -n kubeflow

If you want to use Kubeflow pipelines, it is necessary to give additional privileged permissions to argo and pipeline-runner roles using the following commands:

$ oc adm policy add-scc-to-user privileged -z argo -n kubeflow
$ oc adm policy add-scc-to-user privileged -n kubeflow -z pipeline-runner

It is also necessary to provide additional RBAC permissions to the argo service account under which workflow-controller runs. To do this, create the following YAML file defining both the role and role binding. Save it locally to argo-role.yaml:

kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: argo-role
  labels:
    app: argo
rules:
- apiGroups: ["argoproj.io"]
  resources: ["workflows", "workflows/finalizers"]
  verbs: ["get", "list", "watch", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: argo-rolebinding
  labels:
    app: argo
roleRef:
  kind: Role
  name: argo-role
subjects:
  - kind: ServiceAccount
    name: argo

Run the following command to install it:

$ oc apply -f argo-role.yaml -n kubeflow
Uninstalling Kubeflow

To remove the Kubeflow deployment, use the same kfkctl.sh script used to install it:

../scripts/kfctl.sh delete k8s
Warning

The script will delete the namespace kubeflow along with everything you have deployed in it.

Using Kubeflow

For information on how to use Kubeflow components, see the following resources:

6.3. Fast Data Architectural Patterns

Lightbend Platform can be best utilized for creating applications if the following set of best practices and architecture design patterns are followed. They were used by the Platform development team during the process of selecting and testing components and while writing the sample applications. We discuss some of these patterns here.

6.3.1. Log-centric Execution Architecture

This pattern was originally introduced by Jay Kreps and further elaborated in several posts by Ben Stopford: Data Dichotomy, Services on backbone of events, and Kafka as a backbone for service integration.

Log-centric Architecture
Figure 31. Log-centric Architecture

The heart of this pattern is a distributed log providing well-defined APIs for adding and retrieving data. Currently, there are several implementations that can be used here including Apache Kafka, Apache Distributed Log, Apache Pulsar, and Pravega. Lightbend Platform currently supports Kafka, but one or more of the other options are under evaluation for future support.

In the case of a pure log-centric architecture, there is no direct interconnectivity between services. Any participant in this architecture can produce data that is added to the log and is available to any participant that has access to the log as a consumer. The log just stores data sequentially, usually by time. The implementation may manage producers and consumers of the data or delegate this responsibility elsewhere.

A benefit of logs is the ability to replay messages in cases where some of the consumers are added or replaced, processing logic is updated, etc. This ability also supports Event Sourcing, where the log is considered the source. It can also be used to support CQRS (command-query responsibility segregation), where the log might be used to update read stores with data from write stores. See here for more on Event Sourcing and CQRS.

Pattern Definition

An enterprise has multiple services that are being built independently, with different languages and platforms, that need to exchange data.

Q: How can I organize such data exchange minimizing coupling between services?

A: Have each service produce data (as it changes) to the centralized log for usage by all participants, that can register for the log changes and react appropriately to the log changes.

  • Intent: This pattern aims to minimize direct interaction between applications/service (and consequently their coupling) by introduction of a centralized log that can be sequentially accessed by all participants.

  • Motivation (Forces): Streamlining communications between components/services by introduction of a centralized stream of data rather then implementing spaghetti interactions between participants.

  • Applicability: This pattern is especially useful if:

    • Multiple applications/services required to work together have different internal data model.

    • Information from a particular application/service is used by multiple other components of solution.

    • Strong decoupling is a strong requirement for the overall solution.

    • The ability to add and remove producers and consumers at will is important.

  • Structure: Sequential data, organized into topics.

  • Participants: Log and applications/services leveraging this log for data exchange.

  • Consequences: Strong decoupling of participating components. The ability to replay streams to fix any potential updates.

  • Implementation: Have each service produce data (as it changes) to the centralized log for usage by all participants, that can register for the log changes and react appropriately to the log changes.

  • Related Patterns: Other patterns below address specific implementation aspects of this pattern.

This pattern enforces communication between services through messaging. Compared to the older service messaging pattern), it goes even further - there are no direct messages to any specific applications/service - all of the messaging is mediated by the log, allowing for further decoupling and greater flexibility.

This pattern is similar to the older Whiteboard Pattern, and provides similar advantages:

  • Adding new producers and consumers is easy and does not require any changes to other components in the system.

  • Any topic can be consumed by any number of consumers.

  • Every consumer can manage its own position in the topic that it wants to consume. This is a foundation for replaying the log.

  • Implementation, debugging, and maintenance is simplified because all producers and consumers standardize on the access APIs and messaging semantics for the log. Implementing direct service-to-service communications using ad hoc APIs is harder and less reusable.

In order for this pattern to be used successfully, several supporting patterns needs to be implemented, including:

  • Standardized Semantic Messaging

  • Schema Validation

  • Schema Management

  • Dead-letter Queue

  • Versioning Support

We will discuss these patterns below. Not all of them are required in all cases. Additionally, this pattern supports several execution patterns, including Dynamically Controlled Streams, discussed below.

6.3.2. Standardized Semantic Messaging

In the typical request/response service architectures, messages are only defined between the service and requesters, which often leads to certain shortcuts in service definitions, i.e., less formality in defining the schemas for the exchanged messages.

Usage of log-centric execution encourages the creation of canonical data models, because schema enforcement avoids many problems, i.e., "typing errors". This is not a new idea; compare to the Canonical Data Model used in EAI and SOA, and the concept of standardized service contracts). The rationale is the same in both cases and this pattern is identical to the EAI Canonical Messages Pattern.

Pattern Definition

An enterprise is designing several services that need to work together using a centralized Log. Each service has its own internal data format.

Q: How can you minimize dependencies when integrating applications/services that use different data formats?

A: Design a Canonical Data Model that is independent of any specific application. Require each application to produce and consume messages in this common format.

  • Intent: This pattern aims to minimize the amount of data transformations inside the overall system, through introduction of the semantic model that can be used by by all participants.

  • Motivation (Forces): Ensures that the content of the log will be understood by all participants.

  • Applicability: Required to ensure that all participants can leverage the content of the log. Otherwise, "dynamic typing" logic is required, which is complex and error-prone to implement.

  • Participants: The log implementation, and all consumers and producers.

  • Consequences: An ability of every component of the system to read/write to the log with greater safety.

  • Implementation: Have centrally managed canonical model available to all participating components.

  • Known Uses: was widely used by EAI implementations.

  • Related Patterns: Schema Negotiation, Schema Management.

The Canonical Data Model provides an additional level of decoupling between each service’s individual data format. If a new service is added to the implementation, only a transformation between the Canonical Data Model and the internal model is required, independent of the other services that already participate.

Schema-first design is recommended for Canonical Data Model using, for example, Apache Avro or Google Protocol Buffers.

6.3.3. Schema Validation

As messages become the central part of the service invocation, it is necessary to ensure that a service is able to unmarshall and process incoming messages. This can be done through the introduction of schema negotiation, where every incoming message contains information about the schema that it adheres too and a service should validate that it understands the schema. This pattern is analogous to the older SOA Content Negotiation Pattern.

Pattern Definition

An enterprise is designing several services that need to collaborate with shared messages. Each service can change independently based on its own development schedule.

Q: How can you ensure resiliency and fault tolerance in the face of changing messages?

A: Ensure that a service validates that it can correctly process all incoming messages.

  • Intent: This pattern aims to ensure that message consumption is possible.

  • Motivation (Forces): Eliminate errors where messages are exchanged using an unrecognized format.

  • Applicability: This pattern is required to validate the type of message before attempting to process it.

  • Participants: Service producers and consumers.

  • Consequences: Validation logic, with the associated overhead, is implemented by consumers, e.g., by comparing the schema ID from an incoming message to the IDs of one or more supported formats. Additionally, unmarshalling can leverage a set of default values for fields, which simplifies handling missing information.

  • Known Uses: Similar to SOA Content Negotiation Pattern.

  • Related Patterns: Standardized Semantic Messaging, Schema Management, Dead-letter Queue.

This requires that every message carries the schema definition, in some form, along with the payload, allowing the message processor to validate that it can process a message before attempting to process it. This will allow the processor to avoid “poison messages” and minimize the requirements for reprocessing messages.

In the case of Kafka, message headers can carry the schema information. See examples of the "usage" header in Kafka messages.

6.3.4. Schema Management

In the previous pattern, we discussed that messages should contain information about their schemas. Including the actual schema in every message might be too expensive; the size of the schema might be comparable or even larger than the message itself. The only reasonable solution is to use a schema identifier, which is understood by all participants.

Management of schemas requires a schema repository, which typically contains the full schema definitions, their IDs, known compatibility with similar schemas, etc.

Pattern Definition

An enterprise is designing several services that need to collaborate with shared messages. To ensure compatibility, a limited set of reusable schemas is used.

Q: How can you ensure consistent message schemas across the system?

A: Create a schema repository containing information about all message schemas used.

  • Intent: Centrally manage all of the message schemas used.

  • Motivation (Forces): Need a centralized location for canonical definitions of the message schemas in use.

  • Applicability: Required to ensure all messages use known schemas in an efficient way, i.e., by reference rather then by value.

  • Participants: All system components.

  • Consequences: Every message type is available for service developers and can be uniquely identified by ID.

  • Known Uses: Long used in database systems and more recently in most streaming systems.

  • Related Patterns: Standardized Semantic Messaging, Schema Negotiation.

6.3.5. Dead-letter Queue

Once schema enforcement is in place, the question is what to do with received messages that cannot be processed, because they match no supported schema or other reasons.

While they could be ignored, this would mask the problem that some service is producing messages that can’t be processed, perhaps because a new service entered the system or an existing service changed a schema without notifying existing consumers and/or the schema management system.

The obvious solution is to create a dedicated topic, called a dead-letter queue, where all unprocessed messages are delivered (see here, for example). This topic can be monitored at runtime to discover any abnormal messages happening in the system.

Pattern Definition

An enterprise is using log-centric execution to integrate services.

Q: What will a consumer do with a message they cannot be processed?

A: When a service determines that it cannot process a message, it should forward the message to a dead letter queue for special handling.

  • Intent: Ensure that messages that cannot be processed are not lost, but gathered for special handling.

  • Motivation (Forces): Preserve failed messages for special investigation and handling.

  • Applicability: This pattern is required to save and handle all messages that can’t be processed normally.

  • Participants: All message consumers.

  • Known Uses: Widely used in enterprise integration systems.

  • Related Patterns: Schema Negotiation.

6.3.6. Versioning Support

Ideally, you should strive for backward compatibility when making changes to message schemas for services that produce messages, but sometimes breaking changes are unavoidable. In this case, existing consumers of the given message schema are at risk of breaking. See also API versioning best practices

Pattern Definition

Services evolve over time, sometimes requiring breaking changes in the schemas for the messages they produce.

Q: How can you deal with the breaking changes in messages?

A: When a producer service change requires a message schema change, create a new deployment of the service with a new topic. Consumers that support the new schema can consume messages from this topic.

  • Intent: Ensure that updates in message schemas does not break existing services.

  • Motivation (Forces): Allow message schema changes without breaking all of the deployed message consumers.

  • Applicability: This pattern is required to ensure seamless evolvability of the solution.

  • Participants: The producer service and all its consumers.

  • Consequences: Support for gradual upgrades of the system.

  • Known Uses: Similar to API Versioning Best Practices.

  • Related Patterns: Schema Negotiation, Schema Management.

This approach to versioning allows functionality to evolve independently without breaking the rest of the system. A downside of this pattern is increased size and complexity of the overall system due to multiple deployments of the same service, with different versions. In order to prevent this from happening it is important to introduce a version deprecation policy - old versions should be deleted after a set time period.

6.3.7. Dynamically Controlled Streams

Some streaming services need to use other data that isn’t part of the primary stream. Let’s call it "side-band data" (an old radio term). Examples include reference data for enhancing streaming data and control "signals" that change the system behavior in some way, like updates to machine learning models that change how data is "scored".

This additional data can be either static, i.e., read once at startup and cached, or it can be dynamic, where it can be changed while the service is running. For this pattern, we will consider only the case of dynamic, read-only data.

A traditional approach is to use RPC-based service invocations from within the stream processing service. The advantages of this approach include the following:

  • Simple integration with existing technologies and organizational processes

  • Easier to understand if you come from a non-streaming world

Such an approach may also have the following drawbacks:

  • Worse latency, due to remote-call overhead vs. local function call invocation

  • Coupling the availability, scalability, latency, and throughput of the streaming service with corresponding traits of the RPC service

This pattern defines an alternative; use stateful stream processing where the state can be dynamically updated using messages from other services. Hence, the name: dynamically controlled streams. For a concrete example, see this Ververica blog post).

The following diagram illustrates the basic design:

Dynamically controlled streams
Figure 32. Dynamically Controlled Streams

A stateful stream processor ingests a data stream (or more) while maintaining state that can be updated by a second stream whose function is to pass information necessary to trigger state updates. For consistency, both streams are consumed from the centralized log, but different topics.

An example of this pattern is machine learning model scoring of streaming data, where the state is the in-memory representation of the model, which can be updated periodically after training a new version of the model.

Pattern Definition

The system has a long-running service that maintains state. The state needs to be updated without restarting the service. Often, the state represents the outcome of other, independent services using entirely different languages and platforms, and operating at different time scales.

Q: How can I update state in stateful streaming services without restarting them?

A: Have other services produce messages to the “control” topic for the stream processing to consume and use to trigger state updates.

  • Intent: Avoid the use of RPC service invocation during stream processing to access updated state information, thus providing better overall reliability and decreasing processing time variability and duration due to network communications, etc.

  • Motivation (Forces): Improve stream processing reliability (and performance) by minimizing coupling to external services.

  • Applicability: Applicable to stateful stream processing with slowly changing state.

  • Participants: Stateful stream processing services and producers of state-affecting messages.

  • Consequences: Improved performance, better predictability of processing time.

  • Known Uses: Compare to pattern defined by Ververica.

  • Related Patterns: Log-centric Execution Architecture.

There are several important decisions that you need to make applying the dynamically controlled stream pattern:

  • It usually works best when the control stream is much “slower” compared to the data stream, for example a data stream might process each message from sub-second to minute time frames, while the control stream receives updates in time frames of hours, days, or even months.

  • It is also implied that the full state fits in memory and is otherwise “manageable” by the streaming engine.

When these two criteria aren’t met, the behavior is more like a streaming, SQL-like join.

6.3.8. Queryable State

When using the Dynamically Controlled Streams pattern, it is often desirable to have visibility into current state of the service. There are many ways for accessing this state from outside the service, including logging state changes and writing the state to an external database. These approaches add some complexity to the system. An alternative approach, called queryable state, was introduced by Kafka Streams. Here, the service provides direct access to the stream’s internal state through HTTP(S) access or other methods. The benefits of this approach include:

  • Avoidance of data duplication

  • Minimizes I/O overhead, such as writes to a database, to communicate state externally to interested consumers

  • Leads to fewer moving parts in the end-to-end architecture by avoiding the addition of state stores

Although this implementation was initially introduced in Kafka Streams, it was then implemented in Flink and is straightforward to implement in other streaming frameworks as long as suitable libraries can be integrated into the services.

Pattern Definition

A stateful streaming service has internal state that needs to be accessible for runtime monitoring, debugging, feeding dashboards, and for other external consumers of the state.

Q: How can I gain the visibility to the state of running streaming services?

A: Provide an API for remote access to the current state, e.g., over HTTP.

  • Intent: Gain visibility to the stream processing state using a direct query to the service.

  • Motivation (Forces): Provide access to th estate without the added complexity of persisting the state to external storage.

  • Applicability: Applicable to stateful stream processing.

  • Participants: Stateful stream processing services and consumers of the state.

  • Consequences: Improved visibility into stream execution and state.

  • Known Uses: Standard implementation in Kafka Streams, Apache Flink, and Apache Beam.

  • Related Patterns: Dynamically Controlled Streams.

The details for an example implementation and usage of queryable state can be found in this Lightbend report on model serving.

7. Licenses

This chapter provides license information for Lightbend Fast Data Platform, for the components included in it, and for a few other components that are used in the sample applications.

Use of Lightbend Fast Data Platform and Lightbend Enterprise Suite requires agreeing to the Lightbend Subscription Agreement. For more details on this agreement and Lightbend licenses, see here.

Here are the details for the other components, in alphabetical order.

Apache Cassandra

Apache Cassandra is covered by the Apache 2.0 License.

Apache Flink

Apache Flink is covered by the Apache 2.0 License.

Apache Hadoop

Apache Hadoop is covered by the Apache 2.0 License.

Apache Kafka

Apache Kafka is covered by the Apache 2.0 License.

Apache Spark

Apache Spark is covered by the Apache 2.0 License. The bundled component licenses are available here.

Apache Zeppelin

Apache Zeppelin is covered by the Apache 2.0 License.

Intel Analytics - BigDL

BigDL is covered by the Apache 2.0 License.

Intel Analytics - Analytics Zoo

Analytics Zoo is covered by the Apache 2.0 License.

Elasticsearch

Elasticsearch is covered by the Apache 2.0 License.

Grafana

Grafana is covered by the Apache 2.0 License.

InfluxDB

InfluxDB is covered by the MIT License. The licenses of dependencies are available here

Kubernetes

Kubernetes is covered by the Apache 2.0 License.

Mesosphere Marathon Load Balancer

Mesosphere Marathon Load Balancer (marathon-lb) is covered by the Apache 2.0 License.

RedHat OpenShift Origin (Open Source)

RedHat OpenShift Origin (Open Source) is covered by the Apache 2.0 License.

Scala Language

The Scala programming language is licensed under the BSD 3 Clause License.

8. Contact Us

To contact us for information about Fast Data Platform, to request support, or to provide feedback:

9. Appendices

This chapter provides additional information that isn’t required by all users.

9.1. Appendix: Kubernetes Concepts

If you’re new to Kubernetes, this appendix introduces the core concepts you’ll need to understand, especially for the Before You Install: Cluster Recommendations and Prerequisites and the Management and Monitoring Guide chapters. We can only scratch the surface. See the OpenShift documentation for more information.

9.1.1. Containers, Images, and Pods

Kubernetes can be seen as a deployment platform which facilitates the deployment of applications via the use of images and containers. An image contains the application, with all its data, configuration, dependencies, etc.

That image is used to instantiate a container at runtime; the application will run in this container. The container isolates one application from the other.

The containers for an application are grouped into a pod, with a shared network and storage, and a specification for how to run the containers. A pod’s contents are always co-located and co-scheduled, and run in a shared context.

The shared context of a pod is a set of Linux namespaces, cgroups, and potentially other facets of isolation - the same things that isolate a Docker container. Within a pod’s context, the individual applications may have further levels of isolation defined.

Containers within a pod share an IP address and port space. Pods can find each other via localhost. They can also communicate with each other using standard inter-process communications like SystemV semaphores or POSIX shared memory. Containers in different pods have distinct IP addresses and cannot communicate by IPC without special configuration. These containers usually communicate with each other via pod IP addresses.

Pods are a model of the pattern of multiple cooperating processes which form a cohesive unit of service. They simplify application deployment and management by providing a higher-level abstraction than the set of their constituent images. Pods are the unit of deployment, horizontal scaling, and replication in Kubernetes. Colocation (co-scheduling), shared fate (e.g. termination), coordinated replication, resource sharing, and dependency management are handled automatically for containers in a pod.

A key goal of Kubernetes is to provide an easy-to-use platform that promotes reproducible deployments at any scale. Thus its very common for Kubernetes to be coupled with the concepts of CI/CD (continuous integration/continuous deployment). In the most common scenario, the user builds a number of containers, specifies their deployment characteristics using an application description, and CI/CD tooling deploys the application within the Kubernetes cluster. Kubernetes goes beyond simply offering deployment options for applications. It also provides a number of services to make application development and deployment easier, like load-balancing, service discovery, self-recovery, etc.

9.1.2. Other Concepts in Kubernetes

Several other core concepts are important. All can be associated, one way or another, with the most basic way to deploy objects in a cluster, using Pods, where you specify a set of containers to run co-located on the cluster node chosen by Kubernetes (possibly constrained to particular nodes). Controllers provide more advanced options for continuously controlling applications.

OpenShift provides the following objects with a default controller:

9.1.3. The Operator Pattern

The operator pattern describes a common Kubernetes service implementation pattern that’s modeled after the way Kubernetes built-in resource definitions are designed and implemented. An operator-based project will usually consist of a custom Kubernetes operator (or controller) application and an accompanying CustomResourceDefinition (CRD). The operator is an application written in any language which interacts with the Kubernetes API server to handle all requests associated with its CRD.

The operator uses an active reconciliation process. It will watch instances of a CRD and calculate what actions are required to get to the desired state. This includes the initial instantiation and deployment as well as updates. Once a plan is calculated on how to achieve the required state, the operator will interact with the API server to execute it. For example, to scale out Kafka you would update the Strimzi Kafka CRD by incrementing its replica count. The operator would detect this change and compute that the Kafka StatefulSet needs to increase by one pod, then it will scale the StatefulSet and provide the relevant Kafka configuration to the new pod (its Broker ID, ZooKeeper cluster info, etc.)

The function of the operator can be summarized as the active reconciliation between current and desired state and is best illustrated with the following pseudo-code for an infinite loop:

for {
  desired := getDesiredState()
  current := getCurrentState()
  makeChanges(desired, current)
}

If you’re familiar with Apache Mesos, you can think of an operator as being analogous to a second level scheduler implementation that manages an Apache Mesos framework and its tasks.

The operator pattern was first announced in a CoreOS blog post by Brandon Philips and later in a talk at CloudNativeCon called Writing a custom controller: Extending the functionality of your cluster by Aaron Levy. CoreOS went on to introduce the Operator Framework.

9.1.4. Managing Storage

Managing storage in OpenShift is a distinct problem from managing compute. There are two main approaches to managing storage today:

Local Storage

This type of storage is allocated on-the-fly and will have the same lifecycle as the pod it’s allocated for. If the pod is lost then its local storage is lost. If the pod is rescheduled then it will start with empty local storage. This may not be a problem depending on your use case, such as storage for a local cache. It is also useful for local testing and debugging, due to its simplicity.

Kubernetes PersistentVolume Subsystem

The PersistentVolume subsystem provides an API for users and administrators to abstract out details of how storage is provided from how it is consumed. It is the way to define storage that is not a part of the image/container itself. It separates disk management from pod management, thus allowing the user to preserve data even if a pod dies or completes.

The Kubernetes PersistentVolume subsystem is based on the following API resources:

  • PersistentVolume (PV): A PV is an instance of a volume. Cluster administrators can offer a variety of PVs that differ in more ways than just size and access modes, without exposing users to the details of how those volumes are implemented.

  • PersistentVolumeClaim (PVC): A PVC is a request for storage by a user. It is analogous to a pod in the following ways. Pods consume node resources and a PVC is bound to a PV. Pods can request specific levels of resources (CPU and Memory) and PVCs can request specific storage sizes and access modes, e.g., they can be mounted once read/write or many times read-only, etc.

  • StorageClass: A StorageClass provides a way for administrators to describe the “classes” of storage they offer. Different classes might map to quality-of-service levels, or to backup policies, or to other, arbitrary policies determined by the cluster administrators. Kubernetes itself is not opinionated about what these classes represent. This concept is sometimes called “profiles” in other storage systems.

The most common usage of a StorageClass is for the purpose of dynamic provisioning. When a PVC that specifies a StorageClass is created, Kubernetes should at some moment "return" storage in the form of a PV that will satisfy requirements declared in the StorageClass. You don’t know when exactly it will happen, although it is usually almost immediately. It could also fail. In a way, it is analogous to an async programming promise.

There are two ways PVs may be provisioned: statically or dynamically.

  • Static: A cluster administrator creates a number of PVs. They carry the details of the real storage which is available for use by cluster users. They exist in the Kubernetes API and are available for consumption.

  • Dynamic: When none of the static PVs the administrator created matches a user’s PersistentVolumeClaim, the cluster may try to dynamically provision a volume specially for the PVC. This provisioning is based on StorageClasses; the PVC must request a StorageClass and the administrator must have created and configured that StorageClass in order for dynamic provisioning to succeed.

Here is the list of the currently-supported PV types for OpenShift.

We’ll discuss a more details about PVCs and PVs when we discuss the specific uses of them in Fast Data Platform below.

9.2. VPN Example

This appendix provides a VPN configuration example used by Lightbend’s engineering team to protect access to our development clusters. Similar concepts apply for other cloud providers.

Configuring a VPN depends in part on your on-premise infrastructure and on your cloud provider, when applicable. As an example, here are high-level instructions for using a VPN with an AWS-based OpenShift cluster. (Contact Us for more detailed assistance, if needed.)

  • Create a VPC, e.g., 10.1.0/16

  • Use a public subnet, e.g., 10.1.0.0/24

  • Use a private subnet, e.g., 10.2.0.0/24

  • Attach an Internet gateway to the VPC

  • Create a NAT Gateway and place it in the public subnet

  • Define security group rules on the public subnet:

    • From 0.0.0.0/0 to port 22 and 1194: allow

    • Full, unrestricted traffic flow within the subnet

    • Bi-directional all traffic flow to and from the private subnet

  • Define security group rules on the private subnet:

    • Full, unrestricted traffic flow within the subnet.

    • Bi-directional all traffic flow to and from the public subnet

  • Create an OpenVPN access server. For example, select a community or AWS Marketplace AMI preconfigured with OpenVPN. You’ll need to edit /etc/openvpn/server.conf to reflect the particulars of your network setup and restart via systemctl restart openvpn@server. Also, comment out the DNS settings in the server.conf file.

  • Install a suitable VPN client on your workstation, such as Tunnelblick.

    • Configure a client connection for the VPN.

    • Verify that your VPN is working; can you access the cluster?

9.3. Appendix: "OC Cluster Up"

This is a way to run a local OpenShift cluster, an alternative to Minishift, which is discussed in OpenShift CLI Tools for Working with Clusters.

The full documentation for this approach can be found here.

9.3.1. "OC Cluster Up" Setup Instructions

Let’s see an example installing on an Ubuntu image running on AWS.

In a convenient directory, run these commands:

mkdir -p oc_cluster_up
cd oc_cluster_up

Install Docker, if necessary:

sudo apt-get update
sudo apt-get install \
    apt-transport-https \
    ca-certificates \
    curl \
    software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo apt-key fingerprint 0EBFCD88
sudo add-apt-repository \
   "deb [arch=amd64] https://download.docker.com/linux/ubuntu \
   $(lsb_release -cs) \
   stable"
sudo apt-get update
sudo apt-get install docker-ce
sudo tee /etc/docker/daemon.json << EOF
{
  "insecure-registries": [
    "172.30.0.0/16"
  ]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

Use docker without requiring sudo:

sudo usermod -aG docker $USER

Exit from current terminal and ssh again to the node.

Now install Java:

sudo apt-get install openjdk-8-jre

Install kubectl:

curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo touch /etc/apt/sources.list.d/kubernetes.list
echo "deb http://apt.kubernetes.io/kubernetes-xenial main" | sudo tee -a /etc/apt/sources.list.d/kubernetes.list
sudo apt-get update
sudo apt-get install kubectl

Verify that IPv4 forwarding is set:

sysctl net.ipv4.ip_forward

It should return net.ipv4.ip_forward = 1.

Verify that Docker networking is set as expected:

docker network inspect -f "{{range .IPAM.Config }}{{ .Subnet }}{{end}}" bridge

It should return 172.17.0.0/16.

Download this OpenShift archive file (or a more recent version).

Extract the Linux oc binary from archive and place it somewhere in your PATH:

$ tar -zxvf openshift-origin-client-tools-v3.11.0-dd10d17-linux-64bit.tar.gz
tar: Ignoring unknown extended header keyword 'LIBARCHIVE.xattr.security.selinux'
openshift-origin-client-tools-v3.11.0-dd10d17-linux-64bit/oc
tar: Ignoring unknown extended header keyword 'LIBARCHIVE.xattr.security.selinux'
openshift-origin-client-tools-v3.11.0-dd10d17-linux-64bit/README.md
tar: Ignoring unknown extended header keyword 'LIBARCHIVE.xattr.security.selinux'
openshift-origin-client-tools-v3.11.0-dd10d17-linux-64bit/LICENSE

If you are running oc cluster up on a virtual machine in Amazon AWS EC2, you should pass the public hostname and IP address to ensure that the cluster is reachable from your local host (optional, see below).

metadata_endpoint="http://169.254.169.254/latest/meta-data"
echo $metadata_endpoint
public_hostname="$( curl "${metadata_endpoint}/public-hostname" )"
echo $public_hostname
public_ip="$( curl "${metadata_endpoint}/public-ipv4" )"
echo $public_ip

Start the cluster:

oc cluster up
Note

You can use --loglevel=10 to debug the oc command.

Here is some typical output, lots of which is elided (for OpenShift 3.10):

Getting a Docker client ...
Checking if image openshift/origin-control-plane:v3.10 is available ...
Creating shared mount directory on the remote host ...
Determining server IP ...
Checking if OpenShift is already running ...
Checking for supported Docker version (=>1.22) ...
Checking if insecured registry is configured properly in Docker ...
Checking if required ports are available ...
Checking if OpenShift client is configured properly ...
Checking if image openshift/origin-control-plane:v3.10 is available ...
Starting OpenShift using openshift/origin-control-plane:v3.10 ...
I0921 09:50:22.575484   20827 config.go:42] Running "create-master-config"
I0921 09:50:25.098846   20827 config.go:46] Running "create-node-config"
...
Login to server ...
Creating initial project "myproject" ...
Server Information ...
OpenShift server started.

The server is accessible via web console at:
    https://127.0.0.1:8443

You are logged in as:
    User:     developer
    Password: <any value>

To login as administrator:
    oc login -u system:admin

Login next:

oc login -u system:admin

You should see this:

Logged into "https://127.0.0.1:8443" as "system:admin" using existing credentials.

You have access to the following projects and can switch between them with oc project <projectname>:

  • default

  • kube-dns

  • kube-proxy

  • kube-public

  • kube-system

  • myproject

  • openshift

  • openshift-apiserver

  • openshift-controller-manager

  • openshift-core-operators

  • openshift-infra

  • openshift-node

  • openshift-web-console

The one you are using will have a star * in front of the name. Next, list the pods:

$ oc get po --namespace=default
NAME                            READY     STATUS      RESTARTS   AGE
docker-registry-1-9qw46         1/1       Running     0          2m
persistent-volume-setup-wkk7l   0/1       Completed   0          2m
router-1-6n2vs                  1/1       Running     0          2m

Verify that DNS from within your pods works as expected:

oc run -i --tty ubuntu --image=ubuntu:16.04 --restart=Never -- bash

You’ll be logged in and see a prompt root@ubuntu:/#. Now run these commands:

apt-get update
apt-get install iputils-ping
apt-get install dnsutils
ping 8.8.8.8

Ping should succeed, of course.

Try nslookup:

nslookup www.lightbend.com

It should return:

Server:		172.30.0.2
Address:	172.30.0.2#53

Non-authoritative answer:
www.lightbend.com  canonical name = lightbend-01-388990380.us-east-1.elb.amazonaws.com.
Name:  lightbend-01-388990380.us-east-1.elb.amazonaws.com
Address: 18.235.11.113
Name:  lightbend-01-388990380.us-east-1.elb.amazonaws.com
Address: 34.201.196.248
Name:  lightbend-01-388990380.us-east-1.elb.amazonaws.com
Address: 34.238.100.233

And this:

nslookup kubernetes.default.svc

Which should return:

Server:		172.30.0.2
Address:	172.30.0.2#53

Name:	kubernetes.default.svc.cluster.local
Address: 172.30.0.1
Note

There are some issues with DNS setup.

If you run on your host machine and not on AWS, make sure you don’t encounter this issue. Also, replace /etc/resolv.conf with the contents of cat /run/systemd/resolve/resolv.conf before you start your cluster.

For example, using Ubuntu 18.04 on a local machine:

cat /run/systemd/resolve/resolv.conf
Warning

This file is managed by systemd-resolved(8). Do not edit it directly. This is a dynamic resolv.conf file for connecting local clients directly to all known uplink DNS servers. This file lists all configured search domains. Third party programs must not access this file directly, but only through the symlink at /etc/resolv.conf. To manage resolv.conf(5) in a different way, replace this symlink with a static file or a different symlink.

9.3.2. Accessing the GUI

If you have setup the cluster on a public IP on AWS:

oc cluster up –public-hostname="${public_hostname}"

You should see the following:

I0921 10:02:23.215778 5835 interface.go:41] Finished installing "openshift-image-registry" "sample-templates" "centos-imagestreams" "openshift-router" "persistent-volumes" "openshift-web-console-operator"
Server Information …
{kubernetes-platform-name} server started.

The server is accessible via web console at:
 https://ec2-54-246-222-160.eu-west-1.compute.amazonaws.com:8443[https://ec2-54-246-222-160.eu-west-1.compute.amazonaws.com:8443, window="aws"]

Change the instance security group inbound rules and add port 8843 there.

If you started the cluster simply with oc cluster up forward 8443 to your machine:

ssh -4 -o "UserKnownHostsFile=/dev/null" -o "StctHostKeyChecking=no" -o "ServerAliveInterval=120" -N -L 8443:localhost:8443 ubuntu@54.246.222.160

Now access the GUI at https://localhost:8443.

If you try this method and you have exposed your cluster on the public IP address, it will redirect you to that IP on your browser and vice versa.

If everything runs fine then you should see the following login form:

Login Form
Figure 33. Login Form

Use the credentials admin for the user name and admin for the password.

You should see the following screen after logging in:

After Login
Figure 34. After Login

If you have any issues with your cluster, try again:

oc cluster down
sudo rm -rf openshift.local.clusterup

The folder openshift.local.clusterup contains your cluster’s configuration.

9.3.3. Running a Spark Application

Let’s run the Spark Pi app as a smoke test on the new cluster.

First, login and set your project, if necessary:

oc login -u system:admin
oc project default

This step is important. Otherwise, your users won’t have the privileges to watch the pods in the default namespace.

Create a file spark-rbac.yaml with the following contents:

apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
 name: spark-rbac
subjects:
*

kind: ServiceAccount

== Reference to upper's `metadata.name`

name: default

== Reference to upper's `metadata.namespace`

namespace: default
roleRef:
 kind: ClusterRole
 name: cluster-admin
 apiGroup: rbac.authorization.k8s.io

Use it to implement these settings:

oc create -f spark-rbac.yaml

Create a pod spec pod.yaml with the following contents:

apiVersion: v1
kind: Pod
metadata:
  name: test-deployment-pi
spec:
  containers:
  - name: test-deployment-pi
    image: lightbend/spark:2.1.1-OpenShift-2.4.3
    command:
      - 'sh'
      - '-c'
      -  "/opt/spark/bin/spark-submit
              --master k8s://https://k8s.default.svc
              --deploy-mode cluster
              --name spark-pi
              --class org.apache.spark.examples.SparkPi
              --conf spark.executor.instances=1
              --conf spark.kubernetes.container.image=lightbend/spark:2.1.1-OpenShift-2.4.3
              local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar"
  restartPolicy: Never

Finally, run it!

oc create -f pod.yaml

Check that your spark driver and executors are launched by getting the list of running pods:

oc get po

You should see output like the following output:

NAME                            READY     STATUS              RESTARTS   AGE
docker-registry-1-pjls9         1/1       Running             0          39m
persistent-volume-setup-btf77   0/1       Completed           0          39m
router-1-phbgf                  1/1       Running             0          39m
spark-pi-1537526738128-driver   1/1       Running             0          6s
spark-pi-1537526738128-exec-1   0/1       ContainerCreating   0          0s
test-deployment-pi              1/1       Running             0          10s

If your executors are in a pending state, you probably don’t have enough resources. Try changing any of these settings in the YAML above:

--conf spark.executor.instances=1
--conf spark.kubernetes.executor.request.cores=0.1
--conf spark.kubernetes.executor.limit.cores=0.1

To debug any issues run oc get events. For this case, you should see something similar to the following:

0/1 nodes are available: 1 Insufficient cpu

To check the numbers of cpus used run oc describe nodes | grep cpu.

If it appears that the Spark job is running or completed, check that the job run successfully with oc logs -f spark-pi-1537526738128-driver:

2018-09-21 10:45:49 INFO  DAGScheduler:54 - Job 0 finished: reduce at SparkPi.scala:38, took 0.759390 s
Pi is roughly 3.143635718178591
2018-09-21 10:45:49 INFO  AbstractConnector:318 - Stopped Spark@14faa38c{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-09-21 10:45:49 INFO  SparkUI:54 - Stopped Spark web UI at http://spark-pi-1537526738128-driver-svc.default.svc:4040
2018-09-21 10:45:49 INFO  KubernetesClusterSchedulerBackend:54 - Shutting down all executors
2018-09-21 10:45:49 INFO  KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint:54 - Asking each executor to shut down
2018-09-21 10:45:49 WARN  ExecutorPodsWatchSnapshotSource:87 - Kubernetes client has been closed (this is expected if the application is shutting down.)
2018-09-21 10:45:49 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2018-09-21 10:45:49 INFO  MemoryStore:54 - MemoryStore cleared
2018-09-21 10:45:49 INFO  BlockManager:54 - BlockManager stopped
2018-09-21 10:45:49 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-09-21 10:45:49 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2018-09-21 10:45:49 INFO  SparkContext:54 - Successfully stopped SparkContext
2018-09-21 10:45:49 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-09-21 10:45:49 INFO  ShutdownHookManager:54 - Deleting directory /var/data/spark-f6a8bffb-cfa2-4816-8f1d-19cbe84daf96/spark-ba167a22-f4b7-4cca-834a-07beef0503a8

10. Colophon

Fast Data Platform is a team effort:

  • Craig Blitz

  • Jonas Bonér

  • Luc Bourlier

  • Ryan Braley

  • Mark Brewer

  • David Brinegar

  • Michael Burling

  • Trevor Burton-McCreadie

  • Ed Callahan

  • Kikia Carter

  • Paul Craddick

  • Michael Cramer

  • Andre Cravens

  • Paul Dale

  • Klaus Dambeck

  • Duncan DeVore

  • Hywel Evans

  • Debasish Ghosh

  • Sean Glover

  • Justin Godsey

  • Yury Gribkov

  • Kathleen Hayes

  • Derek Henninger

  • Dwight Keith

  • Mike Kelland

  • Viktor Klang

  • Srikanth Koneru

  • Stavros Kontopoulos

  • John Kroeger

  • Tracey Liot

  • Boris Lublinsky

  • Gerard Maas

  • Dave Martin

  • Eric Martin

  • Douglas McPherson

  • John Meyerhofer

  • Martynas Mickevičius

  • Age Mooij

  • Adriaan Moors

  • Brad Murdoch

  • Alan Ngai

  • Ashley Nuqui

  • David Ogren

  • Justin Pihony

  • Patrick Premont

  • James Ravn

  • Ray Roestenburg

  • Enno Runne

  • Lukas Rytz

  • Sushila Sahay

  • Seth Tisue

  • Craig Upson

  • Robert Walker

  • Dean Wampler

  • Karl Wehden

  • Danny Wei

  • Oliver White

  • Holly Wilson

  • Chaoran Yu

  • Jason Zaugg

  • Yunan Zhao

© 2016-2019, Lightbend, Inc.