AWS Kinesis Connector

The AWS Kinesis connector provides flows for streaming data to and from Kinesis Data streams and to Kinesis Firehose streams.

For more information about Kinesis please visit the Kinesis documentation.

Alternative connector 1

Another Kinesis connector which is based on the Kinesis Client Library is available.

The KCL Source can read from several shards and rebalance automatically when other Workers are started or stopped. It also handles record sequence checkpoints.

Please read more about it at GitHub aserrallerios/kcl-akka-stream.

Alternative connector 2

Another Kinesis connector which is based on the Kinesis Client Library is available.

This library combines the convenience of Akka Streams with KCL checkpoint management, failover, load-balancing, and re-sharding capabilities.

Please read more about it at GitHub StreetContxt/kcl-akka-stream.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-kinesis" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-kinesis_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-kinesis_2.12', version: '0.19'
}

Kinesis Data Streams

Create the Kinesis client

Sources and Flows provided by this connector need a AmazonKinesisAsync instance to consume messages from a shard.

Note

The AmazonKinesisAsync instance you supply is thread-safe and can be shared amongst multiple GraphStages. As a result, individual GraphStages will not automatically shutdown the supplied client when they complete. It is recommended to shut the client instance down on Actor system termination.

Scala
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()

implicit val amazonKinesisAsync: com.amazonaws.services.kinesis.AmazonKinesisAsync =
  AmazonKinesisAsyncClientBuilder.defaultClient()

system.registerOnTermination(amazonKinesisAsync.shutdown())
Java
final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final com.amazonaws.services.kinesis.AmazonKinesisAsync amazonKinesisAsync
        = AmazonKinesisAsyncClientBuilder.defaultClient();

system.registerOnTermination(amazonKinesisAsync::shutdown);

Kinesis as Source

The KinesisSource creates one GraphStage per shard. Reading from a shard requires an instance of ShardSettings.

Scala
val settings = ShardSettings(streamName = "myStreamName",
                             shardId = "shard-id",
                             shardIteratorType = ShardIteratorType.TRIM_HORIZON,
                             refreshInterval = 1.second,
                             limit = 500)
Java
final ShardSettings settings = ShardSettings.create("streamName", "shard-id")
        .withShardIteratorType(ShardIteratorType.LATEST)
        .withRefreshInterval(1L, TimeUnit.SECONDS)
        .withLimit(500);

You have the choice of reading from a single shard, or reading from multiple shards. In the case of multiple shards the results of running a separate GraphStage for each shard will be merged together.

Warning

The GraphStage associated with a shard will remain open until the graph is stopped, or a GetRecords result returns an empty shard iterator indicating that the shard has been closed. This means that if you wish to continue processing records after a merge or reshard, you will need to recreate the source with the results of a new DescribeStream request, which can be done by simply creating a new KinesisSource. You can read more about adapting to a reshard here.

For a single shard you simply provide the settings for a single shard.

Scala
val source: Source[com.amazonaws.services.kinesis.model.Record, NotUsed] =
  KinesisSource.basic(settings, amazonKinesisAsync)
Java
final Source<com.amazonaws.services.kinesis.model.Record, NotUsed> source
        = KinesisSource.basic(settings, amazonKinesisAsync);

You can merge multiple shards by providing a list settings.

Scala
val mergeSettings = List(
  ShardSettings("myStreamName",
                "shard-id-1",
                ShardIteratorType.AT_SEQUENCE_NUMBER,
                startingSequenceNumber = Some("sequence"),
                refreshInterval = 1.second,
                limit = 500),
  ShardSettings("myStreamName",
                "shard-id-2",
                ShardIteratorType.AT_TIMESTAMP,
                atTimestamp = Some(new Date()),
                refreshInterval = 1.second,
                limit = 500)
)

val mergedSource: Source[Record, NotUsed] = KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync)
Java
final Source<Record, NotUsed> two = KinesisSource.basicMerge(Arrays.asList(settings), amazonKinesisAsync);

The constructed Source will return Record objects by calling GetRecords at the specified interval and according to the downstream demand.

Kinesis Put via Flow or as Sink

The KinesisFlow (or KinesisSink) KinesisFlow (or KinesisSink) publishes messages into a Kinesis stream using its partition key and message body. It uses dynamic size batches, can perform several requests in parallel and retries failed records. These features are necessary to achieve the best possible write throughput to the stream. The Flow outputs the result of publishing each record.

Warning

Batching has a drawback: message order cannot be guaranteed, as some records within a single batch may fail to be published. That also means that the Flow output may not match the same input order.

More information can be found here and here.

Publishing to a Kinesis stream requires an instance of KinesisFlowSettings, although a default instance with sane values and a method that returns settings based on the stream shard number are also available:

Scala
val flowSettings = KinesisFlowSettings(
  parallelism = 1,
  maxBatchSize = 500,
  maxRecordsPerSecond = 1000,
  maxBytesPerSecond = 1000000,
  maxRetries = 5,
  backoffStrategy = KinesisFlowSettings.Exponential,
  retryInitialTimeout = 100.millis
)

val defaultFlowSettings = KinesisFlowSettings.defaultInstance

val fourShardFlowSettings = KinesisFlowSettings.byNumberOfShards(4)
Java
final KinesisFlowSettings flowSettings = KinesisFlowSettings.create()
        .withParallelism(1)
        .withMaxBatchSize(500)
        .withMaxRecordsPerSecond(1_000)
        .withMaxBytesPerSecond(1_000_000)
        .withMaxRecordsPerSecond(5)
        .withBackoffStrategyExponential()
        .withRetryInitialTimeout(100L, TimeUnit.MILLISECONDS);

final KinesisFlowSettings defaultFlowSettings = KinesisFlowSettings.create();

final KinesisFlowSettings fourShardFlowSettings = KinesisFlowSettings.byNumberOfShards(4);
Warning

Note that throughput settings maxRecordsPerSecond and maxBytesPerSecond are vital to minimize server errors (like ProvisionedThroughputExceededException) and retries, and thus achieve a higher publication rate.

The Flow/Sink can now be created.

Scala
val flow1: Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] = KinesisFlow("myStreamName")

val flow2: Flow[PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed] = KinesisFlow("myStreamName", flowSettings)

val flow3: Flow[(String, ByteString), PutRecordsResultEntry, NotUsed] =
  KinesisFlow.byPartitionAndBytes("myStreamName")

val flow4: Flow[(String, ByteBuffer), PutRecordsResultEntry, NotUsed] =
  KinesisFlow.byPartitionAndData("myStreamName")

val sink1: Sink[PutRecordsRequestEntry, NotUsed] = KinesisSink("myStreamName")
val sink2: Sink[PutRecordsRequestEntry, NotUsed] = KinesisSink("myStreamName", flowSettings)
val sink3: Sink[(String, ByteString), NotUsed] = KinesisSink.byPartitionAndBytes("myStreamName")
val sink4: Sink[(String, ByteBuffer), NotUsed] = KinesisSink.byPartitionAndData("myStreamName")
Java
final Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> flow
        = KinesisFlow.apply("streamName", flowSettings, amazonKinesisAsync);

final Flow<PutRecordsRequestEntry, PutRecordsResultEntry, NotUsed> defaultSettingsFlow
        = KinesisFlow.apply("streamName", amazonKinesisAsync);

final Sink<PutRecordsRequestEntry, NotUsed> sink
        = KinesisSink.apply("streamName", flowSettings, amazonKinesisAsync);

final Sink<PutRecordsRequestEntry, NotUsed> defaultSettingsSink
        = KinesisSink.apply("streamName", amazonKinesisAsync);

Kinesis Firehose Streams

Create the Kinesis Firehose client

Flows provided by this connector need a AmazonKinesisFirehoseAsync instance to publish messages.

Note

The AmazonKinesisFirehoseAsync instance you supply is thread-safe and can be shared amongst multiple GraphStages. As a result, individual GraphStages will not automatically shutdown the supplied client when they complete. It is recommended to shut the client instance down on Actor system termination.

Scala
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()

implicit val amazonKinesisFirehoseAsync: com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync =
  AmazonKinesisFirehoseAsyncClientBuilder.defaultClient()

system.registerOnTermination(amazonKinesisFirehoseAsync.shutdown())
Java
final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync amazonKinesisFirehoseAsync
        = AmazonKinesisFirehoseAsyncClientBuilder.defaultClient();

    system.registerOnTermination(amazonKinesisFirehoseAsync::shutdown);

Kinesis Put via Flow or as Sink

The KinesisFirehoseFlow (or KinesisFirehoseSink) KinesisFirehoseFlow (or KinesisFirehoseSink) publishes messages into a Kinesis Firehose stream using its message body. It uses dynamic size batches, can perform several requests in parallel and retries failed records. These features are necessary to achieve the best possible write throughput to the stream. The Flow outputs the result of publishing each record.

Warning

Batching has a drawback: message order cannot be guaranteed, as some records within a single batch may fail to be published. That also means that the Flow output may not match the same input order.

More information can be found here.

Publishing to a Kinesis Firehose stream requires an instance of KinesisFirehoseFlowSettings, although a default instance with sane values is available:

Scala
val flowSettings = KinesisFirehoseFlowSettings(
  parallelism = 1,
  maxBatchSize = 500,
  maxRecordsPerSecond = 5000,
  maxBytesPerSecond = 4000000,
  maxRetries = 5,
  backoffStrategy = KinesisFirehoseFlowSettings.Exponential,
  retryInitialTimeout = 100.millis
)

val defaultFlowSettings = KinesisFirehoseFlowSettings.defaultInstance
Java
final KinesisFirehoseFlowSettings flowSettings = KinesisFirehoseFlowSettings.create()
        .withParallelism(1)
        .withMaxBatchSize(500)
        .withMaxRecordsPerSecond(1_000)
        .withMaxBytesPerSecond(1_000_000)
        .withMaxRecordsPerSecond(5)
        .withBackoffStrategyExponential()
        .withRetryInitialTimeout(100L, TimeUnit.MILLISECONDS);

final KinesisFirehoseFlowSettings defaultFlowSettings = KinesisFirehoseFlowSettings.create();
Warning

Note that throughput settings maxRecordsPerSecond and maxBytesPerSecond are vital to minimize server errors (like ProvisionedThroughputExceededException) and retries, and thus achieve a higher publication rate.

The Flow/Sink can now be created.

Scala
val flow1: Flow[Record, PutRecordBatchResponseEntry, NotUsed] = KinesisFirehoseFlow("myStreamName")

val flow2: Flow[Record, PutRecordBatchResponseEntry, NotUsed] = KinesisFirehoseFlow("myStreamName", flowSettings)

val sink1: Sink[Record, NotUsed] = KinesisFirehoseSink("myStreamName")
val sink2: Sink[Record, NotUsed] = KinesisFirehoseSink("myStreamName", flowSettings)
Java
final Flow<Record, PutRecordBatchResponseEntry, NotUsed> flow
        = KinesisFirehoseFlow.apply("streamName", flowSettings, amazonKinesisFirehoseAsync);

final Flow<Record, PutRecordBatchResponseEntry, NotUsed> defaultSettingsFlow
        = KinesisFirehoseFlow.apply("streamName", amazonKinesisFirehoseAsync);

final Sink<Record, NotUsed> sink
        = KinesisFirehoseSink.apply("streamName", flowSettings, amazonKinesisFirehoseAsync);

final Sink<Record, NotUsed> defaultSettingsSink
        = KinesisFirehoseSink.apply("streamName", amazonKinesisFirehoseAsync);
The source code for this page can be found here.