Google Cloud Pub/Sub

The google cloud pub/sub connector provides a way to connect to google clouds managed pub/sub https://cloud.google.com/pubsub/.

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "0.14"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-google-cloud-pub-sub_2.12</artifactId>
  <version>0.14</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-google-cloud-pub-sub_2.12', version: '0.14'
}

Usage

Prepare your credentials for access to google cloud pub/sub.

Scala
val privateKey: PrivateKey = {
  val pk = "MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCxwdLoCIviW0BsREeKzi" +
  "qiSgzl17Q6nD4RhqbB71oPGG8h82EJPeIlLQsMGEtuig0MVsUa9MudewFuQ/XHWtxnueQ3I900EJm" +
  "rDTA4ysgHcVvyDBPuYdVVV7LE/9nysuHb2x3bh057Sy60qZqDS2hV9ybOBp2RIEK04k/hQDDqp+Lx" +
  "cnNQBi5C0f6aohTN6Ced2vvTY6hWbgFDk4Hdw9JDJpf8TSx/ZxJxPd3EA58SgXRBuamVZWy1IVpFO" +
  "SKUCr4wwMOrELu9mRGzmNJiLSqn1jqJlG97ogth3dEldSOtwlfVI1M4sDe3k1SnF1+IagfK7Wda5h" +
  "PbMdbh2my3EMGY159ktbtTAUzJejPQfhVzk84XNxVPdjN01xN2iceXSKcJHzy8iy9JHb+t9qIIcYk" +
  "ZPJrBCyphUGlMWE+MFwtjbHMBxhqJNyG0TYByWudF+/QRFaz0FsMr4TmksNmoLPBZTo8zAoGBAKZI" +
  "vf5XBlTqd/tR4cnTBQOeeegTHT5x7e+W0mfpCo/gDDmKnOsF2lAwj/F/hM5WqorHoM0ibno+0zUb5" +
  "q6rhccAm511h0LmV1taVkbWk4UReuPuN+UyVUP+IjmXjagDle9IkOE7+fDlNb+Q7BHl2R8zm1jZjE" +
  "DwM2NQnSxQ22+/"
  val kf = KeyFactory.getInstance("RSA")
  val encodedPv = Base64.getDecoder.decode(pk)
  val keySpecPv = new PKCS8EncodedKeySpec(encodedPv)
  kf.generatePrivate(keySpecPv)
}
val clientEmail = "test-XXX@test-XXXXX.iam.gserviceaccount.com"
val projectId = "test-XXXXX"
val apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE"
val topic = "topic1"
val subscription = "subscription1"
Java
String keyString = "MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCxwdLoCIviW0BsREeKzi" +
        "qiSgzl17Q6nD4RhqbB71oPGG8h82EJPeIlLQsMGEtuig0MVsUa9MudewFuQ/XHWtxnueQ3I900EJm" +
        "rDTA4ysgHcVvyDBPuYdVVV7LE/9nysuHb2x3bh057Sy60qZqDS2hV9ybOBp2RIEK04k/hQDDqp+Lx" +
        "cnNQBi5C0f6aohTN6Ced2vvTY6hWbgFDk4Hdw9JDJpf8TSx/ZxJxPd3EA58SgXRBuamVZWy1IVpFO" +
        "SKUCr4wwMOrELu9mRGzmNJiLSqn1jqJlG97ogth3dEldSOtwlfVI1M4sDe3k1SnF1+IagfK7Wda5h" +
        "PbMdbh2my3EMGY159ktbtTAUzJejPQfhVzk84XNxVPdjN01xN2iceXSKcJHzy8iy9JHb+t9qIIcYk" +
        "ZPJrBCyphUGlMWE+MFwtjbHMBxhqJNyG0TYByWudF+/QRFaz0FsMr4TmksNmoLPBZTo8zAoGBAKZI" +
        "vf5XBlTqd/tR4cnTBQOeeegTHT5x7e+W0mfpCo/gDDmKnOsF2lAwj/F/hM5WqorHoM0ibno+0zUb5" +
        "q6rhccAm511h0LmV1taVkbWk4UReuPuN+UyVUP+IjmXjagDle9IkOE7+fDlNb+Q7BHl2R8zm1jZjE" +
        "DwM2NQnSxQ22+/";
KeyFactory kf = KeyFactory.getInstance("RSA");
byte[] encodedPv = Base64.getDecoder().decode(keyString);
PKCS8EncodedKeySpec keySpecPv = new PKCS8EncodedKeySpec(encodedPv);
PrivateKey privateKey = kf.generatePrivate(keySpecPv);
String clientEmail = "test-XXX@test-XXXXX.iam.gserviceaccount.com";
String projectId = "test-XXXXX";
String apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE";
String topic = "topic1";
String subscription = "subscription1";

And prepare the actor system and materializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);

To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.

Scala
val publishMessage =
  PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))

val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(projectId, apiKey, clientEmail, privateKey, topic)

val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
Java
PubSubMessage publishMessage =
        new PubSubMessage("1", new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PublishRequest publishRequest = PublishRequest.of(Lists.newArrayList(publishMessage));

Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

Flow<PublishRequest, List<String>, NotUsed> publishFlow = GooglePubSub.publish(projectId, apiKey, clientEmail, privateKey, topic, 1, system, materializer);

CompletionStage<List<List<String>>> publishedMessageIds = source.via(publishFlow).runWith(Sink.seq(), materializer);

To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.

Scala
val messageSource: Source[PubSubMessage, NotUsed] = Source(List(publishMessage, publishMessage))
messageSource.groupedWithin(1000, 1.minute).map(PublishRequest.apply).via(publishFlow).to(Sink.seq)
Java
Source<PubSubMessage, NotUsed> messageSource = Source.single(publishMessage);
messageSource.groupedWithin(1000, FiniteDuration.apply(1, "min"))
        .map(messages -> PublishRequest.of(messages))
        .via(publishFlow)
        .runWith(Sink.ignore(), materializer);

To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest

Scala
val subscriptionSource: Source[ReceivedMessage, NotUsed] =
  GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription)

val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription)

subscriptionSource
  .map { message =>
    // do something fun

    message.ackId
  }
  .groupedWithin(1000, 1.minute)
  .map(AcknowledgeRequest.apply)
  .to(ackSink)
Java
Source<ReceivedMessage, NotUsed> subscriptionSource = GooglePubSub.subscribe(projectId, apiKey, clientEmail, privateKey, subscription, system, materializer);

Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink = GooglePubSub.acknowledge(projectId, apiKey, clientEmail, privateKey, subscription, 1, system, materializer);

subscriptionSource.map( message -> {
    // do something fun
    return message.ackId();
 }).groupedWithin(1000, FiniteDuration.apply(1, "min")).map(acks -> AcknowledgeRequest.of(acks)).to(ackSink);

If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.

Scala
val subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = ???
val processMessage: Sink[ReceivedMessage, NotUsed] = ???

val batchAckSink =
  Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink)

val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
Java
Sink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink;

Sink<ReceivedMessage, NotUsed> batchAckSink = Flow.of(ReceivedMessage.class)
        .map(t -> t.ackId())
        .groupedWithin(1000, FiniteDuration.apply(1, "minute"))
        .map(ids -> AcknowledgeRequest.of(ids))
        .to(ackSink);

subscriptionSource.alsoTo(batchAckSink).to(processSink);

Running the examples

To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.

The source code for this page can be found here.