AWS Kinesis Connector

The AWS Kinesis connector provides an Akka Stream Source for consuming Kinesis Stream records.

For more information about Kinesis please visit the official documentation.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" % "akka-stream-alpakka-kinesis2.12" % "0.14"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-kinesis2.12</artifactId>
  <version>0.14</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-kinesis2.12', version: '0.14'
}

Usage

Sources provided by this connector need a AmazonKinesisAsync instance to consume messages from a shard.

Note

The AmazonKinesisAsync instance you supply is thread-safe and can be shared amongst multiple GraphStages. As a result, individual GraphStages will not automatically shutdown the supplied client when they complete.

Scala
val amazonKinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()
Java
final AmazonKinesisAsync amazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient();

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
Java
final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);

Using the Source

The KinesisSource creates one GraphStage per shard. Reading from a shard requires an instance of ShardSettings.

Scala
val settings = ShardSettings(streamName = "myStreamName",
                             shardId = "shard-id",
                             shardIteratorType = ShardIteratorType.TRIM_HORIZON,
                             refreshInterval = 1.second,
                             limit = 500)
Java
final ShardSettings settings = ShardSettings.create("streamName", "shard-id", ShardIteratorType.LATEST, FiniteDuration.apply(1L, TimeUnit.SECONDS), 500);

You have the choice of reading from a single shard, or reading from multiple shards. In the case of multiple shards the results of running a separate GraphStage for each shard will be merged together.

Warning

The GraphStage associated with a shard will remain open until the graph is stopped, or a GetRecords result returns an empty shard iterator indicating that the shard has been closed. This means that if you wish to continue processing records after a merge or reshard, you will need to recreate the source with the results of a new DescribeStream request, which can be done by simply creating a new KinesisSource. You can read more about adapting to a reshard here.

For a single shard you simply provide the settings for a single shard.

Scala
KinesisSource.basic(settings, amazonKinesisAsync)
Java
final Source<Record, NotUsed> single = KinesisSource.basic(settings, amazonKinesisAsync);

You can merge multiple shards by providing a list settings.

Scala
val mergeSettings = List(
  ShardSettings("myStreamName",
                "shard-id-1",
                ShardIteratorType.AT_SEQUENCE_NUMBER,
                startingSequenceNumber = Some("sequence"),
                refreshInterval = 1.second,
                limit = 500),
  ShardSettings("myStreamName",
                "shard-id-2",
                ShardIteratorType.AT_TIMESTAMP,
                atTimestamp = Some(new Date()),
                refreshInterval = 1.second,
                limit = 500)
)
KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync)
Java
final Source<Record, NotUsed> two = KinesisSource.basicMerge(Arrays.asList(settings), amazonKinesisAsync);

The constructed Source will return Record objects by calling GetRecords at the specified interval and according to the downstream demand.

The source code for this page can be found here.