Section 4: Create the Event Sourced Cart entity

Next, we will create the ShoppingCart Cart entity that manages the state for each shopping cart. The architectural overview shows how the entity is related to the Cart service. The Cart entity will use Event Sourcing to persist events that capture changes to the Cart’s state. The entity writes events to the event journal, which we will use later to create projections:

Example entity

For now, we’ll implement the command to add items to the Cart. In the next part of the tutorial, we will expand it to handle more commands and events. On this page you will learn how to:

  • implement an Event Sourced entity

  • unit test the entity

  • distribute the entities over the nodes in the Akka Cluster

  • send requests from the gRPC service implementation to the entities

If you are unfamiliar with Event Sourcing, refer to the Event Sourcing section for an explanation. The Event Sourcing with Akka 2.6 video new tab is also a good starting point for learning Event Sourcing.

This example is using PostgreSQL for storing the events. An alternative is described in Use Cassandra instead of PostgreSQL.

Akka Workshop

The second video of the Akka Workshop Series new tab covers both the cart entity and Event sourcing. It provides some solid guidance to aid you in digesting this section, and the next section of this guide.

Source downloads

If you prefer to simply view and run the example, download a zip file containing the completed code:

Java
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

Scala
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

1. Add commands and events

Commands are the "external" API of an entity. Entity state can only be changed by commands. The results of commands are emitted as events. A command can request state changes, but different events might be generated depending on the current state of the entity. A command can also be validated and be rejected if it has invalid input or can’t be handled by current state of the entity.

To add a command and an event, follow these steps:

  1. Define a ShoppingCart object and the AddItem command: Define a ShoppingCart class extending EventSourcedBehaviorWithEnforcedReplies and the AddItem command:

    Java
    src/main/java/shopping/cart/ShoppingCart.java:
    package shopping.cart;
    
    import akka.actor.typed.ActorRef;
    import akka.actor.typed.ActorSystem;
    import akka.actor.typed.Behavior;
    import akka.actor.typed.SupervisorStrategy;
    import akka.actor.typed.javadsl.Behaviors;
    import akka.cluster.sharding.typed.javadsl.ClusterSharding;
    import akka.cluster.sharding.typed.javadsl.Entity;
    import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
    import akka.pattern.StatusReply;
    import akka.persistence.typed.PersistenceId;
    import akka.persistence.typed.javadsl.*;
    import com.fasterxml.jackson.annotation.JsonCreator;
    import java.time.Duration;
    import java.util.HashMap;
    import java.util.Map;
    
    public final class ShoppingCart
        extends EventSourcedBehaviorWithEnforcedReplies<
            ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
      // ...
      /** This interface defines all the commands (messages) that the ShoppingCart actor supports. */
      interface Command extends CborSerializable {}
    
      /**
       * A command to add an item to the cart.
       *
       * <p>It replies with `StatusReply&lt;Summary&gt;`, which is sent back to the caller when all the
       * events emitted by this command are successfully persisted.
       */
      public static final class AddItem implements Command {
        final String itemId;
        final int quantity;
        final ActorRef<StatusReply<Summary>> replyTo;
    
        public AddItem(String itemId, int quantity, ActorRef<StatusReply<Summary>> replyTo) {
          this.itemId = itemId;
          this.quantity = quantity;
          this.replyTo = replyTo;
        }
      }
    
      /** Summary of the shopping cart state, used in reply messages. */
      public static final class Summary implements CborSerializable {
        final Map<String, Integer> items;
    
        @JsonCreator
        public Summary(Map<String, Integer> items) {
          // defensive copy since items is a mutable object
          this.items = new HashMap<>(items);
        }
      }
    }
    Scala
    src/main/scala/shopping/cart/ShoppingCart.scala:
    package shopping.cart
    
    import akka.actor.typed.ActorRef
    import akka.pattern.StatusReply
    
    object ShoppingCart {
    
      /**
       * This interface defines all the commands (messages) that the ShoppingCart actor supports.
       */
      sealed trait Command extends CborSerializable
    
      /**
       * A command to add an item to the cart.
       *
       * It replies with `StatusReply[Summary]`, which is sent back to the caller when
       * all the events emitted by this command are successfully persisted.
       */
      final case class AddItem(
          itemId: String,
          quantity: Int,
          replyTo: ActorRef[StatusReply[Summary]])
          extends Command
    
      /**
       * Summary of the shopping cart state, used in reply messages.
       */
      final case class Summary(items: Map[String, Int]) extends CborSerializable
    }
  1. Add a corresponding ItemAdded event:

    Java
    src/main/java/shopping/cart/ShoppingCart.java
    public final class ShoppingCart
        extends EventSourcedBehaviorWithEnforcedReplies<
            ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
      // ...
      abstract static class Event implements CborSerializable {
        public final String cartId;
    
        public Event(String cartId) {
          this.cartId = cartId;
        }
      }
    
      static final class ItemAdded extends Event {
        public final String itemId;
        public final int quantity;
    
        public ItemAdded(String cartId, String itemId, int quantity) {
          super(cartId);
          this.itemId = itemId;
          this.quantity = quantity;
        }
    
        @Override
        public boolean equals(Object o) { (1)
          if (this == o) return true;
          if (o == null || getClass() != o.getClass()) return false;
    
          ItemAdded other = (ItemAdded) o;
    
          if (quantity != other.quantity) return false;
          if (!cartId.equals(other.cartId)) return false;
          return itemId.equals(other.itemId);
        }
    
        @Override
        public int hashCode() {
          int result = cartId.hashCode();
          result = 31 * result + itemId.hashCode();
          result = 31 * result + quantity;
          return result;
        }
      }
    }
    1 equals and hashCode are not strictly needed, aside from that it can be useful when asserting the result in tests
    Scala
    src/main/scala/shopping/cart/ShoppingCart.scala
    package shopping.cart
    
    object ShoppingCart {
    
      /**
       * This interface defines all the events that the ShoppingCart supports.
       */
      sealed trait Event extends CborSerializable {
        def cartId: String
      }
    
      final case class ItemAdded(cartId: String, itemId: String, quantity: Int)
          extends Event
    }

2. Add state map

Items added to the Cart are added to a Map. The contents of the Map comprise the Cart’s state. Add the Map to the ShoppingCart object class as shown:

Java
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart
    extends EventSourcedBehaviorWithEnforcedReplies<
        ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
  // ...
  static final class State implements CborSerializable {
    final Map<String, Integer> items;

    public State() {
      this(new HashMap<>());
    }

    public State(Map<String, Integer> items) {
      this.items = items;
    }

    public boolean hasItem(String itemId) {
      return items.containsKey(itemId);
    }

    public State updateItem(String itemId, int quantity) {
      if (quantity == 0) {
        items.remove(itemId);
      } else {
        items.put(itemId, quantity);
      }
      return this;
    }

    public Summary toSummary() {
      return new Summary(items);
    }

    public int itemCount(String itemId) {
      return items.get(itemId);
    }

    public boolean isEmpty() {
      return items.isEmpty();
    }
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart

object ShoppingCart {

  final case class State(items: Map[String, Int]) extends CborSerializable {

    def hasItem(itemId: String): Boolean =
      items.contains(itemId)

    def isEmpty: Boolean =
      items.isEmpty

    def updateItem(itemId: String, quantity: Int): State = {
      quantity match {
        case 0 => copy(items = items - itemId)
        case _ => copy(items = items + (itemId -> quantity))
      }
    }
  }
  object State {
    val empty = State(items = Map.empty)
  }
}

3. Implement a command handler

The Cart entity will receive commands that request changes to Cart state. We will implement a command handler to process these commands and emit a reply. Our business logic allows only items to be added which are not in the cart yet and require a positive quantity.

Implement the Event Sourced entity with the EventSourcedBehavior. Define the command handlers: Implement the commandHandler as required by EventSourcedBehaviorWithEnforcedReplies:

Java
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart
    extends EventSourcedBehaviorWithEnforcedReplies<
        ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
  // ...
  @Override
  public CommandHandlerWithReply<Command, Event, State> commandHandler() {
    CommandHandlerWithReplyBuilder<Command, Event, State> builder =
        newCommandHandlerWithReplyBuilder();
    builder.forAnyState().onCommand(AddItem.class, this::onAddItem); (1)
    return builder.build();
  }

  private ReplyEffect<Event, State> onAddItem(State state, AddItem cmd) {
    if (state.hasItem(cmd.itemId)) {
      return Effect()
          .reply(
              cmd.replyTo,
              StatusReply.error(
                  "Item '" + cmd.itemId + "' was already added to this shopping cart"));
    } else if (cmd.quantity <= 0) {
      return Effect().reply(cmd.replyTo, StatusReply.error("Quantity must be greater than zero"));
    } else {
      return Effect() (2)
          .persist(new ItemAdded(cartId, cmd.itemId, cmd.quantity))
          .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary()));
    }
  }
}
1 Matching the AddItem command.
2 Persisting the ItemAdded event and replying to the sender.
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart

import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.ReplyEffect

object ShoppingCart {

  private def handleCommand(
      cartId: String,
      state: State,
      command: Command): ReplyEffect[Event, State] = {
    command match {
      case AddItem(itemId, quantity, replyTo) => (1)
        if (state.hasItem(itemId))
          Effect.reply(replyTo)(
            StatusReply.Error(
              s"Item '$itemId' was already added to this shopping cart"))
        else if (quantity <= 0)
          Effect.reply(replyTo)(
            StatusReply.Error("Quantity must be greater than zero"))
        else
          Effect
            .persist(ItemAdded(cartId, itemId, quantity)) (2)
            .thenReply(replyTo) { updatedCart =>
              StatusReply.Success(Summary(updatedCart.items))
            }
    }
  }

}
1 Matching the AddItem command.
2 Persisting the ItemAdded event and replying to the sender.

If an AddItem command is accepted, the Effect.persist applies an event to the cart’s state and makes sure this event is stored before replying to the command. The returned ReplyEffect reacts on the commands by deciding which effect they should have on the entity. If the validation fails we want to send back an error message. The reply can be a success or an error and that is the reason for using the StatusReply.

See all available effects in the Akka reference documentation new tab.

4. Add the event handler

From commands, the entity creates events that represent state changes. Aligning with the command handler above, the entity’s event handler reacts to events and updates the state. The events are continuously persisted to the Event Journal datastore, while the entity state is kept in memory. Other parts of the application may listen to the events. In case of a restart, the entity recovers its latest state by replaying the events from the Event Journal.

Notice that there are no decisions on events, they are applied without any checking.

Add the event handler as follows:

Java
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart
    extends EventSourcedBehaviorWithEnforcedReplies<
        ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
  // ...
  @Override
  public EventHandler<State, Event> eventHandler() {
    return newEventHandlerBuilder()
        .forAnyState()
        .onEvent(ItemAdded.class, (state, evt) -> state.updateItem(evt.itemId, evt.quantity))
        .build();
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart

object ShoppingCart {

  private def handleEvent(state: State, event: Event) = {
    event match {
      case ItemAdded(_, itemId, quantity) =>
        state.updateItem(itemId, quantity)
    }
  }
}

5. Add initialization

To glue the command handler, event handler, and state together, we need some initialization code. Our code will distribute the Cart entities over nodes in the Akka Cluster with Cluster Sharding new tab, enable snapshots to reduce recovery time when the entity is started, and restart with backoff in the case of failure.

Java
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart
    extends EventSourcedBehaviorWithEnforcedReplies<
        ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
  static final EntityTypeKey<Command> ENTITY_KEY =
      EntityTypeKey.create(Command.class, "ShoppingCart");

  private final String cartId;

  public static void init(ActorSystem<?> system) {
    ClusterSharding.get(system)
        .init(
            Entity.of(
                ENTITY_KEY,
                entityContext -> { (1)
                  return ShoppingCart.create(entityContext.getEntityId());
                }));
  }

  public static Behavior<Command> create(String cartId) {
    return Behaviors.setup(
        ctx ->
            EventSourcedBehavior (2)
                .start(new ShoppingCart(cartId), ctx));
  }

  @Override
  public RetentionCriteria retentionCriteria() { (3)
    return RetentionCriteria.snapshotEvery(100, 3);
  }

  private ShoppingCart(String cartId) {
    super(
        PersistenceId.of(ENTITY_KEY.name(), cartId),
        SupervisorStrategy (4)
            .restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1));
    this.cartId = cartId;
  }

  @Override
  public State emptyState() {
    return new State();
  }
}
1 The entities are distributed over the nodes in the Akka Cluster with Cluster Sharding.
2 An EventSourcedBehavior is created for the ShoppingCart.
3 Snapshotting is an optimization to reduce recovery when the entity is started.
4 Restarting with backoff in case of failures.
Scala
src/main/scala/shopping/cart/ShoppingCart.scala
package shopping.cart

import scala.concurrent.duration._
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.RetentionCriteria

object ShoppingCart {

  val EntityKey: EntityTypeKey[Command] =
    EntityTypeKey[Command]("ShoppingCart")

  def init(system: ActorSystem[_]): Unit = {
    ClusterSharding(system).init(Entity(EntityKey) { entityContext => (1)
      ShoppingCart(entityContext.entityId)
    })
  }

  def apply(cartId: String): Behavior[Command] = {
    EventSourcedBehavior (2)
      .withEnforcedReplies[Command, Event, State](
        persistenceId = PersistenceId(EntityKey.name, cartId),
        emptyState = State.empty,
        commandHandler =
          (state, command) => handleCommand(cartId, state, command),
        eventHandler = (state, event) => handleEvent(state, event))
      .withRetention(RetentionCriteria
        .snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) (3)
      .onPersistFailure(
        SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1) (4)
      )
  }
}
1 The entities are distributed over the nodes in the Akka Cluster with Cluster Sharding.
2 Command and event handler are defined with the EventSourcedBehavior.
3 Snapshotting is an optimization to reduce recovery when the entity is started.
4 Restarting with backoff in case of failures.

Then we need to call ShoppingCart.init from Main. Add the following before the gRPC ShoppingCartServer initialization:

Java
src/main/java/shopping/cart/Main.java:
ShoppingCart.init(system);
Scala
src/main/scala/shopping/cart/Main.scala:
ShoppingCart.init(system)

Verify that everything compiles with:

sbt compile
mvn compile

6. How serialization is included

The state, commands and events of the entity must be serializable because they are written to the datastore or sent between nodes within the Akka cluster. The template project includes built-in CBOR serialization. This section describes how serialization is implemented. You do not need to do anything specific to take advantage of CBOR, but this section explains how it is included.

The state, commands and events are marked as CborSerializable which is configured to use the built-in CBOR serialization. The template project includes this marker interface CborSerializable:

Java
src/main/java/shopping/cart/CborSerializable.java:
package shopping.cart;

/**
 * Marker trait for serialization with Jackson CBOR. Enabled in serialization.conf
 * `akka.actor.serialization-bindings` (via application.conf).
 */
public interface CborSerializable {}
Scala
src/main/scala/shopping/cart/CborSerializable.scala:
package shopping.cart

/**
 * Marker trait for serialization with Jackson CBOR.
 * Enabled in serialization.conf `akka.actor.serialization-bindings` (via application.conf).
 */
trait CborSerializable

The interface is configured in the serialization.conf file to enable CBOR serialization. serialization.conf is included in application.conf.

src/main/resources/serialization.conf
akka.actor.serialization-bindings {
  "shopping.cart.CborSerializable" = jackson-cbor
}

7. Unit testing

To test the ShoppingCart entity we can write a unit test using the EventSourcedBehaviorTestKit TestKitJunitResource.

A test for the AddItem command looks like this in src/test/scala/shopping/cart/ShoppingCartSpec.scala src/test/java/shopping/cart/ShoppingCartTest.java:

Java
src/test/java/shopping/cart/ShoppingCartTest.java:
package shopping.cart;

import static akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply;
import static org.junit.Assert.*;

import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.pattern.StatusReply;
import akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit;
import com.typesafe.config.ConfigFactory;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class ShoppingCartTest {

  private static final String CART_ID = "testCart";

  @ClassRule
  public static final TestKitJunitResource testKit =
      new TestKitJunitResource(
          ConfigFactory.parseString(
                  "akka.actor.serialization-bindings {\n"
                      + "  \"shopping.cart.CborSerializable\" = jackson-cbor\n"
                      + "}")
              .withFallback(EventSourcedBehaviorTestKit.config()));

  private EventSourcedBehaviorTestKit<ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State>
      eventSourcedTestKit =
          EventSourcedBehaviorTestKit.create(testKit.system(), ShoppingCart.create(CART_ID));

  @Before
  public void beforeEach() {
    eventSourcedTestKit.clear();
  }

  @Test
  public void addAnItemToCart() {
    CommandResultWithReply<
            ShoppingCart.Command,
            ShoppingCart.Event,
            ShoppingCart.State,
            StatusReply<ShoppingCart.Summary>>
        result =
            eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo));
    assertTrue(result.reply().isSuccess());
    ShoppingCart.Summary summary = result.reply().getValue();
    assertEquals(1, summary.items.size());
    assertEquals(42, summary.items.get("foo").intValue());
    assertEquals(new ShoppingCart.ItemAdded(CART_ID, "foo", 42), result.event());
  }

  @Test
  public void rejectAlreadyAddedItem() {
    CommandResultWithReply<
            ShoppingCart.Command,
            ShoppingCart.Event,
            ShoppingCart.State,
            StatusReply<ShoppingCart.Summary>>
        result1 =
            eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo));
    assertTrue(result1.reply().isSuccess());
    CommandResultWithReply<
            ShoppingCart.Command,
            ShoppingCart.Event,
            ShoppingCart.State,
            StatusReply<ShoppingCart.Summary>>
        result2 =
            eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo));
    assertTrue(result2.reply().isError());
    assertTrue(result2.hasNoEvents());
  }
}
Scala
src/test/scala/shopping/cart/ShoppingCartSpec.scala
package shopping.cart

import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.pattern.StatusReply
import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit
import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfterEach
import org.scalatest.wordspec.AnyWordSpecLike

object ShoppingCartSpec {
  val config = ConfigFactory
    .parseString("""
      akka.actor.serialization-bindings {
        "shopping.cart.CborSerializable" = jackson-cbor
      }
      """)
    .withFallback(EventSourcedBehaviorTestKit.config)
}

class ShoppingCartSpec
    extends ScalaTestWithActorTestKit(ShoppingCartSpec.config)
    with AnyWordSpecLike
    with BeforeAndAfterEach {

  private val cartId = "testCart"
  private val eventSourcedTestKit =
    EventSourcedBehaviorTestKit[
      ShoppingCart.Command,
      ShoppingCart.Event,
      ShoppingCart.State](system, ShoppingCart(cartId))

  override protected def beforeEach(): Unit = {
    super.beforeEach()
    eventSourcedTestKit.clear()
  }

  "The Shopping Cart" should {

    "add item" in {
      val result1 =
        eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]](
          replyTo => ShoppingCart.AddItem("foo", 42, replyTo))
      result1.reply should ===(
        StatusReply.Success(ShoppingCart.Summary(Map("foo" -> 42))))
      result1.event should ===(ShoppingCart.ItemAdded(cartId, "foo", 42))
    }

    "reject already added item" in {
      val result1 =
        eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]](
          ShoppingCart.AddItem("foo", 42, _))
      result1.reply.isSuccess should ===(true)
      val result2 =
        eventSourcedTestKit.runCommand[StatusReply[ShoppingCart.Summary]](
          ShoppingCart.AddItem("foo", 13, _))
      result2.reply.isError should ===(true)
    }

  }

}

Run the test with:

sbt test
mvn test

You can learn more about the EventSourcedBehaviorTestKit TestKitJunitResource in the Akka reference documentation new tab

8. Send commands to the entities

We want to send commands to the entities from the gRPC service implementation. In the previous step, we wrote a dummy implementation of addItem in the ShoppingCartServiceImpl. We can now replace that and send ShoppingCart.AddItem commands from ShoppingCartServiceImpl:

Java
src/main/java/shopping/cart/ShoppingCartServiceImpl.java:
package shopping.cart;

import akka.actor.typed.ActorSystem;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.grpc.GrpcServiceException;
import io.grpc.Status;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shopping.cart.proto.*;

public final class ShoppingCartServiceImpl implements ShoppingCartService {

  private final Logger logger = LoggerFactory.getLogger(getClass());

  private final Duration timeout;
  private final ClusterSharding sharding;

  public ShoppingCartServiceImpl(ActorSystem<?> system) {
    timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout");
    sharding = ClusterSharding.get(system);
  }

  @Override
  public CompletionStage<Cart> addItem(AddItemRequest in) {
    logger.info("addItem {} to cart {}", in.getItemId(), in.getCartId());
    EntityRef<ShoppingCart.Command> entityRef =
        sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId());
    CompletionStage<ShoppingCart.Summary> reply =
        entityRef.askWithStatus(
            replyTo -> new ShoppingCart.AddItem(in.getItemId(), in.getQuantity(), replyTo),
            timeout);
    CompletionStage<Cart> cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart);
    return convertError(cart);
  }

  private static Cart toProtoCart(ShoppingCart.Summary cart) {
    List<Item> protoItems =
        cart.items.entrySet().stream()
            .map(
                entry ->
                    Item.newBuilder()
                        .setItemId(entry.getKey())
                        .setQuantity(entry.getValue())
                        .build())
            .collect(Collectors.toList());

    return Cart.newBuilder().addAllItems(protoItems).build();
  }

  private static <T> CompletionStage<T> convertError(CompletionStage<T> response) {
    return response.exceptionally(
        ex -> {
          if (ex instanceof TimeoutException) {
            throw new GrpcServiceException(
                Status.UNAVAILABLE.withDescription("Operation timed out"));
          } else {
            throw new GrpcServiceException(
                Status.INVALID_ARGUMENT.withDescription(ex.getMessage()));
          }
        });
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala:
package shopping.cart

import java.util.concurrent.TimeoutException

import scala.concurrent.Future

import akka.actor.typed.ActorSystem
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.grpc.GrpcServiceException
import akka.util.Timeout
import io.grpc.Status
import org.slf4j.LoggerFactory

class ShoppingCartServiceImpl(system: ActorSystem[_])
    extends proto.ShoppingCartService {
  import system.executionContext

  private val logger = LoggerFactory.getLogger(getClass)

  implicit private val timeout: Timeout =
    Timeout.create(
      system.settings.config.getDuration("shopping-cart-service.ask-timeout"))

  private val sharding = ClusterSharding(system)

  override def addItem(in: proto.AddItemRequest): Future[proto.Cart] = {
    logger.info("addItem {} to cart {}", in.itemId, in.cartId)
    val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId)
    val reply: Future[ShoppingCart.Summary] =
      entityRef.askWithStatus(ShoppingCart.AddItem(in.itemId, in.quantity, _))
    val response = reply.map(cart => toProtoCart(cart))
    convertError(response)
  }

  private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = {
    proto.Cart(cart.items.iterator.map { case (itemId, quantity) =>
      proto.Item(itemId, quantity)
    }.toSeq)
  }

  private def convertError[T](response: Future[T]): Future[T] = {
    response.recoverWith {
      case _: TimeoutException =>
        Future.failed(
          new GrpcServiceException(
            Status.UNAVAILABLE.withDescription("Operation timed out")))
      case exc =>
        Future.failed(
          new GrpcServiceException(
            Status.INVALID_ARGUMENT.withDescription(exc.getMessage)))
    }
  }

}

If the command is successful, the entity will reply with StatusReply.Success with the updated ShoppingCart.Summary. If the validation in the entity fails it will reply with StatusReply.Error, which will fail the Future CompletionStage that is returned from askWithStatus.

Also, we added an ActorSystem parameter to the constructor of ShoppingCartServiceImpl. Edit Main to include the system as the parameter when creating a new instance of the ShoppingCartServiceImpl.

9. Configure Postgres

The events are stored in a PostgresSQL database and the template project includes configuration for that in the src/main/resources/persistence.conf file. We have to enable this configuration by including persistence.conf in application.conf:

include "persistence"

10. Run locally

To run the service, we first need to start the PostgresSQL to persist the events. Then we can run the service:

  1. From the root project directory, run the following command:

    docker-compose up -d

    This command will build and run the containers. The -d flag starts the containers in detached mode. Containers started in detached mode exit when the root process used to run the container exits

  2. Create the PostgresSQL tables from the SQL script located inside the ddl-scripts at the root of the project:

    docker exec -i shopping-cart-service_postgres-db_1 psql -U shopping-cart -t < ddl-scripts/create_tables.sql

    It creates all tables needed for Akka Persistence as well as the offset store table for Akka Projection.

    When loading the SQL script, make sure to use the same name as your running PostgresSQL container name. The container name is not fixed and depends on the parent folder of the docker-compose file. The above example assumes the project was created using the seed template and named shopping-cart-service.

    If you get a connection error, it means the PostgresSQL container is still starting. Wait a few seconds and re-try the command.

  3. Run the service with:

    sbt -Dconfig.resource=local1.conf run
    # make sure to compile before running exec:exec
    mvn compile exec:exec -DAPP_CONFIG=local1.conf

10.1. Exercise the service

Use grpcurl to exercise the service:

  1. Use grpcurl to add 3 socks to a cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Test the validation logic by trying to add the same item again, which should result in an error:

    grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  3. To verify that the events are actually saved, and the state can be recovered from the events you can stop the service with ctrl-c and then start it again.

  4. Add 2 t-shirts to the same cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem

    The returned updated cart should still contain the 3 socks.

10.2. Exercise with multiple service instances

Another fun experiment is to start several instances of the service on different ports (2552, 2553) and then interact with different carts via the different gRPC servers (gRPC ports 8101, 8102, 8103). To do this, you can use the other provided configuration files:

  1. In a new terminal, start a second instance with local configuration #2:

    sbt -Dconfig.resource=local2.conf run
    # make sure to compile before running exec:exec
    mvn compile exec:exec -DAPP_CONFIG=local2.conf
  2. Within another terminal, start a third instance with local configuration #3:

    sbt -Dconfig.resource=local3.conf run
    # make sure to compile before running exec:exec
    mvn compile exec:exec -DAPP_CONFIG=local3.conf

10.3. Stop the service

When finished, stop the service with ctrl-c. Leave the PostgresSQL running for the next set of steps, or stop it with:

docker-compose down
The following steps for cloud deployment are optional. If you are only running locally, you can skip to the next section of the tutorial.

11. Run in Kubernetes

Before following the steps below, create Kubernetes cluster and install the Akka Operator. Used the instructions below for:

11.1. Create PostgreSQL database

Follow the JDBC instructions to setup a PostgresSQL database in GCP or AWS and create a secret to access it.

Create the PostgresSQL tables using the ddl-scripts/create_tables.sql SQL script located at the root of the project. Follow the instructions in JDBC integration to connect to your database instance and load the script.

kubectl run -i rds-mgmt --image=postgres \
  --restart=Never --rm --env "PGPASSWORD=<password>" -- \
  psql -h <db hostname> -U postgres -t < ddl-scripts/create_tables.sql

If you created your database with the Installation on Amazon Elastic Kubernetes Service (EKS) Quick Start then you can reference the postgres username, password, and hostname using the pulumi output command (you must be in the Pulumi working directory for this to work), and reference the ddl script with an absolute path. See the Connect to the Aurora RDS database new tab section of the quick start for an example.

11.2. Build Docker image

Create a Docker repository and authenticate Docker.

GCP

Follow the instructions in Using Container Registry with Google Cloud new tab to deploy Docker images on GCP’s container registry.

AWS

Follow the instructions in Amazon Elastic Container Registry to deploy Docker images on AWS’s container registry.

11.3. Additional steps for Docker and AWS

If you are using AWS, you will also need to complete the following procedures.

Rebuild the Docker image.

Java
mvn -DskipTests -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com clean package docker:push
Scala
sbt -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com docker:publish

Take note of the image tag as displayed by the docker:publish docker:push command.

Java
DOCKER> Tagging image shopping-cart-service:20201209-135004-363ae2b successful!
Scala
[info] Successfully tagged shopping-cart-service:20201209-135004-363ae2b

11.4. Update the deployment descriptor

Update the kubernetes/shopping-cart-service-cr.yml deployment descriptor with the new image tag and the JDBC credential secret.

kubernetes/shopping-cart-service-cr.yml:
apiVersion: "v1"
kind: "Namespace"
metadata:
  name: "shopping"
---
apiVersion: akka.lightbend.com/v1
kind: AkkaMicroservice
metadata:
  name: shopping-cart-service
  namespace: "shopping"
spec:
  replicas: 1
  image: <docker-registry>/shopping-cart-service:<tag> (1)
  javaOptions: "-Xlog:gc -XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"
  resources:
    limits:
      memory: "2Gi"
    requests:
      memory: "2Gi"
      cpu: "1"
  jdbc:
    credentialsSecret: shopping-cart-service-jdbc-secret  (2)
1 Replace <docker-registry> with your docker registry address and update the image reference with the image tag from the output of the Docker build above, for example: 803424716218.dkr.ecr.eu-central-1.amazonaws.com/shopping-cart-service:20201209-135004-363ae2b.
2 Add a jdbc.credentialsSecret section pointing to the secret created in the JDBC instructions.

11.5. Apply to Kubernetes

Re-apply the shopping-cart-service-cr.yml to Kubernetes:

kubectl apply -f kubernetes/shopping-cart-service-cr.yml

You can see progress by viewing the status:

kubectl get akkamicroservices/shopping-cart-service

See troubleshooting deployment status for more details.

11.6. Exercise the service in Kubernetes

  1. You can list the pods with:

    kubectl get pods
  2. Inspect logs from a separate terminal window:

    kubectl logs -f <shopping-cart-service pod name from above>
  3. Add port forwarding for the gRPC endpoint from a separate terminal:

    kubectl port-forward svc/shopping-cart-service-grpc 8101:8101

Use grpcurl to exercise the service:

  1. Use grpcurl to add 3 socks to a cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Test the validation logic by trying to add the same item again, which should result in an error:

    grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  3. Add 2 t-shirts to the same cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem

    The returned updated cart should still contain the 3 socks.

Note the logging from the ShoppingCartServiceImpl in the console that is running kubectl logs -f.