Implementing Event Sourced Entities in Java

Event Sourced Entities persist their state with ACID semantics new tab, scale horizontally, and isolate failures. They use the Event Sourcing Model—​rather than persisting the current state, they persist all of the events that led to the current state. Akka Serverless stores these events in a journal.

An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.

When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?

To load an Entity, Akka Serverless reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Akka Serverless to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.

In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.

Event Sourced Entities offer strong consistency guarantees. Akka Serverless distributes Entities across every node in a stateful service deployment—​at any given time, each Entity will live on exactly one node. If a command for an Entity arrives to a node not hosting that Entity, the command is forwarded by the proxy to the node that contains that particular Entity. This forwarding is done transparently, your code does not need to know. Because each Entity lives on exactly one node, that node can handle messages for each Entity sequentially. Hence, there are no concurrency concerns relating to Event Sourced Entities, each Entity handles one message at a time.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing new tab.

Creating an Event Sourced Entity

Create an Event Sourced Entity by annotating it with the @EventSourcedEntity new tab annotation.

package shopping.cart;


@EventSourcedEntity(entityType = "eventsourced-shopping-cart")
public class ShoppingCartEntity {
  private final String entityId;
  private final Map<String, ShoppingCartApi.LineItem> cart = new LinkedHashMap<>();
  private long checkedOutTimestamp = 0L;

  public ShoppingCartEntity(@EntityId String entityId) {
    this.entityId = entityId;
  }

The entityType new tab provides a namespace for journal events. Use the simple name for the Entity class. To have a more unique reference, the example above uses eventsourced-shopping-cart.

Serializing

Event Sourced Entities persist events and snapshots, and these need to be serialized. Akka Serverless automatically detects if an emitted event is in protobuf format, and serializes it as such. For other serialization options, including JSON, see Serialization options for Java services.

While protobuf is the format for persisting events, we recommend that you do not persist the protobuf messages from your service interface. Rather, create new messages, even if they are identical. While this may introduce some type conversion overhead, it allows the public interface of the service to evolve independently from the private data storage format.

For the shopping cart example, shopping_cart_domain.proto defines the Entity’s domain objects:

syntax = "proto3";

package shopping.cart.domain;

option java_outer_classname = "ShoppingCartDomain";

import "akkaserverless/annotations.proto";

message LineItem {
  string productId = 1;
  string name = 2;
  int32 quantity = 3;
}

// The item added event.
message ItemAdded {
  LineItem item = 1;
}

// The item removed event.
message ItemRemoved {
  string productId = 1;
  int32 quantity = 2;
}

// The checked out event.
message CheckedOut {
  int64 checked_out_timestamp = 1;
}

// The shopping cart state.
message CartState {
  repeated LineItem items = 1;
  int64 checked_out_timestamp = 2;
}

Storing state

Each Entity should store its state locally in a mutable variable, either a mutable field or a mutable structure such as a collection. In this example, the state is coded as a map of the product id values to products.

    private final String entityId;
    private final Map<String, ShoppingCartApi.LineItem> cart = new LinkedHashMap<>();
    private long checkedOutTimestamp = 0L;

Constructing

Akka Serverless constructs instances of the Event Sourced Entity class on demand. The constructor below shows how the unique ID of the entity is injected and stored in entityId new tab.

    public ShoppingCartEntity(@EntityId String entityId) {
      this.entityId = entityId;
    }

Handling commands

Add your business logic by implementing Command handlers that are invoked by incoming messages. Declare a command handler by annotating a method with @CommandHandler. By default, the name of the command that the method handles will be the name of the method with the first letter capitalized. So, a method called getCart will handle a gRPC service call command named GetCart. This can be overridden by setting the name parameter on the @CommandHandler new tab annotation.

The command handler also can take the gRPC service call input type as a parameter to receive the command message. This is optional. For example, our GetCart service call doesn’t need any information from the message, since it’s just returning the current state. Meanwhile, the AddItem service call does need information from the message, since it needs to know the product id, description and quantity to add to the cart. The return type of the command handler must be the output type for the gRPC service call, this will be sent as the reply.

The following shows the implementation of the GetCart command handler:

@CommandHandler
public ShoppingCartApi.Cart getCart() {
  return createApiCart();
}

Emitting events

Commands can modify state by emitting events. A command handler can emit an event by taking in a CommandContext new tab parameter, and invoking the emit new tab method on it.

The only way for a command handler to modify Entity state is by emitting an event. Any modifications made directly to the state from the command handler are not persisted. When the Entity is passivated and reloaded, those modifications will not be present.
import com.akkaserverless.javasdk.eventsourcedentity.*;
import com.google.protobuf.Empty;

  @CommandHandler
  public Empty addItem(ShoppingCartApi.AddLineItem item, CommandContext context) {
    if (checkedOutTimestamp > 0) {
      throw context.fail("Cannot add item to checked out cart.");
    }
    if (item.getQuantity() <= 0) {
      throw context.fail("Cannot add negative quantity of to item" + item.getProductId());
    }
    ShoppingCartDomain.ItemAdded itemAddedEvent =
        ShoppingCartDomain.ItemAdded.newBuilder()
            .setItem(
                ShoppingCartDomain.LineItem.newBuilder()
                    .setProductId(item.getProductId())
                    .setName(item.getName())
                    .setQuantity(item.getQuantity())
                    .build())
            .build();
    context.emit(itemAddedEvent);
    return Empty.getDefaultInstance();
  }

This command handler also validates the command, ensuring the quantity of items added is greater than zero. Invoking context.fail new tab fails the command - this method throws - no need to explicitly throw an exception.

Handling events

Event handlers update the state of the Event Sourced Entity, based on an event. Event handlers are invoked:

  1. When restoring Event Sourced Entities from the journal, before any commands are handled.

  2. Each time a new event is emitted.

Event handlers are the only safe place to mutate Entity state.

Declare an event handler by annotating a method with @EventHandler new tab. They take a context class of type EventContext new tab.

A single Entity can have multiple event handlers. Akka Serverless chooses one based on the type of event they handle. By default, Akka Serverless looks for the event handler’s single non-context parameter. Some event handler methods do not take a non-context parameter, because the event type is all that is necessary to handle the event. In this case, specify the type using the eventClass new tab parameter on the @EventHandler annotation.

Event handlers may be declared for a superclass or interface based on type. For example, an event handler that declares an Object parameter handles all events. In the case where multiple event handlers match, Akka Serverless chooses the most specific event handler from all superclass interfaces.

The following example shows an event handler for the ItemAdded event with a utility method, convert:

private final String entityId;
private final Map<String, ShoppingCartApi.LineItem> cart = new LinkedHashMap<>();
private long checkedOutTimestamp = 0L;

@EventHandler
public void itemAdded(ShoppingCartDomain.ItemAdded itemAdded) {
  ShoppingCartApi.LineItem item = cart.get(itemAdded.getItem().getProductId());
  if (item == null) {
    item = convert(itemAdded.getItem());
  } else {
    item =
        item.toBuilder()
            .setQuantity(item.getQuantity() + itemAdded.getItem().getQuantity())
            .build();
  }
  cart.put(item.getProductId(), item);
}

private ShoppingCartApi.LineItem convert(ShoppingCartDomain.LineItem item) {
  return ShoppingCartApi.LineItem.newBuilder()
      .setProductId(item.getProductId())
      .setName(item.getName())
      .setQuantity(item.getQuantity())
      .build();
}

Producing and handling snapshots

Snapshots are an important optimization for Event Sourced Entities that emit many events. Rather than reading the entire journal upon loading or restart, Akka Serverless can initiate them from a snapshot.

To produce a snapshot, declare a method annotated with @Snapshot new tab. It takes a context class of type SnapshotContext new tab, and must return a snapshot of the current state in serializable form.

@Snapshot
public ShoppingCartDomain.CartState snapshot() {
  return ShoppingCartDomain.CartState.newBuilder()
      .addAllItems(cart.values().stream().map(this::convert).collect(Collectors.toList()))
      .setCheckedOutTimestamp(checkedOutTimestamp)
      .build();
}

private ShoppingCartDomain.LineItem convert(ShoppingCartApi.LineItem item) {
  return ShoppingCartDomain.LineItem.newBuilder()
      .setProductId(item.getProductId())
      .setName(item.getName())
      .setQuantity(item.getQuantity())
      .build();
}

When the Event Sourced Entity is loaded again, the snapshot will be loaded before any other events are received, and passed to a snapshot handler. Snapshot handlers are declared by annotating a method with @SnapshotHandler new tab, and it can take a context class of type SnapshotContext new tab.

Multiple snapshot handlers may be defined to handle different types of snapshots. The type matching is done in the same way as for events.

@SnapshotHandler
public void handleSnapshot(ShoppingCartDomain.CartState cart) {
  this.cart.clear();
  for (ShoppingCartDomain.LineItem item : cart.getItemsList()) {
    this.cart.put(item.getProductId(), convert(item));
  }
  this.checkedOutTimestamp = cart.getCheckedOutTimestamp();
}

Registering the Entity

Once you’ve created your Entity, you can register it with the AkkaServerless new tab server by invoking the registerEventSourcedEntity method. In addition to passing your Event Sourced Entity class and protobuf service descriptor, you also need to pass any descriptors that you use for persisting events, for example, ShoppingCart as defined in the shopping-cart-domain.proto descriptor.

package shopping;

import com.akkaserverless.javasdk.AkkaServerless;

public final class Main {
  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  public static final AkkaServerless SERVICE =
      new AkkaServerless()
          // event sourced shopping cart entity
          // receives commands from outside the service and persists events to its journal/event log
          .registerEventSourcedEntity(
              ShoppingCartEntity.class,
              ShoppingCartApi.getDescriptor().findServiceByName("ShoppingCartService"),
              ShoppingCartDomain.getDescriptor())

  public static final void main(String[] args) throws Exception {
    LOG.info("started");
    SERVICE.start().toCompletableFuture().get();
  }
}