AWS S3 Connector

The AWS S3 connector provides Akka Stream sources and sinks to connect to Amazon S3. S3 stands for Simple Storage Service and is an object storage service with a web service interface.

Reported issues

Tagged issues at Github

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.19"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-s3_2.12</artifactId>
  <version>0.19</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-s3_2.12', version: '0.19'
}

Usage

Set up your S3 clients

The S3 connector can be configured within your application.conf file.

Configuration
akka.stream.alpakka.s3 {
  # whether the buffer request chunks (up to 5MB each) to "memory" or "disk"
  buffer = "memory"
  
  # location for temporary files, if buffer is set to "disk". If empty, uses the standard java temp path.
  disk-buffer-path = ""

  proxy {
    # hostname of the proxy. If undefined ("") proxy is not enabled.
    host = ""
    port = 8000

    # if "secure" is set to "true" then HTTPS will be used for all requests to S3, otherwise HTTP will be used
    secure = true
  }

  # default values for AWS configuration. If credentials and/or region are not specified when creating S3Client,
  # these values will be used.
  aws {
    # DEPRECATION WARNING
    # Support for settins directly on this level is left for compatibility.
    # It may be removed in future releases.
    # An attempt will be made to read these if no `aws.credentials.*` params
    # will be provided. If you used these parameters, switch to the new format.
    # access-key-id
    # secret-access-key
    # Equivalent config would be:
    # akka.stream.alpakka.s3.aws.credentials {
    #   provider = static
    #   access-key-id = ${old-access-key-id}
    #   secret-access-key = ${old-secret-access-key}
    # }
    # If this section is absent, the fallback behavior is to use the
    # com.amazonaws.auth.DefaultAWSCredentialsProviderChain instance to resolve credentials
    credentials {
      # supported providers:
      # anon - anonymous requests ("no auth")
      # static - static credentials,
      #   required params:
      #     access-key-id
      #     secret-access-key
      #   optional:
      #     token
      # default: as described in com.amazonaws.auth.DefaultAWSCredentialsProviderChain docs,
      # attempts to get the credentials from either:
      #   - environment variables
      #   - system properties
      #   - credentials file
      #   - EC2 credentials service
      #   - IAM / metadata
      provider = default
    }

    # If this section is absent, the fallback behavior is to use the
    # com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain instance to resolve region
    region {
      # supported providers:
      # static - static credentials,
      #   required params:
      #     default-region
      # default: as described in com.amazonaws.regions.AwsRegionProvider.DefaultAwsRegionProviderChain docs,
      # attempts to get the region from either:
      #   - environment variables
      #   - system properties
      #   - progile file
      #   - EC2 metadata
      provider = default
    }
  }

  # Enable path style access to s3, i.e. "https://s3-eu-west-1.amazonaws.com/my.bucket/myobject"
  # Default is virtual-hosted style.
  # When using virtual hosted–style buckets with SSL, the S3 wild card certificate only matches buckets that do not contain periods.
  # Buckets containing periods will lead to certificate errors. In those cases it's useful to enable path-style access.
  path-style-access = false

  # Custom endpoint url, used for alternate s3 implementations
  # endpoint-url = null

  # Which version of the list bucket api to use. Set to 1 to use the old style version 1 API.
  # By default the newer version 2 api is used.
  list-bucket-api-version = 2
}

Create an S3 client

Scala
val awsCredentialsProvider = new AWSStaticCredentialsProvider(
  new BasicAWSCredentials("my-AWS-access-key-ID", "my-AWS-password")
)
val regionProvider =
  new AwsRegionProvider {
    def getRegion: String = "us-east-1"
  }
val proxy = Option(Proxy("localhost", port, "http"))
val settings =
  new S3Settings(MemoryBufferType, proxy, awsCredentialsProvider, regionProvider, false, None, ListBucketVersion2)
val s3Client = new S3Client(settings)(system, materializer)
Java
private final AWSCredentialsProvider credentials = new AWSStaticCredentialsProvider(
        new BasicAWSCredentials("my-AWS-access-key-ID", "my-AWS-password")
);

private AwsRegionProvider regionProvider(String region) {
    return new AwsRegionProvider() {
        @Override
        public String getRegion() throws SdkClientException {
            return region;
        }
    };
};

private final Proxy proxy = new Proxy("localhost",port(),"http");
private final S3Settings settings = new S3Settings(
        MemoryBufferType.getInstance(),
        Some.apply(proxy),
        credentials,
        regionProvider("us-east-1"),
        false,
        scala.Option.empty(),
        ListBucketVersion2.getInstance()
);
private final S3Client client = new S3Client(settings, system(), materializer);

Storing a file in S3

Scala
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(bucket, bucketKey)
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = s3Client.multipartUpload(bucket, bucketKey)
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] =
  s3Client.multipartUploadWithHeaders(bucket, bucketKey, s3Headers = Some(S3Headers(ServerSideEncryption.AES256)))
Java
final Sink<ByteString, CompletionStage<MultipartUploadResult>> sink = client.multipartUpload(bucket(), bucketKey());
final Sink<ByteString, CompletionStage<MultipartUploadResult>> sink = client.multipartUpload(bucket(), bucketKey(), sseCustomerKeys());

Downloading a file from S3

Scala
val (s3Source: Source[ByteString, _], _) = s3Client.download(bucket, bucketKey)
val (s3Source, _) = s3Client.download(bucket, bucketKey, sse = Some(sseCustomerKeys))
Java
final Pair<Source<ByteString, NotUsed>, CompletionStage<ObjectMetadata>> sourceAndMeta = client.download(bucket(), bucketKey());
final Source<ByteString, NotUsed> source = sourceAndMeta.first();
final Pair<Source<ByteString, NotUsed>, CompletionStage<ObjectMetadata>> sourceAndMeta = client.download(bucket(), bucketKey(), sseCustomerKeys());
final Source<ByteString, NotUsed> source = sourceAndMeta.first();

In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange as argument.

Scala
val (s3Source: Source[ByteString, _], _) =
  s3Client.download(bucket, bucketKey, Some(ByteRange(bytesRangeStart, bytesRangeEnd)))
Java
final Pair<Source<ByteString, NotUsed>, CompletionStage<ObjectMetadata>> sourceAndMeta = client.download(bucket(), bucketKey(),
        ByteRange.createSlice(bytesRangeStart(), bytesRangeEnd()));
final Source<ByteString, NotUsed> source = sourceAndMeta.first();
final Pair<Source<ByteString, NotUsed>, CompletionStage<ObjectMetadata>> sourceAndMeta = client.download(bucket(), bucketKey(),
        ByteRange.createSlice(bytesRangeStart(), bytesRangeEnd()), sseCustomerKeys());
final Source<ByteString, NotUsed> source = sourceAndMeta.first();

Accessing object metadata

When downloading an object you also get the object’s metadata with it. Here’s an example of using this metadata to stream an object back to a client in akka http

val (data, eventualMeta) = s3Client.download(bucket, objectKey)
complete( eventualMeta.map ( meta ⇒
  HttpResponse(
    entity = HttpEntity(
      meta.contentType.flatMap(ContentType.parse(_).toOption).getOrElse(`application/octet-stream`),
      meta.contentLength,
      data
    )
  )
))

List bucket contents

Scala
val keySource: Source[ListBucketResultContents, NotUsed] = s3Client.listBucket(bucket, Some(listPrefix))
Java
final Source<ListBucketResultContents, NotUsed> keySource = client.listBucket(bucket(), Option.apply(listPrefix()));

Copy upload (multi part)

Copy an S3 object from source bucket to target bucket using multi part copy upload.

Scala
val result: Future[MultipartUploadResult] = s3Client.multipartCopy(bucket, bucketKey, targetBucket, targetBucketKey)

// pass the source version
val result: Future[MultipartUploadResult] =
  s3Client.multipartCopy(bucket,
                         bucketKey,
                         targetBucket,
                         targetBucketKey,
                         sourceVersionId = Some("3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo"))
Java
final CompletionStage<MultipartUploadResult> resultCompletionStage = client.multipartCopy(
        bucket, sourceKey,
        targetBucket, targetKey
    );

// pass the source version
String sourceVersionId = "3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo";
final CompletionStage<MultipartUploadResult> resultCompletionStage = client.multipartCopy(
        bucket, sourceKey,
        targetBucket, targetKey,
        sourceVersionId,
        S3Headers.empty(),
        null // encryption
    );

Java examples with custom headers

Java
// setting the encryption to AES256
CompletionStage<MultipartUploadResult> result = client.multipartCopy(
        sourceBucket, sourceKey,
        targetBucket, targetKey,
        S3Headers.empty(),
        ServerSideEncryption.AES256$.MODULE$
    );

// using canned ACL
CompletionStage<MultipartUploadResult> result = client.multipartCopy(
        sourceBucket, sourceKey,
        targetBucket, targetKey,
        S3Headers.apply(CannedAcl.Private$.MODULE$),
        null // encryption
    );

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> s3/test
Java
sbt
> s3/test

Bluemix Cloud Object Storage with S3 API

The Alpakka S3 connector can connect to a range of S3 compatible services. One of them is IBM Bluemix Cloud Object Storage, which supports a dialect of the AWS S3 API. Most functionality provided by the Alpakka S3 connector is compatible with Cloud Object Store, but there are a few limitations, which are listed below.

Connection limitations

  • This S3 connector does not support domain-style access for Cloud Object Store, so only path-style access is supported.
  • Regions in COS are always part of the host/endpoint, therefore leave the s3Region field in S3Settings empty
  • The object proxy, containing host/endpoint, port and scheme, must always be specified.

External references

IBM Cloud Object Storage Documentation

Example

Scala
val host = "s3.eu-geo.objectstorage.softlayer.net"
val port = 443

val credentialsProvider = new AWSStaticCredentialsProvider(
  new BasicAWSCredentials(
    "myAccessKeyId",
    "mySecretAccessKey"
  )
)
val regionProvider = new AwsRegionProvider {
  def getRegion = ""
}
val proxy = Some(Proxy(host, port, "https"))

// Set pathStyleAccess to true and specify proxy, leave region blank
val settings =
  new S3Settings(MemoryBufferType, proxy, credentialsProvider, regionProvider, true, None, ListBucketVersion2)
val s3Client = new S3Client(settings)(system, materializer)
Java
final String host = "s3.eu-geo.objectstorage.softlayer.net";
final int port = 443;


final AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
        new BasicAWSCredentials(
                "myAccessKeyId",
                "mySecretAccessKey"
        )
);
final AwsRegionProvider regionProvider = new AwsRegionProvider() {
    public String getRegion() {
        return "";
    }
};
final Proxy proxy = new Proxy(host, port, "https");

// Set pathStyleAccess to true and specify proxy, leave region blank
final S3Settings settings = new S3Settings(
        MemoryBufferType.getInstance(),
        Some.apply(proxy),
        credentials,
        regionProvider,
        true,
        scala.Option.empty(),
        ListBucketVersion2.getInstance()
);
final S3Client s3Client = new S3Client(settings,system(), mat);
The source code for this page can be found here.