Publishing and Subscribing with Actions

A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes or a Google Cloud Pub/Sub topic for asynchronous messaging between services.

With Actions you can:

  • subscribe to events emitted by an event sourced entity within the same service.

  • subscribe to state changes emitted by a value entity within the same service.

  • subscribe to external events via Google Cloud Pub/Sub.

  • publish events to a Google Cloud Pub/Sub topic.

Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages.

Publishing Entity events to a Topic

To illustrate how to publish entity events, we will assume the existence of an Event Sourced Counter entity that emits events of types: ValueIncreased and ValueDecreased. We will get the events delivered to an Action, apply some transformation and let them be published to a Google Cloud Pub/Sub topic.

To use Google Cloud Pub/Sub in for your Akka Serverless project, please follow the instructing at how to configure message brokers.

It is your responsibility to create the topics in Google Cloud Pub/Sub before configuring publishers or subscribers.

src/main/proto/com/example/actions/counter_topic.proto
syntax = "proto3";
package com.example.actions;

import "akkaserverless/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";

option java_outer_classname = "CounterTopicApi";

message Increased {
  int32 value = 1;
}

message Decreased {
  int32 value = 1;
}

service CounterJournalToTopic {
  option (akkaserverless.service) = {
    type : SERVICE_TYPE_ACTION  (2)
  };

  rpc Increase (com.example.domain.ValueIncreased) returns (Increased) { (3)
    option (akkaserverless.method).eventing.in = { (4)
      event_sourced_entity: "counter"
    };
    option (akkaserverless.method).eventing.out = { (5)
      topic:  "counter-events"
    };
  }

  rpc Decrease (com.example.domain.ValueDecreased) returns (Decreased) {
    option (akkaserverless.method).eventing.in = {
      event_sourced_entity: "counter"
    };
    option (akkaserverless.method).eventing.out = {
      topic:  "counter-events"
    };
  }

}
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service.
2 The protobuf option (akkaserverless.service) is specific to code-generation as provided by the Akka Serverless Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The Increase method receives the event ValueIncreased and returns the transformed type Increased.
4 The (akkaserverless.method).eventing.in annotation indicates that events from the entity type counter should be delivered to this method (when the type is ValueIncreased).
5 The (akkaserverless.method).eventing.out annotation indicates that the returned value from this method should be published to the topic called counter-events.

The class CounterJournalToTopicAction gets generated for us based on the proto file defined above.

src/main/java/com/example/actions/CounterJournalToTopicAction.java
public class CounterJournalToTopicAction extends AbstractCounterJournalToTopicAction {
  public CounterJournalToTopicAction(ActionCreationContext creationContext) {}

  /** Handler for "Increase". */
  @Override
  public Effect<CounterTopicApi.Increased> increase(CounterDomain.ValueIncreased valueIncreased) {
    CounterTopicApi.Increased increased = (1)
      CounterTopicApi.Increased.newBuilder()
        .setValue(valueIncreased.getValue())
        .build();

    return effects().reply(increased); (2)
  }
}
1 We convert the incoming domain event CounterDomain.ValueIncreased to the outgoing topic API CounterTopicApi.Increased.
2 We use the converted object to build a reply. The CounterTopicApi.Increased message will be published to the topic.

Subscribing to state changes from a Value Entity

Similar to subscribing to events from an Event Sourced Entity, you can also subscribe to state changes from a Value Entity.

src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3";
package com.example.actions;

import "akkaserverless/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "StateSubscriptionApi";

service CounterStateSubscription {
  option (akkaserverless.service) = {
    type : SERVICE_TYPE_ACTION (2)
  };

  rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = {
      value_entity: "counter" (3)
    };
  }

}
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities)
2 The protobuf option (akkaserverless.service) is specific to code-generation as provided by the Akka Serverless Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The (akkaserverless.method).eventing.in annotation indicates that state changes from the value entity type counter should be delivered to this method.

Subscribing to a Topic

It’s also possible to subscribe to a Pub/Sub topic. To receive messages from a Pub/Sub topic, annotate a service method in the Protobuf service definition with the option (akkaserverless.method).eventing.in annotation and specify the topic name in the topic section of the annotation.

To use Google Cloud Pub/Sub in for your Akka Serverless project, please follow the instructing at how to configure message brokers.

It is your responsibility to create the topics in Google Cloud Pub/Sub before configuring publishers or subscribers.

For illustration purpose, we can add a second Action that consumes from the Pub Sub topic counter-events from the previous example.

src/main/proto/com/example/actions/counter_topic_sub.proto
syntax = "proto3";
package com.example.actions;

import "akkaserverless/annotations.proto";
import "com/example/actions/counter_topic.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "CounterTopicSubApi";

service CounterTopicSubscription {
  option (akkaserverless.service) = {
    type : SERVICE_TYPE_ACTION  (2)
  };

  rpc Increase (com.example.actions.Increased) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = { (3)
      topic:  "counter-events"
    };
  }

  rpc Decrease (com.example.actions.Decreased) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = {
      topic:  "counter-events"
    };
  }
}
1 Import the Counter Topic types from previous example.
2 The protobuf option (akkaserverless.service) is specific to code-generation as provided by the Akka Serverless Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 Define methods for each of the possible incoming messages and annotate them with (akkaserverless.method).eventing.in indicating that the source of events is the topic counter-events.

The class CounterTopicSubscriptionAction gets generated for us based on the proto file defined above.

src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
public class CounterTopicSubscriptionAction extends AbstractCounterTopicSubscriptionAction {

  private Logger logger = LoggerFactory.getLogger(getClass());

  public CounterTopicSubscriptionAction(ActionCreationContext creationContext) {}

  /** Handler for "Increase". */
  @Override
  public Effect<Empty> increase(CounterTopicApi.Increased increased) { (1)
    logger.info("Received increase event: " + increased.toString());
    return effects().noReply();
  }

  /** Handler for "Decrease". */
  @Override
  public Effect<Empty> decrease(CounterTopicApi.Decreased decreased) {
    logger.info("Received increase event: " + decreased.toString());
    return effects().noReply();
  }
}

The events from the topic are delivered to the new Action. The implementation may vary, for this simplified example we are just logging it, but it could a forward to some other component or external service.

Receiving messages from an external Topic

In the example above, we consumed Protobuf messages from a topic that we control ourselves. When consuming an external topic, it’s very likely that the message format is not under your control and is not known by Akka Serverless. In such case, the Action definition should receive Protobuf’s type Any and you must take care of the serialization yourself. See Handling Serialization for more information on how to deal with other serialization formats.

Subscribing and acting upon

Another possible usage for Actions is to consume events and act upon.

For example, you may consume events from one entity or from a topic, transform to commands and send to an another entity or an external system. This is similar to the usage explained in Actions as Controller, except that the Action is driven by the flow of incoming events instead of external user requests.

For that purpose, it’s enough to add the (akkaserverless.method).eventing.in and omit the (akkaserverless.method).eventing.out.

Accessing the Entity ID

For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field subject, accessible through eventSubject in the ActionContextnew tab.

You can access the ActionContextnew tab through method actionContext().

src/main/java/com/example/actions/CounterJournalToTopicAction.java
@Override
public Effect<CounterTopicApi.Increased> increase(CounterDomain.ValueIncreased valueIncreased) {
  Optional<String> counterId = actionContext().eventSubject(); (1)
  ...
}

Ignoring events

When consuming events, each event must be matched by a Protobuf service method. In case your component is only interested in certain events, you may declare a method to receive all events that are not received by the other methods. If an event type is not handled, the Action will fail. Actions are designed to restart, but since the handler is missing, it will fail again. Therefore, it’s important to define a such catch-all method in case you want to discard some events.

src/main/proto/com/example/actions/counter_topic.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) {
  option (akkaserverless.method).eventing.in = { (1)
    event_sourced_entity: "counter"
  };
}
1 We must annotate it with a (akkaserverless.method).eventing.in

The corresponding implementation must exist in the component.

src/main/java/com/example/actions/CounterJournalToTopicAction.java
public class CounterJournalToTopicAction extends AbstractCounterJournalToTopicAction {
  @Override
  public Effect<Empty> ignore(Any any) {
    return effects().noReply(); (1)
  }
}
1 The ignore method is implemented as a catch-all. It matches Any input and replies with effects().noReply().