Publishing and subscribing to topics on a broker

Akka Serverless integrates with Google Cloud Pub/Sub to enable asynchronous messaging.

This requires you to configure Google Cloud Pub/Sub access for your Akka Serverless project, see How to configure a broker.

It is your responsibility to create the topics Akka Serverless listens to in Pub/Sub.

Subscribing to a topic’s messages

To receive messages from a Pub/Sub topic, annotate a service method in the Protobuf service definition with the (akkaserverless.method).eventing annotation. Specify the topic name in the in section of the annotation:

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "akkaserverless/annotations.proto";

package shopping.cart.actions;

service ShoppingCartAnalyticsService {

    rpc ProcessAdded(ItemAdded) returns (google.protobuf.Empty) {
        option (akkaserverless.method).eventing = { (1)
            in: { (2)
              topic:  "shopping-cart-events"
            }
        };
    }
}
1 annotate the Protobuf rpc method with (akkaserverless.method).eventing
2 use in and topic to subscribe to a topic

There is nothing specific required in the implementation of ProcessAdded. The implementation will in most cases be an Action and forward a converted message to a different component (eg. an Event Sourced Entity).

Receiving JSON messages

Your Akka Serverless service may subscribe to topics that use messages in JSON format. The messages must have the Content-Type attribute stating application/json.

The Protobuf rpc method receiving these JSON messages must be set up to receive Any.

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "akkaserverless/annotations.proto";

package shopping.cart.actions;

service ShoppingCartTopicService {
    rpc JsonFromTopic(google.protobuf.Any) returns (google.protobuf.Empty) {
        option (akkaserverless.method).eventing = {
            in: {
                topic:  "shopping-cart-json"
            }
        };
    }
}

The Java implementation of the method can be declared with a regular Java class as parameter.

@Action
public class ShoppingCartTopicAction {

    @Handler
    public ActionReply<Empty> jsonFromTopic(TopicMessage message, ActionContext ctx) {
        ...
    }
}

The parameter class needs to be annotated as @Jsonable and prepared for Jackson JSON parsing by @BeanProperty annotations.

import com.akkaserverless.javasdk.Jsonable;
import java.beans.BeanProperty;

@Jsonable
public class TopicMessage {
  String operation;

  public String getOperation() {
    return operation;
  }

  @BeanProperty
  public void setOperation(String operation) {
    this.operation = operation;
  }
}

Receiving CloudEvents

Akka Serverless uses the CloudEvents standard when receiving from and publishing to topics. The CloudEvents specification standardizes message metadata so that systems can integrate more easily.

Describing the structure of the message payload is the CloudEvents feature most important to Akka Serverless.

An example of that is the capability to send serialized Protobuf messages and have Akka Serverless deserialize them accordingly.

To allow proper reading of Protobuf messages from topics, the messages need to specify the message attributes:

  • Content-Type = application/protobuf

  • ce-specversion = 1.0

  • ce-type = fully qualified protobuf message name (eg. shopping.cart.api.TopicOperation)

(The ce- prefixed attributes are part of the CloudEvents specification.)

The Protobuf rpc declaration uses the expected Protobuf message type and specifies the topic to subscribe to. You’ll normally want to share the exact Protobuf message declaration with the sending system.

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "akkaserverless/annotations.proto";

package shopping.cart.api;

message TopicOperation {
    string operation = 1;
}

service ShoppingCartTopicService {

    rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) {
        option (akkaserverless.method).eventing = {
            in: {
                topic:  "shopping-cart-protobuf-cloudevents"
            }
        };
    }
}