Use Cassandra instead of PostgreSQL
This guide describes how to use Cassandra instead of Relational Database. It assumes that you created the project using the Implementing Microservices with Akka tutorial and it describes the changes relative to the JDBC setup that is used in the tutorial.
Source downloads
If you prefer to view the full example with Cassandra you can download a zip file containing the completed code:
- Java
- Scala
Use Cassandra for the write side
To use Cassandra for the Event Sourced Cart entity the following changes are needed.
Dependencies
Replace akka-persistence-jdbc
with akka-persistence-cassandra
:
- Java
-
pom.xml:
<properties> <akka-diagnostics.version>2.0.0</akka-diagnostics.version> </properties> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-persistence-cassandra_${scala.binary.version}</artifactId> </dependency> </dependencies>
- Scala
-
build.sbt:
val AkkaPersistenceCassandraVersion = "1.1.0" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion, )
Configuration
Change the configuration in src/main/resources/persistence.conf
to the following to enable akka-persistence-cassandra
:
akka {
# use Cassandra to store both snapshots and the events of the persistent actors
persistence {
journal.plugin = "akka.persistence.cassandra.journal"
journal.auto-start-journals = ["akka.persistence.cassandra.journal"]
snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
cassandra {
events-by-tag {
bucket-size = "Day"
eventual-consistency-delay = 2s
flush-interval = 50ms
pubsub-notification = on
first-time-bucket = "20200815T00:00"
}
query {
refresh-interval = 2s
}
journal.keyspace = "shoppingcartservice"
snapshot.keyspace = "shoppingcartservice"
}
}
}
datastax-java-driver {
advanced.reconnect-on-init = on
}
To make the Projections faster in development environment and tests you can add the following to
# for reduced Projection latency
akka.persistence.cassandra.events-by-tag.eventual-consistency-delay = 200 ms
Projection with JDBC
Note that it’s possible to keep JDBC for the projection when the events are stored in Cassandra. Then you would only change the following in the Projection initialization:
-
JdbcReadJournal.Identifier
toCassandraReadJournal.Identifier
-
JdbcProjection.exactlyOnce
toCassandraProjection.atLeastOnce
-
ExactlyOnceProjection
toAtLeastOnceProjection
Use Cassandra for the read side
To use Cassandra for the Projections the following changes are needed.
Dependencies
Replace akka-projection-jdbc
with akka-projection-cassandra
:
- Java
-
pom.xml:
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-cassandra_${scala.binary.version}</artifactId> </dependency>
- Scala
-
build.sbt:
"com.lightbend.akka" %% "akka-projection-cassandra" % AkkaProjectionVersion,
Configuration
Change the configuration in src/main/resources/persistence.conf
and add following for akka-projection-cassandra
:
akka.projection {
cassandra.offset-store.keyspace = "shoppingcartservice"
# use same Cassandra session config as for the journal
cassandra.session-config-path = "akka.persistence.cassandra"
}
Projection for queries
Now we will change the Projection corresponding to the Projection for queries in the tutorial.
Several things are rather different from JDBC variant so we start with removing those files and we will add the corresponding for Cassandra. Remove:
- Java
-
-
src/main/java/shopping/cart/repository/HibernateJdbcSession.java
-
src/main/java/shopping/cart/repository/ItemPopularityRepository.java
-
src/main/java/shopping/cart/repository/SpringConfig.java
-
src/main/java/shopping/cart/repository/SpringIntegration.java src/main/java/shopping/cart/ItemPopularity.java
-
src/main/java/shopping/cart/ItemPopularityProjection.java
-
src/main/java/shopping/cart/ItemPopularityProjectionHandler.java
-
remove the
springContext
,ItemPopularityRepository
andItemPopularityProjection.init
inMain.java
.
-
- Scala
-
-
src/main/scala/shopping/cart/repository/ItemPopularityRepository.scala
-
src/main/scala/shopping/cart/repository/ScalikeJdbcSession.scala
-
src/main/scala/shopping/cart/repository/ScalikeJdbcSetup.scala
-
src/main/scala/shopping/cart/ItemPopularityProjection.scala
-
src/main/scala/shopping/cart/ItemPopularityProjectionHandler.scala
-
remove the
ScalikeJdbcSetup.init
,ItemPopularityRepository
andItemPopularityProjection.init
inMain.scala
.
-
Follow these steps to process events in a Projection that stores the offset in Cassandra and updates an item_popularity
table in Cassandra.
-
Add a class
ItemPopularityProjectionHandler
:- Java
-
src/main/java/shopping/cart/ItemPopularityProjectionHandler.java:
package shopping.cart; import akka.Done; import akka.projection.eventsourced.EventEnvelope; import akka.projection.javadsl.Handler; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class ItemPopularityProjectionHandler extends Handler<EventEnvelope<ShoppingCart.Event>> { (1) private final Logger logger = LoggerFactory.getLogger(getClass()); private final String tag; private final ItemPopularityRepository repo; public ItemPopularityProjectionHandler(String tag, ItemPopularityRepository repo) { this.tag = tag; this.repo = repo; } @Override public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope) (2) throws Exception { ShoppingCart.Event event = envelope.event(); if (event instanceof ShoppingCart.ItemAdded) { (3) ShoppingCart.ItemAdded added = (ShoppingCart.ItemAdded) event; CompletionStage<Done> result = this.repo.update(added.itemId, added.quantity); result.thenAccept(done -> logItemCount(added.itemId)); return result; } else { // skip all other events, such as `CheckedOut` return CompletableFuture.completedFuture(Done.getInstance()); } } private void logItemCount(String itemId) { repo.getItem(itemId) .thenAccept( optCount -> logger.info( "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", this.tag, itemId, optCount.orElse(0L))); } }
1 extends akka.projection.javadsl.Handler
2 the process
method to implement3 match events and increment or decrement the count via the ItemPopularityRepository
, which encapsulates the database access - Scala
-
src/main/scala/shopping/cart/ItemPopularityProjectionHandler.scala:
package shopping.cart import scala.concurrent.ExecutionContext import scala.concurrent.Future import akka.Done import akka.actor.typed.ActorSystem import akka.projection.eventsourced.EventEnvelope import akka.projection.scaladsl.Handler import org.slf4j.LoggerFactory class ItemPopularityProjectionHandler( tag: String, system: ActorSystem[_], repo: ItemPopularityRepository) extends Handler[EventEnvelope[ShoppingCart.Event]]() { (1) private val log = LoggerFactory.getLogger(getClass) private implicit val ec: ExecutionContext = system.executionContext override def process( envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { (2) envelope.event match { (3) case ShoppingCart.ItemAdded(_, itemId, quantity) => val result = repo.update(itemId, quantity) result.foreach(_ => logItemCount(itemId)) result case _: ShoppingCart.CheckedOut => Future.successful(Done) } } private def logItemCount(itemId: String): Unit = { repo.getItem(itemId).foreach { optCount => log.info( "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", tag, itemId, optCount.getOrElse(0)) } } }
1 extends akka.projection.scaladsl.Handler
2 the process
method to implement3 match events and increment or decrement the count via the ItemPopularityRepository
-
Add the
ItemPopularityRepository
:- Java
-
src/main/java/shopping/cart/ItemPopularityRepository.java:
package shopping.cart; import akka.Done; import java.util.Optional; import java.util.concurrent.CompletionStage; public interface ItemPopularityRepository { CompletionStage<Done> update(String itemId, int delta); CompletionStage<Optional<Long>> getItem(String itemId); }
- Scala
-
src/main/scala/shopping/cart/ItemPopularityRepository.scala:
package shopping.cart import scala.concurrent.Future import akka.Done trait ItemPopularityRepository { def update(itemId: String, delta: Int): Future[Done] def getItem(itemId: String): Future[Option[Long]] }
-
Add the implementation for Cassandra:
- Java
-
src/main/java/shopping/cart/ItemPopularityRepositoryImpl.java:
package shopping.cart; import akka.Done; import akka.stream.alpakka.cassandra.javadsl.CassandraSession; import java.util.Optional; import java.util.concurrent.CompletionStage; public final class ItemPopularityRepositoryImpl implements ItemPopularityRepository { static final String POPULARITY_TABLE = "item_popularity"; private final CassandraSession session; private final String table; public ItemPopularityRepositoryImpl(CassandraSession session, String keyspace) { this.session = session; this.table = keyspace + "." + POPULARITY_TABLE; } @Override public CompletionStage<Done> update(String itemId, int delta) { return session.executeWrite( "UPDATE " + table + " SET count = count + ? WHERE item_id = ?", Long.valueOf(delta), itemId); } @Override public CompletionStage<Optional<Long>> getItem(String itemId) { return session .selectOne("SELECT item_id, count FROM " + table + " WHERE item_id = ?", itemId) .thenApply(opt -> opt.map(row -> row.getLong("count"))); } }
The
CassandraSession
comes from the Cassandra connector in Alpakka and provides an asynchronous API for executing CQL statements to Cassandra. In the initialization code, introduced later, we will see how to get access to aCassandraSession
. You can learn more about theCassandraSession
in the Alpakka reference documentation.
- Scala
-
src/main/scala/shopping/cart/ItemPopularityRepository.scala:
package shopping.cart import scala.concurrent.Future import akka.Done import scala.concurrent.ExecutionContext import akka.stream.alpakka.cassandra.scaladsl.CassandraSession object ItemPopularityRepositoryImpl { val popularityTable = "item_popularity" } class ItemPopularityRepositoryImpl(session: CassandraSession, keyspace: String)( implicit val ec: ExecutionContext) extends ItemPopularityRepository { import ItemPopularityRepositoryImpl.popularityTable override def update(itemId: String, delta: Int): Future[Done] = { session.executeWrite( s"UPDATE $keyspace.$popularityTable SET count = count + ? WHERE item_id = ?", java.lang.Long.valueOf(delta), itemId) } override def getItem(itemId: String): Future[Option[Long]] = { session .selectOne( s"SELECT item_id, count FROM $keyspace.$popularityTable WHERE item_id = ?", itemId) .map(opt => opt.map(row => row.getLong("count").longValue())) } }
The
CassandraSession
comes from the Cassandra connector in Alpakka and provides an asynchronous API for executing CQL statements to Cassandra. In the initialization code, introduced later, we will see how to get access to aCassandraSession
. You can learn more about theCassandraSession
in the Alpakka reference documentation.
The example will persist the item popularity count with a Cassandra counter data type. It’s not possible to guarantee that item count updates occur idempotently because we are using at-least-once semantics. However, since the count is only a rough metric to judge how popular an item is it’s not critical to have a totally accurate figure.
-
Initialize the Projection
Place the initialization code of the Projection in an
ItemPopularityProjection
object class:- Java
-
src/main/java/shopping/cart/ItemPopularityProjection.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.Offset; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.cassandra.javadsl.CassandraProjection; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.AtLeastOnceProjection; import akka.projection.javadsl.SourceProvider; import java.util.Optional; public final class ItemPopularityProjection { private ItemPopularityProjection() {} public static void init(ActorSystem<?> system, ItemPopularityRepository repository) { ShardedDaemonProcess.get(system) .init( (1) ProjectionBehavior.Command.class, "ItemPopularityProjection", ShoppingCart.TAGS.size(), index -> ProjectionBehavior.create(createProjectionFor(system, repository, index)), ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjectionFor(ActorSystem<?> system, ItemPopularityRepository repository, int index) { String tag = ShoppingCart.TAGS.get(index); (2) SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = (3) EventSourcedProvider.eventsByTag( system, CassandraReadJournal.Identifier(), (4) tag); return CassandraProjection.atLeastOnce( (5) ProjectionId.of("ItemPopularityProjection", tag), sourceProvider, () -> new ItemPopularityProjectionHandler(tag, repository)); (6) } }
1 ShardedDaemonProcess
will manage the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.2 The tag
is selected based on the Projection instance’s index, corresponding to carts-0 to carts-3 as explained in the tagging in theShoppingCart
.3 The source of the Projection is EventSourcedProvider.eventsByTag
with the selected tag.4 Using the Cassandra event journal. 5 Using Cassandra for offset storage of the Projection. 6 Creating the Projection Handler
that we wrote in the beginning of this part. - Scala
-
src/main/scala/shopping/cart/ItemPopularityProjection.scala:
package shopping.cart import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.Offset import akka.projection.ProjectionBehavior import akka.projection.ProjectionId import akka.projection.cassandra.scaladsl.CassandraProjection import akka.projection.eventsourced.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.AtLeastOnceProjection import akka.projection.scaladsl.SourceProvider object ItemPopularityProjection { def init( system: ActorSystem[_], repository: ItemPopularityRepository): Unit = { ShardedDaemonProcess(system).init( (1) name = "ItemPopularityProjection", ShoppingCart.tags.size, index => ProjectionBehavior(createProjectionFor(system, repository, index)), ShardedDaemonProcessSettings(system), Some(ProjectionBehavior.Stop)) } private def createProjectionFor( system: ActorSystem[_], repository: ItemPopularityRepository, index: Int) : AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = { val tag = ShoppingCart.tags(index) (2) val sourceProvider : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = (3) EventSourcedProvider.eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, (4) tag = tag) CassandraProjection.atLeastOnce( (5) projectionId = ProjectionId("ItemPopularityProjection", tag), sourceProvider, handler = () => new ItemPopularityProjectionHandler(tag, system, repository) (6) ) } }
1 ShardedDaemonProcess
will manage the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.2 The tag
is selected based on the Projection instance’s index, corresponding to carts-0 to carts-3 as explained in the tagging in theShoppingCart
.3 The source of the Projection is EventSourcedProvider.eventsByTag
with the selected tag.4 Using the Cassandra event journal. 5 Using Cassandra for offset storage of the Projection. 6 Creating the Projection Handler
that we wrote in the beginning of this part.
-
Call the
ItemPopularityProjection.init
fromMain
:- Java
-
CassandraSession session = CassandraSessionRegistry.get(system).sessionFor("akka.persistence.cassandra"); (1) // use same keyspace for the item_popularity table as the offset store Config config = system.settings().config(); String itemPopularityKeyspace = config.getString("akka.projection.cassandra.offset-store.keyspace"); ItemPopularityRepository itemPopularityRepository = new ItemPopularityRepositoryImpl(session, itemPopularityKeyspace); (2) ItemPopularityProjection.init(system, itemPopularityRepository); (3)
1 The CassandraSession
is looked up from theCassandraSessionRegistry
2 Instantiate the repository for Cassandra 3 Call the initialization of the Projection - Scala
-
import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry val session = CassandraSessionRegistry(system).sessionFor( "akka.persistence.cassandra" ) (1) // use same keyspace for the item_popularity table as the offset store val itemPopularityKeyspace = system.settings.config .getString("akka.projection.cassandra.offset-store.keyspace") val itemPopularityRepository = new ItemPopularityRepositoryImpl(session, itemPopularityKeyspace)( system.executionContext ) (2) ItemPopularityProjection.init(system, itemPopularityRepository) (3)
1 The CassandraSession
is looked up from theCassandraSessionRegistry
2 Instantiate the repository for Cassandra 3 Call the initialization of the Projection
The CassandraProjection
uses at-least-once processing semantics. The offset is stored after the event has been processed and if the projection is restarted from a previously stored offset some events may be processed more than once. For a JDBC Projectionit’s possible to have exactly-once semantics because the offset can be stored in the same atomic transaction as the database operation in the event handler.
-
Query
To expose the item popularity to the outside of the service we have the
GetItemPopularity
operation in the gRPCShoppingCartService
.Replace the
getItemPopularity
implementation in theShoppingCartServiceImpl
:- Java
-
@Override public CompletionStage<GetItemPopularityResponse> getItemPopularity(GetItemPopularityRequest in) { return itemPopularityRepository .getItem(in.getItemId()) .thenApply( maybePopularity -> { long popularity = maybePopularity.orElse(0L); return GetItemPopularityResponse.newBuilder().setPopularityCount(popularity).build(); }); }
- Scala
-
override def getItemPopularity(in: proto.GetItemPopularityRequest) : Future[proto.GetItemPopularityResponse] = { itemPopularityRepository.getItem(in.itemId).map { case Some(count) => proto.GetItemPopularityResponse(in.itemId, count) case None => proto.GetItemPopularityResponse(in.itemId, 0L) } }
You can remove the unused
blockingJdbcExecutor
.
Projection publishing to Kafka
Now we will change the Projection corresponding to the Projection publishing to Kafka.
The PublishEventsProjectionHandler
can be the same as for JDBC.
Replace the initialization code of the Projection in an PublishEventsProjection
object class:
- Java
-
src/main/java/shopping/cart/PublishEventsProjection.java:
package shopping.cart; import akka.actor.CoordinatedShutdown; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.kafka.ProducerSettings; import akka.kafka.javadsl.SendProducer; import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.Offset; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.cassandra.javadsl.CassandraProjection; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.AtLeastOnceProjection; import akka.projection.javadsl.SourceProvider; import java.util.Optional; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; public final class PublishEventsProjection { private PublishEventsProjection() {} public static void init(ActorSystem<?> system) { SendProducer<String, byte[]> sendProducer = createProducer(system); String topic = system.settings().config().getString("shopping-cart-service.kafka.topic"); ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "PublishEventsProjection", ShoppingCart.TAGS.size(), index -> ProjectionBehavior.create(createProjectionFor(system, topic, sendProducer, index)), ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static SendProducer<String, byte[]> createProducer(ActorSystem<?> system) { ProducerSettings<String, byte[]> producerSettings = ProducerSettings.create(system, new StringSerializer(), new ByteArraySerializer()); SendProducer<String, byte[]> sendProducer = new SendProducer<>(producerSettings, system); CoordinatedShutdown.get(system) .addTask( CoordinatedShutdown.PhaseActorSystemTerminate(), "close-sendProducer", () -> sendProducer.close()); return sendProducer; } private static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> (1) createProjectionFor( ActorSystem<?> system, String topic, SendProducer<String, byte[]> sendProducer, int index) { String tag = ShoppingCart.TAGS.get(index); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); (2) return CassandraProjection.atLeastOnce( (3) ProjectionId.of("PublishEventsProjection", tag), sourceProvider, () -> new PublishEventsProjectionHandler(topic, sendProducer)); } }
- Scala
-
src/main/scala/shopping/cart/PublishEventsProjection.scala:
package shopping.cart import akka.actor.CoordinatedShutdown import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.kafka.ProducerSettings import akka.kafka.scaladsl.SendProducer import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.Offset import akka.projection.cassandra.scaladsl.CassandraProjection import akka.projection.eventsourced.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.{ AtLeastOnceProjection, SourceProvider } import akka.projection.{ ProjectionBehavior, ProjectionId } import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer } object PublishEventsProjection { def init(system: ActorSystem[_]): Unit = { val sendProducer = createProducer(system) val topic = system.settings.config.getString("shopping-cart-service.kafka.topic") ShardedDaemonProcess(system).init( name = "PublishEventsProjection", ShoppingCart.tags.size, index => ProjectionBehavior( createProjectionFor(system, topic, sendProducer, index)), ShardedDaemonProcessSettings(system), Some(ProjectionBehavior.Stop)) } private def createProducer( system: ActorSystem[_]): SendProducer[String, Array[Byte]] = { val producerSettings = ProducerSettings(system, new StringSerializer, new ByteArraySerializer) val sendProducer = SendProducer(producerSettings)(system) CoordinatedShutdown(system).addTask( CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "close-sendProducer") { () => sendProducer.close() } sendProducer } private def createProjectionFor( system: ActorSystem[_], topic: String, sendProducer: SendProducer[String, Array[Byte]], index: Int): AtLeastOnceProjection[ Offset, EventEnvelope[ShoppingCart.Event]] = { (1) val tag = ShoppingCart.tags(index) val sourceProvider : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider.eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, (2) tag = tag) CassandraProjection.atLeastOnce( (3) projectionId = ProjectionId("PublishEventsProjection", tag), sourceProvider, handler = () => new PublishEventsProjectionHandler(system, topic, sendProducer)) } }
1 | AtLeastOnceProjection instead of ExactlyOnceProjection |
2 | CassandraReadJournal.Identifier instead of JdbcReadJournal.Identifier |
3 | CassandraProjection.atLeastOnce instead of JdbcProjection.exactlyOnce |
The PublishEventsProjection.init
call from the Main
class can remain the same as for JDBC.
Projection calling gRPC service
Now we will change the Projection corresponding to the Projection calling gRPC service.
The SendOrderProjectionHandler
can be the same as for JDBC.
Replace the initialization code of the Projection in SendOrderProjection
object class:
- Java
-
src/main/java/shopping/cart/SendOrderProjection.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.Offset; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.cassandra.javadsl.CassandraProjection; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.AtLeastOnceProjection; import akka.projection.javadsl.SourceProvider; import java.util.Optional; import shopping.order.proto.ShoppingOrderService; public class SendOrderProjection { private SendOrderProjection() {} public static void init(ActorSystem<?> system, ShoppingOrderService orderService) { ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "SendOrderProjection", ShoppingCart.TAGS.size(), index -> ProjectionBehavior.create(createProjectionsFor(system, orderService, index)), ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> (1) createProjectionsFor(ActorSystem<?> system, ShoppingOrderService orderService, int index) { String tag = ShoppingCart.TAGS.get(index); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, CassandraReadJournal.Identifier(), tag); (2) return CassandraProjection.atLeastOnce( (3) ProjectionId.of("SendOrderProjection", tag), sourceProvider, () -> new SendOrderProjectionHandler(system, orderService)); } }
- Scala
-
src/main/scala/shopping/cart/SendOrderProjection.scala:
package shopping.cart import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal import akka.persistence.query.Offset import akka.projection.ProjectionBehavior import akka.projection.ProjectionId import akka.projection.cassandra.scaladsl.CassandraProjection import akka.projection.eventsourced.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.scaladsl.AtLeastOnceProjection import akka.projection.scaladsl.SourceProvider import shopping.order.proto.ShoppingOrderService object SendOrderProjection { def init(system: ActorSystem[_], orderService: ShoppingOrderService): Unit = { ShardedDaemonProcess(system).init( name = "SendOrderProjection", ShoppingCart.tags.size, index => ProjectionBehavior(createProjectionFor(system, orderService, index)), ShardedDaemonProcessSettings(system), Some(ProjectionBehavior.Stop)) } private def createProjectionFor( system: ActorSystem[_], orderService: ShoppingOrderService, index: Int): AtLeastOnceProjection[ Offset, EventEnvelope[ShoppingCart.Event]] = { (1) val tag = ShoppingCart.tags(index) val sourceProvider : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider.eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = CassandraReadJournal.Identifier, (2) tag = tag) CassandraProjection.atLeastOnce( (3) projectionId = ProjectionId("SendOrderProjection", tag), sourceProvider, handler = () => new SendOrderProjectionHandler(system, orderService)) } }
1 | AtLeastOnceProjection instead of ExactlyOnceProjection |
2 | CassandraReadJournal.Identifier instead of JdbcReadJournal.Identifier |
3 | CassandraProjection.atLeastOnce instead of JdbcProjection.exactlyOnce |
The SendOrderProjection.init
call from the Main
class can remain the same as for JDBC.
DDL scripts
Replace the sql
scripts in the ddl_scripts
folder with corresponding cql
scripts for Cassandra.
create_tables.cql
will create the keyspace and all tables needed for Akka Persistence as well as the offset store table for Akka Projection.
-- This CQL script will create the keyspace and all tables needed for the this sample.
-- It includes the messages and snapshot tables (write-side) and the projection tables (read-side).
-- NOTE: the keyspace as created here is probably not what you need in a production environment.
-- This is good enough for local development though.
CREATE KEYSPACE IF NOT EXISTS shoppingcartservice
WITH REPLICATION = { 'class' : 'SimpleStrategy','replication_factor':1 };
USE shoppingcartservice;
CREATE TABLE IF NOT EXISTS messages (
persistence_id text,
partition_nr bigint,
sequence_nr bigint,
timestamp timeuuid,
timebucket text,
writer_uuid text,
ser_id int,
ser_manifest text,
event_manifest text,
event blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
tags set<text>,
PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp));
CREATE TABLE IF NOT EXISTS tag_views (
tag_name text,
persistence_id text,
sequence_nr bigint,
timebucket bigint,
timestamp timeuuid,
tag_pid_sequence_nr bigint,
writer_uuid text,
ser_id int,
ser_manifest text,
event_manifest text,
event blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr));
CREATE TABLE IF NOT EXISTS tag_write_progress(
persistence_id text,
tag text,
sequence_nr bigint,
tag_pid_sequence_nr bigint,
offset timeuuid,
PRIMARY KEY (persistence_id, tag));
CREATE TABLE IF NOT EXISTS tag_scanning(
persistence_id text,
sequence_nr bigint,
PRIMARY KEY (persistence_id));
CREATE TABLE IF NOT EXISTS metadata(
persistence_id text PRIMARY KEY,
deleted_to bigint,
properties map<text,text>);
CREATE TABLE IF NOT EXISTS all_persistence_ids(
persistence_id text PRIMARY KEY);
CREATE TABLE IF NOT EXISTS snapshots (
persistence_id text,
sequence_nr bigint,
timestamp bigint,
ser_id int,
ser_manifest text,
snapshot_data blob,
snapshot blob,
meta_ser_id int,
meta_ser_manifest text,
meta blob,
PRIMARY KEY (persistence_id, sequence_nr))
WITH CLUSTERING ORDER BY (sequence_nr DESC);
CREATE TABLE IF NOT EXISTS offset_store (
projection_name text,
partition int,
projection_key text,
offset text,
manifest text,
last_updated timestamp,
PRIMARY KEY ((projection_name, partition), projection_key));
CREATE TABLE IF NOT EXISTS projection_management (
projection_name text,
partition int,
projection_key text,
paused boolean,
last_updated timestamp,
PRIMARY KEY ((projection_name, partition), projection_key));
The keyspace as created by the script works fine for local development but is probably not what you need in a production environment. |
create_user_tables.cql
will create the table needed for the item popularity Projection.
USE shoppingcartservice;
CREATE TABLE IF NOT EXISTS item_popularity (
item_id text,
count counter,
PRIMARY KEY (item_id));
Run locally
Docker compose
-
Change the
docker-compose.yml
to start Cassandra instead of PostgreSQL in Docker:docker-compose.yml:version: '2.2' services: cassandra-service: image: cassandra:latest ports: - "9042:9042" healthcheck: test: ["CMD", "cqlsh", "-e", "describe keyspaces"] interval: 5s timeout: 5s retries: 60 # This exists to force the condition of having the Cassandra service is up before starting the tests. # The healthcheck above is not enough because it does not provide a condition to wait for the service # to be up. And this is simpler than installing cqlsh and using it to check the service status on the # CI server. cassandra: image: alpine:latest depends_on: cassandra-service: condition: service_healthy # See dockerhub for different versions of kafka and zookeeper # https://hub.docker.com/r/wurstmeister/kafka/ # https://hub.docker.com/r/wurstmeister/zookeeper/ zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: image: wurstmeister/kafka:2.13-2.6.0 ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: localhost KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-
Start Cassandra and Kafka from the
shopping-cart-service
:docker-compose up -d
-
Create the Cassandra keyspace and tables from the CQL script located inside the
ddl-scripts
at the root of the project:docker exec -i shopping-cart-service_cassandra_1 cqlsh -t < ddl-scripts/create_tables.cql
docker exec -i shopping-cart-service_cassandra_1 cqlsh -t < ddl-scripts/create_user_tables.cql
When loading the CQL script, make sure to use the same name as your running Cassandra container name. The container name is not fixed and depends on the parent folder of the docker-compose file. The above example assumes the project was created using the seed template and named
shopping-cart-service
.If you get a connection error with the message Unable to connect to any servers, it means the Cassandra container is still starting. Wait a few seconds and re-try the command.
It will create the keyspace and all tables needed for Akka Persistence as well as the offset store table for Akka Projection.
The keyspace as created by the script works fine for local development but is probably not what you need in a production environment.
Run the service
Run the service with:
sbt -Dconfig.resource=local1.conf run
# make sure to compile before running exec:exec
mvn compile exec:exec -DAPP_CONFIG=local1.conf
Exercise the service
Try the following to exercise the service:
-
Use
grpcurl
to add 3 socks to a cart:grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Add 2 t-shirts to the same cart:
grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
The returned updated cart should still contain the 3 socks.
-
Check the quantity of the cart:
grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart
-
Check the popularity of the item:
grpcurl -d '{"itemId":"t-shirt"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity
-
Check out cart:
grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout