Section 6: Projection for queries
Next, we will create an Akka Projection from the events emitted by the
ShoppingCart
entity. The Projection will update counts in the database to track item popularity. Then, we can query the database to find how popular an item is. Since ShoppingCart
entities can only be addressed by individual cart identifiers, we can find a particular cart, but we can’t find all carts that contain a particular item.
This piece of the full example focuses on the ItemPopularityProjection
and a query representation in the database. On this page you will learn how to:
-
implement a Projection
-
distribute the Projection instances over the nodes in the Akka Cluster
-
work with the Projection JDBC API
The CQRS section explains why it is a good practice to build a Projection from entity events that can be queried. The Introduction to Akka Projections video is also a good starting point for learning about Akka Projections.
This example is using PostgreSQL for storing the Projection result, and the Projection offset. An alternative is described in Use Cassandra instead of PostgreSQL.
Akka Workshop
The third video of the Akka Workshop Series covers both Projections for queries and CQRS. It provides some solid guidance to aid you in understanding this section of this guide.
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. Process events in a Projection
To process events in a projection, we will:
-
encapsulate database access with
ItemPopularityRepository
, which can have a stubbed implementation for tests -
add Repository implementation for JDBC
-
implement the event processing of the Projection in a
Handler
Follow these steps to process events in a Projection:
-
Add the
ItemPopularityRepository
:- Java
-
src/main/java/shopping/cart/repository/ItemPopularityRepository.java:
package shopping.cart.repository; import java.util.Optional; import org.springframework.data.repository.Repository; import shopping.cart.ItemPopularity; public interface ItemPopularityRepository extends Repository<ItemPopularity, String> { ItemPopularity save(ItemPopularity itemPopularity); Optional<ItemPopularity> findById(String id); }
- Scala
-
src/main/scala/shopping/cart/repository/ItemPopularityRepository.scala:
trait ItemPopularityRepository { def update(session: ScalikeJdbcSession, itemId: String, delta: Int): Unit def getItem(session: ScalikeJdbcSession, itemId: String): Option[Long] }
-
Add the
ItemPopularity
:src/main/java/shopping/cart/ItemPopularity.java:package shopping.cart; import javax.persistence.*; @Entity @Table(name = "item_popularity") public class ItemPopularity { // primary key @Id private final String itemId; // optimistic locking @Version private final Long version; private final long count; public ItemPopularity() { // null version means the entity is not on the DB this.version = null; this.itemId = ""; this.count = 0; } public ItemPopularity(String itemId, long version, long count) { this.itemId = itemId; this.version = version; this.count = count; } public String getItemId() { return itemId; } public long getCount() { return count; } public long getVersion() { return version; } public ItemPopularity changeCount(long delta) { return new ItemPopularity(itemId, version, count + delta); } }
-
Use the Spring
ApplicationContext
class to retrieve the implementation for the repository. This will be added later to theMain
class. Add the implementation for PostgresSQL by using theScalikeJdbcSession
:- Java
-
src/main/java/shopping/cart/Main.java:
ApplicationContext springContext = SpringIntegration.applicationContext(system); (1) ItemPopularityRepository itemPopularityRepository = springContext.getBean(ItemPopularityRepository.class); (2)
1 Initialize a Spring application context using the ActorSystem. 2 Get a Spring generated implementation for the ItemPopularityRepository
. - Scala
-
src/main/scala/shopping/cart/repository/ItemPopularityRepository.scala:
class ItemPopularityRepositoryImpl() extends ItemPopularityRepository { override def update( session: ScalikeJdbcSession, itemId: String, delta: Int): Unit = { session.db.withinTx { implicit dbSession => // This uses the PostgreSQL `ON CONFLICT` feature // Alternatively, this can be implemented by first issuing the `UPDATE` // and checking for the updated rows count. If no rows got updated issue // the `INSERT` instead. sql""" INSERT INTO item_popularity (itemid, count) VALUES ($itemId, $delta) ON CONFLICT (itemid) DO UPDATE SET count = item_popularity.count + $delta """.executeUpdate().apply() } } override def getItem( session: ScalikeJdbcSession, itemId: String): Option[Long] = { if (session.db.isTxAlreadyStarted) { session.db.withinTx { implicit dbSession => select(itemId) } } else { session.db.readOnly { implicit dbSession => select(itemId) } } } private def select(itemId: String)(implicit dbSession: DBSession) = { sql"SELECT count FROM item_popularity WHERE itemid = $itemId" .map(_.long("count")) .toOption() .apply() } }
-
Add a class
ItemPopularityProjectionHandler
:- Java
-
src/main/java/shopping/cart/ItemPopularityProjectionHandler.java:
package shopping.cart; import akka.projection.eventsourced.EventEnvelope; import akka.projection.jdbc.javadsl.JdbcHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.repository.HibernateJdbcSession; import shopping.cart.repository.ItemPopularityRepository; public final class ItemPopularityProjectionHandler extends JdbcHandler<EventEnvelope<ShoppingCart.Event>, HibernateJdbcSession> { (1) private final Logger logger = LoggerFactory.getLogger(getClass()); private final String tag; private final ItemPopularityRepository repo; public ItemPopularityProjectionHandler(String tag, ItemPopularityRepository repo) { this.tag = tag; this.repo = repo; } private ItemPopularity findOrNew(String itemId) { return repo.findById(itemId).orElseGet(() -> new ItemPopularity(itemId, 0, 0)); } @Override public void process( HibernateJdbcSession session, EventEnvelope<ShoppingCart.Event> envelope) { (2) ShoppingCart.Event event = envelope.event(); if (event instanceof ShoppingCart.ItemAdded) { (3) ShoppingCart.ItemAdded added = (ShoppingCart.ItemAdded) event; String itemId = added.itemId; ItemPopularity existingItemPop = findOrNew(itemId); ItemPopularity updatedItemPop = existingItemPop.changeCount(added.quantity); repo.save(updatedItemPop); logger.info( "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", this.tag, itemId, updatedItemPop.getCount()); } else { // skip all other events, such as `CheckedOut` } } }
1 Extends akka.projection.javadsl.JdbcHandler
.2 The process
method to implement.3 Match events and increment or decrement the count via the ItemPopularityRepository
, which encapsulates the database access. - Scala
-
src/main/scala/shopping/cart/ItemPopularityProjectionHandler.scala:
package shopping.cart import akka.actor.typed.ActorSystem import akka.projection.eventsourced.EventEnvelope import akka.projection.jdbc.scaladsl.JdbcHandler import org.slf4j.LoggerFactory import shopping.cart.repository.{ ItemPopularityRepository, ScalikeJdbcSession } class ItemPopularityProjectionHandler( tag: String, system: ActorSystem[_], repo: ItemPopularityRepository) extends JdbcHandler[ EventEnvelope[ShoppingCart.Event], ScalikeJdbcSession]() { (1) private val log = LoggerFactory.getLogger(getClass) override def process( session: ScalikeJdbcSession, envelope: EventEnvelope[ShoppingCart.Event]): Unit = { (2) envelope.event match { (3) case ShoppingCart.ItemAdded(_, itemId, quantity) => repo.update(session, itemId, quantity) logItemCount(session, itemId) case _: ShoppingCart.CheckedOut => } } private def logItemCount( session: ScalikeJdbcSession, itemId: String): Unit = { log.info( "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", tag, itemId, repo.getItem(session, itemId).getOrElse(0)) } }
1 Extends akka.projection.scaladsl.JdbcHandler
.2 The process
method to implement.3 Match events and increment or decrement the count via the ItemPopularityRepository
.
2. Initialize the Projection
We want to connect the events from the ShoppingCart
with the Projection. Several instances of the Projection may run on different nodes of the Akka Cluster. Each Projection instance will consume a slice of the events to distribute the load. All events from a specific entity (cart id) will always be processed by the same Projection instance so that it can build a stateful model from the events if needed.
2.1. Create tags
To connect the events from the entities with the Projection we need to tag the events. We should use several tags, each with a slice number, to distribute the events over several Projection instances. The tag is selected based on the modulo of the entity id’s hash code (stable identifier) and the total number of tags. Each entity instance will tag its events using one of those tags, and the entity instance will always use the same tag.
Create tags as follows:
-
Edit
ShoppingCart.scala
ShoppingCart.java
to include the following:- Java
-
src/main/java/shopping/cart/ShoppingCart.java
static final List<String> TAGS = Collections.unmodifiableList( Arrays.asList("carts-0", "carts-1", "carts-2", "carts-3", "carts-4")); public static void init(ActorSystem<?> system) { ClusterSharding.get(system) .init( Entity.of( ENTITY_KEY, entityContext -> { int i = Math.abs(entityContext.getEntityId().hashCode() % TAGS.size()); String selectedTag = TAGS.get(i); return ShoppingCart.create(entityContext.getEntityId(), selectedTag); })); }
- Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
import akka.cluster.sharding.typed.scaladsl.EntityContext val tags = Vector.tabulate(5)(i => s"carts-$i") def init(system: ActorSystem[_]): Unit = { val behaviorFactory: EntityContext[Command] => Behavior[Command] = { entityContext => val i = math.abs(entityContext.entityId.hashCode % tags.size) val selectedTag = tags(i) ShoppingCart(entityContext.entityId, selectedTag) } ClusterSharding(system).init(Entity(EntityKey)(behaviorFactory)) }
One of the tags is selected based on the
cartId
, which is theentityContext.entityId
. The tag is assigned to theEventSourcedBehavior
. -
In the
ShoppingCart.apply
method, add theprojectionTag
parameter and pass it to.withTagger
: In theShoppingCart
constructor, add theprojectionTag
parameter and use it to override thetagsFor
method:- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public static Behavior<Command> create(String cartId, String projectionTag) { return Behaviors.setup( ctx -> EventSourcedBehavior.start(new ShoppingCart(cartId, projectionTag), ctx)); } private final String projectionTag; private final String cartId; private ShoppingCart(String cartId, String projectionTag) { super( PersistenceId.of(ENTITY_KEY.name(), cartId), SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1)); this.cartId = cartId; this.projectionTag = projectionTag; } @Override public Set<String> tagsFor(Event event) { (1) return Collections.singleton(projectionTag); }
1 Use tagsFor
to assign theprojectionTag
. - Scala
-
src/main/scala/shopping/cart/ShoppingCart.scala
def apply(cartId: String, projectionTag: String): Behavior[Command] = { EventSourcedBehavior .withEnforcedReplies[Command, Event, State]( persistenceId = PersistenceId(EntityKey.name, cartId), emptyState = State.empty, commandHandler = (state, command) => handleCommand(cartId, state, command), eventHandler = (state, event) => handleEvent(state, event)) .withTagger(_ => Set(projectionTag)) (1) .withRetention(RetentionCriteria .snapshotEvery(numberOfEvents = 100, keepNSnapshots = 3)) .onPersistFailure( SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) }
1 Use withTagger
to assign theprojectionTag
.
In this example, we use five different tags. Tagging is not easy to change later without system downtime. Before going live in production you should consider how many tags to use, see Akka Projections reference documentation |
2.2. Create Projection
To create the Projection:
-
Place the initialization code of the Projection in an
ItemPopularityProjection
object class:- Java
-
src/main/java/shopping/cart/ItemPopularityProjection.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.persistence.jdbc.query.javadsl.JdbcReadJournal; import akka.persistence.query.Offset; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.ExactlyOnceProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.jdbc.javadsl.JdbcProjection; import java.util.Optional; import org.springframework.orm.jpa.JpaTransactionManager; import shopping.cart.repository.HibernateJdbcSession; import shopping.cart.repository.ItemPopularityRepository; public final class ItemPopularityProjection { private ItemPopularityProjection() {} public static void init( ActorSystem<?> system, JpaTransactionManager transactionManager, ItemPopularityRepository repository) { ShardedDaemonProcess.get(system) .init( (1) ProjectionBehavior.Command.class, "ItemPopularityProjection", ShoppingCart.TAGS.size(), index -> ProjectionBehavior.create( createProjectionFor(system, transactionManager, repository, index)), ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static ExactlyOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjectionFor( ActorSystem<?> system, JpaTransactionManager transactionManager, ItemPopularityRepository repository, int index) { String tag = ShoppingCart.TAGS.get(index); (2) SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = (3) EventSourcedProvider.eventsByTag( system, JdbcReadJournal.Identifier(), (4) tag); return JdbcProjection.exactlyOnce( (5) ProjectionId.of("ItemPopularityProjection", tag), sourceProvider, () -> new HibernateJdbcSession(transactionManager), (6) () -> new ItemPopularityProjectionHandler(tag, repository), (7) system); } }
1 ShardedDaemonProcess
will manage the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.2 The tag
is selected based on the Projection instance’s index, corresponding to carts-0 to carts-3 as explained in the tagging in theShoppingCart
.3 The source of the Projection is EventSourcedProvider.eventsByTag
with the selected tag.4 Using the JDBC event journal. 5 Using JDBC for offset storage of the Projection using exactly-once
strategy. Offset and projected model will be persisted transactionally.6 Define a HibernateJdbcSession
factory. The JDBC connection are create by the projection and used to save the offset and the projected model.7 Define a Projection Handler
factory for the handler we wrote in the beginning of this part. - Scala
-
src/main/scala/shopping/cart/ItemPopularityProjection.scala:
package shopping.cart import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal import akka.persistence.query.Offset import akka.projection.eventsourced.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.jdbc.scaladsl.JdbcProjection import akka.projection.scaladsl.{ ExactlyOnceProjection, SourceProvider } import akka.projection.{ ProjectionBehavior, ProjectionId } import shopping.cart.repository.{ ItemPopularityRepository, ScalikeJdbcSession } object ItemPopularityProjection { def init( system: ActorSystem[_], repository: ItemPopularityRepository): Unit = { ShardedDaemonProcess(system).init( (1) name = "ItemPopularityProjection", ShoppingCart.tags.size, index => ProjectionBehavior(createProjectionFor(system, repository, index)), ShardedDaemonProcessSettings(system), Some(ProjectionBehavior.Stop)) } private def createProjectionFor( system: ActorSystem[_], repository: ItemPopularityRepository, index: Int) : ExactlyOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = { val tag = ShoppingCart.tags(index) (2) val sourceProvider : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = (3) EventSourcedProvider.eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = JdbcReadJournal.Identifier, (4) tag = tag) JdbcProjection.exactlyOnce( (5) projectionId = ProjectionId("ItemPopularityProjection", tag), sourceProvider, handler = () => new ItemPopularityProjectionHandler(tag, system, repository), (6) sessionFactory = () => new ScalikeJdbcSession())(system) } }
1 ShardedDaemonProcess
will manage the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.2 The tag
is selected based on the Projection instance’s index, corresponding to carts-0 to carts-3 as explained in the tagging in theShoppingCart
.3 The source of the Projection is EventSourcedProvider.eventsByTag
with the selected tag.4 Using the JDBC event journal. 5 Using JDBC for offset storage of the Projection using exactly-once
strategy. Offset and projected model will be persisted transactionally.6 Creating the Projection Handler
that we wrote in the beginning of this part.
-
Call the
ItemPopularityProjection.init
fromMain
:- Java
-
src/main/java/shopping/cart/Main.java
ApplicationContext springContext = SpringIntegration.applicationContext(system); (1) ItemPopularityRepository itemPopularityRepository = springContext.getBean(ItemPopularityRepository.class); (2) JpaTransactionManager transactionManager = springContext.getBean(JpaTransactionManager.class); (3) ItemPopularityProjection.init(system, transactionManager, itemPopularityRepository); (4)
1 Initialize a Spring application context using the ActorSystem. 2 Get a Spring generated implementation for the ItemPopularityRepository
.3 Get a Spring JpaTransactionManager
to pass to the Projection init method.4 Initialize the Projection passing all necessary dependencies. - Scala
-
src/main/scala/shopping/cart/Main.scala
ScalikeJdbcSetup.init(system) (1) val itemPopularityRepository = new ItemPopularityRepositoryImpl() (2) ItemPopularityProjection.init(system, itemPopularityRepository) (3)
1 Call ScalikeJdbcSetup.init
method to initiate the connection pool for the read-side. The connection pool will be closed when the actor system terminates.2 Instantiate the repository. 3 Call the initialization of the Projection.
3. Query
To expose the item popularity to the outside of the service we can add an operation in the gRPC ShoppingCartService
. Follow these steps:
-
Add a new
GetItemPopularity
operation to theShoppingCartService.proto
:src/main/protobuf/ShoppingCartService.protoservice ShoppingCartService { rpc GetItemPopularity(GetItemPopularityRequest) returns (GetItemPopularityResponse) {} } message GetItemPopularityRequest { string itemId = 1; } message GetItemPopularityResponse { string itemId = 1; int64 popularityCount = 2; }
-
Generate code from the new Protobuf specification by compiling the project:
- Java
-
mvn compile
- Scala
-
sbt compile
-
Add the
getItemPopularity
method to theShoppingCartServiceImpl
:For this you have to add the
ItemPopularityRepository
as a constructor parameter to theShoppingCartServiceImpl
. TheItemPopularityRepository
instance is created inMain.scala
Main.java
so pass that instance as parameter toShoppingCartServiceImpl
.
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
private final ItemPopularityRepository repository; private final Executor blockingJdbcExecutor; public ShoppingCartServiceImpl( ActorSystem<?> system, ItemPopularityRepository repository) { (1) DispatcherSelector dispatcherSelector = DispatcherSelector.fromConfig("akka.projection.jdbc.blocking-jdbc-dispatcher"); this.blockingJdbcExecutor = system.dispatchers().lookup(dispatcherSelector); (2) this.repository = repository; timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout"); sharding = ClusterSharding.get(system); } @Override public CompletionStage<GetItemPopularityResponse> getItemPopularity(GetItemPopularityRequest in) { CompletionStage<Optional<ItemPopularity>> itemPopularity = CompletableFuture.supplyAsync( () -> repository.findById(in.getItemId()), blockingJdbcExecutor); (3) return itemPopularity.thenApply( popularity -> { long count = popularity.map(ItemPopularity::getCount).orElse(0L); return GetItemPopularityResponse.newBuilder().setPopularityCount(count).build(); }); }
1 Add the ItemPopularityRepository
to the service implementation constructor.2 Use the ActorSystem
to get an instance of anExecutor
tailored for running JDBC blocking operations.3 Implement getItemPopularity
by calling the repository to find the projected model by id and wrap it withCompletableFuture.supplyAsync
. - Scala
-
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala
class ShoppingCartServiceImpl( system: ActorSystem[_], itemPopularityRepository: ItemPopularityRepository) (1) extends proto.ShoppingCartService { private val blockingJdbcExecutor: ExecutionContext = system.dispatchers.lookup( DispatcherSelector .fromConfig("akka.projection.jdbc.blocking-jdbc-dispatcher") ) (2) override def getItemPopularity(in: proto.GetItemPopularityRequest) : Future[proto.GetItemPopularityResponse] = { Future { (3) ScalikeJdbcSession.withSession { session => itemPopularityRepository.getItem(session, in.itemId) } }(blockingJdbcExecutor).map { case Some(count) => proto.GetItemPopularityResponse(in.itemId, count) case None => proto.GetItemPopularityResponse(in.itemId, 0L) } } }
1 Add the ItemPopularityRepository
to the service implementation constructor.2 Use the ActorSystem
to get an instance of anExecutionContext
tailored for running JDBC blocking operations.3 Implement getItemPopularity
by calling the repository to find the projected model by id and wrap it in aFuture
running on the executor for JDBC operations.
Calls to the repository are blocking therefore we wrap it with Make sure to follow that pattern whenever you interact with a DB repository. |
Calls to the repository are blocking therefore we run them in a Make sure to follow that pattern whenever you interact with a DB repository. |
4. Run locally
Try your solution by running locally:
-
Start the docker containers, unless they are already running:
docker-compose up -d
-
Create the item popularity table by creating a
ddl-scripts/create_user_tables.sql
file and adding the SQL statement below.CREATE TABLE IF NOT EXISTS public.item_popularity ( itemid VARCHAR(255) NOT NULL, version BIGINT NOT NULL, count BIGINT NOT NULL, PRIMARY KEY (itemid));
CREATE TABLE IF NOT EXISTS public.item_popularity (
itemid VARCHAR(255) NOT NULL,
count BIGINT NOT NULL,
PRIMARY KEY (itemid));
-
Load the file into Postgres:
docker exec -i shopping-cart-service_postgres-db_1 psql -U shopping-cart -t < ddl-scripts/create_user_tables.sql
When loading the SQL script, make sure to use the same name as your running PostgresSQL container name. The container name is not fixed and depends on the parent folder of the docker-compose file. The above example assumes the project was created using the seed template and named
shopping-cart-service
.If you get a connection error, it means the PostgresSQL container is still starting. Wait a few seconds and re-try the command.
-
Run the service with:
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
4.1. Exercise the service
Use grpcurl
to exercise the service:
-
Add 5 hoodies to a cart:
grpcurl -d '{"cartId":"cart3", "itemId":"hoodie", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Check the popularity of the item:
grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity
-
Add 2 hoodies to another cart:
grpcurl -d '{"cartId":"cart5", "itemId":"hoodie", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Check that the popularity count increased to 7:
grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity