Section 6: Projection for queries

Next, we will create an Akka Projection new tab from the events emitted by the ShoppingCart entity. The Projection will update counts in the database to track item popularity. Then, we can query the database to find how popular an item is. Since ShoppingCart entities can only be addressed by individual cart identifiers, we can find a particular cart, but we can’t find all carts that contain a particular item.

Example query

This piece of the full example focuses on the ItemPopularityProjection and a query representation in the database. On this page you will learn how to:

  • implement a Projection

  • distribute the Projection instances over the nodes in the Akka Cluster

  • work with the Projection JDBC API

The CQRS section explains why it is a good practice to build a Projection from entity events that can be queried. The Introduction to Akka Projections video new tab is also a good starting point for learning about Akka Projections.

This example is using PostgreSQL for storing the Projection result, and the Projection offset. An alternative is described in Use Cassandra instead of PostgreSQL.

Akka Workshop

The third video of the Akka Workshop Series new tab covers both Projections for queries and CQRS. It provides some solid guidance to aid you in understanding this section of this guide.

Source downloads

If you prefer to simply view and run the example, download a zip file containing the completed code:

Java
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

Scala
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

1. Process events in a Projection

To process events in a projection, we will:

  • encapsulate database access with ItemPopularityRepository, which can have a stubbed implementation for tests

  • add Repository implementation for JDBC

  • implement the event processing of the Projection in a Handler

This example uses Spring Data and Hibernate to access the database. You may use any other JDBC library to achieve this.

The template provides a few classes for integration with Spring Data: HibernateJdbcSession, SpringConfig, SpringIntegration and CreateTableTestUtils. How these classes work won’t be covered by this tutorial. Consult their respective source code in the template for more information on how they provide the necessary glue code.

This example uses the ScalikeJDBC library new tab to access the database.. You may use any other JDBC library to achieve this.

The template provides a few classes for integration with ScalikeJDBC, for instance: ScalikeJdbcSetup, ScalikeJdbcSession and CreateTableTestUtils. How these classes work won’t be covered by this tutorial. Consult their respective source code in the template for more information on how they provide the necessary glue code.

Follow these steps to process events in a Projection:

  1. Add the ItemPopularityRepository:

    Java
    src/main/java/shopping/cart/repository/ItemPopularityRepository.java:
    package shopping.cart.repository;
    
    import java.util.Optional;
    import org.springframework.data.repository.Repository;
    import shopping.cart.ItemPopularity;
    
    public interface ItemPopularityRepository extends Repository<ItemPopularity, String> {
    
      ItemPopularity save(ItemPopularity itemPopularity);
    
      Optional<ItemPopularity> findById(String id);
    }
    Scala
    src/main/scala/shopping/cart/repository/ItemPopularityRepository.scala:
    trait ItemPopularityRepository {
      def update(session: ScalikeJdbcSession, itemId: String, delta: Int): Unit
      def getItem(session: ScalikeJdbcSession, itemId: String): Option[Long]
    }
  2. Add the ItemPopularity:

    src/main/java/shopping/cart/ItemPopularity.java:
    package shopping.cart;
    
    import javax.persistence.*;
    
    @Entity
    @Table(name = "item_popularity")
    public class ItemPopularity {
    
      // primary key
      @Id private final String itemId;
    
      // optimistic locking
      @Version private final Long version;
    
      private final long count;
    
      public ItemPopularity() {
        // null version means the entity is not on the DB
        this.version = null;
        this.itemId = "";
        this.count = 0;
      }
    
      public ItemPopularity(String itemId, long version, long count) {
        this.itemId = itemId;
        this.version = version;
        this.count = count;
      }
    
      public String getItemId() {
        return itemId;
      }
    
      public long getCount() {
        return count;
      }
    
      public long getVersion() {
        return version;
      }
    
      public ItemPopularity changeCount(long delta) {
        return new ItemPopularity(itemId, version, count + delta);
      }
    }
  3. Use the Spring ApplicationContext class to retrieve the implementation for the repository. This will be added later to the Main class. Add the implementation for PostgresSQL by using the ScalikeJdbcSession:

    Java
    src/main/java/shopping/cart/Main.java:
    ApplicationContext springContext = SpringIntegration.applicationContext(system); (1)
    
    ItemPopularityRepository itemPopularityRepository =
        springContext.getBean(ItemPopularityRepository.class); (2)
    1 Initialize a Spring application context using the ActorSystem.
    2 Get a Spring generated implementation for the ItemPopularityRepository.
    Scala
    src/main/scala/shopping/cart/repository/ItemPopularityRepository.scala:
    class ItemPopularityRepositoryImpl() extends ItemPopularityRepository {
    
      override def update(
          session: ScalikeJdbcSession,
          itemId: String,
          delta: Int): Unit = {
        session.db.withinTx { implicit dbSession =>
          // This uses the PostgreSQL `ON CONFLICT` feature
          // Alternatively, this can be implemented by first issuing the `UPDATE`
          // and checking for the updated rows count. If no rows got updated issue
          // the `INSERT` instead.
          sql"""
               INSERT INTO item_popularity (itemid, count) VALUES ($itemId, $delta)
               ON CONFLICT (itemid) DO UPDATE SET count = item_popularity.count + $delta
             """.executeUpdate().apply()
        }
      }
    
      override def getItem(
          session: ScalikeJdbcSession,
          itemId: String): Option[Long] = {
        if (session.db.isTxAlreadyStarted) {
          session.db.withinTx { implicit dbSession =>
            select(itemId)
          }
        } else {
          session.db.readOnly { implicit dbSession =>
            select(itemId)
          }
        }
      }
    
      private def select(itemId: String)(implicit dbSession: DBSession) = {
        sql"SELECT count FROM item_popularity WHERE itemid = $itemId"
          .map(_.long("count"))
          .toOption()
          .apply()
      }
    }
  4. Add a class ItemPopularityProjectionHandler:

    Java
    src/main/java/shopping/cart/ItemPopularityProjectionHandler.java:
    package shopping.cart;
    
    import akka.projection.eventsourced.EventEnvelope;
    import akka.projection.jdbc.javadsl.JdbcHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import shopping.cart.repository.HibernateJdbcSession;
    import shopping.cart.repository.ItemPopularityRepository;
    
    public final class ItemPopularityProjectionHandler
        extends JdbcHandler<EventEnvelope<ShoppingCart.Event>, HibernateJdbcSession> { (1)
      private final Logger logger = LoggerFactory.getLogger(getClass());
      private final String tag;
      private final ItemPopularityRepository repo;
    
      public ItemPopularityProjectionHandler(String tag, ItemPopularityRepository repo) {
        this.tag = tag;
        this.repo = repo;
      }
    
      private ItemPopularity findOrNew(String itemId) {
        return repo.findById(itemId).orElseGet(() -> new ItemPopularity(itemId, 0, 0));
      }
    
      @Override
      public void process(
          HibernateJdbcSession session, EventEnvelope<ShoppingCart.Event> envelope) { (2)
        ShoppingCart.Event event = envelope.event();
    
        if (event instanceof ShoppingCart.ItemAdded) { (3)
          ShoppingCart.ItemAdded added = (ShoppingCart.ItemAdded) event;
          String itemId = added.itemId;
    
          ItemPopularity existingItemPop = findOrNew(itemId);
          ItemPopularity updatedItemPop = existingItemPop.changeCount(added.quantity);
          repo.save(updatedItemPop);
    
          logger.info(
              "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]",
              this.tag,
              itemId,
              updatedItemPop.getCount());
        } else {
          // skip all other events, such as `CheckedOut`
        }
      }
    }
    1 Extends akka.projection.javadsl.JdbcHandler.
    2 The process method to implement.
    3 Match events and increment or decrement the count via the ItemPopularityRepository, which encapsulates the database access.
    Scala
    src/main/scala/shopping/cart/ItemPopularityProjectionHandler.scala:
    package shopping.cart
    
    import akka.actor.typed.ActorSystem
    import akka.projection.eventsourced.EventEnvelope
    import akka.projection.jdbc.scaladsl.JdbcHandler
    import org.slf4j.LoggerFactory
    import shopping.cart.repository.{ ItemPopularityRepository, ScalikeJdbcSession }
    
    class ItemPopularityProjectionHandler(
        tag: String,
        system: ActorSystem[_],
        repo: ItemPopularityRepository)
        extends JdbcHandler[
          EventEnvelope[ShoppingCart.Event],
          ScalikeJdbcSession]() { (1)
    
      private val log = LoggerFactory.getLogger(getClass)
    
      override def process(
          session: ScalikeJdbcSession,
          envelope: EventEnvelope[ShoppingCart.Event]): Unit = { (2)
        envelope.event match { (3)
          case ShoppingCart.ItemAdded(_, itemId, quantity) =>
            repo.update(session, itemId, quantity)
            logItemCount(session, itemId)
    
    
          case _: ShoppingCart.CheckedOut =>
        }
      }
    
      private def logItemCount(
          session: ScalikeJdbcSession,
          itemId: String): Unit = {
        log.info(
          "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]",
          tag,
          itemId,
          repo.getItem(session, itemId).getOrElse(0))
      }
    
    }
    1 Extends akka.projection.scaladsl.JdbcHandler.
    2 The process method to implement.
    3 Match events and increment or decrement the count via the ItemPopularityRepository.

2. Initialize the Projection

We want to connect the events from the ShoppingCart with the Projection. Several instances of the Projection may run on different nodes of the Akka Cluster. Each Projection instance will consume a slice of the events to distribute the load. All events from a specific entity (cart id) will always be processed by the same Projection instance so that it can build a stateful model from the events if needed.

2.1. Create tags

To connect the events from the entities with the Projection we need to tag the events. We should use several tags, each with a slice number, to distribute the events over several Projection instances. The tag is selected based on the modulo of the entity id’s hash code (stable identifier) and the total number of tags. Each entity instance will tag its events using one of those tags, and the entity instance will always use the same tag.

Create tags as follows:

  1. Edit ShoppingCart.scala ShoppingCart.java to include the following:

    Java
    src/main/java/shopping/cart/ShoppingCart.java
    static final List<String> TAGS =
        Collections.unmodifiableList(
            Arrays.asList("carts-0", "carts-1", "carts-2", "carts-3", "carts-4"));
    
    public static void init(ActorSystem<?> system) {
      ClusterSharding.get(system)
          .init(
              Entity.of(
                  ENTITY_KEY,
                  entityContext -> {
                    int i = Math.abs(entityContext.getEntityId().hashCode() % TAGS.size());
                    String selectedTag = TAGS.get(i);
                    return ShoppingCart.create(entityContext.getEntityId(), selectedTag);
                  }));
    }
    Scala
    src/main/scala/shopping/cart/ShoppingCart.scala
    import akka.cluster.sharding.typed.scaladsl.EntityContext
    
      val tags = Vector.tabulate(5)(i => s"carts-$i")
    
      def init(system: ActorSystem[_]): Unit = {
        val behaviorFactory: EntityContext[Command] => Behavior[Command] = {
          entityContext =>
            val i = math.abs(entityContext.entityId.hashCode % tags.size)
            val selectedTag = tags(i)
            ShoppingCart(entityContext.entityId, selectedTag)
        }
        ClusterSharding(system).init(Entity(EntityKey)(behaviorFactory))
      }

    One of the tags is selected based on the cartId, which is the entityContext.entityId. The tag is assigned to the EventSourcedBehavior.

  2. In the ShoppingCart.apply method, add the projectionTag parameter and pass it to .withTagger: In the ShoppingCart constructor, add the projectionTag parameter and use it to override the tagsFor method:

    Java
    src/main/java/shopping/cart/ShoppingCart.java
    public static Behavior<Command> create(String cartId, String projectionTag) {
      return Behaviors.setup(
          ctx -> EventSourcedBehavior.start(new ShoppingCart(cartId, projectionTag), ctx));
    }
    
    private final String projectionTag;
    
    private final String cartId;
    
    private ShoppingCart(String cartId, String projectionTag) {
      super(
          PersistenceId.of(ENTITY_KEY.name(), cartId),
          SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1));
      this.cartId = cartId;
      this.projectionTag = projectionTag;
    }
    
    @Override
    public Set<String> tagsFor(Event event) { (1)
      return Collections.singleton(projectionTag);
    }
    1 Use tagsFor to assign the projectionTag.
    Scala
    src/main/scala/shopping/cart/ShoppingCart.scala
    def apply(cartId: String, projectionTag: String): Behavior[Command] = {
      EventSourcedBehavior
        .withEnforcedReplies[Command, Event, State](
          persistenceId = PersistenceId(EntityKey.name, cartId),
          emptyState = State.empty,
          commandHandler =
            (state, command) => handleCommand(cartId, state, command),
          eventHandler = (state, event) => handleEvent(state, event))
        .withTagger(_ => Set(projectionTag)) (1)
        .withRetention(RetentionCriteria
          .snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3))
        .onPersistFailure(
          SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
    }
    1 Use withTagger to assign the projectionTag.
In this example, we use five different tags. Tagging is not easy to change later without system downtime. Before going live in production you should consider how many tags to use, see Akka Projections reference documentation new tab for more information.

2.2. Create Projection

To create the Projection:

  1. Place the initialization code of the Projection in an ItemPopularityProjection object class:

    Java
    src/main/java/shopping/cart/ItemPopularityProjection.java:
    package shopping.cart;
    
    import akka.actor.typed.ActorSystem;
    import akka.cluster.sharding.typed.ShardedDaemonProcessSettings;
    import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
    import akka.persistence.jdbc.query.javadsl.JdbcReadJournal;
    import akka.persistence.query.Offset;
    import akka.projection.ProjectionBehavior;
    import akka.projection.ProjectionId;
    import akka.projection.eventsourced.EventEnvelope;
    import akka.projection.eventsourced.javadsl.EventSourcedProvider;
    import akka.projection.javadsl.ExactlyOnceProjection;
    import akka.projection.javadsl.SourceProvider;
    import akka.projection.jdbc.javadsl.JdbcProjection;
    import java.util.Optional;
    import org.springframework.orm.jpa.JpaTransactionManager;
    import shopping.cart.repository.HibernateJdbcSession;
    import shopping.cart.repository.ItemPopularityRepository;
    
    public final class ItemPopularityProjection {
    
      private ItemPopularityProjection() {}
    
      public static void init(
          ActorSystem<?> system,
          JpaTransactionManager transactionManager,
          ItemPopularityRepository repository) {
    
        ShardedDaemonProcess.get(system)
            .init( (1)
                ProjectionBehavior.Command.class,
                "ItemPopularityProjection",
                ShoppingCart.TAGS.size(),
                index ->
                    ProjectionBehavior.create(
                        createProjectionFor(system, transactionManager, repository, index)),
                ShardedDaemonProcessSettings.create(system),
                Optional.of(ProjectionBehavior.stopMessage()));
      }
    
      private static ExactlyOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>>
          createProjectionFor(
              ActorSystem<?> system,
              JpaTransactionManager transactionManager,
              ItemPopularityRepository repository,
              int index) {
    
        String tag = ShoppingCart.TAGS.get(index); (2)
    
        SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = (3)
            EventSourcedProvider.eventsByTag(
                system,
                JdbcReadJournal.Identifier(), (4)
                tag);
    
        return JdbcProjection.exactlyOnce( (5)
            ProjectionId.of("ItemPopularityProjection", tag),
            sourceProvider,
            () -> new HibernateJdbcSession(transactionManager), (6)
            () -> new ItemPopularityProjectionHandler(tag, repository), (7)
            system);
      }
    }
    1 ShardedDaemonProcess will manage the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.
    2 The tag is selected based on the Projection instance’s index, corresponding to carts-0 to carts-3 as explained in the tagging in the ShoppingCart.
    3 The source of the Projection is EventSourcedProvider.eventsByTag with the selected tag.
    4 Using the JDBC event journal.
    5 Using JDBC for offset storage of the Projection using exactly-once strategy. Offset and projected model will be persisted transactionally.
    6 Define a HibernateJdbcSession factory. The JDBC connection are create by the projection and used to save the offset and the projected model.
    7 Define a Projection Handler factory for the handler we wrote in the beginning of this part.
    Scala
    src/main/scala/shopping/cart/ItemPopularityProjection.scala:
    package shopping.cart
    
    import akka.actor.typed.ActorSystem
    import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
    import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
    import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
    import akka.persistence.query.Offset
    import akka.projection.eventsourced.EventEnvelope
    import akka.projection.eventsourced.scaladsl.EventSourcedProvider
    import akka.projection.jdbc.scaladsl.JdbcProjection
    import akka.projection.scaladsl.{ ExactlyOnceProjection, SourceProvider }
    import akka.projection.{ ProjectionBehavior, ProjectionId }
    import shopping.cart.repository.{ ItemPopularityRepository, ScalikeJdbcSession }
    
    object ItemPopularityProjection {
      def init(
          system: ActorSystem[_],
          repository: ItemPopularityRepository): Unit = {
        ShardedDaemonProcess(system).init( (1)
          name = "ItemPopularityProjection",
          ShoppingCart.tags.size,
          index =>
            ProjectionBehavior(createProjectionFor(system, repository, index)),
          ShardedDaemonProcessSettings(system),
          Some(ProjectionBehavior.Stop))
      }
    
      private def createProjectionFor(
          system: ActorSystem[_],
          repository: ItemPopularityRepository,
          index: Int)
          : ExactlyOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = {
        val tag = ShoppingCart.tags(index) (2)
    
        val sourceProvider
            : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = (3)
          EventSourcedProvider.eventsByTag[ShoppingCart.Event](
            system = system,
            readJournalPluginId = JdbcReadJournal.Identifier, (4)
            tag = tag)
    
        JdbcProjection.exactlyOnce( (5)
          projectionId = ProjectionId("ItemPopularityProjection", tag),
          sourceProvider,
          handler = () =>
            new ItemPopularityProjectionHandler(tag, system, repository), (6)
          sessionFactory = () => new ScalikeJdbcSession())(system)
      }
    
    }
    1 ShardedDaemonProcess will manage the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.
    2 The tag is selected based on the Projection instance’s index, corresponding to carts-0 to carts-3 as explained in the tagging in the ShoppingCart.
    3 The source of the Projection is EventSourcedProvider.eventsByTag with the selected tag.
    4 Using the JDBC event journal.
    5 Using JDBC for offset storage of the Projection using exactly-once strategy. Offset and projected model will be persisted transactionally.
    6 Creating the Projection Handler that we wrote in the beginning of this part.
  2. Call the ItemPopularityProjection.init from Main:

    Java
    src/main/java/shopping/cart/Main.java
    ApplicationContext springContext = SpringIntegration.applicationContext(system); (1)
    
    ItemPopularityRepository itemPopularityRepository =
        springContext.getBean(ItemPopularityRepository.class); (2)
    
    JpaTransactionManager transactionManager =
        springContext.getBean(JpaTransactionManager.class); (3)
    
    ItemPopularityProjection.init(system, transactionManager, itemPopularityRepository); (4)
    1 Initialize a Spring application context using the ActorSystem.
    2 Get a Spring generated implementation for the ItemPopularityRepository.
    3 Get a Spring JpaTransactionManager to pass to the Projection init method.
    4 Initialize the Projection passing all necessary dependencies.
    Scala
    src/main/scala/shopping/cart/Main.scala
    ScalikeJdbcSetup.init(system) (1)
    val itemPopularityRepository = new ItemPopularityRepositoryImpl() (2)
    ItemPopularityProjection.init(system, itemPopularityRepository) (3)
    1 Call ScalikeJdbcSetup.init method to initiate the connection pool for the read-side. The connection pool will be closed when the actor system terminates.
    2 Instantiate the repository.
    3 Call the initialization of the Projection.

3. Query

To expose the item popularity to the outside of the service we can add an operation in the gRPC ShoppingCartService. Follow these steps:

  1. Add a new GetItemPopularity operation to the ShoppingCartService.proto:

    src/main/protobuf/ShoppingCartService.proto
    service ShoppingCartService {
        rpc GetItemPopularity(GetItemPopularityRequest) returns (GetItemPopularityResponse) {}
    }
    
    message GetItemPopularityRequest {
        string itemId = 1;
    }
    
    message GetItemPopularityResponse {
        string itemId = 1;
        int64 popularityCount = 2;
    }
  2. Generate code from the new Protobuf specification by compiling the project:

    Java
    mvn compile
    Scala
    sbt compile
  3. Add the getItemPopularity method to the ShoppingCartServiceImpl:

    For this you have to add the ItemPopularityRepository as a constructor parameter to the ShoppingCartServiceImpl. The ItemPopularityRepository instance is created in Main.scala Main.java so pass that instance as parameter to ShoppingCartServiceImpl.

Java
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
private final ItemPopularityRepository repository;
private final Executor blockingJdbcExecutor;

public ShoppingCartServiceImpl(
    ActorSystem<?> system, ItemPopularityRepository repository) { (1)

  DispatcherSelector dispatcherSelector =
      DispatcherSelector.fromConfig("akka.projection.jdbc.blocking-jdbc-dispatcher");
  this.blockingJdbcExecutor = system.dispatchers().lookup(dispatcherSelector); (2)

  this.repository = repository;
  timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout");
  sharding = ClusterSharding.get(system);
}

@Override
public CompletionStage<GetItemPopularityResponse> getItemPopularity(GetItemPopularityRequest in) {

  CompletionStage<Optional<ItemPopularity>> itemPopularity =
      CompletableFuture.supplyAsync(
          () -> repository.findById(in.getItemId()), blockingJdbcExecutor); (3)

  return itemPopularity.thenApply(
      popularity -> {
        long count = popularity.map(ItemPopularity::getCount).orElse(0L);
        return GetItemPopularityResponse.newBuilder().setPopularityCount(count).build();
      });
}
1 Add the ItemPopularityRepository to the service implementation constructor.
2 Use the ActorSystem to get an instance of an Executor tailored for running JDBC blocking operations.
3 Implement getItemPopularity by calling the repository to find the projected model by id and wrap it with CompletableFuture.supplyAsync.
Scala
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala
class ShoppingCartServiceImpl(
    system: ActorSystem[_],
    itemPopularityRepository: ItemPopularityRepository) (1)
    extends proto.ShoppingCartService {

  private val blockingJdbcExecutor: ExecutionContext =
    system.dispatchers.lookup(
      DispatcherSelector
        .fromConfig("akka.projection.jdbc.blocking-jdbc-dispatcher")
    ) (2)

  override def getItemPopularity(in: proto.GetItemPopularityRequest)
      : Future[proto.GetItemPopularityResponse] = {
    Future { (3)
      ScalikeJdbcSession.withSession { session =>
        itemPopularityRepository.getItem(session, in.itemId)
      }
    }(blockingJdbcExecutor).map {
      case Some(count) =>
        proto.GetItemPopularityResponse(in.itemId, count)
      case None =>
        proto.GetItemPopularityResponse(in.itemId, 0L)
    }
  }
}
1 Add the ItemPopularityRepository to the service implementation constructor.
2 Use the ActorSystem to get an instance of an ExecutionContext tailored for running JDBC blocking operations.
3 Implement getItemPopularity by calling the repository to find the projected model by id and wrap it in a Future running on the executor for JDBC operations.

Calls to the repository are blocking therefore we wrap it with CompletableFuture.supplyAsync and we use the JDBC blocking executor to run it. This is a pre-configured dispatcher provided by Akka Projections. Its pool size can be configured with settings akka.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size (see src/main/resources/persistence.conf).

Make sure to follow that pattern whenever you interact with a DB repository.

Calls to the repository are blocking therefore we run them in a Future with the JDBC blocking execution context. This is a pre-configured dispatcher provided by Akka Projections. Its pool size can be configured with settings akka.projection.jdbc.blocking-jdbc-dispatcher.thread-pool-executor.fixed-pool-size (see src/main/resources/persistence.conf).

Make sure to follow that pattern whenever you interact with a DB repository.

4. Run locally

Try your solution by running locally:

  1. Start the docker containers, unless they are already running:

    docker-compose up -d
  2. Create the item popularity table by creating a ddl-scripts/create_user_tables.sql file and adding the SQL statement below.

    CREATE TABLE IF NOT EXISTS public.item_popularity (
        itemid VARCHAR(255) NOT NULL,
        version BIGINT NOT NULL,
        count BIGINT NOT NULL,
        PRIMARY KEY (itemid));
CREATE TABLE IF NOT EXISTS public.item_popularity (
    itemid VARCHAR(255) NOT NULL,
    count BIGINT NOT NULL,
    PRIMARY KEY (itemid));
  1. Load the file into Postgres:

    docker exec -i shopping-cart-service_postgres-db_1 psql -U shopping-cart -t < ddl-scripts/create_user_tables.sql

    When loading the SQL script, make sure to use the same name as your running PostgresSQL container name. The container name is not fixed and depends on the parent folder of the docker-compose file. The above example assumes the project was created using the seed template and named shopping-cart-service.

    If you get a connection error, it means the PostgresSQL container is still starting. Wait a few seconds and re-try the command.

  2. Run the service with:

    # make sure to compile before running exec:exec
    mvn compile exec:exec -DAPP_CONFIG=local1.conf
    sbt -Dconfig.resource=local1.conf run

4.1. Exercise the service

Use grpcurl to exercise the service:

  1. Add 5 hoodies to a cart:

    grpcurl -d '{"cartId":"cart3", "itemId":"hoodie", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Check the popularity of the item:

    grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity
  3. Add 2 hoodies to another cart:

    grpcurl -d '{"cartId":"cart5", "itemId":"hoodie", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  4. Check that the popularity count increased to 7:

    grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity

4.2. Stop the service

When finished, stop the service with ctrl-c. Leave PostgresSQL running for the next set of steps, or stop it with:

docker-compose stop
The following steps for cloud deployment are optional. If you are only running locally, you can skip to the next section of the tutorial.

5. Run in Kubernetes

Create a Kubernetes cluster and install the Akka Operator if you haven’t already.

5.1. Create a Item Popularity table

Create the item popularity table using the ddl-scripts/create_user_tables.sql SQL script. Follow the instructions in JDBC integration to connect to your PostgreSQL instance and load the script.

kubectl run -i rds-mgmt --image=postgres \
  --restart=Never --rm --env "PGPASSWORD=<password>" -- \
  psql -h <rds endpoint> -U postgres -t < ddl-scripts/create_user_tables.sql

Make sure that ddl-scripts/create_tables.sql is also loaded as previously described.

If you created your database with the Installation on Amazon Elastic Kubernetes Service (EKS) Quick Start then you can reference the postgres username, password, and hostname using the pulumi output command (you must be in the Pulumi working directory for this to work), and reference the ddl script with an absolute path. See the Connect to the Aurora RDS database new tab section of the quick start for an example.

Before following the steps below, create Kubernetes cluster and install the Akka Operator. Used the instructions below for:

5.2. Build Docker image

Create a Docker repository and authenticate Docker.

GCP

Follow the instructions in Using Container Registry with Google Cloud new tab to deploy Docker images on GCP’s container registry.

AWS

Follow the instructions in Amazon Elastic Container Registry to deploy Docker images on AWS’s container registry.

5.3. Additional steps for Docker and AWS

If you are using AWS, you will also need to complete the following procedures.

Rebuild the Docker image.

Java
mvn -DskipTests -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com clean package docker:push
Scala
sbt -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com docker:publish

Take note of the image tag as displayed by the docker:publish docker:push command.

Java
DOCKER> Tagging image shopping-cart-service:20201209-135004-363ae2b successful!
Scala
[info] Successfully tagged shopping-cart-service:20201209-135004-363ae2b

5.4. Update the deployment descriptor

Update the kubernetes/shopping-cart-service-cr.yml deployment descriptor with the new image tag as previously described.

5.5. Apply to Kubernetes

Re-apply the shopping-cart-service-cr.yml to Kubernetes:

kubectl apply -f kubernetes/shopping-cart-service-cr.yml

You can see progress by viewing the status:

kubectl get akkamicroservices/shopping-cart-service

See troubleshooting deployment status for more details.

5.6. Exercise the service in Kubernetes

  1. You can list the pods with:

    kubectl get pods
  2. Inspect logs from a separate terminal window:

    kubectl logs -f <shopping-cart-service pod name from above>
  3. Add port forwarding for the gRPC endpoint from a separate terminal:

    kubectl port-forward svc/shopping-cart-service-grpc 8101:8101

Use grpcurl to exercise the service:

  1. Add 5 hoodies to a cart:

    grpcurl -d '{"cartId":"cart3", "itemId":"hoodie", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Check the popularity of the item:

    grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity
  3. Add 2 hoodies to another cart:

    grpcurl -d '{"cartId":"cart5", "itemId":"hoodie", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  4. Check that the popularity count increased to 7:

    grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity

If you decide to compile and run your tests you may encounter aan error reason: actual and formal argument lists differ in length. If this should occur you may bypass the error by adding an arbitrary tag to the shopping cart creation within the test.