Publishing and Subscribing

A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes, a Google Cloud Pub/Sub or Apache Kafka topic for asynchronous messaging between services.

In this section, we will explore how you can use an Action to:

  • Subscribe to events emitted by an event sourced entity within the same service.

  • Subscribe to state changes emitted by a value entity within the same service.

  • Subscribe to events from Event Sourced Entities published as service to service eventing.

  • Subscribe to external events from topics of Google Cloud Pub/Sub or Apache Kafka.

  • Publish events to a Google Cloud Pub/Sub or Apache Kafka topic.

Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages.

Publishing Entity events to a Topic

To illustrate how to publish entity events, let’s assume the existence of an Event Sourced Counter entity that emits events of types: ValueIncreased and ValueDecreased. You will get the events delivered to an Action, apply some transformation and let them be published to a topic.

You can subscribe an Action to events from an Event Source Entity by annotating a method with @Subscribe.EventSourcedEntity and specifying the class of the entity. The input type of the method must be the same of the events generated by the entity.

To publish the events you need to add the annotation @Publish.Topic to the method subscribed to the events and add the name of the topic.

In the Topic annotation, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

src/main/java/com/example/actions/CounterJournalToTopicAction.java
import kalix.javasdk.action.Action;
import kalix.javasdk.annotations.Publish;
import kalix.javasdk.annotations.Subscribe;

public class CounterJournalToTopicAction extends Action {

    @Subscribe.EventSourcedEntity(value = Counter.class) (1)
    @Publish.Topic("counter-events") (2)
    public Action.Effect<CounterEvent> onValueIncreased(ValueIncreased event){ (3)
        return effects().reply(event); (4)
    }
}
1 Subscribing to the events from the Counter.
2 Publishing to a topic name 'counter-events'.
3 Setting the method input type to the events produced by the counter.
4 Any return Action.Effect response is valid.

Subscribing to a Value Entity

You can subscribe an Action to events from a Value Entity. It works the same as an Event Sourced Entity except for the annotation. To receive messages from the entity, annotate a service method @Subscribe.ValueEntity and specify the class of the entity.

Type level annotations for subscribing

You can subscribe to a topic or an Event Source Entity by adding @Subscribe as a type level annotation, at the top of the class. This provides additional functionality for subscribing: all methods returning Action.Effect are selected to process incoming events. The Action will fail if it receives an event for which there is no method handler, unless the subscription is set with ignoreUnknown = true.

In the following example you can take a look at how the Action is configured to ignore unknown messages because it only has a method handler for ValueIncrease, while it could also receive a ValueMultiplied.

src/main/java/com/example/actions/SubscribeTypeLevelAction.java
import kalix.javasdk.action.Action;
import kalix.javasdk.annotations.Subscribe;

@Subscribe.EventSourcedEntity(value = Counter.class, ignoreUnknown = true) (1)
public class SubscribeTypeLevelAction extends Action {

  private Logger logger = LoggerFactory.getLogger(SubscribeTypeLevelAction.class);

  public Action.Effect<Confirmed> onIncrease(ValueIncreased event) { (2)
    logger.info("Received increased event: " + event.toString());
    return effects().reply(Confirmed.instance); (3)
  }
}
1 Set to ignore unknown events.
2 Only processing ValueIncreased events.
3 Any return is valid.

If you don’t add ignoreUnknown=true, the action would fail when processing a ValueMultiplied. The default is false.

Subscribing and acting upon

Another possible usage for Actions is to consume events and act upon.

For example, you may consume events from one entity or from a topic, transform to commands and send to another entity or an external system. This is similar to the usage explained in Actions as Controller, except that the Action is driven by the flow of incoming events instead of external user requests.

Service to Service Eventing

A Kalix application can be comprised of multiple services working to support specific business requirements. Although each service is an independent deployable unit, often times information needs to flow between those services.

Kalix provides brokerless at-least-once event delivery across Kalix services through the Service to Service eventing.

The source of the events is an Event Sourced Entity. Its events can be published as a stream and consumed by another Kalix service without the need to set up a message broker.

Note

For eventing from an entity inside the same Kalix service as the consuming component, use regular Subscription to the entity instead of Service to Service eventing.

Event Producer

The event producer controls which entity to publish events for. Each entity published is identified by a stream id so that one Kalix service can publish more than one of the entity types it contains.

src/main/java/customer/api/CustomerEventsService.java
@Subscribe.EventSourcedEntity(
    value = CustomerEntity.class, (1)
    ignoreUnknown = true) (2)
@Publish.Stream(id = "customer_events") (3)
@Acl(allow = @Acl.Matcher(service = "*")) (4)
public class CustomerEventsService extends Action {

  public Effect<CustomerPublicEvent.Created> onEvent( (5)
      CustomerEvent.CustomerCreated created) {
    return effects().reply(
        new CustomerPublicEvent.Created(created.email(), created.name()));
  }

  public Effect<CustomerPublicEvent.NameChanged> onEvent( (5)
      CustomerEvent.NameChanged nameChanged) {
    return effects().reply(new CustomerPublicEvent.NameChanged(nameChanged.newName()));
  }
}
1 Identifying which event sourced entity to publish events for.
2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default.
3 Public identifier for consumers of this stream.
4 Allowing access from other Kalix services, but not the public internet.
5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume.

Event Consumer

The consumer can be an Action or a View, annotated with @Subscribe.Stream with a service identifying the publishing service, and the id of the stream to subscribe to.

We then define a component subscribing to the service to service publisher. In this example we do that with a View:

Java
src/main/java/customer/views/CustomersByNameView.java
@Table("customers_by_name")
@Subscribe.Stream( (1)
    service = "customer-registry", (2)
  id = "customer_events", (3)
  consumerGroup = "customer-by-name-view"
)
public class CustomersByNameView extends View<Customer> {

  public UpdateEffect<Customer> onEvent( (4)
      CustomerPublicEvent.Created created) {
    var id = updateContext().eventSubject().get();
    return effects().updateState(
        new Customer(id, created.email(), created.name()));
  }

  public UpdateEffect<Customer> onEvent(
      CustomerPublicEvent.NameChanged nameChanged) {
    var updated = viewState().withName(nameChanged.newName());
    return effects().updateState(updated);
  }

  @GetMapping("/customers/by_name/{name}")
  @Query("SELECT * FROM customers_by_name WHERE name = :name")
  @Acl(allow = @Acl.Matcher(principal = Acl.Principal.INTERNET))
  public Flux<Customer> findByName(@PathVariable String name) {
    return null;
  }

}
1 Annotate the component with @Subscribe.Stream to subscribe to an event stream from another Kalix service.
2 The name of the Kalix service publishing the event stream.
3 The public identifier of the specific stream from the publisher.
4 One update method per message type that the stream may contain.
If you’re looking to test this locally, you will likely need to run the 2 services in different ports. For more details, consult Running multiple services.

Deployment-dependent sources

It is possible to use environment variables to control the name of the service that a consumer consumes from, this is useful for example for using the same image in staging and production deployments but having them consume from different source services.

Referencing environment variables is done with the syntax ${VAR_NAME} in the service parameter of the @Subscribe.Stream annotation.

Changing the service name after it has once been deployed means the consumer will start over from the beginning of the event stream.

See kalix service deploy for details on how to set environment variables when deploying a service.

Handling Serialization

Check serialization documentation for more details.

Subscribing to a Topic

To receive messages from a Google Cloud Pub/Sub or Apache Kafka topic, annotate the service method @Subscribe.Topic and specify the topic name.

In the Topic annotation, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

In the following example the events from the topic are delivered to the Action and logged.

src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
import kalix.javasdk.action.Action;
import kalix.javasdk.annotations.Subscribe;

public class CounterTopicSubscriptionAction extends Action {

  private Logger logger = LoggerFactory.getLogger(CounterTopicSubscriptionAction.class);

  @Subscribe.Topic(value = "counter-events") (1)
  public Action.Effect<Confirmed> onValueIncreased(ValueIncreased event) { (2)
    logger.info("Received increased event: " + event.toString());
    return effects().reply(Confirmed.instance); (3)
  }

  @Subscribe.Topic(value = "counter-events") (4)
  public Action.Effect<Confirmed> onValueMultiplied(ValueMultiplied event) { (5)
    logger.info("Received multiplied event: " + event.toString());
    return effects().reply(Confirmed.instance); (6)
  }
}
1 Subscribing to the ValueIncreased from topic 'counter-events'.
2 Setting the method input type to the events produced by the counter.
3 Any return is valid.
4 Subscribing to the ValueMultiplied from topic 'counter-events'.
5 Setting the method input type to the events produced by the counter.
6 Any return is valid.

The events from the topic are delivered to the Action. The implementation may vary: for this simplified example you are just logging it, but it could be a forward to some other component or external service.

The return value of the method is an Action.Effect with message Confirmed, but can be any other of type Action.Effect<> if the return type of the method defines it. The Kalix framework needs the type Effect to ensure that the event was successfully processed. If no exception is thrown and the method returns a effects().reply, the framework assumes that the event was successfully processed and marks it as such. This allows the next event to be sent to the subscribing method.

However, if an exception is raised and not handled, or the method return effects().error() this action will not process any more events until the necessary handling of the current event is added such its return is a reply. Otherwise, it will raise the same error over and over again until the application is fixed and restarted.

By default, Kalix assumes the messages in the topic were serialized as JSON and as such, deserializes them into the input type of your method by taking advantage of CloudEvents standard.

Receiving CloudEvents

Kalix uses the CloudEvents standard when receiving from and publishing to topics. The CloudEvents specification standardizes message metadata so that systems can integrate more easily.

Describing the structure of the message payload is the CloudEvents feature most important to Kalix.

An example of that is the capability to send serialized JSON messages and have Kalix deserialize them accordingly.

To allow proper reading of JSON messages from external topics, the messages need to specify the message attributes:
  • Content-Type = application/json

  • ce-specversion = 1.0

  • ce-type = fully qualified name (e.g. com.example.ValueIncreased)

(The ce- prefixed attributes are part of the CloudEvents specification.)

Receiving Bytes

If the content type is application/octet-stream, no content type is present, or the type is unknown to Kalix, the message is treated as a binary message. The topic subscriber method must accept the byte[] message.

src/main/java/com/example/actions/SubscribeRawBytesAction.java
public class SubscribeToRawBytesAction extends Action {

  public Effect<Done> onMessage(byte[] bytes) { (1)
    // deserialization logic here
    return effects().reply(Done.done());
  }
}
1 When consuming raw bytes messages from a topic the input type must be byte[].

If an action has a return type of byte[] and publishes to a topic, the events published to the topic will have content-type application/octet-stream.

Deployment-dependent names

It is possible to use environment variables to control the name of the topic that is used for consuming from or producing events to, this is useful for example for using the same image in staging and production deployments but having them interact with separate topics.

Referencing environment variables is done with the syntax ${VAR_NAME} in the value parameter of the @Subscribe.Topic annotation. See kalix service deploy for details on how to set environment variables when deploying a service.

Changing the topic name after it has once been deployed for an event consumer means the consumer will start over from the beginning of the topic.

Accessing the Entity ID

For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field subject, accessible through eventSubject in the ActionContextnew tab.

You can access the ActionContextnew tab through method actionContext().

Testing the Integration

When a Kalix service relies on a broker, it might be useful to use integration tests to assert that those boundaries work as intended. For such scenarios, you can either:

  • Use TestKit’s mocked topic:

    • this offers a general API to inject messages into topics or read the messages written to another topic, regardless of the specific broker integration you have configured.

  • Run an external broker instance:

    • if you’re interested in running your integration tests against a real instance, you need to provide the broker instance yourself by running it in a separate process in your local setup and make sure to disable the use of TestKit’s test broker. Currently, the only external broker supported in integration tests is Google PubSub Emulator.

TestKit Mocked Incoming Messages

Following up on the counter entity example used above, let’s consider an example (composed by 2 Actions and 1 Event Sourced entity) as pictured below:

eventing testkit sample

In this example:

  • commands are consumed from an external topic event-commands and forwarded to a Counter entity;

  • the Counter entity is an Event Sourced Entity and has its events published to another topic counter-events.

To test this flow, we will take advantage of the TestKit to be able to push commands into the event-commands topic and check what messages are produced to topic counter-events.

src/it/java/com/example/CounterIntegrationTest.java
import kalix.javasdk.testkit.EventingTestKit;
import kalix.javasdk.testkit.KalixTestKit;
import kalix.spring.testkit.KalixIntegrationTestKitSupport;
// ...

@SpringBootTest(classes = Main.class)
@Import(TestKitConfiguration.class)
public class CounterIntegrationTest extends KalixIntegrationTestKitSupport { (1)

  @Autowired
  private KalixTestKit kalixTestKit; (2)
  private EventingTestKit.IncomingMessages commandsTopic;
  private EventingTestKit.OutgoingMessages eventsTopic;

  @BeforeAll
  public void beforeAll() {
    commandsTopic = kalixTestKit.getTopicIncomingMessages("counter-commands"); (3)
    eventsTopic = kalixTestKit.getTopicOutgoingMessages("counter-events");
  }

  @Test
  public void verifyCounterEventSourcedPublishToTopic() throws InterruptedException {
    var counterId = "test-topic";
    var increaseCmd = new CounterCommandFromTopicAction.IncreaseCounter(counterId, 3);
    var multipleCmd = new CounterCommandFromTopicAction.MultiplyCounter(counterId, 4);

    commandsTopic.publish(increaseCmd, counterId); (4)
    commandsTopic.publish(multipleCmd, counterId);

    var eventIncreased = eventsTopic.expectOneTyped(CounterEvent.ValueIncreased.class); (5)
    var eventMultiplied = eventsTopic.expectOneTyped(CounterEvent.ValueMultiplied.class);

    assertEquals(increaseCmd.value(), eventIncreased.getPayload().value()); (6)
    assertEquals(multipleCmd.value(), eventMultiplied.getPayload().value());
  }
}
1 Start the TestKit, booting up both the service and the Kalix Runtime.
2 Wire the Kalix TestKit automatically.
3 Get a IncomingMessages for topic named counter-commands and OutgoingMessages for counter-events from the TestKit.
4 Build 2 commands and publish both to the topic. Note the counterId is passed as the subject id of the message.
5 Read 2 messages, one at a time. We pass in the expected class type for the next message.
6 Assert the received messages have the same value as the commands sent.
In the example above we take advantage of the TestKit to serialize / deserialize the messages and pass all the required metadata automatically for us. However, the API also offers the possibility to read and write raw bytes, construct your metadata or read multiple messages at once.

Configuration

Before running your test, make sure to configure the TestKit correctly.

src/it/java/com/example/TestKitConfiguration.java
@Configuration
public class TestKitConfiguration {
  @Bean
  public KalixTestKit.Settings settings() {
    return KalixTestKit.Settings.DEFAULT
        .withTopicIncomingMessages("counter-commands") (1)
        .withTopicOutgoingMessages("counter-events") (2)
  }
}
1 Mock incoming messages from the counter-commands topic.
2 Mock outgoing messages from the counter-events topic.

Metadata

Typically, messages are published with associated metadata. If you want to construct your own Metadata to be consumed by a service or make sure the messages published out of your service have specific metadata attached, you can do so using the TestKit, as shown below.

src/it/java/com/example/CounterTopicIntegrationTest.java
@Test
public void verifyCounterCommandsAndPublishWithMetadata() {
  var counterId = "test-topic-metadata";
  var increaseCmd = new CounterCommandFromTopicAction.IncreaseCounter(counterId, 10);

  var metadata = CloudEvent.of( (1)
          "cmd1",
          URI.create("CounterTopicIntegrationTest"),
          increaseCmd.getClass().getName())
      .withSubject(counterId) (2)
      .asMetadata()
      .add("Content-Type", "application/json"); (3)

  commandsTopic.publish(kalixTestKit.getMessageBuilder().of(increaseCmd, metadata)); (4)

  var increasedEvent = eventsTopicWithMeta.expectOneTyped(CounterCommandFromTopicAction.IncreaseCounter.class);
  var actualMd = increasedEvent.getMetadata(); (5)
  assertEquals(counterId, actualMd.asCloudEvent().subject().get()); (6)
  assertEquals("application/json", actualMd.get("Content-Type").get());
}
1 Build a CloudEvent object with the 3 required attributes, respectively: id, source and type.
2 Add the subject to which the message is related, that is the counterId.
3 Set the mandatory header "Content-Type" accordingly.
4 Publish the message along with its metadata to topic commandsTopic.
5 Upon receiving the message, access the metadata.
6 Assert the headers Content-Type and ce-subject (every CloudEvent header is prefixed with "ce-") have the expected values.

One Suite, Multiple Tests

When running multiple test cases under the same test suite and thus using a common TestKit instance, you might face some issues if unconsumed messages from previous tests mess up with the current one. To avoid this, be sure to:

  • have the tests run in sequence, not in parallel;

  • clear the contents of the topics in use before the test.

As an alternative, you can consider using different test suites which will use independent TestKit instances.

src/it/java/com/example/CounterTopicIntegrationTest.java
@BeforeEach (1)
public void clearTopics() {
  eventsTopic.clear(); (2)
  eventsTopicWithMeta.clear();
}
1 Run this before each test.
2 Clear the topic ignoring any unread messages.
Despite the example, you are neither forced to clear all topics nor to do it before each test. You can do it selectively, or you might not even need it depending on your tests and the flows they test.

External Broker

To run an integration test against a real instance of Google PubSub (or its Emulator) or Kafka, use the TestKit settings to override the default eventing support, as shown below:

src/it/java/com/example/TestKitConfiguration.java
@Configuration
public class TestKitConfiguration {
  @Bean
  public KalixTestKit.Settings settingsWithPubSub() {
    return KalixTestKit.Settings.DEFAULT.withAclEnabled()
        .withEventingSupport(EventingSupport.GOOGLE_PUBSUB);
  }
}