Azure Storage Queue
The Azure Storage Queue connector provides an Akka Stream Source and Sinks for Azure Storage Queue integration.
Azure Storage Queue is a queuing service similar to Amazon’s SQS. It is designed mostly for long-running and non-time-critical tasks. For more information on Azure Storage Queue see the Azure docs.
Project Info: Alpakka Azure Storage Queue | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-azure-storage-queue
3.0.4
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.12.11, 2.13.3 |
JPMS module name | akka.stream.alpakka.azure.storagequeue |
License | |
Readiness level |
Since 0.9, 2017-05-24
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
val AkkaVersion = "2.6.14" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-azure-storage-queue" % "3.0.4", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.6.14</akka.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-azure-storage-queue_${scala.binary.version}</artifactId> <version>3.0.4</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.6.14", ScalaBinary: "2.12" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-azure-storage-queue_${versions.ScalaBinary}:3.0.4" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.microsoft.azure azure-storage 8.0.0 com.typesafe.akka akka-stream_2.12 2.6.14 org.scala-lang scala-library 2.12.11 - Dependency tree
com.microsoft.azure azure-storage 8.0.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.9.4 com.microsoft.azure azure-keyvault-core 1.0.0 The MIT License (MIT) com.google.guava guava 20.0 org.apache.commons commons-lang3 3.4 org.apache.commons commons-lang3 3.4 org.slf4j slf4j-api 1.7.12 com.typesafe.akka akka-stream_2.12 2.6.14 Apache-2.0 com.typesafe.akka akka-actor_2.12 2.6.14 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.12 2.6.14 Apache-2.0 com.typesafe ssl-config-core_2.12 0.4.2 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.reactivestreams reactive-streams 1.0.3 CC0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0
Init Azure Storage API
import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
val storageConnectionString = "DefaultEndpointsProtocol=http;AccountName=<YourAccountName>;AccountKey=<YourKey>"
val queueFactory = () => { // Since azure storage JDK is not guaranteed to be thread-safe
val storageAccount = CloudStorageAccount.parse(storageConnectionString)
val queueClient = storageAccount.createCloudQueueClient
queueClient.getQueueReference("myQueue")
}
For more details, see Microsoft Azure Storage Docs.
Queuing a message
import one.aleph.akkzure.queue._
import one.aleph.akkzure.queue.scaladsl._
// Create an example message
val message = new CloudQueueMessage("Hello Azure")
Source.single(message).runWith(AzureQueueSink(queueFactory))
Processing and deleting messages
AzureQueueSource(queueFactory).take(10)
.map({ msg: CloudQueueMessage =>
println(msg.getMessageContentAsString) // Print the messages content
msg // Return message to the flow for deletion
}).runWith(AzureQueueDeleteSink(queueFactory))