AWS SQS

The AWS SQS connector provides Akka Stream sources and sinks for AWS SQS queues.

For more information about AWS SQS please visit the official documentation.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "0.20"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-sqs_2.12</artifactId>
  <version>0.20</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-sqs_2.12', version: '0.20'
}

Setup

Prepare an ActorSystem and a Materializer.

Scala
implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()
Full source at GitHub
Java
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
Full source at GitHub

This connector requires an implicit AmazonSQSAsync instance to communicate with AWS SQS.

It is your code’s responsibility to call shutdown to free any resources held by the client. In this example it will be called when the actor system is terminated.

Scala
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder}

val credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
implicit val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder
  .standard()
  .withCredentials(credentialsProvider)
  .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1"))
  .build()
system.registerOnTermination(awsSqsClient.shutdown())
Full source at GitHub
Java
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;

AWSCredentialsProvider credentialsProvider =
    new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"));
AmazonSQSAsync awsSqsClient =
    AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withEndpointConfiguration(
            new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, "eu-central-1"))
        .build();
system.registerOnTermination(() -> awsSqsClient.shutdown());
Full source at GitHub

Read from an SQS queue

The SqsSourceSqsSource created source reads AWS Java SDK SQS Message objects from any SQS queue given by the queue URL.

Scala
SqsSource(queue, sqsSourceSettings)
  .take(100)
  .runWith(Sink.seq)
Full source at GitHub
Java
final CompletionStage<String> cs =
    SqsSource.create(queueUrl, sqsSourceSettings, sqsClient)
        .map(Message::getBody)
        .runWith(Sink.head(), materializer);
final CompletionStage<String> cs =
    SqsSource.create(queueUrl, sqsSourceSettings, customSqsClient)
        .map(Message::getBody)
        .take(1)
        .runWith(Sink.head(), materializer);
Full source at GitHub

We take the first 100 elements from the stream. The reason for this is, that reading messages from SQS queues never finishes because there is no direct way to determine the end of a queue.

Source configuration

Scala
val settings = SqsSourceSettings.Defaults
  .withWaitTimeSeconds(20)
  .withMaxBufferSize(100)
  .withMaxBatchSize(10)
  .withAttributes(SenderId, SentTimestamp)
  .withMessageAttributes(MessageAttributeName.create("bar.*"))
  .withCloseOnEmptyReceive
Full source at GitHub
Java
SqsSourceSettings settings =
    SqsSourceSettings.Defaults()
        .withWaitTimeSeconds(20)
        .withMaxBufferSize(100)
        .withMaxBatchSize(10)
        .withAttributes(Attribute.senderId(), Attribute.sentTimestamp())
        .withMessageAttributes(MessageAttributeName.create("bar.*"))
        .withCloseOnEmptyReceive();
Full source at GitHub

Options:

  • maxBatchSize - the maximum number of messages to return (see MaxNumberOfMessages in AWS docs). Default: 10
  • maxBufferSize - internal buffer size used by the Source. Default: 100 messages
  • waitTimeSeconds - the duration for which the call waits for a message to arrive in the queue before returning (see WaitTimeSeconds in AWS docs). Default: 20 seconds
  • closeOnEmptyReceive - the shutdown behavior of the Source. Default: false

More details are available in the AWS SQS Receive Message documentation.

An SqsSource can either provide an infinite stream of messages (the default), or can drain its source queue until no further messages are available. The latter behaviour is enabled by setting the closeOnEmptyReceive flag on creation. If set, the Source will receive messages until it encounters an empty reply from the server. It then continues to emit any remaining messages in its local buffer. The stage will complete once the last message has been send downstream.

Note that for short-polling (waitTimeSeconds of 0), SQS may respond with an empty reply even if there are still messages in the queue. This behavior can be prevented by switching to long-polling (by setting waitTimeSeconds to a nonzero value).

Be aware that the SqsSource runs multiple requests to Amazon SQS in parallel. The maximum number of concurrent requests is limited by parallelism = maxBufferSize / maxBatchSize. E.g.: By default maxBatchSize is set to 10 and maxBufferSize is set to 100 so at the maximum, SqsSource will run 10 concurrent requests to Amazon SQS. AmazonSQSAsyncClient uses a fixed thread pool with 50 threads by default. To tune the thread pool used by AmazonSQSAsyncClient you can supply a custom ExecutorService on client creation.

Scala
val credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
implicit val customSqsClient: AmazonSQSAsync =
  AmazonSQSAsyncClientBuilder
    .standard()
    .withCredentials(credentialsProvider)
    .withExecutorFactory(new ExecutorFactory {
      override def newExecutor() = Executors.newFixedThreadPool(10)
    })
    .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1"))
    .build()
Full source at GitHub
Java
AWSCredentialsProvider credentialsProvider =
    new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"));

AmazonSQSAsync customSqsClient =
    AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withExecutorFactory(() -> Executors.newFixedThreadPool(10))
        .withEndpointConfiguration(
            new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, "eu-central-1"))
        .build();
Full source at GitHub

Please make sure to configure a big enough thread pool to avoid resource starvation. This is especially important if you share the client between multiple Sources, Sinks and Flows. For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink) must be less than or equal to the thread pool size.

Write to an SQS queue

Create a sink, that forwards String to the SQS queue.

Scala
Source
  .single("alpakka")
  .runWith(SqsSink(queue))
Full source at GitHub
Java
CompletionStage<Done> done =
    Source.single("alpakka").runWith(SqsSink.create(queueUrl, sqsClient), materializer);
Full source at GitHub

Create a sink, that forwards SendMessageRequest to the SQS queue.

Scala
Source
  .single(new SendMessageRequest().withMessageBody("alpakka"))
  .runWith(SqsSink.messageSink(queue))
Full source at GitHub
Java
CompletionStage<Done> done =
    Source.single(new SendMessageRequest().withMessageBody("alpakka"))
        .runWith(SqsSink.messageSink(queueUrl, sqsClient), materializer);
Full source at GitHub

Write batches to an SQS queue

Create a sink, that forwards String to the SQS queue. However, the main difference from the previous use case, it batches items and sends as a one request.

Note: There is also another option to send batch of messages to SQS which is using AmazonSQSBufferedAsyncClient. This client buffers SendMessageRequests under the hood and sends them as a batch instead of sending them one by one. However, beware that AmazonSQSBufferedAsyncClient does not support FIFO Queues. See documentation for client-side buffering.

Scala
val messages = for (i <- 0 until 20) yield s"Message - $i"

val future = Source(messages)
  .runWith(SqsSink.grouped(queue))
Full source at GitHub
Java
ArrayList<String> messagesToSend = new ArrayList<>();
for (int i = 0; i < 20; i++) {
  messagesToSend.add("message - " + i);
}

CompletionStage<Done> done =
    Source.from(messagesToSend).runWith(SqsSink.grouped(queueUrl, sqsClient), materializer);
Full source at GitHub

Batch configuration

Scala
val batchSettings =
  SqsBatchFlowSettings.Defaults
    .withMaxBatchSize(10)
    .withMaxBatchWait(500.millis)
    .withConcurrentRequests(1)
Full source at GitHub
Java
SqsBatchFlowSettings batchSettings =
    SqsBatchFlowSettings.Defaults()
        .withMaxBatchSize(10)
        .withMaxBatchWait(500, TimeUnit.MILLISECONDS)
        .withConcurrentRequests(1);
Full source at GitHub

Options:

  • maxBatchSize - the maximum number of messages in batch to send SQS. Default: 10.
  • maxBatchWait - the maximum duration for which the stage waits until maxBatchSize messages arrived. Sends what is collects at the end of the time period even though the maxBatchSize is not fulfilled. Default: 500 milliseconds
  • concurrentRequests - the number of batches sending to SQS concurrently.

Write sequences as batches to an SQS queue

Create a sink, that forwards Seq[String] to the SQS queue.

Be aware that the size of the batch must be less than or equal to 10 because Amazon SQS has a limit for batch request. If the batch has more than 10 entries, the request will fail.

Scala
val messages = for (i <- 0 until 10) yield s"Message - $i"

val future = Source
  .single(messages)
  .runWith(SqsSink.batch(queue))
Full source at GitHub
Java
ArrayList<String> messagesToSend = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  messagesToSend.add("Message - " + i);
}
Iterable<String> it = messagesToSend;

CompletionStage<Done> done =
    Source.single(it).runWith(SqsSink.batch(queueUrl, sqsClient), materializer);
Full source at GitHub

Create a sink, that forwards Seq[SendMessageRequest] to the SQS queue.

Be aware that the size of the batch must be less than or equal to 10 because Amazon SQS has a limit for batch request. If the batch has more than 10 entries, the request will fail.

Scala
val messages = for (i <- 0 until 10) yield new SendMessageRequest().withMessageBody(s"Message - $i")

val future = Source
  .single(messages)
  .runWith(SqsSink.batchedMessageSink(queue))
Full source at GitHub
Java
ArrayList<SendMessageRequest> messagesToSend = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  messagesToSend.add(new SendMessageRequest().withMessageBody("Message - " + i));
}
Iterable<SendMessageRequest> it = messagesToSend;

CompletionStage<Done> done =
    Source.single(it).runWith(SqsSink.batchedMessageSink(queueUrl, sqsClient), materializer);
Full source at GitHub

Sink configuration

Scala
val sinkSettings =
  SqsSinkSettings.Defaults
    .withMaxInFlight(10)
Full source at GitHub
Java
SqsSinkSettings sinkSettings = SqsSinkSettings.Defaults().withMaxInFlight(10);
Full source at GitHub

Options:

  • maxInFlight - maximum number of messages being processed by AmazonSQSAsync at the same time. Default: 10

Message processing with acknowledgement

SqsAckSink provides possibility to acknowledge (delete), ignore, or postpone a message.

Your flow must decide which action to take and push it with message:

  • Delete - delete message from the queue
  • Ignore - ignore the message and let it reappear in the queue after visibility timeout
  • ChangeMessageVisibility(visibilityTimeout: Int) - can be used to postpone a message, or make the message immediately visible to other consumers. See official documentation for more details.

Acknowledge (delete) messages:

Scala
SqsSource(queue)
  .take(1)
  .map { m: Message =>
    (m, MessageAction.Delete)
  }
  .runWith(SqsAckSink(queue))
Full source at GitHub
Java
Tuple2<Message, MessageAction> pair =
    new Tuple2<>(new Message().withBody("test"), MessageAction.delete());
CompletionStage<Done> done =
    Source.single(pair).runWith(SqsAckSink.create(queueUrl, awsClient), materializer);
Full source at GitHub

Ignore messages:

Scala
SqsSource(queue)
  .take(1)
  .map { m: Message =>
    (m, MessageAction.Ignore)
  }
  .via(SqsAckFlow(queue))
Full source at GitHub
Java
Tuple2<Message, MessageAction> pair =
    new Tuple2<>(new Message().withBody("test"), MessageAction.ignore());
CompletionStage<AckResult> stage =
    Source.single(pair)
        .via(SqsAckFlow.create(queueUrl, awsClient))
        .runWith(Sink.head(), materializer);
Full source at GitHub

Change Visibility Timeout of messages:

Scala
val future = SqsSource(queue)
  .take(1)
  .map { m: Message =>
    (m, MessageAction.ChangeMessageVisibility(5))
  }
  .runWith(SqsAckSink(queue))
Full source at GitHub
Java
Tuple2<Message, MessageAction> pair =
    new Tuple2<>(new Message().withBody("test"), MessageAction.changeMessageVisibility(12));
CompletionStage<Done> done =
    Source.single(pair).runWith(SqsAckSink.create(queueUrl, awsClient), materializer);
Full source at GitHub

SqsAckSink configuration

Same as the normal SqsSink:

Scala
val sinkSettings =
  SqsAckSinkSettings.Defaults
    .withMaxInFlight(10)
Full source at GitHub
Java
SqsAckSinkSettings sinkSettings = SqsAckSinkSettings.Defaults().withMaxInFlight(10);
Full source at GitHub

Options:

  • maxInFlight - maximum number of messages being processed by AmazonSQSAsync at the same time. Default: 10

Message processing with acknowledgement with batching

SqsAckFlow.grouped is a flow that can acknowledge (delete), ignore, or postpone messages, but it batches items and sends them as one request per action.

Acknowledge (delete) messages:

Scala
SqsSource(queue)
  .take(10)
  .map { m: Message =>
    (m, MessageAction.Delete)
  }
  .via(SqsAckFlow.grouped(queue, SqsBatchAckFlowSettings.Defaults))
  .runWith(Sink.ignore)
Full source at GitHub
Java
List<Tuple2<Message, MessageAction>> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  messages.add(new Tuple2<>(new Message().withBody("test"), MessageAction.delete()));
}
CompletionStage<Done> done =
    Source.fromIterator(() -> messages.iterator())
        .via(SqsAckFlow.grouped(queueUrl, awsClient))
        .runWith(Sink.ignore(), materializer);
Full source at GitHub

Ignore messages:

Scala
Source(messages)
  .take(10)
  .map { m: Message =>
    (m, MessageAction.Ignore)
  }
  .via(SqsAckFlow.grouped("queue", SqsBatchAckFlowSettings.Defaults))
Full source at GitHub
Java
List<Tuple2<Message, MessageAction>> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  messages.add(new Tuple2<>(new Message().withBody("test"), MessageAction.ignore()));
}
CompletionStage<List<AckResult>> stage =
    Source.fromIterator(() -> messages.iterator())
        .via(SqsAckFlow.grouped(queueUrl, awsClient))
        .runWith(Sink.seq(), materializer);
Full source at GitHub

Change Visibility Timeout of messages:

Scala
SqsSource(queue)
  .take(10)
  .map { m: Message =>
    (m, MessageAction.ChangeMessageVisibility(5))
  }
  .via(SqsAckFlow.grouped(queue, SqsBatchAckFlowSettings.Defaults))
  .runWith(Sink.ignore)
Full source at GitHub
Java
List<Tuple2<Message, MessageAction>> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  messages.add(
      new Tuple2<>(new Message().withBody("test"), MessageAction.changeMessageVisibility(5)));
}
CompletionStage<Done> done =
    Source.fromIterator(() -> messages.iterator())
        .via(SqsAckFlow.grouped(queueUrl, awsClient))
        .runWith(Sink.ignore(), materializer);
Full source at GitHub

Batch configuration

Scala
val batchSettings =
  SqsBatchAckFlowSettings.Defaults
    .withMaxBatchSize(10)
    .withMaxBatchWait(500.millis)
    .withConcurrentRequests(1)
Full source at GitHub
Java
SqsBatchAckFlowSettings flowSettings =
    SqsBatchAckFlowSettings.Defaults()
        .withMaxBatchSize(10)
        .withMaxBatchWait(500, TimeUnit.MILLISECONDS)
        .withConcurrentRequests(1);
Full source at GitHub

Options:

  • maxBatchSize - the maximum number of messages in batch to send SQS. Default: 10.
  • maxBatchWait - the maximum duration for which the stage waits until maxBatchSize messages arrived. Sends what is collects at the end of the time period even though the maxBatchSize is not fulfilled. Default: 500 milliseconds
  • concurrentRequests - the number of batches sending to SQS concurrently.

Using SQS as a Flow

You can also build flow stages which put or acknowledge messages in SQS, backpressure on queue response and then forward responses further down the stream. The API is similar to creating Sinks.

Scala
Source
  .single(new SendMessageRequest().withMessageBody("alpakka"))
  .via(SqsFlow(queue))
  .runWith(Sink.foreach(result => println(result.message)))
Full source at GitHub
Java
CompletionStage<Done> done =
    Source.single(new SendMessageRequest(queueUrl, "alpakka-flow"))
        .via(SqsFlow.create(queueUrl, sqsClient))
        .runWith(Sink.ignore(), materializer);
Full source at GitHub

With Ack:

Scala
SqsSource(queue)
  .take(1)
  .map { m: Message =>
    (m, MessageAction.Delete)
  }
  .via(SqsAckFlow(queue))
  .runWith(Sink.ignore)
Full source at GitHub
Java
Tuple2<Message, MessageAction> pair =
    new Tuple2<>(new Message().withBody("test-ack-flow"), MessageAction.delete());
CompletionStage<Done> done =
    Source.single(pair)
        .via(SqsAckFlow.create(queueUrl, awsClient))
        .runWith(Sink.ignore(), materializer);
Full source at GitHub

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

The test code uses embedded ElasticMQ as queuing service which serves an AWS SQS compatible API.

Scala
sbt 'project sqs' test
Java
sbt 'project sqs' test
The source code for this page can be found here.