Section 5: Complete Event Sourced entity

On this page, we will complete the cart entity with more commands and events. This ShoppingCart entity will use Event Sourcing to persist events that represent changes to the state of the cart.

Example entity
This part of the full example will focus on the shopping cart entity.

On this page you will learn how to:

  • implement an Event Sourced entity by expanding work from the previous steps and adding to the ShoppingCart:

    • Checkout - a command to checkout the shopping cart

    • CheckedOut - an event to capture checkouts

    • Get - a way get the current state of the shopping cart

At the end, we also provide a list of Optional commands and events that you can add on your own to test your knowledge.

Source downloads

If you prefer to simply view and run the example, download a zip file containing the completed code:

Java
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

Scala
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

1. Add the command and event for checkout

When the cart has been checked out it should not accept any commands that change its state. However, it should still be possible to Get the current state of a checked out cart. We suggest you try implementing Checkout on your own and then compare it with the solution shown below. Add the Checkout command alongside the existing AddItem command.

The following sections show our solution.

1.1. Checkout command

Java
src/main/java/shopping/cart/ShoppingCart.java
public static final class Checkout implements Command {
  final ActorRef<StatusReply<Summary>> replyTo;

  @JsonCreator
  public Checkout(ActorRef<StatusReply<Summary>> replyTo) {
    this.replyTo = replyTo;
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
final case class Checkout(replyTo: ActorRef[StatusReply[Summary]])
    extends Command

1.2. CheckedOut event

The corresponding CheckedOut event:

Java
src/main/java/shopping/cart/ShoppingCart.java
static final class CheckedOut extends Event {
  final Instant eventTime;

  public CheckedOut(String cartId, Instant eventTime) {
    super(cartId);
    this.eventTime = eventTime;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    CheckedOut that = (CheckedOut) o;
    return Objects.equals(eventTime, that.eventTime);
  }

  @Override
  public int hashCode() {
    return Objects.hash(eventTime);
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
final case class CheckedOut(cartId: String, eventTime: Instant) extends Event

1.3. State changes

The state should include a value for if and when the cart was checked out:

Java
src/main/java/shopping/cart/ShoppingCart.java
static final class State implements CborSerializable {
  final Map<String, Integer> items;
  private Optional<Instant> checkoutDate;

  public State() {
    this(new HashMap<>(), Optional.empty());
  }

  public State(Map<String, Integer> items, Optional<Instant> checkoutDate) {
    this.items = items;
    this.checkoutDate = checkoutDate;
  }

  public boolean isCheckedOut() {
    return checkoutDate.isPresent();
  }

  public State checkout(Instant now) {
    checkoutDate = Optional.of(now);
    return this;
  }

  public Summary toSummary() {
    return new Summary(items, isCheckedOut());
  }

  public boolean hasItem(String itemId) {
    return items.containsKey(itemId);
  }

  public State updateItem(String itemId, int quantity) {
    if (quantity == 0) {
      items.remove(itemId);
    } else {
      items.put(itemId, quantity);
    }
    return this;
  }

  public boolean isEmpty() {
    return items.isEmpty();
  }
}

public static final class Summary implements CborSerializable {
  final Map<String, Integer> items;
  final boolean checkedOut;

  public Summary(Map<String, Integer> items, boolean checkedOut) {
    // defensive copy since items is a mutable object
    this.items = new HashMap<>(items);
    this.checkedOut = checkedOut;
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
final case class State(items: Map[String, Int], checkoutDate: Option[Instant])
    extends CborSerializable {

  def isCheckedOut: Boolean =
    checkoutDate.isDefined

  def checkout(now: Instant): State =
    copy(checkoutDate = Some(now))

  def toSummary: Summary =
    Summary(items, isCheckedOut)

  def hasItem(itemId: String): Boolean =
    items.contains(itemId)

  def isEmpty: Boolean =
    items.isEmpty

  def updateItem(itemId: String, quantity: Int): State = {
    quantity match {
      case 0 => copy(items = items - itemId)
      case _ => copy(items = items + (itemId -> quantity))
    }
  }
}
object State {
  val empty =
    State(items = Map.empty, checkoutDate = None)
}

final case class Summary(items: Map[String, Int], checkedOut: Boolean)
    extends CborSerializable

1.4. Unit test

Add a unit test for the new Checkout command in ShoppingCartSpec ShoppingCartTest:

Java
src/test/java/shopping/cart/ShoppingCartTest.java
@Test
public void checkout() {
  CommandResultWithReply<
          ShoppingCart.Command,
          ShoppingCart.Event,
          ShoppingCart.State,
          StatusReply<ShoppingCart.Summary>>
      result1 =
          eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo));
  assertTrue(result1.reply().isSuccess());
  CommandResultWithReply<
          ShoppingCart.Command,
          ShoppingCart.Event,
          ShoppingCart.State,
          StatusReply<ShoppingCart.Summary>>
      result2 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.Checkout(replyTo));
  assertTrue(result2.reply().isSuccess());
  assertTrue(result2.event() instanceof ShoppingCart.CheckedOut);
  assertEquals(CART_ID, result2.event().cartId);

  CommandResultWithReply<
          ShoppingCart.Command,
          ShoppingCart.Event,
          ShoppingCart.State,
          StatusReply<ShoppingCart.Summary>>
      result3 =
          eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo));
  assertTrue(result3.reply().isError());
}
Scala
src/test/scala/shopping/cart/ShoppingCartSpec.scala
"checkout" in {
  val result1 =
    eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]](
      ShoppingCart.AddItem("foo", 42, _))
  result1.reply.isSuccess should ===(true)
  val result2 = eventSourcedTestKit
    .runCommand[StatusReply[ShoppingCart.Summary]](ShoppingCart.Checkout(_))
  result2.reply should ===(
    StatusReply.Success(
      ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true)))
  result2.event.asInstanceOf[ShoppingCart.CheckedOut].cartId should ===(
    cartId)

  val result3 =
    eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]](
      ShoppingCart.AddItem("bar", 13, _))
  result3.reply.isError should ===(true)
}

You will also have to update the "add item" to use the new Summary signature.

Commands should be handled differently when the cart has been checked out. AddItem is no longer allowed after checkout. Therefore, we refactor the handleCommand commandHandler method into two separate methods openShoppingCart and checkedOutShoppingCart that are used depending on the checkedOut state. The previous code for AddItem goes into the openShoppingCart method as well as the new Checkout command.

Java
src/main/java/shopping/cart/ShoppingCart.java
@Override
public CommandHandlerWithReply<Command, Event, State> commandHandler() {
  return openShoppingCart()
      .orElse(checkedOutShoppingCart())
      .build();
}
private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> openShoppingCart() {
  return newCommandHandlerWithReplyBuilder()
      .forState(state -> !state.isCheckedOut())
      .onCommand(AddItem.class, this::onAddItem)
      .onCommand(Checkout.class, this::onCheckout);
}

private ReplyEffect<Event, State> onAddItem(State state, AddItem cmd) {
  if (state.hasItem(cmd.itemId)) {
    return Effect()
        .reply(
            cmd.replyTo,
            StatusReply.error(
                "Item '" + cmd.itemId + "' was already added to this shopping cart"));
  } else if (cmd.quantity <= 0) {
    return Effect().reply(cmd.replyTo, StatusReply.error("Quantity must be greater than zero"));
  } else {
    return Effect()
        .persist(new ItemAdded(cartId, cmd.itemId, cmd.quantity))
        .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary()));
  }
}

private ReplyEffect<Event, State> onCheckout(State state, Checkout cmd) {
  if (state.isEmpty()) {
    return Effect()
        .reply(cmd.replyTo, StatusReply.error("Cannot checkout an empty shopping cart"));
  } else {
    return Effect()
        .persist(new CheckedOut(cartId, Instant.now()))
        .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary()));
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
private def handleCommand(
    cartId: String,
    state: State,
    command: Command): ReplyEffect[Event, State] = {
  // The shopping cart behavior changes if it's checked out or not.
  // The commands are handled differently for each case.
  if (state.isCheckedOut)
    checkedOutShoppingCart(cartId, state, command)
  else
    openShoppingCart(cartId, state, command)
}

private def openShoppingCart(
    cartId: String,
    state: State,
    command: Command): ReplyEffect[Event, State] = {
  command match {
    case AddItem(itemId, quantity, replyTo) =>
      if (state.hasItem(itemId))
        Effect.reply(replyTo)(
          StatusReply.Error(
            s"Item '$itemId' was already added to this shopping cart"))
      else if (quantity <= 0)
        Effect.reply(replyTo)(
          StatusReply.Error("Quantity must be greater than zero"))
      else
        Effect
          .persist(ItemAdded(cartId, itemId, quantity))
          .thenReply(replyTo) { updatedCart =>
            StatusReply.Success(updatedCart.toSummary)
          }

    case Checkout(replyTo) =>
      if (state.isEmpty)
        Effect.reply(replyTo)(
          StatusReply.Error("Cannot checkout an empty shopping cart"))
      else
        Effect
          .persist(CheckedOut(cartId, Instant.now()))
          .thenReply(replyTo)(updatedCart =>
            StatusReply.Success(updatedCart.toSummary))
  }
}

In checkedOutShoppingCart the AddItem and Checkout commands should be rejected:

Java
src/main/java/shopping/cart/ShoppingCart.java
private CommandHandlerWithReplyBuilderByState<Command, Event, State, State>
    checkedOutShoppingCart() {
  return newCommandHandlerWithReplyBuilder()
      .forState(State::isCheckedOut)
      .onCommand(
          AddItem.class,
          cmd ->
              Effect()
                  .reply(
                      cmd.replyTo,
                      StatusReply.error(
                          "Can't add an item to an already checked out shopping cart")))
      .onCommand(
          Checkout.class,
          cmd ->
              Effect()
                  .reply(
                      cmd.replyTo,
                      StatusReply.error("Can't checkout already checked out shopping cart")));
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
private def checkedOutShoppingCart(
    cartId: String,
    state: State,
    command: Command): ReplyEffect[Event, State] = {
  command match {
    case cmd: AddItem =>
      Effect.reply(cmd.replyTo)(
        StatusReply.Error(
          "Can't add an item to an already checked out shopping cart"))
    case cmd: Checkout =>
      Effect.reply(cmd.replyTo)(
        StatusReply.Error("Can't checkout already checked out shopping cart"))
  }
}

1.5. Event handler

We still need to add the event handler for the CheckedOut event in the handleEvent method:

Java
src/main/java/shopping/cart/ShoppingCart.java
@Override
public EventHandler<State, Event> eventHandler() {
  return newEventHandlerBuilder()
      .forAnyState()
      .onEvent(ItemAdded.class, (state, evt) -> state.updateItem(evt.itemId, evt.quantity))
      .onEvent(CheckedOut.class, (state, evt) -> state.checkout(evt.eventTime))
      .build();
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
private def handleEvent(state: State, event: Event): State = {
  event match {
    case ItemAdded(_, itemId, quantity) =>
      state.updateItem(itemId, quantity)
    case CheckedOut(_, eventTime) =>
      state.checkout(eventTime)
  }
}

1.6. Run unit tests

That should cover everything for the Checkout command. If you shut down the service after the previous exercise you will need to restart it using docker-compose up -d in the command line. Let’s confirm everything by running the unit tests with:

sbt test
mvn test

2. Add Get command

Add the Get command alongside the existing AddItem and Checkout commands:

Java
src/main/java/shopping/cart/ShoppingCart.java
public static final class Get implements Command {
  final ActorRef<Summary> replyTo;

  @JsonCreator
  public Get(ActorRef<Summary> replyTo) {
    this.replyTo = replyTo;
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
final case class Get(replyTo: ActorRef[Summary]) extends Command

Add a unit test for the new Get command in ShoppingCartSpec:

Java
src/test/java/shopping/cart/ShoppingCartTest.java
@Test
public void get() {
  CommandResultWithReply<
          ShoppingCart.Command,
          ShoppingCart.Event,
          ShoppingCart.State,
          StatusReply<ShoppingCart.Summary>>
      result1 =
          eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo));
  assertTrue(result1.reply().isSuccess());

  CommandResultWithReply<
          ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, ShoppingCart.Summary>
      result2 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.Get(replyTo));
  assertFalse(result2.reply().checkedOut);
  assertEquals(1, result2.reply().items.size());
  assertEquals(42, result2.reply().items.get("foo").intValue());
}
Scala
src/test/scala/shopping/cart/ShoppingCartSpec.scala
"get" in {
  val result1 =
    eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]](
      ShoppingCart.AddItem("foo", 42, _))
  result1.reply.isSuccess should ===(true)

  val result2 = eventSourcedTestKit.runCommand[ShoppingCart.Summary](
    ShoppingCart.Get(_))
  result2.reply should ===(
    ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false))
}

The command handler for Get is independent of the checkedOut state, so add it to both openShoppingCart and checkedOutShoppingCart so it can be added to the command handler builder using forAnyState():

Java
src/main/java/shopping/cart/ShoppingCart.java
@Override
public CommandHandlerWithReply<Command, Event, State> commandHandler() {
  return openShoppingCart()
      .orElse(checkedOutShoppingCart())
      .orElse(getCommandHandler())
      .build();
}
private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> getCommandHandler() {
  return newCommandHandlerWithReplyBuilder()
      .forAnyState()
      .onCommand(Get.class, (state, cmd) -> Effect().reply(cmd.replyTo, state.toSummary()));
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
case Get(replyTo) =>
  Effect.reply(replyTo)(state.toSummary)

Try the new Get command by running the unit tests with:

sbt test
mvn test

3. Add new operations to the service descriptor

In the existing ShoppingCartService.proto add corresponding operation definitions in the form of the two rpc calls listed below:

src/main/protobuf/ShoppingCartService.proto
service ShoppingCartService {
    rpc Checkout (CheckoutRequest) returns (Cart) {} (1)
    rpc GetCart (GetCartRequest) returns (Cart) {} (2)
}

message CheckoutRequest {
    string cartId = 1;
}

message GetCartRequest {
    string cartId = 1;
}

message Cart { (3)
    repeated Item items = 1;
    bool checkedOut = 2; (4)
}
1 Defines the Checkout operation.
2 Defines the GetCart operation.
3 For simplicity, most requests share a common response, for easier evolution of an interface, separate responses are often a better choice.
4 Note the new checkedOut flag.

Generate code by compiling the project:

sbt compile
mvn compile

You will see a compilation error in ShoppingCartServiceImpl.scala ShoppingCartServiceImpl.java, but that is expected with the changed Protobuf definition. We will fix that now.

Add implementations of the new operations in ShoppingCartServiceImpl in the same way as addItem:

Java
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
@Override
public CompletionStage<Cart> checkout(CheckoutRequest in) {
  logger.info("checkout {}", in.getCartId());
  EntityRef<ShoppingCart.Command> entityRef =
      sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId());
  CompletionStage<ShoppingCart.Summary> reply =
      entityRef.askWithStatus(replyTo -> new ShoppingCart.Checkout(replyTo), timeout);
  CompletionStage<Cart> cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart);
  return convertError(cart);
}

@Override
public CompletionStage<Cart> getCart(GetCartRequest in) {
  logger.info("getCart {}", in.getCartId());
  EntityRef<ShoppingCart.Command> entityRef =
      sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId());
  CompletionStage<ShoppingCart.Summary> reply =
      entityRef.ask(replyTo -> new ShoppingCart.Get(replyTo), timeout);
  CompletionStage<Cart> protoCart =
      reply.thenApply(
          cart -> {
            if (cart.items.isEmpty())
              throw new GrpcServiceException(
                  Status.NOT_FOUND.withDescription("Cart " + in.getCartId() + " not found"));
            else return toProtoCart(cart);
          });
  return convertError(protoCart);
}
Scala
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala
override def checkout(in: proto.CheckoutRequest): Future[proto.Cart] = {
  logger.info("checkout {}", in.cartId)
  val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId)
  val reply: Future[ShoppingCart.Summary] =
    entityRef.askWithStatus(ShoppingCart.Checkout(_))
  val response = reply.map(cart => toProtoCart(cart))
  convertError(response)
}

override def getCart(in: proto.GetCartRequest): Future[proto.Cart] = {
  logger.info("getCart {}", in.cartId)
  val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId)
  val response =
    entityRef.ask(ShoppingCart.Get).map { cart =>
      if (cart.items.isEmpty)
        throw new GrpcServiceException(
          Status.NOT_FOUND.withDescription(s"Cart ${in.cartId} not found"))
      else
        toProtoCart(cart)
    }
  convertError(response)
}

We also have to include the new checkedOut flag when converting from ShoppingCart.Summary to proto.Cart.

Java
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
private static Cart toProtoCart(ShoppingCart.Summary cart) {
  List<Item> protoItems =
      cart.items.entrySet().stream()
          .map(
              entry ->
                  Item.newBuilder()
                      .setItemId(entry.getKey())
                      .setQuantity(entry.getValue())
                      .build())
          .collect(Collectors.toList());

  return Cart.newBuilder().setCheckedOut(cart.checkedOut).addAllItems(protoItems).build();
}
Scala
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala
private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = {
  proto.Cart(
    cart.items.iterator.map { case (itemId, quantity) =>
      proto.Item(itemId, quantity)
    }.toSeq,
    cart.checkedOut)
}

4. Run locally

Start the PostgresSQL database, unless it’s already running:

docker-compose up -d

Run the service with:

sbt -Dconfig.resource=local1.conf run
# make sure to compile before running exec:exec
mvn compile exec:exec -DAPP_CONFIG=local1.conf

4.1. Exercise the service

Use grpcurl to exercise the service:

  1. Add an item to the cart:

    grpcurl -d '{"cartId":"cart2", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Check the quantity of the cart:

    grpcurl -d '{"cartId":"cart2"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart
  3. Check out cart:

    grpcurl -d '{"cartId":"cart2"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout

4.2. Stop the service

When finished, stop the service with ctrl-c. Leave PostgresSQL running for the next set of steps, or stop it with:

docker-compose stop
The following steps for cloud deployment are optional. If you are only running locally, you can skip to the next section of the tutorial.

5. Run in Kubernetes

Create a Kubernetes cluster and install the Akka Operator if you haven’t already.

5.1. Create a PostgresSQL database

Make sure you have created your database and loaded the schema as described in the previous tutorial section.

5.2. Build Docker image

Create a Docker repository and authenticate Docker.

GCP

Follow the instructions in Using Container Registry with Google Cloud new tab to deploy Docker images on GCP’s container registry.

AWS

Follow the instructions in Amazon Elastic Container Registry to deploy Docker images on AWS’s container registry.

5.3. Additional steps for Docker and AWS

If you are using AWS, you will also need to complete the following procedures.

Rebuild the Docker image.

Java
mvn -DskipTests -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com clean package docker:push
Scala
sbt -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com docker:publish

Take note of the image tag as displayed by the docker:publish docker:push command.

Java
DOCKER> Tagging image shopping-cart-service:20201209-135004-363ae2b successful!
Scala
[info] Successfully tagged shopping-cart-service:20201209-135004-363ae2b

5.4. Update the deployment descriptor

Update the kubernetes/shopping-cart-service-cr.yml deployment descriptor with the new image tag as previously described.

5.5. Apply to Kubernetes

Re-apply the shopping-cart-service-cr.yml to Kubernetes:

kubectl apply -f kubernetes/shopping-cart-service-cr.yml

You can see progress by viewing the status:

kubectl get akkamicroservices/shopping-cart-service

See troubleshooting deployment status for more details.

5.6. Exercise the service in Kubernetes

  1. You can list the pods with:

    kubectl get pods
  2. Inspect logs from a separate terminal window:

    kubectl logs -f <shopping-cart-service pod name from above>
  3. Add port forwarding for the gRPC endpoint from a separate terminal:

    kubectl port-forward svc/shopping-cart-service-grpc 8101:8101

Use grpcurl to exercise the service:

  1. Add an item to the cart:

    grpcurl -d '{"cartId":"cart2", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Check the quantity of the cart:

    grpcurl -d '{"cartId":"cart2"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart
  3. Check out cart:

    grpcurl -d '{"cartId":"cart2"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout

Note the logging from the ShoppingCartServiceImpl in the console that is running kubectl logs -f.

Optional commands and events

The commands and events listed in this section are not mandatory for subsequent steps of the tutorial and their details won’t be covered on this page. You can implement the commands, events, and State management following the pattern we used for the AddItem command and ItemAdded event in the previous step. This is a good exercise to help solidify your knowledge of how to implement EventSourcedBehavior. Optional commands and corresponding events that you can add on your own:

  • RemoveItem - remove an item from the cart

  • AdjustItemQuantity - adjust the quantity of an item in the cart

  • ItemRemoved

  • ItemQuantityAdjusted

After adding the optional commands, you can build and run again and try them out:

Update the quantity of an item:

grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":6}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.UpdateItem

Get the cart state again:

grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart