AWS S3 Connector

The AWS S3 connector provides Akka Stream sources and sinks to connect to Amazon S3. S3 stands for Simple Storage Service and is an object storage service with a web service interface.


libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.11"
dependencies {
  compile group: "com.lightbend.akka", name: "akka-stream-alpakka-s3_2.12", version: "0.11"


Set up your S3 clients

The S3 connector can be configured within your application.conf file.

Configuration {
  # whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
  buffer = "memory"
  # location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path.
  disk-buffer-path = ""

  proxy {
    # hostname of the proxy. If undefined ("") proxy is not enabled.
    host = ""
    port = 8000

    # if "secure" is set to "true" then HTTPS will be used for all requests to S3, otherwise HTTP will be used
    secure = true

  # default values for AWS configuration. If credentials and/or region are not specified when creating S3Client,
  # these values will be used.
  aws {
    access-key-id = ""
    secret-access-key = ""
    default-region = "us-west-2"

  # Enable path style access to s3, i.e. ""
  # Default is virtual-hosted style.
  # When using virtual hosted–style buckets with SSL, the S3 wild card certificate only matches buckets that do not contain periods.
  # Buckets containing periods will lead to certificate errors. In those cases it's useful to enable path-style access.
  path-style-access = false

Create an S3 client

val awsCredentials = AWSCredentials(accessKeyId = "my-AWS-access-key-ID", secretAccessKey = "my-AWS-password")
val proxy = Option(Proxy("localhost", port, "http"))
val settings = new S3Settings(MemoryBufferType, "", proxy, awsCredentials, "us-east-1", false)
val s3Client = new S3Client(settings)(system, materializer)
final AWSCredentials credentials = new BasicCredentials("my-AWS-access-key-ID", "my-AWS-password");
final Proxy proxy = new Proxy("localhost",port(),"http");
final S3Settings settings = new S3Settings(MemoryBufferType.getInstance(),"", Some.apply(proxy),credentials,"us-east-1",false);
final S3Client client = new S3Client(settings, system(), materializer);

Storing a file in S3

val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(bucket, bucketKey)
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
  s3Client.multipartUploadWithHeaders(bucket, bucketKey, s3Headers = Some(S3Headers(ServerSideEncryption.AES256)))
final Sink<ByteString, CompletionStage<MultipartUploadResult>> sink = client.multipartUpload(bucket(), bucketKey());

Downloading a file from S3

val s3Source: Source[ByteString, NotUsed] =, bucketKey)
final Source<ByteString, NotUsed> source =, bucketKey());

In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange as argument.

val s3Source: Source[ByteString, NotUsed] =, bucketKey, ByteRange(bytesRangeStart, bytesRangeEnd))
final Source<ByteString, NotUsed> source =, bucketKey(),
        ByteRange.createSlice(bytesRangeStart(), bytesRangeEnd()));

List bucket contents

val keySource: Source[ListBucketResultContents, NotUsed] = s3Client.listBucket(bucket, Some(listPrefix))
final Source<ListBucketResultContents, NotUsed> keySource = client.listBucket(bucket(), Option.apply(listPrefix()));

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

> s3/test
> s3/test
The source code for this page can be found here.