Akka Stream configuration

Streams need to be selected for telemetry. To avoid any unwanted impact on application performance, you need to decide which streams in your system are instrumented. This can be done either by using configuration—based on the call site of stream materialization—or programmatically using stream attributes.

Note: Streams are not instrumented automatically, but must have instrumentation enabled explicitly.

Using configuration to instrument streams

Streams can be selected for instrumentation based on the code location where the stream is materialized. You can specify the enclosing method fully, or you can specify the enclosing class or package using wildcards. Cinnamon accesses the same configuration as used to configure an ActorSystem. The default configuration is in application.conf.

For example, the following are valid stream selections for fully qualified method names, where the stream is materialized:

"example.streams.a.A.method"
"example.streams.a.A$B.method"

A * wildcard can also be used at the end of a selection, after a delimiter, to select all methods within the given package or class. The following are valid stream selections for methods in a package, methods in a class, or for methods in inner classes, where streams are materialized:

"example.streams.a.*"
"example.streams.a.A.*"
"example.streams.a.A$*"
"example.streams.a.A$B.*"

Settings can be applied to each configured stream selection. The main setting is report-by, for enabling instrumentation and deciding how the selected streams will report metrics.

Report-by settings

Instrumentation is enabled for a stream selection using the report-by setting. There are two main ways to report metrics: either metrics are aggregated by stream name—using the Name attribute of the stream graph—or metrics are reported for individual stream instances—using a unique identifier created by the materializer, such as flow-0.

Here’s an example configuration that enables instrumentation for several streams:

cinnamon.akka {
  streams {
    "example.streams.a.A.method" {
      report-by = name
    }
    "example.streams.a.B.*" {
      report-by = instance
    }
    "example.streams.a.b.*" {
      report-by = name
    }
    "example.streams.a.A$B.*" {
      report-by = [name, instance]
    }
  }
}

Note: Streams that are reported by instance should be carefully selected, to avoid too many unique metric identifiers.

Note: Streams that are reported by name create aggregated metrics for all streams with the same name.

To use report-by-name the runnable graph needs to be named, such as with this stream:

Source.single(0).map(_ + 1).to(Sink.ignore).named("my-stream").run()

Note: Reporting by name requires a Name attribute to be attached to the graph.

Selection excludes

Wildcards can be used to select all streams materialized within a package or class. Streams can be excluded from a selection using excludes. For example:

cinnamon.akka {
  streams {
    "example.streams.a.*" {
      report-by = name
      excludes = ["example.streams.a.B.*", "example.streams.a.b.*"]
    }
  }
}

The above configuration enables telemetry for all streams materialized in the example.streams.a package, apart from the example.streams.a.B class and those streams materialized in the example.streams.a.b sub-package.

Using attributes to instrument streams

Streams can also be selected for instrumentation programmatically, rather than using configuration, by attaching a special stream Attribute to the runnable graph.

Cinnamon provides an Instrumented attribute for specifying telemetry settings directly on a stream. This supports the same report-by settings as described above for configuration.

First import the CinnamonAttributes class:

Scala
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes
Java
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes;

You can then attach the Instrumented attribute to a stream to configure the telemetry settings:

Scala
Source.single(0)
  .map(_ + 1)
  .to(Sink.ignore)
  .named("my-stream")
  .addAttributes(CinnamonAttributes.instrumented(reportByName = true))
  .run()
Java
Source.single(0)
    .map(x -> x + 1)
    .to(Sink.ignore())
    .named("my-stream")
    .addAttributes(CinnamonAttributes.instrumentedByName())
    .run(materializer);

There are arguments and methods for the various configuration options. See the API documentation for CinnamonAttributes.

Note: Streams that are reported by instance should be carefully selected, to avoid too many unique metric identifiers.

Note: Streams that are reported by name create aggregated metrics for all streams with the same name.

For Scala, there is also an implicit extension method available to provide a more convenient way to enable instrumentation, similar to the named method available on graphs. When configuring report-by-name, you can also enable this by providing the name directly.

Scala
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes.GraphWithInstrumented

Source.single(0)
  .map(_ + 1)
  .to(Sink.ignore)
  .instrumented(name = "my-stream")
  .run()

Instrumented runWith

When using the Instrumented attribute to configure instrumentation settings for streams, the attribute needs to be applied to the whole runnable graph (including the sinks). This means that the Instrumented attribute can’t be used in conjunction with the runWith method on sources. Also note that source.runWith(sink) is equivalent to source.toMat(sink)(Keep.right).run() while source.to(sink).run() is equivalent to source.toMat(sink)(Keep.left).run(). That is, runWith keeps the materialized value from the sink, while to/run keeps the materialized value from the source.

To make it easier to instrument streams that currently use runWith to connect the sink, and make use of the returned materialized value, Cinnamon provides instrumentedRunWith helper methods. For Scala, this is provided as an implicit extension method. For Java, this is provided as a static method. Both the sink and the instrumentation settings are passed to instrumentedRunWith:

Scala
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes.SourceWithInstrumented

implicit val materializer = ActorMaterializer()

Source.single(0)
  .map(_ + 1)
  .instrumentedRunWith(Sink.ignore)(name = "my-stream")
Java
CinnamonAttributes.instrumentedRunWith(
    Source.single(0).map(x -> x + 1),
    Sink.ignore(),
    materializer,
    CinnamonAttributes.isInstrumented().withReportByName("my-stream"));

Instrumenting partial streams

Partial streams can be instrumented using the Instrumented attribute. This can be useful when only one part of the overall stream is of interest, such as when a Flow is being passed to another library such as Akka HTTP. To enable instrumentation for partial streams, attach the Instrumented attribute to a Source or Flow in the same way as enabling instrumentation for entire streams.

Note: If the intention is to instrument the whole runnable graph, including the sinks, see the section about instrumented runWith on instrumenting all operators rather than only part of the stream.

For Scala, there is also an implicit extension method available to provide a more convenient way to enable instrumentation for partial streams. The method is similar to the instrumented extension method, but named instrumentedPartial.

For example, if the Flow handler approach in Akka HTTP is being used, then the Flow can be instrumented as a partial stream:

Scala
import com.lightbend.cinnamon.akka.stream.CinnamonAttributes.FlowWithInstrumented

val flow = Flow[HttpRequest]
  .map(request => HttpResponse(entity = "response"))
  .instrumentedPartial(name = "handler", traceable = true)

Http().bindAndHandle(flow, host, port)

Stream and operator names

Cinnamon can create meaningful and unique names for streams and operators, to use when reporting metrics, events, or traces. Attributes attached to a stream or operator will be used, as well as the unique ids that the materializer associates with each stream or operator.

Stream names

The naming for streams depends on the report-by settings and either the named or instrumented attributes. Here are some examples:

Stream: not named and report-by-instance

If a stream is not named and is configured to report by instance, then the unique internal name created by the materializer is used.

The default pattern is flow-{id}.

Scala
// example stream: not named and instrumented to report-by-instance

Source.single(0).map(_ + 1)
  .instrumentedRunWith(Sink.ignore)(reportByInstance = true)
Java
// example stream: not named and instrumented to report-by-instance

CinnamonAttributes.instrumentedRunWith(
    Source.single(0).map(x -> x + 1),
    Sink.ignore(),
    materializer,
    CinnamonAttributes.isInstrumented().withReportByInstance());
// expected naming for an unnamed stream with report-by-instance

name -> "flow-0"

Stream: named and report-by-instance

If a stream is named and configured to report by instance, then the name is combined with the unique internal name created by the materializer.

The pattern is {name}-flow-{id}.

Scala
// example stream: named and instrumented to report-by-instance

Source.single(0)
  .map(_ + 1)
  .to(Sink.ignore)
  .named("my-stream")
  .instrumented(reportByInstance = true)
  .run()
Java
// example stream: named and instrumented to report-by-instance

Source.single(0)
    .map(x -> x + 1)
    .to(Sink.ignore())
    .named("my-stream")
    .addAttributes(CinnamonAttributes.instrumentedByInstance())
    .run(materializer);
// expected naming for a named stream with report-by-instance

name -> "my-stream-flow-0"

Stream: report-by-name

If a stream is named and configured to report by name, then just the given name is used.

The name can be provided by using the named attribute:

Scala
// example stream: named and instrumented to report-by-name

Source.single(0)
  .map(_ + 1)
  .to(Sink.ignore)
  .named("my-stream")
  .instrumented(reportByName = true)
  .run()
Java
// example stream: named and instrumented to report-by-name

Source.single(0)
    .map(x -> x + 1)
    .to(Sink.ignore())
    .named("my-stream")
    .addAttributes(CinnamonAttributes.instrumentedByName())
    .run(materializer);
// expected naming for a named stream with report-by-name

name -> "my-stream"

The name can also be provided directly with the instrumented attribute, which will automatically enable report by name:

Scala
// example stream: named and instrumented to report-by-name

Source.single(0).map(_ + 1)
  .instrumentedRunWith(Sink.ignore)(name = "my-stream")
Java
// example stream: named and instrumented to report-by-name

CinnamonAttributes.instrumentedRunWith(
    Source.single(0).map(x -> x + 1),
    Sink.ignore(),
    materializer,
    CinnamonAttributes.isInstrumented().withReportByName("my-stream"));
// expected naming for a named stream with report-by-name

name -> "my-stream"

Stream: report-by-name and report-by-instance

If a stream has multiple report-by settings, then metrics will be reported under multiple names. See examples above for names used for reporting by instance or name.

Operator names

Stream operator names are based on any name attributes attached to the operator, with a fallback to the class name of the operator if there are no attributed names. To ensure that operator names are unique, the ids assigned by the materializer for async islands and operator stages are always included in the name.

The pattern for operator names is {async-island-id}-{operator-stage-id}-{names}, where {names} are either all name attributes (ordered from least specific to most specific) or otherwise the (simple) class name for the operator.

Here are some examples:

Operators: no extra name attributes

If an instrumented stream has no extra name attributes, then the names will default to the built-in names provided by Akka Streams (either provided name attributes, or fallback to the simple class name).

Scala
// example stream: with async island and without any extra name attributes

Source(1 to 5)
  .map(_ + 1)
  .async
  .filter(_ % 2 == 0)
  .map(_ * 2)
Java
// example stream: with async island and without any extra name attributes

Source.range(1, 5).map(x -> x + 1).async().filter(x -> x % 2 == 0).map(x -> x * 2);
// expected naming for operators without any extra name attributes

// source of iterable, highest id numbers
source -> "2-3-iterableSource-singleSource"

// source of iterable is made up of two stages
sourceMapConcat -> "2-2-iterableSource-statefulMapConcat"

// the first map is in a different async island from the second map
map1 -> "2-0-map"

// async boundary: output from first async island
outputBoundary -> "2-1-ActorOutputBoundary"

// async boundary: input to second async island
inputBoundary -> "0-3-BatchingActorInputBoundary"

// the filter is in the second async island (first to be materialized)
filter -> "0-2-filter"

// the second map has the first ids
map2 -> "0-0-map"

Note: The stream materializer will traverse back through the graph from the sink, so downstream operators are assigned ids first. The ids are generally in reverse order.

Operators: with name attributes

If an instrumented stream has name attributes, then the operator names will include all attached and inherited name attributes. Note that attributes apply to the entire sub-graph when using the fluid DSLs. To isolate a name, create a named flow and use the via operator. See the Akka documentation on attributes for more information.

Scala
// example stream: with extra name attributes

// separate named flow
val timesTwo = Flow[Int].map(_ * 2).named("times-two")

Source(1 to 5)
  .map(_ + 1)
  .async.named("first") // applies to sub-graph above
  .via(Flow[Int].filter(_ % 2 == 0).named("even")) // inline named operator
  .via(timesTwo)
Java
// example stream: with extra name attributes

// separate named flow
Flow<Integer, Integer, NotUsed> timesTwo =
    Flow.of(Integer.class).map(x -> x * 2).named("times-two");

Source.range(1, 5)
    .map(x -> x + 1)
    .async()
    .named("first") // applies to sub-graph above
    .via(Flow.of(Integer.class).filter(x -> x % 2 == 0).named("even")) // inline named operator
    .via(timesTwo);
// expected naming for operators with name attributes

// source of iterable, highest id numbers, within the "first" part
source -> "2-3-first-iterableSource-singleSource"

// source of iterable is made up of two stages, name includes "first"
sourceMapConcat -> "2-2-first-iterableSource-statefulMapConcat"

// the first map is in the "first" async island, outer name is prepended
map1 -> "2-0-first-map"

// async boundary: output from first async island
outputBoundary -> "2-1-ActorOutputBoundary"

// async boundary: input to second async island
inputBoundary -> "0-3-BatchingActorInputBoundary"

// the filter has an extra "even" name attached
filter -> "0-2-even-filter"

// the second map is from a separate named flow, with "times-two" prepended to default "map" name
timesTwo -> "0-0-times-two-map"

Note: The stream materializer will traverse back through the graph from the sink, so downstream operators are assigned ids first. The ids are generally in reverse order.

Asynchronous operators

Asynchronous stream operators, such as mapAsync or mapAsyncUnordered, process elements asynchronously—the initial element processing will register an asynchronous callback, which will re-enter the stream at a later time, when ready, to complete the processing and pass the result downstream. This means that the regular processing time for these operators will generally be very short, with the bulk of the time spent asynchronously elsewhere, such as in a Future or Actor. To handle this, Cinnamon will detect asynchronous operators and record the processing time for these specially, using a Stopwatch. The total time for asynchronous operators will be recorded, including any asynchronous processing or time in buffers, and until a result is pushed downstream.

The asynchronous processing time is disabled by default, and so is the Stopwatch that it is based on, and both of them need to be enabled:

cinnamon.stopwatch {
  enabled = true
}
cinnamon.akka.stream.metrics {
  async-processing-time = on
}

The asynchronous processing time for a generic asynchronous operator is illustrated in this diagram:

Extended telemetry

Extended telemetry can be enabled for recording end-to-end flow time, stream demand, latency, and per connection metrics. See Akka Stream extended telemetry.