Publishing and Subscribing with Actions

A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes, a Google Cloud Pub/Sub or Apache Kafka topic for asynchronous messaging between services.

With Actions you can:

  • subscribe to events emitted by an event sourced entity within the same service.

  • subscribe to state changes emitted by a value entity within the same service.

  • subscribe to external events from Google Cloud Pub/Sub or Apache Kafka.

  • publish events to a Google Cloud Pub/Sub or Apache Kafka topic.

Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages.

Publishing Entity events to a Topic

The Event Sourced Entity journal contains events that capture all state changes. By subscribing to the journal, with the Event Sourced Entity type name, another component can receive all events emitted of that type.

Use case: send asynchronous messages

By combining the processing of a journal with publishing to topics (see below), a service can trigger other services asynchronously via messaging. To achieve this, create an Action that subscribes to a journal and publishes messages to a topic.

Use case: external calls

A service might need to trigger other systems when certain events happened to an Entity. An Action can be connected to the Entity’s journal and react on certain events to issue calls (e.g. via HTTP or gRPC). To achieve this, create an Action that subscribes to a journal and let the implementation call other services.

Subscribing to state changes from an Entity

To subscribe to an Event Sourced Entity log, define Protobuf rpc methods for each Journal event that you want to receive. Annotate these methods with the (kalix.method).eventing annotation and specify the Entity type name of the Event Sourced Entity.

syntax = "proto3";

package shopping.product.actions;

import "kalix/annotations.proto";
import "cart/shopping_cart_domain.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";

service ToProductPopularityService {

  (1)
  rpc ForwardAdded(shopping.cart.domain.ItemAdded) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = { (2)
      event_sourced_entity: "eventsourced-shopping-cart" (3)
    };
  }
}
1 create rpc methods for all Protobuf types in the journal
2 annotate the methods with (kalix.method).eventing
3 specify the Event Sourced Entity’s type name as journal source

There is nothing specific required in the implementation of these methods. The implementation usually is an Action that forwards a converted message to a different component (e.g. an Event Sourced Entity).

Subscribing to a Topic

It’s also possible to subscribe to a Pub/Sub topic. To receive messages from a Pub/Sub topic, annotate a service method in the Protobuf service definition with the option (kalix.method).eventing.in annotation and specify the topic name in the topic section of the annotation.

In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

syntax = "proto3";

package shopping.cart.actions;

import "kalix/annotations.proto";
import "cart/shopping_cart_domain.proto";
import "google/protobuf/empty.proto";

service ShoppingCartAnalyticsService {
  // get ItemAdded from the topic
  rpc ProcessAdded(shopping.cart.domain.ItemAdded) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = { (1)
      topic: "shopping-cart-events" (2)
    };
  }
}
1 annotate the Protobuf rpc method with (kalix.method).eventing
2 use in and topic to subscribe to a topic

There is nothing specific required in the implementation of ProcessAdded. The implementation will in most cases be an Action and forward a converted message to a different component (e.g. an Event Sourced Entity).

Receiving messages from an external Topic

In the example above, we consumed Protobuf messages from a topic that we control ourselves. When consuming an external topic, it’s very likely that the message format is not under your control and is not known by Kalix.

In such case, the Action definition should receive a type depending on the type of the message payload. See Handling Serialization for more information on how to deal with data formats.

Subscribing and acting upon

Another possible usage for Actions is to consume events and act upon.

For example, you may consume events from one entity or from a topic, transform to commands and send to an another entity or an external system. This is similar to the usage explained in Forwarding and effects, except that the Action is driven by the flow of incoming events instead of external user requests.

For that purpose, it’s enough to add the (kalix.method).eventing.in and omit the (kalix.method).eventing.out.

Accessing the Entity ID

For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the CloudEvent metadata field subject, via the ActionContext and the CloudEvent metadata:

context.metadata().asCloudEvent().subject()

Ignoring events

When consuming events, each event must be matched by a Protobuf service method. In case your component is only interested in certain events, you may declare a method to receive all events that are not received by the other methods. If an event type is not handled, the Action will fail. Actions are designed to restart, but since the handler is missing, it will fail again. Therefore, it’s important to define methods for all events or define a catch-all method in case you want to discard some events.

  // handle other events which are not managed above
  rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = { (1)
      event_sourced_entity:  "eventsourced-shopping-cart"
      ignore: true (2)
    };
  }
1 We must annotate it with a (kalix.method).eventing.in.
2 Set ignore: true option.

The Ignore method here is defined as a catch-all because it has input type Any. Instead of using a catch-all it can be better to define concrete methods for all known event types that should be ignored because then there is no risk of accidentally ignoring events that are added in later evolution of the service.

When adding the ignore: true annotation the corresponding implementation is not needed in the component. It is more efficient to use ignore: true than implementing the method with an immediate reply.