Google Cloud Storage
Google Cloud Storage allows world-wide storage and retrieval of any amount of data at any time.
Further information at the official Google Cloud Storage documentation website. This connector communicates to Cloud Storage via HTTP requests.
Project Info: Alpakka Google Cloud Storage | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-google-cloud-storage
2.0.2
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.12.11, 2.13.3 |
JPMS module name | akka.stream.alpakka.google.cloud.storage |
License | |
Readiness level |
Since 1.1.0, 2019-07-03
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
val AkkaVersion = "2.5.31" val AkkaHttpVersion = "10.1.11" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-storage" % "2.0.2", "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion )
- Maven
<properties> <akka.version>2.5.31</akka.version> <akka.http.version>10.1.11</akka.http.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-google-cloud-storage_${scala.binary.version}</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http-spray-json_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency>
- Gradle
versions += [ AkkaVersion: "2.5.31", AkkaHttpVersion: "10.1.11", ScalaBinary: "2.12" ] dependencies { compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-google-cloud-storage_${versions.ScalaBinary}", version: '2.0.2', compile group: 'com.typesafe.akka', name: "akka-stream_${versions.ScalaBinary}", version: versions.AkkaVersion, compile group: 'com.typesafe.akka', name: "akka-http_${versions.ScalaBinary}", version: versions.AkkaHttpVersion, compile group: 'com.typesafe.akka', name: "akka-http-spray-json_${versions.ScalaBinary}", version: versions.AkkaHttpVersion }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.pauldijou jwt-core_2.12 3.0.1 com.typesafe.akka akka-http-spray-json_2.12 10.1.11 com.typesafe.akka akka-http_2.12 10.1.11 com.typesafe.akka akka-stream_2.12 2.5.31 org.scala-lang scala-library 2.12.11 - Dependency tree
com.pauldijou jwt-core_2.12 3.0.1 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-http-spray-json_2.12 10.1.11 com.typesafe.akka akka-http_2.12 10.1.11 com.typesafe.akka akka-http-core_2.12 10.1.11 com.typesafe.akka akka-parsing_2.12 10.1.11 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 io.spray spray-json_2.12 1.3.5 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-http_2.12 10.1.11 com.typesafe.akka akka-http-core_2.12 10.1.11 com.typesafe.akka akka-parsing_2.12 10.1.11 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-stream_2.12 2.5.31 com.typesafe.akka akka-actor_2.12 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-protobuf_2.12 2.5.31 org.scala-lang scala-library 2.12.11 com.typesafe ssl-config-core_2.12 0.3.8 com.typesafe config 1.3.3 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11
Configuration
The settings for the Google Cloud Storage connector are read by default from alpakka.googlecloud.storage
configuration section. If you use a non-standard configuration path or need multiple different configurations, please refer to the attributes section below to see how to apply different configuration to different parts of the stream. You’ll first need to prepare your credentials for access to google cloud storage. All of the available configuration settings can be found in the application.conf.
- HOCON
-
privateKey ="""-----BEGIN PRIVATE KEY----- MIICeAIBADANBgkqhkiG9w0BAQEFAASCAmIwggJeAgEAAoGBAMwkmdwrWp+LLlsf bVE+neFjZtUNuaD4/tpQ2UIh2u+qU6sr4bG8PPuqSdrt5b0/0vfMZA11mQWmKpg5 PK98kEkhbSvC08fG0TtpR9+vflghOuuvcw6kCniwNbHlOXnE8DwtKQp1DbTUPzMD hhsIjJaUtv19Xk7gh4MqYgANTm6lAgMBAAECgYEAwBXIeHSKxwiNS8ycbg//Oq7v eZV6j077bq0YYLO+cDjSlYOq0DSRJTSsXcXvoE1H00aM9mUq4TfjaGyi/3SzxYsr rSzu/qpYC58MJsnprIjlLgFZmZGe5MOSoul/u6JsBTJGkYPV0xGrtXJY103aSYzC xthpY0BHy9eO9I/pNlkCQQD/64g4INAiBdM4R5iONQvh8LLvqbb8Bw4vVwVFFnAr YHcomxtT9TunMad6KPgbOCd/fTttDADrv54htBrFGXeXAkEAzDTtisPKXPByJnUd jKO2oOg0Fs9IjGeWbnkrsN9j0134ldARE+WbT5S8G5EFo+bQi4ffU3+Y/4ly6Amm OAAzIwJBANV2GAD5HaHDShK/ZTf4dxjWM+pDnSVKnUJPS039EUKdC8cK2RiGjGNA v3jdg1Tw2cE1K8QhJwN8qOFj4JBWVbECQQCwcntej9bnf4vi1wd1YnCHkJyRqQIS 7974DhNGfYAQPv5w1JwtCRSuKuJvH1w0R1ijd//scjCNfQKgpNXPRbzpAkAQ8MFA MLpOLGqezUQthJWmVtnXEXaAlb3yFSRTZQVEselObiIc6EvYzNXv780IDT4pyKjg 8DS9i5jJDIVWr7mA -----END PRIVATE KEY----- """ privateKey = ${?GC_STORAGE_PRIVATE_KEY} alpakka.google.cloud.storage { project-id = "projectId" client-email = "[email protected]" private-key = ${privateKey} base-url = "https://www.googleapis.com/" // default base-path = "/storage/v1" // default token-url = "https://www.googleapis.com/oauth2/v4/token" // default token-scope = "https://www.googleapis.com/auth/devstorage.read_write" // default }
Store a file in Google Cloud Storage
A file can be uploaded to Google Cloud Storage by creating a source of ByteString
ByteString
and running that with a sink created from GCStorage.resumableUpload
GCStorage.resumableUpload
.
- Scala
-
val sink = GCStorage.resumableUpload(bucketName, fileName, ContentTypes.`text/plain(UTF-8)`, chunkSize) val source = Source( List(ByteString(firstChunkContent), ByteString(secondChunkContent)) ) val result: Future[StorageObject] = source.runWith(sink)
- Java
-
final Sink<ByteString, CompletionStage<StorageObject>> sink = GCStorage.resumableUpload( bucketName(), fileName(), ContentTypes.TEXT_PLAIN_UTF8, chunkSize); final Source<ByteString, NotUsed> source = Source.from( Lists.newArrayList( ByteString.fromString(firstChunkContent), ByteString.fromString(secondChunkContent))); final CompletionStage<StorageObject> result = source.runWith(sink, materializer);
Download a file from Google Cloud Storage
A source for downloading a file can be created by calling GCStorage.download
GCStorage.download
. It will emit an Option
Optional
that will hold file’s data or will be empty if no such file can be found.
If you need to download the specific version of the object in a bucket where object versioning is enabled, you can specify the generation
.
- Scala
-
val downloadSource: Source[Option[Source[ByteString, NotUsed]], NotUsed] = GCStorage.download(bucketName, fileName) val downloadGenerationSource: Source[Option[Source[ByteString, NotUsed]], NotUsed] = GCStorage.download(bucketName, fileName, Some(generation)) - Java
-
final Source<Optional<Source<ByteString, NotUsed>>, NotUsed> downloadSource = GCStorage.download(bucketName(), fileName()); final Source<Optional<Source<ByteString, NotUsed>>, NotUsed> downloadGenerationSource = GCStorage.download(bucketName(), fileName(), generation());
Access object metadata without downloading object from Google Cloud Storage
If you do not need object itself, you can query for only object metadata using a source from GCStorage.getObject
GCStorage.getObject
.
If you need the specific version of the object metadata in a bucket where object versioning is enabled, you can specify the generation
.
- Scala
-
val getObjectSource: Source[Option[StorageObject], NotUsed] = GCStorage.getObject(bucketName, fileName) val getObjectGenerationSource: Source[Option[StorageObject], NotUsed] = GCStorage.getObject(bucketName, fileName, Some(generation)) - Java
-
final Source<Optional<StorageObject>, NotUsed> getObjectSource = GCStorage.getObject(bucketName(), fileName()); final Source<Optional<StorageObject>, NotUsed> getObjectGenerationSource = GCStorage.getObject(bucketName(), fileName(), generation());
List bucket contents
To get a list of all objects in a bucket, use GCStorage.listBucket
GCStorage.listBucket
. When run, this will give a stream of StorageObject
.
To get a list of both live and archived versions of all objects in a bucket where object versioning is enabled, the versions
has to be set to true
- Scala
-
val listSource: Source[StorageObject, NotUsed] = GCStorage.listBucket(bucketName, Some(folder)) val listVersionsSource: Source[StorageObject, NotUsed] = GCStorage.listBucket(bucketName, Some(folder), versions) - Java
-
final Source<StorageObject, NotUsed> listSource = GCStorage.listBucket(bucketName(), folder); final Source<StorageObject, NotUsed> listVersionsSource = GCStorage.listBucket(bucketName(), folder, versions);
Rewrite (multi part)
Copy an Google Clouds Storage object from source bucket to target bucket using GCStorage.rewrite
GCStorage.rewrite
. When run, this will emit a single StorageObject
with the information about the copied object.
- Scala
-
val result: Future[StorageObject] = GCStorage.rewrite(bucketName, fileName, rewriteBucketName, fileName).run - Java
-
final CompletionStage<StorageObject> result = GCStorage.rewrite(bucketName(), fileName(), rewriteBucketName, fileName()) .run(materializer);
Apply Google Cloud Storage settings to a part of the stream
It is possible to make one part of the stream use different GCStorageSettings
from the rest of the graph. This can be useful, when one stream is used to copy files across regions with different service accounts. You can attach a custom GCStorageSettings
instance or a custom config path to a graph using attributes from GCStorageAttributes
:
- Scala
-
val newBasePathSettings = GCStorageExt(this.system).settings.withBasePath("/storage/v1") val listSource: Source[StorageObject, NotUsed] = GCStorage.listBucket(bucketName, None).withAttributes(GCStorageAttributes.settings(newBasePathSettings)) - Java
-
final GCStorageSettings newBasePathSettings = GCStorageExt.get(this.system()).settings().withBasePath("/storage/v1"); final Source<StorageObject, NotUsed> listSource = GCStorage.listBucket(bucketName()) .withAttributes(GCStorageAttributes.settings(newBasePathSettings));
Bucket management
Bucket management API provides functionality for both Sources and Futures / CompletionStages. In case of the Future API user can specify attributes to the request in the method itself and as for Sources it can be done via method .withAttributes
. For more information about attributes see: GCStorageAttributes
and Attributes
Make bucket
In order to create a bucket in Google Cloud Storage you need to specify it’s unique name. This value has to be set accordingly to the requirements. The bucket will be created in the given location.
- Scala
-
implicit val sampleAttributes: Attributes = GCStorageAttributes.settings(sampleSettings) val createBucketResponse: Future[Bucket] = GCStorage.createBucket(bucketName, location) val createBucketSourceResponse: Source[Bucket, NotUsed] = GCStorage.createBucketSource(bucketName, location) - Java
-
final Attributes sampleAttributes = GCStorageAttributes.settings(sampleSettings); final CompletionStage<Bucket> createBucketResponse = GCStorage.createBucket(bucketName(), location, materializer, sampleAttributes); final Source<Bucket, NotUsed> createBucketSourceResponse = GCStorage.createBucketSource(bucketName(), location);
Delete bucket
To delete a bucket you need to specify its name and the bucket needs to be empty.
- Scala
-
implicit val sampleAttributes: Attributes = GCStorageAttributes.settings(sampleSettings) val deleteBucketResponse: Future[Done] = GCStorage.deleteBucket(bucketName) val deleteBucketSourceResponse: Source[Done, NotUsed] = GCStorage.deleteBucketSource(bucketName) - Java
-
final Attributes sampleAttributes = GCStorageAttributes.settings(sampleSettings); final CompletionStage<Done> deleteBucketResponse = GCStorage.deleteBucket(this.bucketName(), materializer, sampleAttributes); final Source<Done, NotUsed> deleteBucketSourceResponse = GCStorage.deleteBucketSource(this.bucketName());
Get bucket
To get a bucket you need to specify its name.
- Scala
-
implicit val sampleAttributes: Attributes = GCStorageAttributes.settings(sampleSettings) val getBucketResponse: Future[Option[Bucket]] = GCStorage.getBucket(bucketName) val getBucketSourceResponse: Source[Option[Bucket], NotUsed] = GCStorage.getBucketSource(bucketName) - Java
-
final Attributes sampleAttributes = GCStorageAttributes.settings(sampleSettings); final CompletionStage<Optional<Bucket>> getBucketResponse = GCStorage.getBucket(this.bucketName(), materializer, sampleAttributes); final Source<Optional<Bucket>, NotUsed> getBucketSourceResponse = GCStorage.getBucketSource(this.bucketName());
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > google-cloud-storage/test
- Java
-
sbt > google-cloud-storage/test
Some test code requires access to Google cloud storage, to run them you will need to configure a project and pub/sub in google cloud and provide your own credentials.