Implementing Event Sourced Entities in Java

Event Sourced Entities persist their state with ACID semantics new tab, scale horizontally, and isolate failures. They use the Event Sourcing Model—​rather than persisting the current state, they persist all of the events that led to the current state. Akka Serverless stores these events in a journal.

An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.

When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?

To load an Entity, Akka Serverless reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Akka Serverless to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.

In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.

Event Sourced Entities offer strong consistency guarantees. Akka Serverless distributes Entities across every node in a stateful service deployment—​at any given time, each Entity will live on exactly one node. If a command for an Entity arrives to a node not hosting that Entity, the command is forwarded by the proxy to the node that contains that particular Entity. This forwarding is done transparently, your code does not need to know. Because each Entity lives on exactly one node, that node can handle messages for each Entity sequentially. Hence, there are no concurrency concerns relating to Event Sourced Entities, each Entity handles one message at a time.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing new tab.

Event Sourced Entities persist changes as events and snapshots. Akka Serverless needs to serialize that data to send it to the underlying data store, this is done with Protocol Buffers using protobuf types.

While Protocol Buffers are the recommended format for persisting state, we recommend that you do not persist your service’s public protobuf messages. This may introduce some overhead to convert from one type to the other but allows the service public interface logic to evolve independently of the data storage format, which should be private.

The steps necessary to implement an Event Sourced Entity include:

  1. Defining the API and domain objects in .proto files.

  2. Implementing behavior in command and event handlers.

  3. Creating and initializing the Entity.

The sections on this page walk through these steps using a shopping cart service as an example.

Defining the proto files

Our Event Sourced Entity example is a shopping cart service.

The following shoppingcart_domain.proto file defines our "Shopping" Event Sourced Entity. The entity manages line items of a cart and stores events ItemAdded and ItemRemoved to represent changes to the cart. Real-world entities store much more data — often structured data —  they represent an Entity in the domain-driven design sense of the term.

src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// These are the messages that get persisted - the events, plus the current
// state (Cart) for snapshots.

syntax = "proto3";

package com.example.shoppingcart.domain; (1)

import "akkaserverless/annotations.proto"; (2)

option java_outer_classname = "ShoppingCartDomain"; (3)

// Describes how this domain relates to an event sourced entity
option (akkaserverless.file).event_sourced_entity = { (4)
  name: "ShoppingCart" (5)
  entity_type: "shopping-cart" (6)
  state: "Cart" (7)
  events: ["ItemAdded", "ItemRemoved"] (8)
};

message LineItem {
  string productId = 1;
  string name = 2;
  int32 quantity = 3;
}

// The item added event.
message ItemAdded {
  LineItem item = 1;
}

// The item removed event.
message ItemRemoved {
  string productId = 1;
}

// The shopping cart state.
message Cart {
  repeated LineItem items = 1;
}
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.domain.
2 Import the Akka Serverless protobuf annotations or options.
3 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartDomain.
4 The protobuf option (akkaserverless.file).event_sourced_entity is specific to code-generation as provided by the Akka Serverless Maven plugin.
5 name denotes the base name for the Event Sourced Entity, the code-generation will create initial sources ShoppingCart, ShoppingCartTest and ShoppingCartIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
6 entity_type is a unique identifier of the "state storage". The entity name may be changed even after data has been created, the entity_type can’t.
7 state points to the protobuf message representing the entity’s state which is kept by Akka Serverless. It is stored as snapshots.
8 events points to the protobuf message representing the entity’s events, which are stored by Akka Serverless.

The shoppingcart_api.proto file defines the commands we can send to the shopping cart service to manipulate or access the cart’s state. They make up the service API:

src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the shopping cart entity.

syntax = "proto3";

package com.example.shoppingcart; (1)

import "akkaserverless/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "ShoppingCartApi"; (3)

message AddLineItem { (4)
  string cart_id = 1 [(akkaserverless.field).entity_key = true]; (5)
  string product_id = 2;
  string name = 3;
  int32 quantity = 4;
}

message RemoveLineItem {
  string cart_id = 1 [(akkaserverless.field).entity_key = true];
  string product_id = 2;
}

message GetShoppingCart {
  string cart_id = 1 [(akkaserverless.field).entity_key = true];
}

message LineItem {
  string product_id = 1;
  string name = 2;
  int32 quantity = 3;
}

message Cart { (6)
  repeated LineItem items = 1;
}

service ShoppingCartService { (7)
  option (akkaserverless.service) = { (8)
    type: SERVICE_TYPE_ENTITY
    component: "com.example.shoppingcart.domain.ShoppingCart"
  };

  rpc AddItem (AddLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/add"
      body: "*"
    };
  }

  rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) {
    option (google.api.http).post = "/cart/{cart_id}/items/{product_id}/remove";
  }

  rpc GetCart (GetShoppingCart) returns (Cart) {
    option (google.api.http) = {
      get: "/carts/{cart_id}"
      additional_bindings: {
          get: "/carts/{cart_id}/items"
          response_body: "items"
      } };
  }
}
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.
2 Import the Akka Serverless protobuf annotations or options.
3 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartApi.
4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
5 Every Command must contain a string field that contains the entity ID and is marked with the (akkaserverless.field).entity_key option.
6 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
7 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
8 The protobuf option (akkaserverless.service) is specific to code-generation as provided by the Akka Serverless Maven plugin and points to the protobuf definition ShoppingCart we’ve seen above (in the com.example.shoppingcart.domain package).

Implementing behavior

An Event Sourced Entity implementation is a Java class where you define how each command is handled. The class ShoppingCart gets generated for us based on the shoppingcart_api.proto and shoppingcart_domain.proto definitions. Once the ShoppingCart.java file exists, it is not overwritten, so you can freely add logic to it. ShoppingCart extends the generated class AbstractShoppingCart which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors. AbstractShoppingCart contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the ShoppingCart class and you have to implement the methods required by AbstractShoppingCart.

src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public class ShoppingCart extends AbstractShoppingCart { (1)
  @SuppressWarnings("unused")
  private final String entityId;

  public ShoppingCart(EventSourcedEntityContext context) { this.entityId = context.entityId(); }

  @Override
  public ShoppingCartDomain.Cart emptyState() { (2)
    return ShoppingCartDomain.Cart.getDefaultInstance();
  }
1 Extends the generated AbstractShoppingCart, which extends EventSourcedEntity new tab.
2 Defines the initial, empty, state that is used before any updates.

We need to implement all methods our Event Sourced Entity offers as command handlers.

The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.

Command handlers are implemented in the ShoppingCart class as methods that override abstract methods from AbstractShoppingCart. The methods take the current state as the first parameter and the request message as the second parameter. They return an Effect, which describes the next processing actions, such as emitting events and sending a reply.

When adding or changing the rpc definitions, including name, parameter, and return messages, in the .proto files the corresponding methods are regenerated in the abstract class (AbstractShoppingCart). This means that the Java compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.

Updating state

In the example below, the AddItem service call uses the request message AddLineItem. It returns an Effect to emit an event and then sends a reply once the event is stored successfully. The state is updated by the event handler.

The only way for a command handler to modify the Entity’s state is by emitting an event. Any modifications made directly to the state (or instance variables) from the command handler are not persisted. When the Entity is passivated and reloaded, those modifications will not be present.
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<Empty> addItem(
    ShoppingCartDomain.Cart currentState,
    ShoppingCartApi.AddLineItem command) {
  if (command.getQuantity() <= 0) { (1)
    return effects().error("Quantity for item " + command.getProductId() + " must be greater than zero.");
  }

  ShoppingCartDomain.ItemAdded event = (2)
      ShoppingCartDomain.ItemAdded.newBuilder()
          .setItem(
              ShoppingCartDomain.LineItem.newBuilder()
                  .setProductId(command.getProductId())
                  .setName(command.getName())
                  .setQuantity(command.getQuantity())
                  .build())
          .build();

  return effects()
          .emitEvent(event) (3)
          .thenReply(newState -> Empty.getDefaultInstance()); (4)
}
1 The validation ensures the quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect with effects().error.
2 From the current incoming AddLineItem we create a new ItemAdded event representing the change of the cart.
3 We store the event by returning an Effect with effects().emit.
4 The acknowledgment that the command was successfully processed is only sent if the event was successfully stored, otherwise there will be an error reply.

The new state is created from the event and the previous state in the event handler. Event handlers are implemented in the ShoppingCart class as methods that override abstract methods from AbstractShoppingCart.

src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public ShoppingCartDomain.Cart itemAdded(
    ShoppingCartDomain.Cart currentState,
    ShoppingCartDomain.ItemAdded itemAdded) {
  ShoppingCartDomain.LineItem item = itemAdded.getItem();
  ShoppingCartDomain.LineItem lineItem = updateItem(item, currentState);
  List<ShoppingCartDomain.LineItem> lineItems =
      removeItemByProductId(currentState, item.getProductId());
  lineItems.add(lineItem);
  lineItems.sort(Comparator.comparing(ShoppingCartDomain.LineItem::getProductId));
  return ShoppingCartDomain.Cart.newBuilder().addAllItems(lineItems).build();
}

private ShoppingCartDomain.LineItem updateItem(
    ShoppingCartDomain.LineItem item, ShoppingCartDomain.Cart cart) {
  return findItemByProductId(cart, item.getProductId())
      .map(li -> li.toBuilder().setQuantity(li.getQuantity() + item.getQuantity()).build())
      .orElse(item);
}

private Optional<ShoppingCartDomain.LineItem> findItemByProductId(
    ShoppingCartDomain.Cart cart, String productId) {
  Predicate<ShoppingCartDomain.LineItem> lineItemExists =
      lineItem -> lineItem.getProductId().equals(productId);
  return cart.getItemsList().stream().filter(lineItemExists).findFirst();
}

private List<ShoppingCartDomain.LineItem> removeItemByProductId(
    ShoppingCartDomain.Cart cart, String productId) {
  return cart.getItemsList().stream()
      .filter(lineItem -> !lineItem.getProductId().equals(productId))
      .collect(Collectors.toList());
}

Note that you have to define the events in the (akkaserverless.file).event_sourced_entity:

src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// Describes how this domain relates to an event sourced entity
option (akkaserverless.file).event_sourced_entity = { (4)
  name: "ShoppingCart" (5)
  entity_type: "shopping-cart" (6)
  state: "Cart" (7)
  events: ["ItemAdded", "ItemRemoved"] (8)
};

Retrieving state

The following example shows the implementation of the GetCart command handler. This command handler is a read-only command handler—​it doesn’t update the state, it just returns it:

src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<ShoppingCartApi.Cart> getCart(
    ShoppingCartDomain.Cart currentState, (1)
    ShoppingCartApi.GetShoppingCart command) {
  List<ShoppingCartApi.LineItem> apiItems =
      currentState.getItemsList().stream()
          .map(this::convert)
          .sorted(Comparator.comparing(ShoppingCartApi.LineItem::getProductId))
          .collect(Collectors.toList());
  ShoppingCartApi.Cart apiCart =
          ShoppingCartApi.Cart.newBuilder().addAllItems(apiItems).build(); (2)
  return effects().reply(apiCart);
}

private ShoppingCartApi.LineItem convert(ShoppingCartDomain.LineItem item) {
  return ShoppingCartApi.LineItem.newBuilder()
          .setProductId(item.getProductId())
          .setName(item.getName())
          .setQuantity(item.getQuantity())
          .build();
}
1 The current state is passed to the method.
2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect with effects().reply.

Registering the Entity

To make Akka Serverless aware of the Event Sourced Entity, we need to register it with the service.

From the code-generation, the registration gets automatically inserted in the generated AkkaServerlessFactory.withComponents method from the Main class.

src/main/java/com/example/shoppingcart/Main.java
/* This code was generated by Akka Serverless tooling.
 * As long as this file exists it will not be re-generated.
 * You are free to make changes to this file.
 */
//tag::RegisterEventSourcedEntity[]
package com.example.shoppingcart;

import com.akkaserverless.javasdk.AkkaServerless;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.shoppingcart.domain.ShoppingCart;

public final class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  public static AkkaServerless createAkkaServerless() {
    // The AkkaServerlessFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `new AkkaServerless()` instance.
    return AkkaServerlessFactory.withComponents(
        ShoppingCart::new
    );
  }

  public static void main(String[] args) throws Exception {
    LOG.info("starting the Akka Serverless service");
    createAkkaServerless().start();
  }
}
//end::RegisterEventSourcedEntity[]

By default, the generated constructor has an EventSourcedEntityContext parameter, but you can change this to accept other parameters. If you change the constructor of the ShoppingCart class you will see a compilation error here, and you have to adjust the factory function that is passed to AkkaServerlessFactory.withComponents.

When more components are added the AkkaServerlessFactory is regenerated and you have to adjust the registration from the Main class.

Testing the Entity

The following snippet shows how the ShoppingCartTestKit is used to test the ShoppingCart implementation. AkkaServerless provides two main APIs for unit tests, the ShoppingCartTestKit and the EventSourcedResult. The former gives us the overall state of the entity. Its state and all the events produced by all the calls to the Entity. While the latter only holds the effects produced for each individual call to the Entity.

src/test/java/com/example/shoppingcart/domain/ShoppingCartTest.java
/* This code was initialised by Akka Serverless tooling.
 * As long as this file exists it will not be re-generated.
 * You are free to make changes to this file.
 */
package com.example.shoppingcart.domain;

import com.akkaserverless.javasdk.testkit.EventSourcedResult;
import com.example.shoppingcart.ShoppingCartApi;
import com.google.protobuf.Empty;
import org.junit.Test;

import java.util.NoSuchElementException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

public class ShoppingCartTest {

    @Test
    public void addItemTest() {

        ShoppingCartTestKit testKit = ShoppingCartTestKit.of(ShoppingCart::new); (1)

        ShoppingCartApi.AddLineItem apples = ShoppingCartApi.AddLineItem.newBuilder().setProductId("idA")
                .setName("apples").setQuantity(1).build();
        EventSourcedResult<Empty> addingApplesResult = testKit.addItem(apples); (2)

        ShoppingCartApi.AddLineItem bananas = ShoppingCartApi.AddLineItem.newBuilder().setProductId("idB")
                .setName("bananas").setQuantity(2).build();
        testKit.addItem(bananas); (3)

        assertEquals(1, addingApplesResult.getAllEvents().size()); (4)
        assertEquals(2, testKit.getAllEvents().size()); (5)

        ShoppingCartDomain.ItemAdded addedApples = addingApplesResult.getNextEventOfType(ShoppingCartDomain.ItemAdded.class); (6)
        assertEquals("apples", addedApples.getItem().getName());
        assertThrows(NoSuchElementException.class, () ->  addingApplesResult.getNextEventOfType(ShoppingCartDomain.ItemAdded.class)); (7)
        assertEquals(Empty.getDefaultInstance(), addingApplesResult.getReply()); (8)

        ShoppingCartDomain.LineItem expectedApples = ShoppingCartDomain.LineItem.newBuilder().setProductId("idA")
                .setName("apples").setQuantity(1).build();
        ShoppingCartDomain.LineItem expectedBananas = ShoppingCartDomain.LineItem.newBuilder().setProductId("idB")
                .setName("bananas").setQuantity(2).build();
        ShoppingCartDomain.Cart expectedState = ShoppingCartDomain.Cart.newBuilder()
                .addItems(expectedApples)
                .addItems(expectedBananas)
                .build();
        assertEquals(expectedState, testKit.getState()); (9)
    }
}
1 creating the TestKit passing the constructor of the Entity.
2 calling the method addItem from the Entity in the ShoppingCartTestKit.
3 calling the method addItem from the Entity in the ShoppingCartTestKit.
4 checking the EventSourcedResult of the first call to addItem.
5 checking the EventSourcedResult of all the calls to addItem.
6 retrieving the first event generated from the first call to addItem.
7 retrieving the second event generated from the first call to addItem. There is no such event as our implementation only generates one event when addItem it’s called.
8 retrieving the response from the call to addItem.
9 retrieving the state of the entity after all the calls to addItem.

EventSourcedResult

Calling an entity method through the TestKit gives us back an EventSourcedResult new tab. This class has methods that we can use to assert our expectations, such as:

  • getReply is the response of the method called in the test

  • getAllEvents are the events generated upon the method called in the test. This list can be inspected multiple times. This method is idempotent.

  • getNextEventOfType is the first of the events generated upon the method called in the test. This event gets consumed once is inspected. This method is not idempotent.

ShoppingCartTestKit

This class is generated by AkkaServerless when the project is compiled and located in target/generated-test-sources/akkaserveless/java/com/example/shoppingcart/domain/. The snippet ShoppingCartTest.java above shows all the available methods.

Snapshots

Snapshots are an important optimization for Event Sourced Entities that emit many events. Rather than reading the entire journal upon loading or restart, Akka Serverless can initiate them from a snapshot.

Snapshots are stored and handled automatically by Akka Serverless without any specific code required. Snapshots are stored after a configured number of events:

src/main/resources/application.conf
akkaserverless.event-sourced-entity.snapshot-every = 100

When the Event Sourced Entity is loaded again, the snapshot will be loaded before any other events are received.