Subscribing to a journal

The Event Sourced Entity journal contains events that capture all state changes. By subscribing to the journal, with the Event Sourced Entity type name, another component can receive all events emitted of that type.

Use case: send asynchronous messages

By combining the processing of a journal with publishing to brokers, a service can trigger other services asynchronously via messaging. To achieve this, create an Action that subscribes to a journal and publishes messages to a topic.

Use case: external calls

A service might need to trigger other systems when certain events happened to an Entity. An Action can be connected to the Entity’s journal and react on certain events to issue calls (eg. via HTTP or gRPC). To achieve this, create an Action that subscribes to a journal and let the implementation call other services.

Subscribing

To subscribe to an Event Sourced Entity log, define Protobuf rpc methods for each Journal event that you want to receive. Annotate these methods with the (akkaserverless.method).eventing annotation and specify the Entity type name of the Event Sourced Entity.

syntax = "proto3";

package shopping.product.actions;

import "akkaserverless/annotations.proto";
import "cart/shopping_cart_domain.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";

service ToProductPopularityService {

  (1)
  rpc ForwardAdded(shopping.cart.domain.ItemAdded) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = { (2)
      event_sourced_entity: "eventsourced-shopping-cart" (3)
    };
  }
}
1 create rpc methods for all Protobuf types in the journal
2 annotate the methods with (akkaserverless.method).eventing
3 specify the Event Sourced Entity’s type name as journal source

There is nothing specific required in the implementation of these methods. The implementation usually is an Action that forwards a converted message to a different component (e.g. an Event Sourced Entity).

Subscribing to events from a different service

The approach described above works within a service. To subscribe to events from a different service, you would use Publishing and subscribing.

To retrieve the entity ID from the event metadata, configure the action method to accept the ActionContextnew tab as a second parameter. The entity ID becomes available from the context metadata.

@Action
public class TriggerCheckoutAction {
    @Handler
    public ActionReply<Empty> checkout(ShoppingCart.CheckedOut in, ActionContext ctx) {
      Optional<String> entityId = ctx.eventSubject();
      ...
    }
}

Accessing the Entity ID

For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the CloudEvent metadata field subject, via the ActionContext and the CloudEvent metadata:

context.metadata().asCloudEvent().subject()

Ignoring events

When listening to an event log, all events emitted by the Event Sourced entity must be matched by a Protobuf service method. In case your component is interested only in certain events, you may declare a method to receive all events that are not received by the other methods.

  // handle other events which are not managed above
  rpc CatchOthers(google.protobuf.Any) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = {
      event_sourced_entity:  "eventsourced-shopping-cart"
    };
  }

The corresponding implementation must exist in the component.