AWS S3 Connector

The AWS S3 connector provides Akka Stream sources and sinks to connect to Amazon S3. S3 stands for Simple Storage Service and is an object storage service with a web service interface.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.14"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-s3_2.12</artifactId>
  <version>0.14</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-s3_2.12', version: '0.14'
}

Usage

Set up your S3 clients

The S3 connector can be configured within your application.conf file.

Configuration
akka.stream.alpakka.s3 {
  # whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
  buffer = "memory"
  
  # location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path.
  disk-buffer-path = ""

  proxy {
    # hostname of the proxy. If undefined ("") proxy is not enabled.
    host = ""
    port = 8000

    # if "secure" is set to "true" then HTTPS will be used for all requests to S3, otherwise HTTP will be used
    secure = true
  }

  # default values for AWS configuration. If credentials and/or region are not specified when creating S3Client,
  # these values will be used.
  aws {
    # DEPRECATION WARNING
    # Support for settins directly on this level is left for compatibility.
    # It may be removed in future releases.
    # An attempt will be made to read these if no `aws.credentials.*` params
    # will be provided. If you used these parameters, switch to the new format.
    # access-key-id
    # secret-access-key
    # Equivalent config would be:
    # akka.stream.alpakka.s3.aws.credentials {
    #   type = static
    #   access-key-id = ${old-access-key-id}
    #   secret-access-key = ${old-secret-access-key}
    # }
    # If this section is absent, the fallback behavior is to use the
    # com.amazonaws.auth.DefaultAWSCredentialsProviderChain instance to resolve credentials
    credentials {
      # supported types:
      # anon - anonymous requests ("no auth")
      # static - static credentials,
      #   required params:
      #     access-key-id
      #     secret-access-key
      #   optional:
      #     token
      # default: as described in com.amazonaws.auth.DefaultAWSCredentialsProviderChain docs,
      # attempts to get the credentials from either:
      #   - environment variables
      #   - system properties
      #   - credentials file
      #   - EC2 credentials service
      #   - IAM / metadata
      provider = anon
    }
    default-region = "us-west-2"
  }

  # Enable path style access to s3, i.e. "https://s3-eu-west-1.amazonaws.com/my.bucket/myobject"
  # Default is virtual-hosted style.
  # When using virtual hosted–style buckets with SSL, the S3 wild card certificate only matches buckets that do not contain periods.
  # Buckets containing periods will lead to certificate errors. In those cases it's useful to enable path-style access.
  path-style-access = false
}

Create an S3 client

Scala
val awsCredentials = new AWSStaticCredentialsProvider(
  new BasicAWSCredentials("my-AWS-access-key-ID", "my-AWS-password")
)
val proxy = Option(Proxy("localhost", port, "http"))
val settings = new S3Settings(MemoryBufferType, proxy, awsCredentials, "us-east-1", false)
val s3Client = new S3Client(settings)(system, materializer)
Java
private final AWSCredentialsProvider credentials = new AWSStaticCredentialsProvider(
        new BasicAWSCredentials("my-AWS-access-key-ID", "my-AWS-password")
);

private final Proxy proxy = new Proxy("localhost",port(),"http");
private final S3Settings settings = new S3Settings(
        MemoryBufferType.getInstance(),
        Some.apply(proxy),
        credentials,
        "us-east-1",
        false
);
private final S3Client client = new S3Client(settings, system(), materializer);

Storing a file in S3

Scala
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(bucket, bucketKey)
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
  s3Client.multipartUploadWithHeaders(bucket, bucketKey, s3Headers = Some(S3Headers(ServerSideEncryption.AES256)))
Java
final Sink<ByteString, CompletionStage<MultipartUploadResult>> sink = client.multipartUpload(bucket(), bucketKey());

Downloading a file from S3

Scala
val s3Source: Source[ByteString, NotUsed] = s3Client.download(bucket, bucketKey)
Java
final Source<ByteString, NotUsed> source = client.download(bucket(), bucketKey());

In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange as argument.

Scala
val s3Source: Source[ByteString, NotUsed] =
  s3Client.download(bucket, bucketKey, ByteRange(bytesRangeStart, bytesRangeEnd))
Java
final Source<ByteString, NotUsed> source = client.download(bucket(), bucketKey(),
        ByteRange.createSlice(bytesRangeStart(), bytesRangeEnd()));

List bucket contents

Scala
val keySource: Source[ListBucketResultContents, NotUsed] = s3Client.listBucket(bucket, Some(listPrefix))
Java
final Source<ListBucketResultContents, NotUsed> keySource = client.listBucket(bucket(), Option.apply(listPrefix()));

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> s3/test
Java
sbt
> s3/test

Bluemix Cloud Object Storage with S3 API

The Alpakka S3 connector can connect to a range of S3 compatible services. One of them is IBM Bluemix Cloud Object Storage, which supports a dialect of the AWS S3 API. Most functionality provided by the Alpakka S3 connector is compatible with Cloud Object Store, but there are a few limitations, which are listed below.

Connection limitations

  • This S3 connector does not support domain-style access for Cloud Object Store, so only path-style access is supported.
  • Regions in COS are always part of the host/endpoint, therefore leave the s3Region field in S3Settings empty
  • The object proxy, containing host/endpoint, port and scheme, must always be specified.

External references

IBM Cloud Object Storage Documentation

Example

Scala
val host = "s3.eu-geo.objectstorage.softlayer.net"
val port = 443

val credentials = new AWSStaticCredentialsProvider(
  new BasicAWSCredentials(
    "myAccessKeyId",
    "mySecretAccessKey"
  )
)
val proxy = Some(Proxy(host, port, "https"))

// Set pathStyleAccess to true and specify proxy, leave region blank
val settings = new S3Settings(MemoryBufferType, proxy, credentials, "", true)
val s3Client = new S3Client(settings)(system, materializer)
Java
final String host = "s3.eu-geo.objectstorage.softlayer.net";
final int port = 443;


final AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
        new BasicAWSCredentials(
                "myAccessKeyId",
                "mySecretAccessKey"
        )
);
final Proxy proxy = new Proxy(host, port, "https");

// Set pathStyleAccess to true and specify proxy, leave region blank
final S3Settings settings = new S3Settings(
        MemoryBufferType.getInstance(),
        Some.apply(proxy),
        credentials,
        "",
        true
);
final S3Client s3Client = new S3Client(settings,system(), mat);
The source code for this page can be found here.