Implementing Event Sourced Entities

Event Sourced Entities persist their state with ACID semanticsnew tab, scale horizontally, and isolate failures. They use the Event Sourcing Model—​instead of persisting the current state, they persist all the events that led to the current state. Kalix stores these events in a journal.

An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.

When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?

To load an Entity, Kalix reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Kalix to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.

In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.

Event Sourced Entities offer strong consistency guarantees. Kalix distributes Entities across every instance of a stateful service deployment—​at any given time, each Entity lives on exactly one instance. If a command for an Entity arrives to an instance not hosting that Entity, the command is forwarded by the Kalix Runtime to the one that contains that particular Entity. This forwarding is done transparently, your code does not need to know. Because each Entity lives on exactly one instance, messages can be handled sequentially. Hence, there are no concurrency concerns relating to Event Sourced Entities, each Entity handles one message at a time.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing new tab.

Event Sourced Entities persist changes as events and snapshots. Kalix needs to serialize that data to send it to the underlying data store. However, we recommend that you do not persist your service’s public API messages. Persisting private API messages may introduce some overhead when converting from a public message to an internal one but it allows the logic of the service public interface to evolve independently of the data storage format, which should be private.

The steps necessary to implement an Event Sourced Entity include:

  1. Model the entity’s state and its domain events.

  2. Implementing behavior in command and event handlers.

  3. Creating and initializing the Entity.

The following sections walk through these steps using a shopping cart service as an example (working sample available here new tab).

Modeling the Entity

Through our "Shopping Cart" Event Sourced Entity we expect to manage our cart, adding and removing items as we please. Being event-sourced means it will represent changes to state as a series of domain events. So let’s have a look at what kind of model we expect to store and the events our entity might generate.

src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public record ShoppingCart(String cartId, List<LineItem> items, boolean checkedOut) { (1)

  public record LineItem(String productId, String name, int quantity) { (2)
  }
}
1 Our ShoppingCart is fairly simple, being composed only by a cartId and a list of line items.
2 A LineItem represents a single product and the quantity we intend to buy.
Above we are taking advantage of the Java record to reduce the amount of boilerplate code, but you can use regular classes so long as they can be serialized to JSON (e.g. using Jackson annotations).

Another fundamental aspect of our entity will be its domain events. For now, we will have 3 different events ItemAdded, ItemRemoved and CheckedOut, defined as below:

src/main/java/com/example/shoppingcart/domain/ShoppingCartEvent.java
import kalix.javasdk.annotations.TypeName;
public sealed interface ShoppingCartEvent { (1)

  @TypeName("item-added") (2)
  record ItemAdded(ShoppingCart.LineItem item) implements ShoppingCartEvent {}

  @TypeName("item-removed")
  record ItemRemoved(String productId) implements ShoppingCartEvent {}

  @TypeName("checked-out")
  record CheckedOut() implements ShoppingCartEvent {}
}
1 The 3 types of event all derive from the same type ShoppingCartEvent.
2 Includes the logical type name using @TypeName annotation.
The use of logical names for subtypes is essential for maintainability purposes. Our recommendation is to use logical names (i.e. @TypeName) that are unique per Kalix service. Check type name documentation for more details.

Identifying the Entity

In order to interact with an Entity in Kalix, we need to assign an type id and one or more instance ids:

  • type id is a unique identifier for all entities of a given type. To define the entity type id, the entity class must be annotated with @TypeId and have a unique and stable identifier assigned.

  • id, on the other hand, is unique per instance. In most cases, the entity id is passed as a path parameter of a REST request. The exception to the rule is when we request Kalix to auto-generate a id for us. In such a case, Kalix won’t try to extract the id from the endpoint path.

The entity id can be defined in different ways, as detailed below.

Single identifier

The most common use is to annotate the class with @Id and assign one path variable name to it. For instance, @Id("id") will instruct Kalix to look up a matching path variable. For an endpoint defined with @RequestMapping("/users/{id}"), Kalix will extract whatever path segment is used to replace {id} and treat it as the Entity unique identifier.

Composite identifier

It’s also possible to have a composite identifier. For example, @Id({"groupId", "id"}) defines a composite identifier made of groupId and id. In such a case, the endpoints for this entity will need to have both path variables, e.g.: @RequestMapping("/users/{groupId}/{id}").

Generated identifier

Finally, you can ask Kalix to generate an unique identifier, this is typically useful when creating an Entity, and the id is a surrogate id. To indicate to Kalix that an Entity id should be generated rather than extracted from the path, be sure to annotate the corresponding command method with @GenerateId. Typically, an Entity has only one method annotated with @GenerateId. The one that creates the Entity. All other methods will have @Id annotation in order to extract the surrogate id from the endpoint path.

It will often be necessary to access the generated entity id from inside the entities code. This can be done using the EntityContext.entityIdnew tab method.

Kalix generates a UUID version 4 (random) keys. Only version 4 UUIDs are currently supported for generated Entity identifiers.

Event Sourced Entity’s Effect API

The Event Sourced Entity’s Effect defines the operations that Kalix should perform when an incoming command is handled by an Event Sourced Entity.

An Event Sourced Entity Effect can either:

  • emit events and send a reply to the caller

  • directly reply to the caller if the command is not requesting any state change

  • rejected the command by returning an error

  • instruct Kalix to delete the entity

Implementing behavior

Now that we have our Entity state defined along with its events, the remaining steps can be summarized as follows:

  • declare your entity and pick an entity id (it needs to be unique as it will be used for sharding purposes);

  • define an access point (i.e. a route path) to your entity;

  • implement how each command is handled and which event(s) it generates;

  • provide a handler for each event and how it affects the entity’s state.

Let’s have a look at what our shopping cart entity will look like for the first 2 steps from the above list:

src/main/java/com/example/shoppingcart/ShoppingCartEntity.java
@Id("cartId") (2)
@TypeId("shopping-cart") (3)
@RequestMapping("/cart/{cartId}") (4)
public class ShoppingCartEntity extends EventSourcedEntity<ShoppingCart, ShoppingCartEvent> { (1)

}
1 Create a class that extends EventSourcedEntity<S, E>, where S is the state type this entity will store (i.e. ShoppingCart) and E is the top type for the events it emits (i.e. ShoppingCartEvent).
2 Annotate such class with @Id and pass the name of the id that will be used as the entity instance unique identifier.
3 Make sure to annotate such class with @TypeId and pass a unique identifier for this entity type.
4 Use Spring’s RequestMapping annotation to define the route to your entity.
The @Id value cartId must match a path parameter (i.e. cartId) and such value needs to be unique per entity. On the other hand, the @TypeId value shopping-cart is common for all instances of this entity but must be stable - cannot be changed after a production deploy - and unique across the different entity types.

Updating state

Having created the basis of our entity, we will now define how each command is handled. In the example below, we define a new endpoint that will add a new line item to a given shopping cart. It returns an Effect to emit an event and then sends a reply once the event is stored successfully. The state is updated by the event handler.

The only way for a command handler to modify the Entity’s state is by emitting an event. Any modifications made directly to the state (or instance variables) from the command handler are not persisted. When the Entity is passivated and reloaded, those modifications will not be present.
src/main/java/com/example/shoppingcart/ShoppingCartEntity.java
@PostMapping("/add")
public Effect<String> addItem(@RequestBody LineItem item) {
  if (currentState().checkedOut())
    return effects().error("Cart is already checked out.");
  if (item.quantity() <= 0) { (1)
    return effects().error("Quantity for item " + item.productId() + " must be greater than zero.");
  }

  var event = new ShoppingCartEvent.ItemAdded(item); (2)

  return effects()
      .emitEvent(event) (3)
      .thenReply(newState -> "OK"); (4)
}

@EventHandler (5)
public ShoppingCart itemAdded(ShoppingCartEvent.ItemAdded itemAdded) {
  return currentState().onItemAdded(itemAdded); (6)
}
1 The validation ensures the quantity of items added is greater than zero and it fails for calls with illegal values by returning an Effect with effects().error.
2 From the current incoming LineItem we create a new ItemAdded event representing the change of the cart.
3 We store the event by returning an Effect with effects().emitEvent.
4 The acknowledgment that the command was successfully processed is only sent if the event was successfully stored and applied, otherwise there will be an error reply. The lambda parameter newState gives us access to the new state returned by applying such event.
5 Event handler needs to be marked with @EventHandler and receive a single parameter type matching the event type produced (i.e. ItemAdded).
6 Return the new state to be stored - the logic for state transition is defined inside the ShoppingCart domain model.
There needs to be one event handler declared per each type of event the ES entity emits (e.g. itemAdded receives a parameter of type ItemAdded, the same type emitted in addItem command handler).

As mentioned above, the business logic that allows us to transition between states was placed on the domain model as seen below:

src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public ShoppingCart onItemAdded(ShoppingCartEvent.ItemAdded itemAdded) {
  var item = itemAdded.item();
  var lineItem = updateItem(item, this); (1)
  List<LineItem> lineItems =
      removeItemByProductId(this, item.productId()); (2)
  lineItems.add(lineItem); (3)
  lineItems.sort(Comparator.comparing(LineItem::productId));
  return new ShoppingCart(cartId, lineItems, checkedOut); (4)
}

private static List<LineItem> removeItemByProductId(
    ShoppingCart cart, String productId) {
  return cart.items().stream()
      .filter(lineItem -> !lineItem.productId().equals(productId))
      .collect(Collectors.toList());
}

private static LineItem updateItem(LineItem item, ShoppingCart cart) {
  return cart.findItemByProductId(item.productId())
      .map(li -> li.withQuantity(li.quantity() + item.quantity()))
      .orElse(item);
}

public Optional<LineItem> findItemByProductId(String productId) {
  Predicate<LineItem> lineItemExists =
      lineItem -> lineItem.productId().equals(productId);
  return items.stream().filter(lineItemExists).findFirst();
}
1 For an existing item, we will make sure to sum the existing quantity with the incoming one.
2 Returns an update list of items without the existing item.
3 Adds the update item to the shopping cart.
4 Returns a new instance of the shopping cart with the updated line items.

Retrieving state

To have access to the current state of the entity we can use currentState() as you have probably noticed from the examples above. However, what if this is the first command we are receiving for this entity? The following example shows the implementation of the read-only command handler (accessed through GET /cart/myCarId):

src/main/java/com/example/shoppingcart/ShoppingCartEntity.java
private final String entityId;

public ShoppingCartEntity(EventSourcedEntityContext context) {
  this.entityId = context.entityId(); (1)
}

@Override
public ShoppingCart emptyState() { (2)
  return new ShoppingCart(entityId, Collections.emptyList(), false);
}

@GetMapping() (3)
public Effect<ShoppingCart> getCart() {
  return effects().reply(currentState()); (4)
}
1 Stores the entityId on an internal attribute so we can use it later.
2 Provides initial state - overriding emptyState() is optional but if not doing it, be careful to deal with a currentState() with a null value when receiving the first command or event.
3 Marks this method as a command handler for GET requests.
4 Returns the current state as reply for the request.
For simplicity purposes, we are returning the internal state directly back to the requester. In a real-world scenario, it’s usually better to instead convert this internal domain model into a public model so the internal representation is free to evolve without breaking clients code.

Snapshots

Snapshots are an important optimization for Event Sourced Entities that emit many events. Rather than reading the entire journal upon loading or restart, Kalix can initiate them from a snapshot.

Snapshots are stored and handled automatically by Kalix without any specific code required. Snapshots are stored after a configured number of events:

src/main/resources/application.conf
kalix.event-sourced-entity.snapshot-every = 100

When the Event Sourced Entity is loaded again, the snapshot will be loaded before any other events are received.

Deleting an Entity

Normally, Event Sourced Entities are not deleted because the history of the events typically provide business value. For certain use cases or for regulatory reasons the entity can be deleted.

src/main/java/com/example/shoppingcart/ShoppingCartEntity.java
@PostMapping("/checkout")
public Effect<String> checkout() {
  if (currentState().checkedOut())
    return effects().error("Cart is already checked out.");

  return effects()
      .emitEvent(new ShoppingCartEvent.CheckedOut()) (1)
      .deleteEntity() (2)
      .thenReply(newState -> "OK"); (4)
}
1 Emit final event before deletion, which is handled as any other event.
2 Instruction to delete the entity.

When you give the instruction to delete the entity it will still exist from some time, including its events and snapshots. The actual removal of events and snapshots will be deleted later to give downstream consumers time to process all prior events, including the final event that was emitted together with the deleteEntity effect. By default, the existence of the entity is completely cleaned up after a week.

It is not allowed to emit more events after the entity has been "marked" as deleted. You can still handle read requests of the entity until it has been completely removed.

It is best to not reuse the same entity id after deletion, but if that happens after the entity has been completely removed it will be instantiated as a completely new entity without any knowledge of previous state.

Note that deleting View state must be handled explicitly.

Running Side Effects

An Entity may also emit one or more side effects. A side effect is something whose result has no impact on the result of the current command—​if it fails, the current command still succeeds. The result of the side effect is therefore ignored. When used from inside an Entity, side effects are only performed after the successful completion of any state actions requested by the command handler.

See this dedicated section regarding Actions, for more details.

Testing the Entity

There are two ways to test an Entity:

  • Unit tests, which run the Entity class in the same JVM as the test code itself with the help of a test kit.

  • Integration tests, with the service deployed in a docker container running the entire service and the test interacting with it over HTTP requests.

Each way has its benefits, unit tests are faster and provide more immediate feedback about success or failure but can only test a single entity at a time and in isolation. Integration tests, on the other hand, are more realistic and allow many entities to interact with other components inside and outside the service.

Unit tests

The following snippet shows how the EventSourcedTestKit is used to test the ShoppingCartEntity implementation. Kalix provides two main APIs for unit tests, the EventSourcedTestKit and the EventSourcedResult. The former gives us the overall state of the entity and all the events produced by all the calls to the Entity. While the latter only holds the effects produced for each individual call to the Entity.

src/test/java/com/example/shoppingcart/domain/ShoppingCartTest.java
package com.example.shoppingcart.domain;

import com.example.shoppingcart.ShoppingCartEntity;
import kalix.javasdk.testkit.EventSourcedResult;
import kalix.javasdk.testkit.EventSourcedTestKit;
import org.junit.jupiter.api.Test;

import java.util.List;

import static com.example.shoppingcart.domain.ShoppingCartEvent.*;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class ShoppingCartTest {

  private final ShoppingCart.LineItem akkaTshirt = new ShoppingCart.LineItem("kalix-tshirt", "Akka Tshirt", 10);


  @Test
  public void testAddLineItem() {

    var testKit = EventSourcedTestKit.of(ShoppingCartEntity::new); (1)

    {
      var result = testKit.call(e -> e.addItem(akkaTshirt)); (2)
      assertEquals("OK", result.getReply()); (3)

      var itemAdded = result.getNextEventOfType(ItemAdded.class);
      assertEquals(10, itemAdded.item().quantity()); (4)
    }

    // actually we want more akka tshirts
    {
      var result = testKit.call(e -> e.addItem( akkaTshirt.withQuantity(5))); (5)
      assertEquals("OK", result.getReply());

      var itemAdded = result.getNextEventOfType(ItemAdded.class);
      assertEquals(5, itemAdded.item().quantity());
    }

    {
      assertEquals(testKit.getAllEvents().size(), 2); (6)
      var result = testKit.call(ShoppingCartEntity::getCart); (7)
      assertEquals(
        new ShoppingCart("testkit-entity-id", List.of(akkaTshirt.withQuantity(15)), false),
        result.getReply());
    }

  }

}
1 Creates the TestKit passing the constructor of the Entity.
2 Calls the method addItem from the Entity in the EventSourcedTestKit with quantity 10.
3 Asserts the return value is "OK".
4 Returns the next event of type IdemAdded and asserts on the quantity.
5 Add a new item with quantity 5.
6 Asserts that the total number of events should be 2.
7 Calls the getCart method and asserts that quantity should be 15.
The EventSourcedTestKit is stateful, and it holds the state of a single entity instance in memory. If you want to test more than one entity in a test, you need to create multiple instance of EventSourcedTestKit.

EventSourcedResult

Calling a command handler through the TestKit gives us back an EventSourcedResult new tab. This class has methods that we can use to assert the handling of the command, such as:

  • getReply() - the response from the command handler if there was one, if not an, exception is thrown, failing the test.

  • getAllEvents() - all the events emitted by handling the command.

  • getState() - the state of the entity after applying any events the command handler emitted.

  • getNextEventOfType(ExpectedEvent.class) - check the next of the emitted events against an event type, return it for inspection if it matches, or fail the test if it does not. The event gets consumed once is inspected and the next call will look for a subsequent event.

EventSourcedTestKit

For the above example, this class provides access to all the command handlers of the ShoppingCart entity for unit testing. In addition to that also has the following methods:

  • getState() - the current state of the entity, it is updated on each method call emitting events.

  • getAllEvents() - all events emitted since the creation of the testkit instance.

Integration tests

The skeleton of an Integration Test is generated for you if you use the archetype to start your Kalix app. Let’s see what it could look like to test our ShoppingCartEntity:

/src/it/java/com/example/IntegrationTest.java
import kalix.spring.testkit.KalixIntegrationTestKitSupport;
// ...

@SpringBootTest(classes = Main.class)
public class IntegrationTest extends KalixIntegrationTestKitSupport { (1)

  @Autowired
  private WebClient webClient; (2)

  private Duration timeout = Duration.of(5, SECONDS);

  @Test
  public void createAndManageCart() {

    String cartId = "card-abc";
    ResponseEntity<String> created =
      webClient.post() (3)
        .uri("/cart/" + cartId + "/create")
        .retrieve()
        .toEntity(String.class)
        .block(timeout);
    assertEquals(HttpStatus.OK, created.getStatusCode());

    var item1 = new LineItem("tv", "Super TV 55'", 1);
    ResponseEntity<String> itemOne =
      webClient.post() (4)
        .uri("/cart/" + cartId + "/add")
        .bodyValue(item1)
        .retrieve()
        .toEntity(String.class)
        .block(timeout);
    assertEquals(HttpStatus.OK, itemOne.getStatusCode());

    ShoppingCart cartUpdated =
      webClient.get() (5)
        .uri("/cart/" + cartId)
        .retrieve()
        .bodyToMono(ShoppingCart.class)
        .block(timeout);
    assertEquals(1, cartUpdated.items().size());
    assertEquals(item2, cartUpdated.items().get(0));
  }
}
1 Note the test class must extend KalixIntegrationTestKitSupport.
2 A built-in web-client is provided to interact with the components.
3 Request to create a new shopping cart with id cart-abc.
4 Request to add an item to the cart.
5 GET request to retrieve current status of the shopping cart and assert there should only be one item.
The integration tests in samples are under in a specific project profile it and can be run using mvn verify -Pit.