AWS SQS Connector

The AWS SQS connector allows to stream SQS Message from a AWS SQS queue. For more information about AWS SQS please visit the official docmentation.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.3"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-sqs_2.11</artifactId>
  <version>0.3</version>
</dependency>
Gradle
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-sqs_2.11", version: "0.3"
}

Usage

Sources provided by this connector need a prepared AmazonSQSAsyncClient to load messages from a queue.

Scala
val credentials = new BasicAWSCredentials("x", "x")
implicit val sqsClient: AmazonSQSAsyncClient =
  new AmazonSQSAsyncClient(credentials).withEndpoint("http://localhost:9324")
Java
credentials = new BasicAWSCredentials("x", "x");
sqsClient = new AmazonSQSAsyncClient(credentials)
    .withEndpoint("http://localhost:9324");

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);

This is all preparation that we are going to need.

Now we can stream AWS Java SDK SQS Message objects from any SQS queue where we have access to by providing the queue URL to the SqsSource factory.

Scala
SqsSource(queue).take(100).runWith(Sink.seq).map(_ should have size 100)
Java
final CompletionStage<List<String>> cs = SqsSource.create(queueUrl, sqsClient)
    .map(m -> m.getBody())
    .take(100)
    .runWith(Sink.seq(), materializer);

As you have seen we take the first 100 elements from the stream. The reason for this is, that reading messages from SQS queues never finishes because there is no direct way to determine the end of a queue.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

The test code uses ElasticMQ as queuing service which serves an AWS SQS compatible API. You can start one quickly using docker:

docker run -p 9324:9324 -d expert360/elasticmq

Scala
sbt
> sqs/testOnly *.SqsSourceSpec
Java
sbt
> sqs/testOnly *.SqsSourceTest