Section 5: Complete Event Sourced entity
On this page, we will complete the cart entity with more commands and events. This ShoppingCart
entity will use Event Sourcing to persist events that represent changes to the state of the cart.
On this page you will learn how to:
-
implement an Event Sourced entity by expanding work from the previous steps and adding to the
ShoppingCart
:-
Checkout
- a command to checkout the shopping cart -
CheckedOut
- an event to capture checkouts -
Get
- a way get the current state of the shopping cart
-
At the end, we also provide a list of Optional commands and events that you can add on your own to test your knowledge.
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. Add the command and event for checkout
When the cart has been checked out it should not accept any commands that change its state. However, it should still be possible to Get
the current state of a checked out cart. We suggest you try implementing Checkout
on your own and then compare it with the solution shown below. Add the Checkout
command alongside the existing AddItem
command.
The following sections show our solution.
1.1. Checkout command
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public static final class Checkout implements Command { final ActorRef<StatusReply<Summary>> replyTo; @JsonCreator public Checkout(ActorRef<StatusReply<Summary>> replyTo) { this.replyTo = replyTo; } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
final case class Checkout(replyTo: ActorRef[StatusReply[Summary]]) extends Command
1.2. CheckedOut event
The corresponding CheckedOut
event:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
static final class CheckedOut extends Event { final Instant eventTime; public CheckedOut(String cartId, Instant eventTime) { super(cartId); this.eventTime = eventTime; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CheckedOut that = (CheckedOut) o; return Objects.equals(eventTime, that.eventTime); } @Override public int hashCode() { return Objects.hash(eventTime); } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
final case class CheckedOut(cartId: String, eventTime: Instant) extends Event
1.3. State changes
The state should include a value for if and when the cart was checked out:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
static final class State implements CborSerializable { final Map<String, Integer> items; private Optional<Instant> checkoutDate; public State() { this(new HashMap<>(), Optional.empty()); } public State(Map<String, Integer> items, Optional<Instant> checkoutDate) { this.items = items; this.checkoutDate = checkoutDate; } public boolean isCheckedOut() { return checkoutDate.isPresent(); } public State checkout(Instant now) { checkoutDate = Optional.of(now); return this; } public Summary toSummary() { return new Summary(items, isCheckedOut()); } public boolean hasItem(String itemId) { return items.containsKey(itemId); } public State updateItem(String itemId, int quantity) { if (quantity == 0) { items.remove(itemId); } else { items.put(itemId, quantity); } return this; } public boolean isEmpty() { return items.isEmpty(); } } public static final class Summary implements CborSerializable { final Map<String, Integer> items; final boolean checkedOut; public Summary(Map<String, Integer> items, boolean checkedOut) { // defensive copy since items is a mutable object this.items = new HashMap<>(items); this.checkedOut = checkedOut; } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
final case class State(items: Map[String, Int], checkoutDate: Option[Instant]) extends CborSerializable { def isCheckedOut: Boolean = checkoutDate.isDefined def checkout(now: Instant): State = copy(checkoutDate = Some(now)) def toSummary: Summary = Summary(items, isCheckedOut) def hasItem(itemId: String): Boolean = items.contains(itemId) def isEmpty: Boolean = items.isEmpty def updateItem(itemId: String, quantity: Int): State = { quantity match { case 0 => copy(items = items - itemId) case _ => copy(items = items + (itemId -> quantity)) } } } object State { val empty = State(items = Map.empty, checkoutDate = None) } final case class Summary(items: Map[String, Int], checkedOut: Boolean) extends CborSerializable
1.4. Unit test
Add a unit test for the new Checkout
command in ShoppingCartSpec
ShoppingCartTest
:
- Java
-
src/test/java/shopping/cart/ShoppingCartTest.java
@Test public void checkout() { CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result1 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result1.reply().isSuccess()); CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result2 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.Checkout(replyTo)); assertTrue(result2.reply().isSuccess()); assertTrue(result2.event() instanceof ShoppingCart.CheckedOut); assertEquals(CART_ID, result2.event().cartId); CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result3 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result3.reply().isError()); }
- Scala
-
src/test/scala/shopping/cart/ShoppingCartSpec.scala
"checkout" in { val result1 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( ShoppingCart.AddItem("foo", 42, _)) result1.reply.isSuccess should ===(true) val result2 = eventSourcedTestKit .runCommand[StatusReply[ShoppingCart.Summary]](ShoppingCart.Checkout(_)) result2.reply should ===( StatusReply.Success( ShoppingCart.Summary(Map("foo" -> 42), checkedOut = true))) result2.event.asInstanceOf[ShoppingCart.CheckedOut].cartId should ===( cartId) val result3 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( ShoppingCart.AddItem("bar", 13, _)) result3.reply.isError should ===(true) }
You will also have to update the "add item" to use the new
Summary
signature.
Commands should be handled differently when the cart has been checked out. AddItem
is no longer allowed after checkout. Therefore, we refactor the handleCommand
commandHandler
method into two separate methods openShoppingCart
and checkedOutShoppingCart
that are used depending on the checkedOut
state. The previous code for AddItem
goes into the openShoppingCart
method as well as the new Checkout
command.
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
@Override public CommandHandlerWithReply<Command, Event, State> commandHandler() { return openShoppingCart() .orElse(checkedOutShoppingCart()) .build(); } private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> openShoppingCart() { return newCommandHandlerWithReplyBuilder() .forState(state -> !state.isCheckedOut()) .onCommand(AddItem.class, this::onAddItem) .onCommand(Checkout.class, this::onCheckout); } private ReplyEffect<Event, State> onAddItem(State state, AddItem cmd) { if (state.hasItem(cmd.itemId)) { return Effect() .reply( cmd.replyTo, StatusReply.error( "Item '" + cmd.itemId + "' was already added to this shopping cart")); } else if (cmd.quantity <= 0) { return Effect().reply(cmd.replyTo, StatusReply.error("Quantity must be greater than zero")); } else { return Effect() .persist(new ItemAdded(cartId, cmd.itemId, cmd.quantity)) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } } private ReplyEffect<Event, State> onCheckout(State state, Checkout cmd) { if (state.isEmpty()) { return Effect() .reply(cmd.replyTo, StatusReply.error("Cannot checkout an empty shopping cart")); } else { return Effect() .persist(new CheckedOut(cartId, Instant.now())) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
private def handleCommand( cartId: String, state: State, command: Command): ReplyEffect[Event, State] = { // The shopping cart behavior changes if it's checked out or not. // The commands are handled differently for each case. if (state.isCheckedOut) checkedOutShoppingCart(cartId, state, command) else openShoppingCart(cartId, state, command) } private def openShoppingCart( cartId: String, state: State, command: Command): ReplyEffect[Event, State] = { command match { case AddItem(itemId, quantity, replyTo) => if (state.hasItem(itemId)) Effect.reply(replyTo)( StatusReply.Error( s"Item '$itemId' was already added to this shopping cart")) else if (quantity <= 0) Effect.reply(replyTo)( StatusReply.Error("Quantity must be greater than zero")) else Effect .persist(ItemAdded(cartId, itemId, quantity)) .thenReply(replyTo) { updatedCart => StatusReply.Success(updatedCart.toSummary) } case Checkout(replyTo) => if (state.isEmpty) Effect.reply(replyTo)( StatusReply.Error("Cannot checkout an empty shopping cart")) else Effect .persist(CheckedOut(cartId, Instant.now())) .thenReply(replyTo)(updatedCart => StatusReply.Success(updatedCart.toSummary)) } }
In checkedOutShoppingCart
the AddItem
and Checkout
commands should be rejected:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> checkedOutShoppingCart() { return newCommandHandlerWithReplyBuilder() .forState(State::isCheckedOut) .onCommand( AddItem.class, cmd -> Effect() .reply( cmd.replyTo, StatusReply.error( "Can't add an item to an already checked out shopping cart"))) .onCommand( Checkout.class, cmd -> Effect() .reply( cmd.replyTo, StatusReply.error("Can't checkout already checked out shopping cart"))); }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
private def checkedOutShoppingCart( cartId: String, state: State, command: Command): ReplyEffect[Event, State] = { command match { case cmd: AddItem => Effect.reply(cmd.replyTo)( StatusReply.Error( "Can't add an item to an already checked out shopping cart")) case cmd: Checkout => Effect.reply(cmd.replyTo)( StatusReply.Error("Can't checkout already checked out shopping cart")) } }
1.5. Event handler
We still need to add the event handler for the CheckedOut
event in the handleEvent
method:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
@Override public EventHandler<State, Event> eventHandler() { return newEventHandlerBuilder() .forAnyState() .onEvent(ItemAdded.class, (state, evt) -> state.updateItem(evt.itemId, evt.quantity)) .onEvent(CheckedOut.class, (state, evt) -> state.checkout(evt.eventTime)) .build(); }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
private def handleEvent(state: State, event: Event): State = { event match { case ItemAdded(_, itemId, quantity) => state.updateItem(itemId, quantity) case CheckedOut(_, eventTime) => state.checkout(eventTime) } }
2. Add Get command
Add the Get
command alongside the existing AddItem
and Checkout
commands:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public static final class Get implements Command { final ActorRef<Summary> replyTo; @JsonCreator public Get(ActorRef<Summary> replyTo) { this.replyTo = replyTo; } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
final case class Get(replyTo: ActorRef[Summary]) extends Command
Add a unit test for the new Get
command in ShoppingCartSpec
:
- Java
-
src/test/java/shopping/cart/ShoppingCartTest.java
@Test public void get() { CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result1 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result1.reply().isSuccess()); CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, ShoppingCart.Summary> result2 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.Get(replyTo)); assertFalse(result2.reply().checkedOut); assertEquals(1, result2.reply().items.size()); assertEquals(42, result2.reply().items.get("foo").intValue()); }
- Scala
-
src/test/scala/shopping/cart/ShoppingCartSpec.scala
"get" in { val result1 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( ShoppingCart.AddItem("foo", 42, _)) result1.reply.isSuccess should ===(true) val result2 = eventSourcedTestKit.runCommand[ShoppingCart.Summary]( ShoppingCart.Get(_)) result2.reply should ===( ShoppingCart.Summary(Map("foo" -> 42), checkedOut = false)) }
The command handler for Get
is independent of the checkedOut
state, so add it to both openShoppingCart
and checkedOutShoppingCart
so it can be added to the command handler builder using forAnyState()
:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
@Override public CommandHandlerWithReply<Command, Event, State> commandHandler() { return openShoppingCart() .orElse(checkedOutShoppingCart()) .orElse(getCommandHandler()) .build(); } private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> getCommandHandler() { return newCommandHandlerWithReplyBuilder() .forAnyState() .onCommand(Get.class, (state, cmd) -> Effect().reply(cmd.replyTo, state.toSummary())); }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
case Get(replyTo) => Effect.reply(replyTo)(state.toSummary)
Try the new Get
command by running the unit tests with:
sbt test
mvn test
3. Add new operations to the service descriptor
In the existing ShoppingCartService.proto
add corresponding operation definitions in the form of the two rpc calls listed below:
service ShoppingCartService {
rpc Checkout (CheckoutRequest) returns (Cart) {} (1)
rpc GetCart (GetCartRequest) returns (Cart) {} (2)
}
message CheckoutRequest {
string cartId = 1;
}
message GetCartRequest {
string cartId = 1;
}
message Cart { (3)
repeated Item items = 1;
bool checkedOut = 2; (4)
}
1 | Defines the Checkout operation. |
2 | Defines the GetCart operation. |
3 | For simplicity, most requests share a common response, for easier evolution of an interface, separate responses are often a better choice. |
4 | Note the new checkedOut flag. |
Generate code by compiling the project:
sbt compile
mvn compile
You will see a compilation error in ShoppingCartServiceImpl.scala
ShoppingCartServiceImpl.java
, but that is expected with the changed Protobuf definition. We will fix that now.
Add implementations of the new operations in ShoppingCartServiceImpl
in the same way as addItem
:
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
@Override public CompletionStage<Cart> checkout(CheckoutRequest in) { logger.info("checkout {}", in.getCartId()); EntityRef<ShoppingCart.Command> entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId()); CompletionStage<ShoppingCart.Summary> reply = entityRef.askWithStatus(replyTo -> new ShoppingCart.Checkout(replyTo), timeout); CompletionStage<Cart> cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart); return convertError(cart); } @Override public CompletionStage<Cart> getCart(GetCartRequest in) { logger.info("getCart {}", in.getCartId()); EntityRef<ShoppingCart.Command> entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId()); CompletionStage<ShoppingCart.Summary> reply = entityRef.ask(replyTo -> new ShoppingCart.Get(replyTo), timeout); CompletionStage<Cart> protoCart = reply.thenApply( cart -> { if (cart.items.isEmpty()) throw new GrpcServiceException( Status.NOT_FOUND.withDescription("Cart " + in.getCartId() + " not found")); else return toProtoCart(cart); }); return convertError(protoCart); }
- Scala
-
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala
override def checkout(in: proto.CheckoutRequest): Future[proto.Cart] = { logger.info("checkout {}", in.cartId) val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId) val reply: Future[ShoppingCart.Summary] = entityRef.askWithStatus(ShoppingCart.Checkout(_)) val response = reply.map(cart => toProtoCart(cart)) convertError(response) } override def getCart(in: proto.GetCartRequest): Future[proto.Cart] = { logger.info("getCart {}", in.cartId) val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId) val response = entityRef.ask(ShoppingCart.Get).map { cart => if (cart.items.isEmpty) throw new GrpcServiceException( Status.NOT_FOUND.withDescription(s"Cart ${in.cartId} not found")) else toProtoCart(cart) } convertError(response) }
We also have to include the new checkedOut
flag when converting from ShoppingCart.Summary
to proto.Cart
.
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
private static Cart toProtoCart(ShoppingCart.Summary cart) { List<Item> protoItems = cart.items.entrySet().stream() .map( entry -> Item.newBuilder() .setItemId(entry.getKey()) .setQuantity(entry.getValue()) .build()) .collect(Collectors.toList()); return Cart.newBuilder().setCheckedOut(cart.checkedOut).addAllItems(protoItems).build(); }
- Scala
-
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala
private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = { proto.Cart( cart.items.iterator.map { case (itemId, quantity) => proto.Item(itemId, quantity) }.toSeq, cart.checkedOut) }
4. Run locally
Start the PostgresSQL database, unless it’s already running:
docker-compose up -d
Run the service with:
sbt -Dconfig.resource=local1.conf run
# make sure to compile before running exec:exec
mvn compile exec:exec -DAPP_CONFIG=local1.conf
4.1. Exercise the service
Use grpcurl
to exercise the service:
-
Add an item to the cart:
grpcurl -d '{"cartId":"cart2", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Check the quantity of the cart:
grpcurl -d '{"cartId":"cart2"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart
-
Check out cart:
grpcurl -d '{"cartId":"cart2"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout
Optional commands and events
The commands and events listed in this section are not mandatory for subsequent steps of the tutorial and their details won’t be covered on this page. You can implement the commands, events, and State
management following the pattern we used for the AddItem
command and ItemAdded
event in the previous step. This is a good exercise to help solidify your knowledge of how to implement EventSourcedBehavior
. Optional commands and corresponding events that you can add on your own:
-
RemoveItem
- remove an item from the cart -
AdjustItemQuantity
- adjust the quantity of an item in the cart -
ItemRemoved
-
ItemQuantityAdjusted
After adding the optional commands, you can build and run again and try them out:
Update the quantity of an item:
grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":6}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.UpdateItem
Get the cart state again:
grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart