Section 7: Projection publishing to Kafka
To decouple communication between different Microservices, we can publish messages to a broker, such as Apache Kafka. See Internal and External Communication concepts for more information.
To accomplish this, we will add another Projection from the events of the ShoppingCart
entity. The new Projection will be similar to what we developed in the previous step, but it will send the events to a Kafka topic instead of updating a database. As shown below, we will also add an Analytics service, the ShoppingAnalyticsService
, that consumes the events from the Kafka topic.
This part of the full example will focus on the Kafka producer in the PublishEventsProjection
and the Kafka consumer in ShoppingAnalyticsService
. On this page you will learn how to:
-
send messages to a Kafka topic from a Projection
-
consume messages from a Kafka topic
Akka Workshop
The fourth video of the Akka Workshop Series covers CQRS and Projections for Kafka and gRPC. It provides some solid guidance to aid you in understanding this section, and the next section of this guide.
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. External representation of the events
For external APIs of a service, such as a Kafka topic that is consumed by other services, it is good to have a well defined data format. Therefore we define event data formats in Protobuf rather than using the internal event representation. This also makes it easier to evolve the representation of events over time without breaking downstream consumers.
To define the external representation:
-
Add a new
ShoppingCartEvents.proto
with the specification of the events:syntax = "proto3"; option java_multiple_files = true; option java_package = "shopping.cart.proto"; package shoppingcart; // Events published to Kafka message ItemAdded { string cartId = 1; string itemId = 2; int32 quantity = 3; } message ItemQuantityAdjusted { string cartId = 1; string itemId = 2; int32 quantity = 3; } message ItemRemoved { string cartId = 1; string itemId = 2; } message CheckedOut { string cartId = 1; }
-
Generate code by compiling the project:
sbt compile
mvn compile
2. Send to Kafka from a Projection
Add a PublishEventsProjectionHandler
class that is the Projection Handler
for processing the events:
- Java
-
src/main/java/shopping/cart/PublishEventsProjectionHandler.java:
package shopping.cart; import static akka.Done.done; import akka.Done; import akka.kafka.javadsl.SendProducer; import akka.projection.eventsourced.EventEnvelope; import akka.projection.javadsl.Handler; import com.google.protobuf.Any; import com.google.protobuf.ByteString; import java.util.concurrent.CompletionStage; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class PublishEventsProjectionHandler extends Handler<EventEnvelope<ShoppingCart.Event>> { private final Logger logger = LoggerFactory.getLogger(getClass()); private final String topic; private final SendProducer<String, byte[]> sendProducer; (1) public PublishEventsProjectionHandler(String topic, SendProducer<String, byte[]> sendProducer) { this.topic = topic; this.sendProducer = sendProducer; } @Override public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope) throws Exception { ShoppingCart.Event event = envelope.event(); // using the cartId as the key and `DefaultPartitioner` will select partition based on the key // so that events for same cart always ends up in same partition String key = event.cartId; ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(topic, key, serialize(event)); (2) return sendProducer .send(producerRecord) .thenApply( recordMetadata -> { logger.info( "Published event [{}] to topic/partition {}/{}", event, topic, recordMetadata.partition()); return done(); }); } private static byte[] serialize(ShoppingCart.Event event) { final ByteString protoMessage; final String fullName; if (event instanceof ShoppingCart.ItemAdded) { ShoppingCart.ItemAdded itemAdded = (ShoppingCart.ItemAdded) event; protoMessage = shopping.cart.proto.ItemAdded.newBuilder() .setCartId(itemAdded.cartId) .setItemId(itemAdded.itemId) .setQuantity(itemAdded.quantity) .build() .toByteString(); fullName = shopping.cart.proto.ItemAdded.getDescriptor().getFullName(); } else if (event instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event; protoMessage = shopping.cart.proto.CheckedOut.newBuilder() .setCartId(checkedOut.cartId) .build() .toByteString(); fullName = shopping.cart.proto.CheckedOut.getDescriptor().getFullName(); } else { throw new IllegalArgumentException("Unknown event type: " + event.getClass()); } // pack in Any so that type information is included for deserialization return Any.newBuilder() .setValue(protoMessage) .setTypeUrl("shopping-cart-service/" + fullName) .build() .toByteArray(); (3) } }
- Scala
-
src/main/scala/shopping/cart/PublishEventsProjectionHandler.scala:
package shopping.cart import scala.concurrent.ExecutionContext import scala.concurrent.Future import akka.Done import akka.actor.typed.ActorSystem import akka.kafka.scaladsl.SendProducer import akka.projection.eventsourced.EventEnvelope import akka.projection.scaladsl.Handler import com.google.protobuf.any.{ Any => ScalaPBAny } import org.apache.kafka.clients.producer.ProducerRecord import org.slf4j.LoggerFactory class PublishEventsProjectionHandler( system: ActorSystem[_], topic: String, sendProducer: SendProducer[String, Array[Byte]]) (1) extends Handler[EventEnvelope[ShoppingCart.Event]] { private val log = LoggerFactory.getLogger(getClass) private implicit val ec: ExecutionContext = system.executionContext override def process( envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { val event = envelope.event // using the cartId as the key and `DefaultPartitioner` will select partition based on the key // so that events for same cart always ends up in same partition val key = event.cartId val producerRecord = new ProducerRecord(topic, key, serialize(event)) (2) val result = sendProducer.send(producerRecord).map { recordMetadata => log.info( "Published event [{}] to topic/partition {}/{}", event, topic, recordMetadata.partition) Done } result } private def serialize(event: ShoppingCart.Event): Array[Byte] = { val protoMessage = event match { case ShoppingCart.ItemAdded(cartId, itemId, quantity) => proto.ItemAdded(cartId, itemId, quantity) case ShoppingCart.CheckedOut(cartId, _) => proto.CheckedOut(cartId) } // pack in Any so that type information is included for deserialization ScalaPBAny.pack(protoMessage, "shopping-cart-service").toByteArray (3) } }
1 | SendProducer comes from the Kafka connector in Alpakka. |
2 | The events are serialized to Protobuf and sent to the given topic. |
3 | Wrap in Protobuf Any to include type information. |
The serialization converts the ShoppingCart.Event
classes to the Protobuf representation. Since several types of messages are sent to the same topic we must include some type information that the consumers of the topic can use when deserializing the messages. Protobuf provides a built-in type called Any
for this purpose. That is why it is wrapped with ScalaPBAny.pack
.
3. Initialize the Projection
If this were our first Projection, we would need to add tags for the events. However, the tagging of the events is already in place from the previous step. So, we can simply add the appropriate initialization as follows:
-
Place the initialization code of the Projection in an
PublishEventsProjection
object class:- Java
-
src/main/java/shopping/cart/PublishEventsProjection.java:
package shopping.cart; import akka.actor.CoordinatedShutdown; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.kafka.ProducerSettings; import akka.kafka.javadsl.SendProducer; import akka.persistence.jdbc.query.javadsl.JdbcReadJournal; import akka.persistence.query.Offset; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.AtLeastOnceProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.jdbc.javadsl.JdbcProjection; import java.util.Optional; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.orm.jpa.JpaTransactionManager; import shopping.cart.repository.HibernateJdbcSession; public final class PublishEventsProjection { private PublishEventsProjection() {} public static void init(ActorSystem<?> system, JpaTransactionManager transactionManager) { SendProducer<String, byte[]> sendProducer = createProducer(system); String topic = system.settings().config().getString("shopping-cart-service.kafka.topic"); ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "PublishEventsProjection", ShoppingCart.TAGS.size(), index -> ProjectionBehavior.create( createProjectionFor(system, transactionManager, topic, sendProducer, index)), ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static SendProducer<String, byte[]> createProducer(ActorSystem<?> system) { ProducerSettings<String, byte[]> producerSettings = ProducerSettings.create(system, new StringSerializer(), new ByteArraySerializer()); SendProducer<String, byte[]> sendProducer = new SendProducer<>(producerSettings, system); CoordinatedShutdown.get(system) .addTask( CoordinatedShutdown.PhaseActorSystemTerminate(), "close-sendProducer", () -> sendProducer.close()); return sendProducer; } private static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjectionFor( ActorSystem<?> system, JpaTransactionManager transactionManager, String topic, SendProducer<String, byte[]> sendProducer, int index) { String tag = ShoppingCart.TAGS.get(index); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, JdbcReadJournal.Identifier(), tag); return JdbcProjection.atLeastOnceAsync( ProjectionId.of("PublishEventsProjection", tag), sourceProvider, () -> new HibernateJdbcSession(transactionManager), () -> new PublishEventsProjectionHandler(topic, sendProducer), system); } }
- Scala
-
src/main/scala/shopping/cart/PublishEventsProjection.scala:
package shopping.cart import akka.actor.CoordinatedShutdown import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.kafka.ProducerSettings import akka.kafka.scaladsl.SendProducer import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal import akka.persistence.query.Offset import akka.projection.eventsourced.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.jdbc.scaladsl.JdbcProjection import akka.projection.scaladsl.{ AtLeastOnceProjection, SourceProvider } import akka.projection.{ ProjectionBehavior, ProjectionId } import org.apache.kafka.common.serialization.{ ByteArraySerializer, StringSerializer } import shopping.cart.repository.ScalikeJdbcSession object PublishEventsProjection { def init(system: ActorSystem[_]): Unit = { val sendProducer = createProducer(system) val topic = system.settings.config.getString("shopping-cart-service.kafka.topic") ShardedDaemonProcess(system).init( name = "PublishEventsProjection", ShoppingCart.tags.size, index => ProjectionBehavior( createProjectionFor(system, topic, sendProducer, index)), ShardedDaemonProcessSettings(system), Some(ProjectionBehavior.Stop)) } private def createProducer( system: ActorSystem[_]): SendProducer[String, Array[Byte]] = { val producerSettings = ProducerSettings(system, new StringSerializer, new ByteArraySerializer) val sendProducer = SendProducer(producerSettings)(system) CoordinatedShutdown(system).addTask( CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "close-sendProducer") { () => sendProducer.close() } sendProducer } private def createProjectionFor( system: ActorSystem[_], topic: String, sendProducer: SendProducer[String, Array[Byte]], index: Int) : AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = { val tag = ShoppingCart.tags(index) val sourceProvider : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider.eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = JdbcReadJournal.Identifier, tag = tag) JdbcProjection.atLeastOnceAsync( projectionId = ProjectionId("PublishEventsProjection", tag), sourceProvider, handler = () => new PublishEventsProjectionHandler(system, topic, sendProducer), sessionFactory = () => new ScalikeJdbcSession())(system) } }
The
SendProducer
is initialized using some configuration that we need to add. It defines how to connect to the Kafka broker. -
Add the following to a new
src/main/resources/kafka.conf
file:shopping-cart-service { kafka.topic = "shopping-cart-events" } # common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients kafka-connection-settings { # This and other connection settings may have to be changed depending on environment. bootstrap.servers = "localhost:9092" } akka.kafka.producer { kafka-clients = ${kafka-connection-settings} } akka.kafka.consumer { kafka-clients = ${kafka-connection-settings} }
-
Include
kafka.conf
inapplication.conf
. -
For local development add the following to
src/main/resources/local-shared.conf
, which is loaded when running locally:# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients kafka-connection-settings { bootstrap.servers = "localhost:9092" } akka.kafka.producer { kafka-clients = ${kafka-connection-settings} } akka.kafka.consumer { kafka-clients = ${kafka-connection-settings} }
-
Call
PublishEventsProjection.init
fromMain
:- Java
-
PublishEventsProjection.init(system, transactionManager);
- Scala
-
PublishEventsProjection.init(system)
4. Consume the events
Let’s add another service that consumes the events from the Kafka topic. The template download (or other source downloads) includes a directory named shopping-analytics-service
. This service will receive the events in the Protobuf format defined in the ShoppingCartEvents.proto
from the shopping-cart-service
so we can copy the .proto
file we created earlier.
Different services should not share code, but we can copy the Protobuf specification since that is the published interface of the service. |
To add the service, follow these steps:
-
Open the
shopping-analytics-service
in IntelliJ just as you did with the shopping-cart-service. -
Copy the
ShoppingCartEvents.proto
from theshopping-cart-service
to theshopping-analytics-service/src/main/protobuf
and generate code by compiling the project:sbt compile
mvn compile
-
Create a
ShoppingCartEventConsumer
object class inshopping-analytics-service
. It runs an Akka Stream with a KafkaConsumer.committableSource
from Alpakka Kafka.- Java
-
src/main/java/shopping/analytics/ShoppingCartEventConsumer.java:
package shopping.analytics; import akka.Done; import akka.actor.typed.ActorSystem; import akka.kafka.CommitterSettings; import akka.kafka.ConsumerSettings; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Committer; import akka.kafka.javadsl.Consumer; import akka.stream.RestartSettings; import akka.stream.javadsl.RestartSource; import com.google.protobuf.Any; import com.google.protobuf.CodedInputStream; import com.google.protobuf.InvalidProtocolBufferException; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.CheckedOut; import shopping.cart.proto.ItemAdded; import shopping.cart.proto.ItemQuantityAdjusted; import shopping.cart.proto.ItemRemoved; class ShoppingCartEventConsumer { private static final Logger log = LoggerFactory.getLogger(ShoppingCartEventConsumer.class); static void init(ActorSystem<?> system) { String topic = system .settings() .config() .getString("shopping-analytics-service.shopping-cart-kafka-topic"); ConsumerSettings<String, byte[]> consumerSettings = ConsumerSettings.create(system, new StringDeserializer(), new ByteArrayDeserializer()) .withGroupId("shopping-cart-analytics"); CommitterSettings committerSettings = CommitterSettings.create(system); Duration minBackoff = Duration.ofSeconds(1); Duration maxBackoff = Duration.ofSeconds(30); double randomFactor = 0.1; RestartSource (1) .onFailuresWithBackoff( RestartSettings.create(minBackoff, maxBackoff, randomFactor), () -> { return Consumer.committableSource( consumerSettings, Subscriptions.topics(topic)) (2) .mapAsync( 1, msg -> handleRecord(msg.record()).thenApply(done -> msg.committableOffset())) .via(Committer.flow(committerSettings)); (3) }) .run(system); } private static CompletionStage<Done> handleRecord(ConsumerRecord<String, byte[]> record) throws InvalidProtocolBufferException { byte[] bytes = record.value(); Any x = Any.parseFrom(bytes); (4) String typeUrl = x.getTypeUrl(); CodedInputStream inputBytes = x.getValue().newCodedInput(); try { switch (typeUrl) { case "shopping-cart-service/shoppingcart.ItemAdded": { ItemAdded event = ItemAdded.parseFrom(inputBytes); log.info( "ItemAdded: {} {} to cart {}", event.getQuantity(), event.getItemId(), event.getCartId()); break; } case "shopping-cart-service/shoppingcart.CheckedOut": { CheckedOut event = CheckedOut.parseFrom(inputBytes); log.info("CheckedOut: cart {} checked out", event.getCartId()); break; } default: throw new IllegalArgumentException("unknown record type " + typeUrl); } } catch (Exception e) { log.error("Could not process event of type [{}]", typeUrl, e); // continue with next } return CompletableFuture.completedFuture(Done.getInstance()); } }
- Scala
-
src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala:
package shopping.analytics import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.Done import akka.actor.typed.ActorSystem import akka.kafka.CommitterSettings import akka.kafka.ConsumerSettings import akka.kafka.Subscriptions import akka.kafka.scaladsl.{ Committer, Consumer } import akka.stream.RestartSettings import akka.stream.scaladsl.RestartSource import com.google.protobuf.any.{ Any => ScalaPBAny } import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import org.slf4j.LoggerFactory import shopping.cart.proto object ShoppingCartEventConsumer { private val log = LoggerFactory.getLogger("shopping.analytics.ShoppingCartEventConsumer") def init(system: ActorSystem[_]): Unit = { implicit val sys: ActorSystem[_] = system implicit val ec: ExecutionContext = system.executionContext val topic = system.settings.config .getString("shopping-analytics-service.shopping-cart-kafka-topic") val consumerSettings = ConsumerSettings( system, new StringDeserializer, new ByteArrayDeserializer).withGroupId("shopping-cart-analytics") val committerSettings = CommitterSettings(system) RestartSource (1) .onFailuresWithBackoff( RestartSettings( minBackoff = 1.second, maxBackoff = 30.seconds, randomFactor = 0.1)) { () => Consumer .committableSource( consumerSettings, Subscriptions.topics(topic) ) (2) .mapAsync(1) { msg => handleRecord(msg.record).map(_ => msg.committableOffset) } .via(Committer.flow(committerSettings)) (3) } .run() } private def handleRecord( record: ConsumerRecord[String, Array[Byte]]): Future[Done] = { val bytes = record.value() val x = ScalaPBAny.parseFrom(bytes) (4) val typeUrl = x.typeUrl try { val inputBytes = x.value.newCodedInput() val event = typeUrl match { case "shopping-cart-service/shoppingcart.ItemAdded" => proto.ItemAdded.parseFrom(inputBytes) case "shopping-cart-service/shoppingcart.CheckedOut" => proto.CheckedOut.parseFrom(inputBytes) case _ => throw new IllegalArgumentException( s"unknown record type [$typeUrl]") } event match { case proto.ItemAdded(cartId, itemId, quantity, _) => log.info("ItemAdded: {} {} to cart {}", quantity, itemId, cartId) case proto.CheckedOut(cartId, _) => log.info("CheckedOut: cart {} checked out", cartId) } Future.successful(Done) } catch { case NonFatal(e) => log.error("Could not process event of type [{}]", typeUrl, e) // continue with next Future.successful(Done) } } }
1 | RestartSource will restart the stream in case of failures. |
2 | Kafka Consumer stream. |
3 | Offset is committed to Kafka when records have been processed. |
4 | Protobuf Any for type information.
Note how the deserialization is using the type information from the Protobuf |
4.1. Configuration
We need to add configuration to initialize the Consumer
and define how to connect to the Kafka broker.
Add the following to a new src/main/resources/kafka.conf
file in shopping-analytics-service
:
shopping-analytics-service {
shopping-cart-kafka-topic = "shopping-cart-events"
}
# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients
kafka-connection-settings {
# This and other connection settings may have to be changed depending on environment.
bootstrap.servers = "localhost:9092"
}
akka.kafka.producer {
kafka-clients = ${kafka-connection-settings}
}
akka.kafka.consumer {
kafka-clients = ${kafka-connection-settings}
kafka-clients {
auto.offset.reset = "earliest"
}
}
Include kafka.conf
from application.conf
.
And for local development add the following to src/main/resources/local-shared.conf
, which is loaded when running locally:
shopping-analytics-service.kafka.bootstrap-servers = "localhost:9092"
4.2. Main
Edit the Main
class that is included from the template project. It should initialize the ActorSystem
and the ShoppingCartEventConsumer
like this:
- Java
-
package shopping.analytics; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.management.cluster.bootstrap.ClusterBootstrap; import akka.management.javadsl.AkkaManagement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingAnalyticsService"); try { init(system); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system) { AkkaManagement.get(system).start(); ClusterBootstrap.get(system).start(); ShoppingCartEventConsumer.init(system); } }
- Scala
-
package shopping.analytics import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.ActorSystem import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement import org.slf4j.LoggerFactory import scala.util.control.NonFatal object Main { val logger = LoggerFactory.getLogger("shopping.analytics.Main") def main(args: Array[String]): Unit = { val system = ActorSystem[Nothing](Behaviors.empty, "ShoppingAnalyticsService") try { init(system) } catch { case NonFatal(e) => logger.error("Terminating due to initialization failure.", e) system.terminate() } } def init(system: ActorSystem[_]): Unit = { AkkaManagement(system).start() ClusterBootstrap(system).start() ShoppingCartEventConsumer.init(system) } }
5. Run locally
In addition to PostgresSQL we now also need Kafka. The docker-compose
script starts PostgresSQL and Kafka:
-
Start PostgresSQL and Kafka, unless it’s already running, from the
shopping-cart-service
:docker-compose up -d
-
Run the
shopping-cart-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
-
In another terminal, run the new
shopping-analytics-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
5.1. Exercise the service
Use grpcurl
to exercise the service:
-
Start another terminal, and use
grpcurl
to add 1 pencil to a cart:grpcurl -d '{"cartId":"cart4", "itemId":"pencil", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Look at the log output in the terminal of the
shopping-analytics-service
. You should see the logging from theAddItem
, showing that the new service consumed the event from Kafka:ItemAdded: 1 pencil to cart cart4
5.2. Stop the service
When finished, stop the shopping-cart-service
and shopping-analytics-service
with ctrl-c
. Leave PostgresSQL and Kafka running for the next set of steps, or stop them with:
docker-compose stop
The following steps for cloud deployment are optional. If you are only running locally, you can skip to the next section of the tutorial. |
6. Run in Kubernetes
Create a Kubernetes cluster and install the Akka Operator if you haven’t already.
6.1. Create a Kafka cluster
Follow the Amazon MSK instructions to setup a Kafka cluster in EKS.
Create two secrets for it, one for the shopping-cart-service
:
kubectl create secret generic \
shopping-cart-service-kafka-secret \
--from-literal=bootstrapServers=<copied bootstrap servers connect string>
and another secret (with same content) for the shopping-analytics-service
:
kubectl create secret generic \
shopping-analytics-service-kafka-secret
--from-literal=bootstrapServers=<copied bootstrap servers connect string>
The Amazon MSK page also covers how to create the shopping-cart-events
topic.
6.2. Build Docker image
Create a Docker repository and authenticate Docker as described in Amazon Elastic Container Registry if you haven’t already.
Build and publish the Docker images for both shopping-cart-service
and shopping-analytics-service
.
- Java
-
mvn -DskipTests -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com clean package docker:push
- Scala
-
sbt -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com docker:publish
Take note of the image tag as displayed by the docker:publish
docker:push
command.
- Java
-
DOCKER> Tagging image shopping-cart-service:20201209-135004-363ae2b successful!
- Scala
-
[info] Successfully tagged shopping-cart-service:20201209-135004-363ae2b
6.3. Update the deployment descriptor
Update the deployment descriptors with the respective image tag and the Kafka credential secret.
For shopping-cart-service
:
apiVersion: "v1"
kind: "Namespace"
metadata:
name: "shopping"
---
apiVersion: akka.lightbend.com/v1
kind: AkkaMicroservice
metadata:
name: shopping-cart-service
namespace: "shopping"
spec:
replicas: 1
image: <docker-registry>/shopping-cart-service:<tag> (1)
javaOptions: "-Xlog:gc -XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"
resources:
limits:
memory: "2Gi"
requests:
memory: "2Gi"
cpu: "1"
jdbc:
credentialsSecret: shopping-cart-service-jdbc-secret
kafka:
credentialsSecret: shopping-cart-service-kafka-secret (2)
1 | Replace <docker-registry> with your docker registry address and update the image reference with the image tag from the output of the Docker build above, for example: 803424716218.dkr.ecr.eu-central-1.amazonaws.com/shopping-cart-service:20201209-135004-363ae2b . |
2 | Add a kafka.credentialsSecret section pointing to the secret created in the Amazon MSK instructions. |
Similar for the shopping-analytics-service
:
apiVersion: "v1"
kind: "Namespace"
metadata:
name: "shopping"
---
apiVersion: akka.lightbend.com/v1
kind: AkkaMicroservice
metadata:
name: shopping-analytics-service
namespace: "shopping"
spec:
replicas: 1
image: <repository>/shopping-analytics-service:<tag>
javaOptions: "-Xlog:gc -XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"
resources:
limits:
memory: "2Gi"
requests:
memory: "2Gi"
cpu: "1"
kafka:
credentialsSecret: shopping-analytics-service-kafka-secret
6.4. Apply to Kubernetes
Apply both shopping-cart-service/kubernetes/shopping-cart-service-cr.yml
and shopping-order-service/kubernetes/shopping-analytics-service-cr.yml
to Kubernetes:
kubectl apply -f shopping-cart-service/kubernetes/shopping-cart-service-cr.yml
kubectl apply -f shopping-analytics-service/kubernetes/shopping-analytics-service-cr.yml
You can see progress by viewing the status:
kubectl get akkamicroservices
See troubleshooting deployment status for more details.
6.5. Exercise the service in Kubernetes
-
You can list the pods with:
kubectl get pods
-
Inspect logs from both services from a separate terminal window:
kubectl logs -f <shopping-cart-service pod name from above>
kubectl logs -f <shopping-analytics-service pod name from above>
-
Add port forwarding for the gRPC endpoint from a separate terminal:
kubectl port-forward svc/shopping-cart-service-grpc 8101:8101
Use grpcurl
to exercise the service:
-
Start another terminal, and use
grpcurl
to add 1 pencil to a cart:grpcurl -d '{"cartId":"cart4", "itemId":"pencil", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Look at the log output in the terminal of the
shopping-analytics-service
. You should see the logging from theAddItem
, showing that the new service consumed the event from Kafka:ItemAdded: 1 pencil to cart cart4