Section 7: Projection publishing to Kafka
To decouple communication between different Microservices, we can publish messages to a broker, such as Apache Kafka. See Internal and External Communication concepts for more information.
To accomplish this, we will add another Projection from the events of the ShoppingCart
entity. The new Projection will be similar to what we developed in the previous step, but it will send the events to a Kafka topic instead of updating a database. As shown below, we will also add an Analytics service, the ShoppingAnalyticsService
, that consumes the events from the Kafka topic.
This part of the full example will focus on the Kafka producer in the PublishEventsProjection
and the Kafka consumer in ShoppingAnalyticsService
. On this page you will learn how to:
-
send messages to a Kafka topic from a Projection
-
consume messages from a Kafka topic
Akka Workshop
The fourth video of the Akka Workshop Series covers CQRS and Projections for Kafka and gRPC. It provides some solid guidance to aid you in understanding this section, and the next section of this guide.
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. External representation of the events
For external APIs of a service, such as a Kafka topic that is consumed by other services, it is good to have a well defined data format. Therefore we define event data formats in Protobuf rather than using the internal event representation. This also makes it easier to evolve the representation of events over time without breaking downstream consumers.
To define the external representation:
-
Add a new
ShoppingCartEvents.proto
with the specification of the events:syntax = "proto3"; option java_multiple_files = true; option java_package = "shopping.cart.proto"; package shoppingcart; // Events published to Kafka message ItemAdded { string cartId = 1; string itemId = 2; int32 quantity = 3; } message ItemQuantityAdjusted { string cartId = 1; string itemId = 2; int32 quantity = 3; } message ItemRemoved { string cartId = 1; string itemId = 2; } message CheckedOut { string cartId = 1; }
-
Generate code by compiling the project:
sbt compile
mvn compile
2. Send to Kafka from a Projection
Add a PublishEventsProjectionHandler
class that is the Projection Handler
for processing the events:
- Java
-
src/main/java/shopping/cart/PublishEventsProjectionHandler.java:
package shopping.cart; import static akka.Done.done; import akka.Done; import akka.kafka.javadsl.SendProducer; import akka.projection.eventsourced.EventEnvelope; import akka.projection.javadsl.Handler; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import java.util.concurrent.CompletionStage; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class PublishEventsProjectionHandler extends Handler<EventEnvelope<ShoppingCart.Event>> { private final Logger logger = LoggerFactory.getLogger(getClass()); private final String topic; private final SendProducer<String, byte[]> sendProducer; (1) public PublishEventsProjectionHandler(String topic, SendProducer<String, byte[]> sendProducer) { this.topic = topic; this.sendProducer = sendProducer; } @Override public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope) throws Exception { ShoppingCart.Event event = envelope.event(); // using the cartId as the key and `DefaultPartitioner` will select partition based on the key // so that events for same cart always ends up in same partition String key = event.cartId; ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(event)); (2) return sendProducer .send(producerRecord) .thenApply( recordMetadata -> { logger.info( "Published event [{}] to topic/partition {}/{}", event, topic, recordMetadata.partition()); return done(); }); } private static byte[] serialize(ShoppingCart.Event event) { final ByteString protoMessage; final String fullName; if (event instanceof ShoppingCart.ItemAdded) { ShoppingCart.ItemAdded itemAdded = (ShoppingCart.ItemAdded) event; protoMessage = shopping.cart.proto.ItemAdded.newBuilder() .setCartId(itemAdded.cartId) .setItemId(itemAdded.itemId) .setQuantity(itemAdded.quantity) .build() .toByteString(); fullName = shopping.cart.proto.ItemAdded.getDescriptor().getFullName(); } else if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; protoMessage = shopping.cart.proto.CheckedOut.newBuilder() .setCartId(checkedOut.cartId) .build() .toByteString(); fullName = shopping.cart.proto.CheckedOut.getDescriptor().getFullName(); } else { throw new IllegalArgumentException("Unknown event type: " + event.getClass()); } // pack in Any so that type information is included for deserialization return Any.newBuilder() .setValue(protoMessage) .setTypeUrl("shopping-cart-service/" + fullName) .build() .toByteArray(); (3) } }
- Scala
-
src/main/scala/shopping/cart/PublishEventsProjectionHandler.scala:
package shopping.cart import scala.concurrent.ExecutionContext import scala.concurrent.Future import akka.Done import akka.actor.typed.ActorSystem import akka.kafka.scaladsl.SendProducer import akka.projection.eventsourced.EventEnvelope import akka.projection.scaladsl.Handler import com.google.protobuf.any.{ Any => ScalaPBAny } import org.apache.kafka.clients.producer.ProducerRecord import org.slf4j.LoggerFactory class PublishEventsProjectionHandler( system: ActorSystem[_], topic: String, sendProducer: SendProducer[String, Array[Byte]]) (1) extends Handler[EventEnvelope[ShoppingCart.Event]] { private val log = LoggerFactory.getLogger(getClass) private implicit val ec: ExecutionContext = system.executionContext override def process( envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { val event = envelope.event // using the cartId as the key and `DefaultPartitioner` will select partition based on the key // so that events for same cart always ends up in same partition val key = event.cartId val producerRecord = new ProducerRecord(topic, key, serialize(event)) (2) val result = sendProducer.send(producerRecord).map { recordMetadata => log.info( "Published event [{}] to topic/partition {}/{}", event, topic, recordMetadata.partition) Done } result } private def serialize(event: ShoppingCart.Event): Array[Byte] = { val protoMessage = event match { case ShoppingCart.ItemAdded(cartId, itemId, quantity) => proto.ItemAdded(cartId, itemId, quantity) case ShoppingCart.CheckedOut(cartId, _) => proto.CheckedOut(cartId) } // pack in Any so that type information is included for deserialization ScalaPBAny.pack(protoMessage, "shopping-cart-service").toByteArray (3) } }
1 | SendProducer comes from the Kafka connector in Alpakka. |
2 | The events are serialized to Protobuf and sent to the given topic. |
3 | Wrap in Protobuf Any to include type information. |
The serialization converts the ShoppingCart.Event
classes to the Protobuf representation. Since several types of messages are sent to the same topic we must include some type information that the consumers of the topic can use when deserializing the messages. Protobuf provides a built-in type called Any
for this purpose. That is why it is wrapped with ScalaPBAny.pack
.
3. Initialize the Projection
If this were our first Projection, we would need to add tags for the events. However, the tagging of the events is already in place from the previous step. So, we can simply add the appropriate initialization as follows:
-
Place the initialization code of the Projection in an
PublishEventsProjection
object class:- Java
-
src/main/java/shopping/cart/PublishEventsProjection.java:
package shopping.cart; import akka.actor.CoordinatedShutdown; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.kafka.ProducerSettings; import akka.kafka.javadsl.SendProducer; import akka.persistence.jdbc.query.javadsl.JdbcReadJournal; import akka.persistence.query.Offset; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.AtLeastOnceProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.jdbc.javadsl.JdbcProjection; import java.util.Optional; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.orm.jpa.JpaTransactionManager; import shopping.cart.repository.HibernateJdbcSession; public final class PublishEventsProjection { private PublishEventsProjection() {} public static void init(ActorSystem<?> system, JpaTransactionManager transactionManager) { SendProducer<String, byte[]> sendProducer = createProducer(system); String topic = system.settings().config().getString("shopping-cart-service.kafka.topic"); ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "PublishEventsProjection", ShoppingCart.TAGS.size(), index -> ProjectionBehavior.create( createProjectionFor(system, transactionManager, topic, sendProducer, index)), ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static SendProducer<String, byte[]> createProducer(ActorSystem<?> system) { ProducerSettings<String, byte[]> producerSettings = ProducerSettings.create(system, new StringSerializer(), new ByteArraySerializer()); SendProducer<String, byte[]> sendProducer = new SendProducer<>(producerSettings, system); CoordinatedShutdown.get(system) .addTask( CoordinatedShutdown.PhaseActorSystemTerminate(), "close-sendProducer", () -> sendProducer.close()); return sendProducer; } private static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjectionFor( ActorSystem<?> system, JpaTransactionManager transactionManager, String topic, SendProducer<String, byte[]> sendProducer, int index) { String tag = ShoppingCart.TAGS.get(index); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, JdbcReadJournal.Identifier(), tag); return JdbcProjection.atLeastOnceAsync( ProjectionId.of("PublishEventsProjection", tag), sourceProvider, () -> new HibernateJdbcSession(transactionManager), () -> new PublishEventsProjectionHandler(topic, sendProducer), system); } }
- Scala
-
src/main/scala/shopping/cart/PublishEventsProjection.scala:
package shopping.cart import akka.actor.CoordinatedShutdown import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.kafka.ProducerSettings import akka.kafka.scaladsl.SendProducer import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal import akka.persistence.query.Offset import akka.projection.eventsourced.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.jdbc.scaladsl.JdbcProjection import akka.projection.scaladsl.{ AtLeastOnceProjection, SourceProvider } import akka.projection.{ ProjectionBehavior, ProjectionId } import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer } import shopping.cart.repository.ScalikeJdbcSession object PublishEventsProjection { def init(system: ActorSystem[_]): Unit = { val sendProducer = createProducer(system) val topic = system.settings.config.getString("shopping-cart-service.kafka.topic") ShardedDaemonProcess(system).init( name = "PublishEventsProjection", ShoppingCart.tags.size, index => ProjectionBehavior( createProjectionFor(system, topic, sendProducer, index)), ShardedDaemonProcessSettings(system), Some(ProjectionBehavior.Stop)) } private def createProducer( system: ActorSystem[_]): SendProducer[String, Array[Byte]] = { val producerSettings = ProducerSettings(system, new StringSerializer, new ByteArraySerializer) val sendProducer = SendProducer(producerSettings)(system) CoordinatedShutdown(system).addTask( CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "close-sendProducer") { () => sendProducer.close() } sendProducer } private def createProjectionFor( system: ActorSystem[_], topic: String, sendProducer: SendProducer[String, Array[Byte]], index: Int) : AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = { val tag = ShoppingCart.tags(index) val sourceProvider : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider.eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = JdbcReadJournal.Identifier, tag = tag) JdbcProjection.atLeastOnceAsync( projectionId = ProjectionId("PublishEventsProjection", tag), sourceProvider, handler = () => new PublishEventsProjectionHandler(system, topic, sendProducer), sessionFactory = () => new ScalikeJdbcSession())(system) } }
The
SendProducer
is initialized using some configuration that we need to add. It defines how to connect to the Kafka broker. -
Add the following to a new
src/main/resources/kafka.conf
file:shopping-cart-service { kafka.topic = "shopping-cart-events" } # common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients kafka-connection-settings { # This and other connection settings may have to be changed depending on environment. bootstrap.servers = "localhost:9092" } akka.kafka.producer { kafka-clients = ${kafka-connection-settings} } akka.kafka.consumer { kafka-clients = ${kafka-connection-settings} }
-
Include
kafka.conf
inapplication.conf
. -
For local development add the following to
src/main/resources/local-shared.conf
, which is loaded when running locally:# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients kafka-connection-settings { bootstrap.servers = "localhost:9092" } akka.kafka.producer { kafka-clients = ${kafka-connection-settings} } akka.kafka.consumer { kafka-clients = ${kafka-connection-settings} }
-
Call
PublishEventsProjection.init
fromMain
:- Java
-
PublishEventsProjection.init(system, transactionManager);
- Scala
-
PublishEventsProjection.init(system)
4. Consume the events
Let’s add another service that consumes the events from the Kafka topic. The template download (or other source downloads) includes a directory named shopping-analytics-service
. This service will receive the events in the Protobuf format defined in the ShoppingCartEvents.proto
from the shopping-cart-service
so we can copy the .proto
file we created earlier.
Different services should not share code, but we can copy the Protobuf specification since that is the published interface of the service. |
To add the service, follow these steps:
-
Open the
shopping-analytics-service
in IntelliJ just as you did with the shopping-cart-service. -
Copy the
ShoppingCartEvents.proto
from theshopping-cart-service
to theshopping-analytics-service/src/main/protobuf
and generate code by compiling the project:sbt compile
mvn compile
-
Create a
ShoppingCartEventConsumer
object class inshopping-analytics-service
. It runs an Akka Stream with a KafkaConsumer.committableSource
from Alpakka Kafka.- Java
-
src/main/java/shopping/analytics/ShoppingCartEventConsumer.java:
package shopping.analytics; import akka.Done; import akka.actor.typed.ActorSystem; import akka.kafka.CommitterSettings; import akka.kafka.ConsumerSettings; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Committer; import akka.kafka.javadsl.Consumer; import akka.stream.RestartSettings; import akka.stream.javadsl.RestartSource; import com.google.protobuf.Any; import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.CheckedOut; import shopping.cart.proto.ItemAdded; import shopping.cart.proto.ItemQuantityAdjusted; import shopping.cart.proto.ItemRemoved; class ShoppingCartEventConsumer { private static final Logger log = LoggerFactory.getLogger(ShoppingCartEventConsumer.class); static void init(ActorSystem<?> system) { String topic = system .settings() .config() .getString("shopping-analytics-service.shopping-cart-kafka-topic"); ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new ByteArrayDeserializer()) .withGroupId("shopping-cart-analytics"); CommitterSettings committerSettings = CommitterSettings.create(system); Duration minBackoff = Duration.ofSeconds(1); Duration maxBackoff = Duration.ofSeconds(30); double randomFactor = 0.1; RestartSource (1) .onFailuresWithBackoff( RestartSettings.create(minBackoff, maxBackoff, randomFactor), () -> { return Consumer.committableSource( consumerSettings, Subscriptions.topics(topic)) (2) .mapAsync( 1, msg -> handleRecord(msg.record()).thenApply(done -> msg.committableOffset())) .via(Committer.flow(committerSettings)); (3) }) .run(system); } private static CompletionStage<Done> handleRecord(ConsumerRecord<String, byte[]> record) throws InvalidProtocolBufferException { byte[] bytes = record.value(); Any x = Any.parseFrom(bytes); (4) String typeUrl = x.getTypeUrl(); CodedInputStream inputBytes = x.getValue().newCodedInput(); try { switch (typeUrl) { case "shopping-cart-service/shoppingcart.ItemAdded": { ItemAdded event = ItemAdded.parseFrom(inputBytes); log.info( "ItemAdded: {} {} to cart {}", event.getQuantity(), event.getItemId(), event.getCartId()); break; } case "shopping-cart-service/shoppingcart.CheckedOut": { CheckedOut event = CheckedOut.parseFrom(inputBytes); log.info("CheckedOut: cart {} checked out", event.getCartId()); break; } default: throw new IllegalArgumentException("unknown record type " + typeUrl); } } catch (Exception e) { log.error("Could not process event of type [{}]", typeUrl, e); // continue with next } return CompletableFuture.completedFuture(Done.getInstance()); } }
- Scala
-
src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala:
package shopping.analytics import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.Done import akka.actor.typed.ActorSystem import akka.kafka.CommitterSettings import akka.kafka.ConsumerSettings import akka.kafka.Subscriptions import akka.kafka.scaladsl.{ Committer, Consumer } import akka.stream.RestartSettings import akka.stream.scaladsl.RestartSource import com.google.protobuf.any.{ Any => ScalaPBAny } import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.slf4j.LoggerFactory import shopping.cart.proto object ShoppingCartEventConsumer { private val log = LoggerFactory.getLogger("shopping.analytics.ShoppingCartEventConsumer") def init(system: ActorSystem[_]): Unit = { implicit val sys: ActorSystem[_] = system implicit val ec: ExecutionContext = system.executionContext val topic = system.settings.config .getString("shopping-analytics-service.shopping-cart-kafka-topic") val consumerSettings = ConsumerSettings( system, new StringDeserializer, new ByteArrayDeserializer).withGroupId("shopping-cart-analytics") val committerSettings = CommitterSettings(system) RestartSource (1) .onFailuresWithBackoff( RestartSettings( minBackoff = 1.second, maxBackoff = 30.seconds, randomFactor = 0.1)) { () => Consumer .committableSource( consumerSettings, Subscriptions.topics(topic) ) (2) .mapAsync(1) { msg => handleRecord(msg.record).map(_ => msg.committableOffset) } .via(Committer.flow(committerSettings)) (3) } .run() } private def handleRecord( record: ConsumerRecord[String, Array[Byte]]): Future[Done] = { val bytes = record.value() val x = ScalaPBAny.parseFrom(bytes) (4) val typeUrl = x.typeUrl try { val inputBytes = x.value.newCodedInput() val event = typeUrl match { case "shopping-cart-service/shoppingcart.ItemAdded" => proto.ItemAdded.parseFrom(inputBytes) case "shopping-cart-service/shoppingcart.CheckedOut" => proto.CheckedOut.parseFrom(inputBytes) case _ => throw new IllegalArgumentException( s"unknown record type [$typeUrl]") } event match { case proto.ItemAdded(cartId, itemId, quantity, _) => log.info("ItemAdded: {} {} to cart {}", quantity, itemId, cartId) case proto.CheckedOut(cartId, _) => log.info("CheckedOut: cart {} checked out", cartId) } Future.successful(Done) } catch { case NonFatal(e) => log.error("Could not process event of type [{}]", typeUrl, e) // continue with next Future.successful(Done) } } }
1 | RestartSource will restart the stream in case of failures. |
2 | Kafka Consumer stream. |
3 | Offset is committed to Kafka when records have been processed. |
4 | Protobuf Any for type information.
Note how the deserialization is using the type information from the Protobuf |
4.1. Configuration
We need to add configuration to initialize the Consumer
and define how to connect to the Kafka broker.
Add the following to a new src/main/resources/kafka.conf
file in shopping-analytics-service
:
shopping-analytics-service {
shopping-cart-kafka-topic = "shopping-cart-events"
}
# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients
kafka-connection-settings {
# This and other connection settings may have to be changed depending on environment.
bootstrap.servers = "localhost:9092"
}
akka.kafka.producer {
kafka-clients = ${kafka-connection-settings}
}
akka.kafka.consumer {
kafka-clients = ${kafka-connection-settings}
kafka-clients {
auto.offset.reset = "earliest"
}
}
Include kafka.conf
from application.conf
.
And for local development add the following to src/main/resources/local-shared.conf
, which is loaded when running locally:
shopping-analytics-service.kafka.bootstrap-servers = "localhost:9092"
4.2. Main
Edit the Main
class that is included from the template project. It should initialize the ActorSystem
and the ShoppingCartEventConsumer
like this:
- Java
-
package shopping.analytics; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.management.cluster.bootstrap.ClusterBootstrap; import akka.management.javadsl.AkkaManagement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingAnalyticsService"); try { init(system); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system) { AkkaManagement.get(system).start(); ClusterBootstrap.get(system).start(); ShoppingCartEventConsumer.init(system); } }
- Scala
-
package shopping.analytics import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.ActorSystem import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement import org.slf4j.LoggerFactory import scala.util.control.NonFatal object Main { val logger = LoggerFactory.getLogger("shopping.analytics.Main") def main(args: Array[String]): Unit = { val system = ActorSystem[Nothing](Behaviors.empty, "ShoppingAnalyticsService") try { init(system) } catch { case NonFatal(e) => logger.error("Terminating due to initialization failure.", e) system.terminate() } } def init(system: ActorSystem[_]): Unit = { AkkaManagement(system).start() ClusterBootstrap(system).start() ShoppingCartEventConsumer.init(system) } }
5. Run locally
In addition to PostgresSQL we now also need Kafka. The docker-compose
script starts PostgresSQL and Kafka:
-
Start PostgresSQL and Kafka, unless it’s already running, from the
shopping-cart-service
:docker-compose up -d
-
Run the
shopping-cart-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
-
In another terminal, run the new
shopping-analytics-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
5.1. Exercise the service
Use grpcurl
to exercise the service:
-
Start another terminal, and use
grpcurl
to add 1 pencil to a cart:grpcurl -d '{"cartId":"cart4", "itemId":"pencil", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Look at the log output in the terminal of the
shopping-analytics-service
. You should see the logging from theAddItem
, showing that the new service consumed the event from Kafka:ItemAdded: 1 pencil to cart cart4