AWS DynamoDB Connector

The AWS DynamoDB connector provides a flow for streaming DynamoDB requests. For more information about DynamoDB please visit the official documentation.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-dynamodb" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-dynamodb_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-dynamodb_2.12', version: '0.19'
}

Usage

This connector will uses the default credential provider chain provided by the DynamoDB Java SDK to retrieve credentials.

If you wish to use static credentials they can be defined in the config

Define Static Credentials in Config
"""
  | akka.stream.alpakka.dynamodb {
  |  region = "eu-west-1"
  |  host = "localhost"
  |  port: 443
  |  parallelism = 32
  |  credentials {
  |    access-key-id = "dummy-access-key"
  |    secret-key-id = "dummy-secret-key"
  |  }
  |}""".stripMargin

Alternatively any type of AWSCredentialsProvider can be provided programmatically via the default DynamoSettings constructor

Supply Credentials Provider to Constructor
case class DynamoSettings(region: String,
                          host: String,
                          port: Int,
                          parallelism: Int,
                          credentialsProvider: AWSCredentialsProvider)

Before you can construct the client, you need an ActorSystem, ActorMaterializer, and ExecutionContext.

Scala
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
Java
final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);

You can then create the client with a settings object.

Scala
val settings = DynamoSettings(system)
val client = DynamoClient(settings)
Java
final DynamoSettings settings = DynamoSettings.create(system);
final DynamoClient client = DynamoClient.create(settings, system, materializer);

We can now send requests to DynamoDB across the connection.

Scala
import DynamoImplicits._
val listTablesResult: Future[ListTablesResult] = client.single(new ListTablesRequest())
Java
final Future<ListTablesResult> listTablesResultFuture = client.listTables(new ListTablesRequest());

You can also use a Flow to execute your DynamoDB call:

Scala
Source
  .single(new CreateTableRequest().withTableName("testTable").toOp)
  .via(client.flow)
  .map(_.getTableDescription.getTableArn)
Java
Source<String, NotUsed> tableArnSource = Source
        .single(new CreateTable(new CreateTableRequest().withTableName("testTable")))
        .via(client.flow())
        .map(result -> (CreateTableResult) result)
        .map(result -> result.getTableDescription().getTableArn());

Some DynamoDB operations, such as Query and Scan, are paginated by nature. You can get a stream of all result pages:

Scala
val scanPages: Source[ScanResult, NotUsed] =
  client.source(new ScanRequest().withTableName("testTable"))
Java
Source<ScanResult, NotUsed> scanPages =
        client.scanAll(new ScanRequest().withTableName("testTable"));

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires DynamoDB running in the background. You can start one quickly using docker:

docker-compose up dynamodb

Scala
sbt
> dynamodb/testOnly *Spec
Java
sbt
> dynamodb/testOnly *Test
The source code for this page can be found here.