Section 4: Create the Event Sourced Cart entity
Next, we will create the ShoppingCart
Cart entity that manages the state for each shopping cart. The architectural overview shows how the entity is related to the Cart service. The Cart entity will use Event Sourcing to persist events that capture changes to the Cart’s state. The entity writes events to the event journal, which we will use later to create projections:
For now, we’ll implement the command to add items to the Cart. In the next part of the tutorial, we will expand it to handle more commands and events. On this page you will learn how to:
-
implement an Event Sourced entity
-
unit test the entity
-
distribute the entities over the nodes in the Akka Cluster
-
send requests from the gRPC service implementation to the entities
If you are unfamiliar with Event Sourcing, refer to the Event Sourcing section for an explanation.
The Event Sourcing with Akka 2.6 video is also a good starting point for learning Event Sourcing.
This example is using PostgreSQL for storing the events. An alternative is described in Use Cassandra instead of PostgreSQL.
Akka Workshop
The second video of the Akka Workshop Series covers both the cart entity and Event sourcing. It provides some solid guidance to aid you in digesting this section, and the next section of this guide.
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. Add commands and events
Commands are the "external" API of an entity. Entity state can only be changed by commands. The results of commands are emitted as events. A command can request state changes, but different events might be generated depending on the current state of the entity. A command can also be validated and be rejected if it has invalid input or can’t be handled by current state of the entity.
To add a command and an event, follow these steps:
-
Define a
ShoppingCart
object and theAddItem
command: Define aShoppingCart
class extendingEventSourcedBehaviorWithEnforcedReplies
and theAddItem
command:- Java
-
src/main/java/shopping/cart/ShoppingCart.java:
package shopping.cart; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.Behaviors; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.Entity; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.pattern.StatusReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.*; import com.fasterxml.jackson.annotation.JsonCreator; import java.time.Duration; import java.util.HashMap; import java.util.Map; public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... /** This interface defines all the commands (messages) that the ShoppingCart actor supports. */ interface Command extends CborSerializable {} /** * A command to add an item to the cart. * * <p>It replies with `StatusReply<Summary>`, which is sent back to the caller when all the * events emitted by this command are successfully persisted. */ public static final class AddItem implements Command { final String itemId; final int quantity; final ActorRef<StatusReply<Summary>> replyTo; public AddItem(String itemId, int quantity, ActorRef<StatusReply<Summary>> replyTo) { this.itemId = itemId; this.quantity = quantity; this.replyTo = replyTo; } } /** Summary of the shopping cart state, used in reply messages. */ public static final class Summary implements CborSerializable { final Map<String, Integer> items; @JsonCreator public Summary(Map<String, Integer> items) { // defensive copy since items is a mutable object this.items = new HashMap<>(items); } } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala:
package shopping.cart import akka.actor.typed.ActorRef import akka.pattern.StatusReply object ShoppingCart { /** * This interface defines all the commands (messages) that the ShoppingCart actor supports. */ sealed trait Command extends CborSerializable /** * A command to add an item to the cart. * * It replies with `StatusReply[Summary]`, which is sent back to the caller when * all the events emitted by this command are successfully persisted. */ final case class AddItem( itemId: String, quantity: Int, replyTo: ActorRef[StatusReply[Summary]]) extends Command /** * Summary of the shopping cart state, used in reply messages. */ final case class Summary(items: Map[String, Int]) extends CborSerializable }
-
Add a corresponding
ItemAdded
event:- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... abstract static class Event implements CborSerializable { public final String cartId; public Event(String cartId) { this.cartId = cartId; } } static final class ItemAdded extends Event { public final String itemId; public final int quantity; public ItemAdded(String cartId, String itemId, int quantity) { super(cartId); this.itemId = itemId; this.quantity = quantity; } @Override public boolean equals(Object o) { (1) if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ItemAdded other = (ItemAdded) o; if (quantity != other.quantity) return false; if (!cartId.equals(other.cartId)) return false; return itemId.equals(other.itemId); } @Override public int hashCode() { int result = cartId.hashCode(); result = 31 * result + itemId.hashCode(); result = 31 * result + quantity; return result; } } }
1 equals
andhashCode
are not strictly needed, aside from that it can be useful when asserting the result in tests - Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart object ShoppingCart { /** * This interface defines all the events that the ShoppingCart supports. */ sealed trait Event extends CborSerializable { def cartId: String } final case class ItemAdded(cartId: String, itemId: String, quantity: Int) extends Event }
2. Add state map
Items added to the Cart are added to a Map
. The contents of the Map
comprise the Cart’s state. Add the Map
to the ShoppingCart
object class as shown:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... static final class State implements CborSerializable { final Map<String, Integer> items; public State() { this(new HashMap<>()); } public State(Map<String, Integer> items) { this.items = items; } public boolean hasItem(String itemId) { return items.containsKey(itemId); } public State updateItem(String itemId, int quantity) { if (quantity == 0) { items.remove(itemId); } else { items.put(itemId, quantity); } return this; } public Summary toSummary() { return new Summary(items); } public int itemCount(String itemId) { return items.get(itemId); } public boolean isEmpty() { return items.isEmpty(); } } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart object ShoppingCart { final case class State(items: Map[String, Int]) extends CborSerializable { def hasItem(itemId: String): Boolean = items.contains(itemId) def isEmpty: Boolean = items.isEmpty def updateItem(itemId: String, quantity: Int): State = { quantity match { case 0 => copy(items = items - itemId) case _ => copy(items = items + (itemId -> quantity)) } } } object State { val empty = State(items = Map.empty) } }
3. Implement a command handler
The Cart entity will receive commands that request changes to Cart state. We will implement a command handler to process these commands and emit a reply. Our business logic allows only items to be added which are not in the cart yet and require a positive quantity.
Implement the Event Sourced entity with the EventSourcedBehavior
. Define the command handlers:
Implement the commandHandler
as required by EventSourcedBehaviorWithEnforcedReplies
:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... @Override public CommandHandlerWithReply<Command, Event, State> commandHandler() { CommandHandlerWithReplyBuilder<Command, Event, State> builder = newCommandHandlerWithReplyBuilder(); builder.forAnyState().onCommand(AddItem.class, this::onAddItem); (1) return builder.build(); } private ReplyEffect<Event, State> onAddItem(State state, AddItem cmd) { if (state.hasItem(cmd.itemId)) { return Effect() .reply( cmd.replyTo, StatusReply.error( "Item '" + cmd.itemId + "' was already added to this shopping cart")); } else if (cmd.quantity <= 0) { return Effect().reply(cmd.replyTo, StatusReply.error("Quantity must be greater than zero")); } else { return Effect() (2) .persist(new ItemAdded(cartId, cmd.itemId, cmd.quantity)) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } } }
1 Matching the AddItem
command.2 Persisting the ItemAdded
event and replying to the sender. - Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.ReplyEffect object ShoppingCart { private def handleCommand( cartId: String, state: State, command: Command): ReplyEffect[Event, State] = { command match { case AddItem(itemId, quantity, replyTo) => (1) if (state.hasItem(itemId)) Effect.reply(replyTo)( StatusReply.Error( s"Item '$itemId' was already added to this shopping cart")) else if (quantity <= 0) Effect.reply(replyTo)( StatusReply.Error("Quantity must be greater than zero")) else Effect .persist(ItemAdded(cartId, itemId, quantity)) (2) .thenReply(replyTo) { updatedCart => StatusReply.Success(Summary(updatedCart.items)) } } } }
1 Matching the AddItem
command.2 Persisting the ItemAdded
event and replying to the sender.
If an AddItem
command is accepted, the Effect.persist
applies an event to the cart’s state and makes sure this event is stored before replying to the command. The returned ReplyEffect
reacts on the commands by deciding which effect they should have on the entity. If the validation fails we want to send back an error message. The reply can be a success or an error and that is the reason for using the StatusReply
.
See all available effects in the Akka reference documentation .
4. Add the event handler
From commands, the entity creates events that represent state changes. Aligning with the command handler above, the entity’s event handler reacts to events and updates the state. The events are continuously persisted to the Event Journal datastore, while the entity state is kept in memory. Other parts of the application may listen to the events. In case of a restart, the entity recovers its latest state by replaying the events from the Event Journal.
Notice that there are no decisions on events, they are applied without any checking.
Add the event handler as follows:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... @Override public EventHandler<State, Event> eventHandler() { return newEventHandlerBuilder() .forAnyState() .onEvent(ItemAdded.class, (state, evt) -> state.updateItem(evt.itemId, evt.quantity)) .build(); } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart object ShoppingCart { private def handleEvent(state: State, event: Event) = { event match { case ItemAdded(_, itemId, quantity) => state.updateItem(itemId, quantity) } } }
5. Add initialization
To glue the command handler, event handler, and state together, we need some initialization code. Our code will distribute the Cart entities over nodes in the Akka Cluster with Cluster Sharding , enable snapshots to reduce recovery time when the entity is started, and restart with backoff in the case of failure.
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { static final EntityTypeKey<Command> ENTITY_KEY = EntityTypeKey.create(Command.class, "ShoppingCart"); private final String cartId; public static void init(ActorSystem<?> system) { ClusterSharding.get(system) .init( Entity.of( ENTITY_KEY, entityContext -> { (1) return ShoppingCart.create(entityContext.getEntityId()); })); } public static Behavior<Command> create(String cartId) { return Behaviors.setup( ctx -> EventSourcedBehavior (2) .start(new ShoppingCart(cartId), ctx)); } @Override public RetentionCriteria retentionCriteria() { (3) return RetentionCriteria.snapshotEvery(100, 3); } private ShoppingCart(String cartId) { super( PersistenceId.of(ENTITY_KEY.name(), cartId), SupervisorStrategy (4) .restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1)); this.cartId = cartId; } @Override public State emptyState() { return new State(); } }
1 The entities are distributed over the nodes in the Akka Cluster with Cluster Sharding. 2 An EventSourcedBehavior
is created for theShoppingCart
.3 Snapshotting is an optimization to reduce recovery when the entity is started. 4 Restarting with backoff in case of failures. - Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart import scala.concurrent.duration._ import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.SupervisorStrategy import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.EventSourcedBehavior import akka.persistence.typed.scaladsl.RetentionCriteria object ShoppingCart { val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart") def init(system: ActorSystem[_]): Unit = { ClusterSharding(system).init(Entity(EntityKey) { entityContext => (1) ShoppingCart(entityContext.entityId) }) } def apply(cartId: String): Behavior[Command] = { EventSourcedBehavior (2) .withEnforcedReplies[Command, Event, State]( persistenceId = PersistenceId(EntityKey.name, cartId), emptyState = State.empty, commandHandler = (state, command) => handleCommand(cartId, state, command), eventHandler = (state, event) => handleEvent(state, event)) .withRetention(RetentionCriteria .snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) (3) .onPersistFailure( SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1) (4) ) } }
1 The entities are distributed over the nodes in the Akka Cluster with Cluster Sharding. 2 Command and event handler are defined with the EventSourcedBehavior
.3 Snapshotting is an optimization to reduce recovery when the entity is started. 4 Restarting with backoff in case of failures.
Then we need to call ShoppingCart.init
from Main
. Add the following before the gRPC ShoppingCartServer
initialization:
- Java
-
src/main/java/shopping/cart/Main.java:
ShoppingCart.init(system);
- Scala
-
src/main/scala/shopping/cart/Main.scala:
ShoppingCart.init(system)
Verify that everything compiles with:
sbt compile
mvn compile
6. How serialization is included
The state, commands and events of the entity must be serializable because they are written to the datastore or sent between nodes within the Akka cluster. The template project includes built-in CBOR serialization. This section describes how serialization is implemented. You do not need to do anything specific to take advantage of CBOR, but this section explains how it is included.
The state, commands and events are marked as CborSerializable
which is configured to use the built-in CBOR serialization. The template project includes this marker interface CborSerializable
:
- Java
-
src/main/java/shopping/cart/CborSerializable.java:
package shopping.cart; /** * Marker trait for serialization with Jackson CBOR. Enabled in serialization.conf * `akka.actor.serialization-bindings` (via application.conf). */ public interface CborSerializable {}
- Scala
-
src/main/scala/shopping/cart/CborSerializable.scala:
package shopping.cart /** * Marker trait for serialization with Jackson CBOR. * Enabled in serialization.conf `akka.actor.serialization-bindings` (via application.conf). */ trait CborSerializable
The interface is configured in the serialization.conf
file to enable CBOR serialization. serialization.conf
is included in application.conf
.
akka.actor.serialization-bindings {
"shopping.cart.CborSerializable" = jackson-cbor
}
7. Unit testing
To test the ShoppingCart
entity we can write a unit test using the EventSourcedBehaviorTestKit
TestKitJunitResource
.
A test for the AddItem
command looks like this in src/test/scala/shopping/cart/ShoppingCartSpec.scala
src/test/java/shopping/cart/ShoppingCartTest.java
:
- Java
-
src/test/java/shopping/cart/ShoppingCartTest.java:
package shopping.cart; import static akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply; import static org.junit.Assert.*; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.pattern.StatusReply; import akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit; import com.typesafe.config.ConfigFactory; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; public class ShoppingCartTest { private static final String CART_ID = "testCart"; @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource( ConfigFactory.parseString( "akka.actor.serialization-bindings {\n" + " \"shopping.cart.CborSerializable\" = jackson-cbor\n" + "}") .withFallback(EventSourcedBehaviorTestKit.config())); private EventSourcedBehaviorTestKit<ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> eventSourcedTestKit = EventSourcedBehaviorTestKit.create(testKit.system(), ShoppingCart.create(CART_ID)); @Before public void beforeEach() { eventSourcedTestKit.clear(); } @Test public void addAnItemToCart() { CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result.reply().isSuccess()); ShoppingCart.Summary summary = result.reply().getValue(); assertEquals(1, summary.items.size()); assertEquals(42, summary.items.get("foo").intValue()); assertEquals(new ShoppingCart.ItemAdded(CART_ID, "foo", 42), result.event()); } @Test public void rejectAlreadyAddedItem() { CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result1 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result1.reply().isSuccess()); CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result2 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result2.reply().isError()); assertTrue(result2.hasNoEvents()); } }
- Scala
-
src/test/scala/shopping/cart/ShoppingCartSpec.scala
package shopping.cart import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.pattern.StatusReply import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterEach import org.scalatest.wordspec.AnyWordSpecLike object ShoppingCartSpec { val config = ConfigFactory .parseString(""" akka.actor.serialization-bindings { "shopping.cart.CborSerializable" = jackson-cbor } """) .withFallback(EventSourcedBehaviorTestKit.config) } class ShoppingCartSpec extends ScalaTestWithActorTestKit(ShoppingCartSpec.config) with AnyWordSpecLike with BeforeAndAfterEach { private val cartId = "testCart" private val eventSourcedTestKit = EventSourcedBehaviorTestKit[ ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State](system, ShoppingCart(cartId)) override protected def beforeEach(): Unit = { super.beforeEach() eventSourcedTestKit.clear() } "The Shopping Cart" should { "add item" in { val result1 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( replyTo => ShoppingCart.AddItem("foo", 42, replyTo)) result1.reply should ===( StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42)))) result1.event should ===(ShoppingCart.ItemAdded(cartId, "foo", 42)) } "reject already added item" in { val result1 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( ShoppingCart.AddItem("foo", 42, _)) result1.reply.isSuccess should ===(true) val result2 = eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]]( ShoppingCart.AddItem("foo", 13, _)) result2.reply.isError should ===(true) } } }
Run the test with:
sbt test
mvn test
You can learn more about the EventSourcedBehaviorTestKit
TestKitJunitResource
in the Akka reference documentation
8. Send commands to the entities
We want to send commands to the entities from the gRPC service implementation. In the previous step, we wrote a dummy implementation of addItem
in the ShoppingCartServiceImpl
. We can now replace that and send ShoppingCart.AddItem
commands from ShoppingCartServiceImpl
:
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.grpc.GrpcServiceException; import io.grpc.Status; import java.time.Duration; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.*; public final class ShoppingCartServiceImpl implements ShoppingCartService { private final Logger logger = LoggerFactory.getLogger(getClass()); private final Duration timeout; private final ClusterSharding sharding; public ShoppingCartServiceImpl(ActorSystem<?> system) { timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout"); sharding = ClusterSharding.get(system); } @Override public CompletionStage<Cart> addItem(AddItemRequest in) { logger.info("addItem {} to cart {}", in.getItemId(), in.getCartId()); EntityRef<ShoppingCart.Command> entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId()); CompletionStage<ShoppingCart.Summary> reply = entityRef.askWithStatus( replyTo -> new ShoppingCart.AddItem(in.getItemId(), in.getQuantity(), replyTo), timeout); CompletionStage<Cart> cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart); return convertError(cart); } private static Cart toProtoCart(ShoppingCart.Summary cart) { List<Item> protoItems = cart.items.entrySet().stream() .map( entry -> Item.newBuilder() .setItemId(entry.getKey()) .setQuantity(entry.getValue()) .build()) .collect(Collectors.toList()); return Cart.newBuilder().addAllItems(protoItems).build(); } private static <T> CompletionStage<T> convertError(CompletionStage<T> response) { return response.exceptionally( ex -> { if (ex instanceof TimeoutException) { throw new GrpcServiceException( Status.UNAVAILABLE.withDescription("Operation timed out")); } else { throw new GrpcServiceException( Status.INVALID_ARGUMENT.withDescription(ex.getMessage())); } }); } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala:
package shopping.cart import java.util.concurrent.TimeoutException import scala.concurrent.Future import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.grpc.GrpcServiceException import akka.util.Timeout import io.grpc.Status import org.slf4j.LoggerFactory class ShoppingCartServiceImpl(system: ActorSystem[_]) extends proto.ShoppingCartService { import system.executionContext private val logger = LoggerFactory.getLogger(getClass) implicit private val timeout: Timeout = Timeout.create( system.settings.config.getDuration("shopping-cart-service.ask-timeout")) private val sharding = ClusterSharding(system) override def addItem(in: proto.AddItemRequest): Future[proto.Cart] = { logger.info("addItem {} to cart {}", in.itemId, in.cartId) val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId) val reply: Future[ShoppingCart.Summary] = entityRef.askWithStatus(ShoppingCart.AddItem(in.itemId, in.quantity, _)) val response = reply.map(cart => toProtoCart(cart)) convertError(response) } private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = { proto.Cart(cart.items.iterator.map { case (itemId, quantity) => proto.Item(itemId, quantity) }.toSeq) } private def convertError[T](response: Future[T]): Future[T] = { response.recoverWith { case _: TimeoutException => Future.failed( new GrpcServiceException( Status.UNAVAILABLE.withDescription("Operation timed out"))) case exc => Future.failed( new GrpcServiceException( Status.INVALID_ARGUMENT.withDescription(exc.getMessage))) } } }
If the command is successful, the entity will reply with StatusReply.Success
with the updated ShoppingCart.Summary
. If the validation in the entity fails it will reply with StatusReply.Error
, which will fail the Future
CompletionStage
that is returned from askWithStatus
.
Also, we added an ActorSystem
parameter to the constructor of ShoppingCartServiceImpl
. Edit Main
to include the system
as the parameter when creating a new instance of the ShoppingCartServiceImpl
.
9. Configure Postgres
The events are stored in a PostgresSQL database and the template project includes configuration for that in the src/main/resources/persistence.conf
file. We have to enable this configuration by including persistence.conf
in application.conf
:
include "persistence"
10. Run locally
To run the service, we first need to start the PostgresSQL to persist the events. Then we can run the service:
-
From the root project directory, run the following command:
docker-compose up -d
This command will build and run the containers. The
-d
flag starts the containers in detached mode. Containers started in detached mode exit when the root process used to run the container exits -
Create the PostgresSQL tables from the SQL script located inside the
ddl-scripts
at the root of the project:docker exec -i shopping-cart-service_postgres-db_1 psql -U shopping-cart -t < ddl-scripts/create_tables.sql
It creates all tables needed for Akka Persistence as well as the offset store table for Akka Projection.
When loading the SQL script, make sure to use the same name as your running PostgresSQL container name. The container name is not fixed and depends on the parent folder of the docker-compose file. The above example assumes the project was created using the seed template and named
shopping-cart-service
.If you get a connection error, it means the PostgresSQL container is still starting. Wait a few seconds and re-try the command.
-
Run the service with:
sbt -Dconfig.resource=local1.conf run
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
10.1. Exercise the service
Use grpcurl
to exercise the service:
-
Use
grpcurl
to add 3 socks to a cart:grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Test the validation logic by trying to add the same item again, which should result in an error:
grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
To verify that the events are actually saved, and the state can be recovered from the events you can stop the service with
ctrl-c
and then start it again. -
Add 2 t-shirts to the same cart:
grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
The returned updated cart should still contain the 3 socks.
10.2. Exercise with multiple service instances
Another fun experiment is to start several instances of the service on different ports (2552, 2553) and then interact with different carts via the different gRPC servers (gRPC ports 8101, 8102, 8103). To do this, you can use the other provided configuration files:
-
In a new terminal, start a second instance with local configuration #2:
sbt -Dconfig.resource=local2.conf run
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local2.conf
-
Within another terminal, start a third instance with local configuration #3:
sbt -Dconfig.resource=local3.conf run
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local3.conf