Section 7: Projection publishing to Kafka

To decouple communication between different Microservices, we can publish messages to a broker, such as Apache Kafka. See Internal and External Communication concepts for more information.

To accomplish this, we will add another Projection from the events of the ShoppingCart entity. The new Projection will be similar to what we developed in the previous step, but it will send the events to a Kafka topic instead of updating a database. As shown below, we will also add an Analytics service, the ShoppingAnalyticsService, that consumes the events from the Kafka topic.

Example Kafka

This part of the full example will focus on the Kafka producer in the PublishEventsProjection and the Kafka consumer in ShoppingAnalyticsService. On this page you will learn how to:

  • send messages to a Kafka topic from a Projection

  • consume messages from a Kafka topic

Akka Workshop

The fourth video of the Akka Workshop Series new tab covers CQRS and Projections for Kafka and gRPC. It provides some solid guidance to aid you in understanding this section, and the next section of this guide.

Source downloads

If you prefer to simply view and run the example, download a zip file containing the completed code:

Java
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

Scala
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

1. External representation of the events

For external APIs of a service, such as a Kafka topic that is consumed by other services, it is good to have a well defined data format. Therefore we define event data formats in Protobuf rather than using the internal event representation. This also makes it easier to evolve the representation of events over time without breaking downstream consumers.

To define the external representation:

  1. Add a new ShoppingCartEvents.proto with the specification of the events:

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_package = "shopping.cart.proto";
    
    package shoppingcart;
    
    // Events published to Kafka
    
    message ItemAdded {
        string cartId = 1;
        string itemId = 2;
        int32 quantity = 3;
    }
    
    message ItemQuantityAdjusted {
        string cartId = 1;
        string itemId = 2;
        int32 quantity = 3;
    }
    
    message ItemRemoved {
        string cartId = 1;
        string itemId = 2;
    }
    
    message CheckedOut {
        string cartId = 1;
    }
  2. Generate code by compiling the project:

    sbt compile
    mvn compile

2. Send to Kafka from a Projection

Add a PublishEventsProjectionHandler class that is the Projection Handler for processing the events:

Java
src/main/java/shopping/cart/PublishEventsProjectionHandler.java:
package shopping.cart;

import static akka.Done.done;

import akka.Done;
import akka.kafka.javadsl.SendProducer;
import akka.projection.eventsourced.EventEnvelope;
import akka.projection.javadsl.Handler;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PublishEventsProjectionHandler
    extends Handler<EventEnvelope<ShoppingCart.Event>> {
  private final Logger logger = LoggerFactory.getLogger(getClass());
  private final String topic;
  private final SendProducer<String, byte[]> sendProducer; (1)

  public PublishEventsProjectionHandler(String topic, SendProducer<String, byte[]> sendProducer) {
    this.topic = topic;
    this.sendProducer = sendProducer;
  }

  @Override
  public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope)
      throws Exception {
    ShoppingCart.Event event = envelope.event();

    // using the cartId as the key and `DefaultPartitioner` will select partition based on the key
    // so that events for same cart always ends up in same partition
    String key = event.cartId;
    ProducerRecord<String, byte[]> producerRecord =
        new ProducerRecord<>(topic, key, serialize(event)); (2)
    return sendProducer
        .send(producerRecord)
        .thenApply(
            recordMetadata -> {
              logger.info(
                  "Published event [{}] to topic/partition {}/{}",
                  event,
                  topic,
                  recordMetadata.partition());
              return done();
            });
  }

  private static byte[] serialize(ShoppingCart.Event event) {
    final ByteString protoMessage;
    final String fullName;
    if (event instanceof ShoppingCart.ItemAdded) {
      ShoppingCart.ItemAdded itemAdded = (ShoppingCart.ItemAdded) event;
      protoMessage =
          shopping.cart.proto.ItemAdded.newBuilder()
              .setCartId(itemAdded.cartId)
              .setItemId(itemAdded.itemId)
              .setQuantity(itemAdded.quantity)
              .build()
              .toByteString();
      fullName = shopping.cart.proto.ItemAdded.getDescriptor().getFullName();
    } else if (event instanceof ShoppingCart.CheckedOut) {
      ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
      protoMessage =
          shopping.cart.proto.CheckedOut.newBuilder()
              .setCartId(checkedOut.cartId)
              .build()
              .toByteString();
      fullName = shopping.cart.proto.CheckedOut.getDescriptor().getFullName();
    } else {
      throw new IllegalArgumentException("Unknown event type: " + event.getClass());
    }
    // pack in Any so that type information is included for deserialization
    return Any.newBuilder()
        .setValue(protoMessage)
        .setTypeUrl("shopping-cart-service/" + fullName)
        .build()
        .toByteArray(); (3)
  }
}
Scala
src/main/scala/shopping/cart/PublishEventsProjectionHandler.scala:
package shopping.cart

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.Done
import akka.actor.typed.ActorSystem
import akka.kafka.scaladsl.SendProducer
import akka.projection.eventsourced.EventEnvelope
import akka.projection.scaladsl.Handler
import com.google.protobuf.any.{ Any => ScalaPBAny }
import org.apache.kafka.clients.producer.ProducerRecord
import org.slf4j.LoggerFactory

class PublishEventsProjectionHandler(
    system: ActorSystem[_],
    topic: String,
    sendProducer: SendProducer[String, Array[Byte]]) (1)
    extends Handler[EventEnvelope[ShoppingCart.Event]] {
  private val log = LoggerFactory.getLogger(getClass)
  private implicit val ec: ExecutionContext =
    system.executionContext

  override def process(
      envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
    val event = envelope.event

    // using the cartId as the key and `DefaultPartitioner` will select partition based on the key
    // so that events for same cart always ends up in same partition
    val key = event.cartId
    val producerRecord = new ProducerRecord(topic, key, serialize(event)) (2)
    val result = sendProducer.send(producerRecord).map { recordMetadata =>
      log.info(
        "Published event [{}] to topic/partition {}/{}",
        event,
        topic,
        recordMetadata.partition)
      Done
    }
    result
  }

  private def serialize(event: ShoppingCart.Event): Array[Byte] = {
    val protoMessage = event match {
      case ShoppingCart.ItemAdded(cartId, itemId, quantity) =>
        proto.ItemAdded(cartId, itemId, quantity)
      case ShoppingCart.CheckedOut(cartId, _) =>
        proto.CheckedOut(cartId)
    }
    // pack in Any so that type information is included for deserialization
    ScalaPBAny.pack(protoMessage, "shopping-cart-service").toByteArray (3)
  }
}
1 SendProducer comes from the Kafka connector in Alpakka.
2 The events are serialized to Protobuf and sent to the given topic.
3 Wrap in Protobuf Any to include type information.

The serialization converts the ShoppingCart.Event classes to the Protobuf representation. Since several types of messages are sent to the same topic we must include some type information that the consumers of the topic can use when deserializing the messages. Protobuf provides a built-in type called Any for this purpose. That is why it is wrapped with ScalaPBAny.pack.

3. Initialize the Projection

If this were our first Projection, we would need to add tags for the events. However, the tagging of the events is already in place from the previous step. So, we can simply add the appropriate initialization as follows:

  1. Place the initialization code of the Projection in an PublishEventsProjection object class:

    Java
    src/main/java/shopping/cart/PublishEventsProjection.java:
    package shopping.cart;
    
    import akka.actor.CoordinatedShutdown;
    import akka.actor.typed.ActorSystem;
    import akka.cluster.sharding.typed.ShardedDaemonProcessSettings;
    import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
    import akka.kafka.ProducerSettings;
    import akka.kafka.javadsl.SendProducer;
    import akka.persistence.jdbc.query.javadsl.JdbcReadJournal;
    import akka.persistence.query.Offset;
    import akka.projection.ProjectionBehavior;
    import akka.projection.ProjectionId;
    import akka.projection.eventsourced.EventEnvelope;
    import akka.projection.eventsourced.javadsl.EventSourcedProvider;
    import akka.projection.javadsl.AtLeastOnceProjection;
    import akka.projection.javadsl.SourceProvider;
    import akka.projection.jdbc.javadsl.JdbcProjection;
    import java.util.Optional;
    import org.apache.kafka.common.serialization.ByteArraySerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.orm.jpa.JpaTransactionManager;
    import shopping.cart.repository.HibernateJdbcSession;
    
    public final class PublishEventsProjection {
    
      private PublishEventsProjection() {}
    
      public static void init(ActorSystem<?> system, JpaTransactionManager transactionManager) {
        SendProducer<String, byte[]> sendProducer = createProducer(system);
        String topic = system.settings().config().getString("shopping-cart-service.kafka.topic");
    
        ShardedDaemonProcess.get(system)
            .init(
                ProjectionBehavior.Command.class,
                "PublishEventsProjection",
                ShoppingCart.TAGS.size(),
                index ->
                    ProjectionBehavior.create(
                        createProjectionFor(system, transactionManager, topic, sendProducer, index)),
                ShardedDaemonProcessSettings.create(system),
                Optional.of(ProjectionBehavior.stopMessage()));
      }
    
      private static SendProducer<String, byte[]> createProducer(ActorSystem<?> system) {
        ProducerSettings<String, byte[]> producerSettings =
            ProducerSettings.create(system, new StringSerializer(), new ByteArraySerializer());
        SendProducer<String, byte[]> sendProducer = new SendProducer<>(producerSettings, system);
        CoordinatedShutdown.get(system)
            .addTask(
                CoordinatedShutdown.PhaseActorSystemTerminate(),
                "close-sendProducer",
                () -> sendProducer.close());
        return sendProducer;
      }
    
      private static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>>
          createProjectionFor(
              ActorSystem<?> system,
              JpaTransactionManager transactionManager,
              String topic,
              SendProducer<String, byte[]> sendProducer,
              int index) {
        String tag = ShoppingCart.TAGS.get(index);
        SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
            EventSourcedProvider.eventsByTag(system, JdbcReadJournal.Identifier(), tag);
    
        return JdbcProjection.atLeastOnceAsync(
            ProjectionId.of("PublishEventsProjection", tag),
            sourceProvider,
            () -> new HibernateJdbcSession(transactionManager),
            () -> new PublishEventsProjectionHandler(topic, sendProducer),
            system);
      }
    }
    Scala
    src/main/scala/shopping/cart/PublishEventsProjection.scala:
    package shopping.cart
    
    import akka.actor.CoordinatedShutdown
    import akka.actor.typed.ActorSystem
    import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
    import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
    import akka.kafka.ProducerSettings
    import akka.kafka.scaladsl.SendProducer
    import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
    import akka.persistence.query.Offset
    import akka.projection.eventsourced.EventEnvelope
    import akka.projection.eventsourced.scaladsl.EventSourcedProvider
    import akka.projection.jdbc.scaladsl.JdbcProjection
    import akka.projection.scaladsl.{ AtLeastOnceProjection, SourceProvider }
    import akka.projection.{ ProjectionBehavior, ProjectionId }
    import org.apache.kafka.common.serialization.{
      ByteArraySerializer,
      StringSerializer
    }
    import shopping.cart.repository.ScalikeJdbcSession
    
    object PublishEventsProjection {
    
      def init(system: ActorSystem[_]): Unit = {
        val sendProducer = createProducer(system)
        val topic =
          system.settings.config.getString("shopping-cart-service.kafka.topic")
    
        ShardedDaemonProcess(system).init(
          name = "PublishEventsProjection",
          ShoppingCart.tags.size,
          index =>
            ProjectionBehavior(
              createProjectionFor(system, topic, sendProducer, index)),
          ShardedDaemonProcessSettings(system),
          Some(ProjectionBehavior.Stop))
      }
    
      private def createProducer(
          system: ActorSystem[_]): SendProducer[String, Array[Byte]] = {
        val producerSettings =
          ProducerSettings(system, new StringSerializer, new ByteArraySerializer)
        val sendProducer =
          SendProducer(producerSettings)(system)
        CoordinatedShutdown(system).addTask(
          CoordinatedShutdown.PhaseBeforeActorSystemTerminate,
          "close-sendProducer") { () =>
          sendProducer.close()
        }
        sendProducer
      }
    
      private def createProjectionFor(
          system: ActorSystem[_],
          topic: String,
          sendProducer: SendProducer[String, Array[Byte]],
          index: Int)
          : AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = {
        val tag = ShoppingCart.tags(index)
        val sourceProvider
            : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] =
          EventSourcedProvider.eventsByTag[ShoppingCart.Event](
            system = system,
            readJournalPluginId = JdbcReadJournal.Identifier,
            tag = tag)
    
        JdbcProjection.atLeastOnceAsync(
          projectionId = ProjectionId("PublishEventsProjection", tag),
          sourceProvider,
          handler =
            () => new PublishEventsProjectionHandler(system, topic, sendProducer),
          sessionFactory = () => new ScalikeJdbcSession())(system)
      }
    
    }

    The SendProducer is initialized using some configuration that we need to add. It defines how to connect to the Kafka broker.

  2. Add the following to a new src/main/resources/kafka.conf file:

    shopping-cart-service {
    
      kafka.topic = "shopping-cart-events"
    
    }
    
    # common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients
    kafka-connection-settings {
      # This and other connection settings may have to be changed depending on environment.
      bootstrap.servers = "localhost:9092"
    }
    akka.kafka.producer {
      kafka-clients = ${kafka-connection-settings}
    }
    akka.kafka.consumer {
      kafka-clients = ${kafka-connection-settings}
    }
  3. Include kafka.conf in application.conf.

  4. For local development add the following to src/main/resources/local-shared.conf, which is loaded when running locally:

    # common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients
    kafka-connection-settings {
      bootstrap.servers = "localhost:9092"
    }
    akka.kafka.producer {
      kafka-clients = ${kafka-connection-settings}
    }
    akka.kafka.consumer {
      kafka-clients = ${kafka-connection-settings}
    }
  5. Call PublishEventsProjection.init from Main:

    Java
    PublishEventsProjection.init(system, transactionManager);
    Scala
    PublishEventsProjection.init(system)

4. Consume the events

Let’s add another service that consumes the events from the Kafka topic. The template download (or other source downloads) includes a directory named shopping-analytics-service. This service will receive the events in the Protobuf format defined in the ShoppingCartEvents.proto from the shopping-cart-service so we can copy the .proto file we created earlier.

Different services should not share code, but we can copy the Protobuf specification since that is the published interface of the service.

To add the service, follow these steps:

  1. Open the shopping-analytics-service in IntelliJ just as you did with the shopping-cart-service.

  2. Copy the ShoppingCartEvents.proto from the shopping-cart-service to the shopping-analytics-service/src/main/protobuf and generate code by compiling the project:

    sbt compile
    mvn compile
  3. Create a ShoppingCartEventConsumer object class in shopping-analytics-service. It runs an Akka Stream with a Kafka Consumer.committableSource from Alpakka Kafka.

    Java
    src/main/java/shopping/analytics/ShoppingCartEventConsumer.java:
    package shopping.analytics;
    
    import akka.Done;
    import akka.actor.typed.ActorSystem;
    import akka.kafka.CommitterSettings;
    import akka.kafka.ConsumerSettings;
    import akka.kafka.Subscriptions;
    import akka.kafka.javadsl.Committer;
    import akka.kafka.javadsl.Consumer;
    import akka.stream.RestartSettings;
    import akka.stream.javadsl.RestartSource;
    import com.google.protobuf.Any;
    import com.google.protobuf.CodedInputStream;
    import com.google.protobuf.InvalidProtocolBufferException;
    import java.time.Duration;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CompletionStage;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import shopping.cart.proto.CheckedOut;
    import shopping.cart.proto.ItemAdded;
    import shopping.cart.proto.ItemQuantityAdjusted;
    import shopping.cart.proto.ItemRemoved;
    
    class ShoppingCartEventConsumer {
      private static final Logger log = LoggerFactory.getLogger(ShoppingCartEventConsumer.class);
    
      static void init(ActorSystem<?> system) {
        String topic =
            system
                .settings()
                .config()
                .getString("shopping-analytics-service.shopping-cart-kafka-topic");
        ConsumerSettings<String, byte[]> consumerSettings =
            ConsumerSettings.create(system, new StringDeserializer(), new ByteArrayDeserializer())
                .withGroupId("shopping-cart-analytics");
        CommitterSettings committerSettings = CommitterSettings.create(system);
    
        Duration minBackoff = Duration.ofSeconds(1);
        Duration maxBackoff = Duration.ofSeconds(30);
        double randomFactor = 0.1;
    
        RestartSource (1)
            .onFailuresWithBackoff(
                RestartSettings.create(minBackoff, maxBackoff, randomFactor),
                () -> {
                  return Consumer.committableSource(
                          consumerSettings, Subscriptions.topics(topic)) (2)
                      .mapAsync(
                          1,
                          msg -> handleRecord(msg.record()).thenApply(done -> msg.committableOffset()))
                      .via(Committer.flow(committerSettings)); (3)
                })
            .run(system);
      }
    
      private static CompletionStage<Done> handleRecord(ConsumerRecord<String, byte[]> record)
          throws InvalidProtocolBufferException {
        byte[] bytes = record.value();
        Any x = Any.parseFrom(bytes); (4)
        String typeUrl = x.getTypeUrl();
        CodedInputStream inputBytes = x.getValue().newCodedInput();
        try {
          switch (typeUrl) {
            case "shopping-cart-service/shoppingcart.ItemAdded":
              {
                ItemAdded event = ItemAdded.parseFrom(inputBytes);
                log.info(
                    "ItemAdded: {} {} to cart {}",
                    event.getQuantity(),
                    event.getItemId(),
                    event.getCartId());
                break;
              }
            case "shopping-cart-service/shoppingcart.CheckedOut":
              {
                CheckedOut event = CheckedOut.parseFrom(inputBytes);
                log.info("CheckedOut: cart {} checked out", event.getCartId());
                break;
              }
            default:
              throw new IllegalArgumentException("unknown record type " + typeUrl);
          }
        } catch (Exception e) {
          log.error("Could not process event of type [{}]", typeUrl, e);
          // continue with next
        }
        return CompletableFuture.completedFuture(Done.getInstance());
      }
    }
    Scala
    src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala:
    package shopping.analytics
    
    import scala.concurrent.ExecutionContext
    import scala.concurrent.Future
    import scala.concurrent.duration._
    import scala.util.control.NonFatal
    import akka.Done
    import akka.actor.typed.ActorSystem
    import akka.kafka.CommitterSettings
    import akka.kafka.ConsumerSettings
    import akka.kafka.Subscriptions
    import akka.kafka.scaladsl.{ Committer, Consumer }
    import akka.stream.RestartSettings
    import akka.stream.scaladsl.RestartSource
    import com.google.protobuf.any.{ Any => ScalaPBAny }
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.ByteArrayDeserializer
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.slf4j.LoggerFactory
    import shopping.cart.proto
    
    object ShoppingCartEventConsumer {
    
      private val log =
        LoggerFactory.getLogger("shopping.analytics.ShoppingCartEventConsumer")
    
      def init(system: ActorSystem[_]): Unit = {
        implicit val sys: ActorSystem[_] = system
        implicit val ec: ExecutionContext =
          system.executionContext
    
        val topic = system.settings.config
          .getString("shopping-analytics-service.shopping-cart-kafka-topic")
        val consumerSettings =
          ConsumerSettings(
            system,
            new StringDeserializer,
            new ByteArrayDeserializer).withGroupId("shopping-cart-analytics")
        val committerSettings = CommitterSettings(system)
    
        RestartSource (1)
          .onFailuresWithBackoff(
            RestartSettings(
              minBackoff = 1.second,
              maxBackoff = 30.seconds,
              randomFactor = 0.1)) { () =>
            Consumer
              .committableSource(
                consumerSettings,
                Subscriptions.topics(topic)
              ) (2)
              .mapAsync(1) { msg =>
                handleRecord(msg.record).map(_ => msg.committableOffset)
              }
              .via(Committer.flow(committerSettings)) (3)
          }
          .run()
      }
    
      private def handleRecord(
          record: ConsumerRecord[String, Array[Byte]]): Future[Done] = {
        val bytes = record.value()
        val x = ScalaPBAny.parseFrom(bytes) (4)
        val typeUrl = x.typeUrl
        try {
          val inputBytes = x.value.newCodedInput()
          val event =
            typeUrl match {
              case "shopping-cart-service/shoppingcart.ItemAdded" =>
                proto.ItemAdded.parseFrom(inputBytes)
              case "shopping-cart-service/shoppingcart.CheckedOut" =>
                proto.CheckedOut.parseFrom(inputBytes)
              case _ =>
                throw new IllegalArgumentException(
                  s"unknown record type [$typeUrl]")
            }
    
          event match {
            case proto.ItemAdded(cartId, itemId, quantity, _) =>
              log.info("ItemAdded: {} {} to cart {}", quantity, itemId, cartId)
            case proto.CheckedOut(cartId, _) =>
              log.info("CheckedOut: cart {} checked out", cartId)
          }
    
          Future.successful(Done)
        } catch {
          case NonFatal(e) =>
            log.error("Could not process event of type [{}]", typeUrl, e)
            // continue with next
            Future.successful(Done)
        }
      }
    
    }
1 RestartSource will restart the stream in case of failures.
2 Kafka Consumer stream.
3 Offset is committed to Kafka when records have been processed.
4 Protobuf Any for type information.

Note how the deserialization is using the type information from the Protobuf Any to decide which type of event to deserialize.

4.1. Configuration

We need to add configuration to initialize the Consumer and define how to connect to the Kafka broker.

Add the following to a new src/main/resources/kafka.conf file in shopping-analytics-service:

shopping-analytics-service {
  shopping-cart-kafka-topic = "shopping-cart-events"
}

# common config for akka.kafka.producer.kafka-clients and akka.kafka.consumer.kafka-clients
kafka-connection-settings {
  # This and other connection settings may have to be changed depending on environment.
  bootstrap.servers = "localhost:9092"
}
akka.kafka.producer {
  kafka-clients = ${kafka-connection-settings}
}
akka.kafka.consumer {
  kafka-clients = ${kafka-connection-settings}
  kafka-clients {
    auto.offset.reset = "earliest"
  }
}

Include kafka.conf from application.conf.

And for local development add the following to src/main/resources/local-shared.conf, which is loaded when running locally:

shopping-analytics-service.kafka.bootstrap-servers = "localhost:9092"

4.2. Main

Edit the Main class that is included from the template project. It should initialize the ActorSystem and the ShoppingCartEventConsumer like this:

Java
package shopping.analytics;

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.management.javadsl.AkkaManagement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Main {

  private static final Logger logger = LoggerFactory.getLogger(Main.class);

  public static void main(String[] args) {
    ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingAnalyticsService");
    try {
      init(system);
    } catch (Exception e) {
      logger.error("Terminating due to initialization failure.", e);
      system.terminate();
    }
  }

  public static void init(ActorSystem<Void> system) {
    AkkaManagement.get(system).start();
    ClusterBootstrap.get(system).start();

    ShoppingCartEventConsumer.init(system);
  }
}
Scala
package shopping.analytics

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorSystem
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import org.slf4j.LoggerFactory
import scala.util.control.NonFatal

object Main {

  val logger = LoggerFactory.getLogger("shopping.analytics.Main")

  def main(args: Array[String]): Unit = {
    val system =
      ActorSystem[Nothing](Behaviors.empty, "ShoppingAnalyticsService")
    try {
      init(system)
    } catch {
      case NonFatal(e) =>
        logger.error("Terminating due to initialization failure.", e)
        system.terminate()
    }
  }

  def init(system: ActorSystem[_]): Unit = {
    AkkaManagement(system).start()
    ClusterBootstrap(system).start()

    ShoppingCartEventConsumer.init(system)
  }

}

5. Run locally

In addition to PostgresSQL we now also need Kafka. The docker-compose script starts PostgresSQL and Kafka:

  1. Start PostgresSQL and Kafka, unless it’s already running, from the shopping-cart-service:

    docker-compose up -d
  2. Run the shopping-cart-service with:

    # make sure to compile before running exec:exec
    mvn compile exec:exec -DAPP_CONFIG=local1.conf
    sbt -Dconfig.resource=local1.conf run
  3. In another terminal, run the new shopping-analytics-service with:

    # make sure to compile before running exec:exec
    mvn compile exec:exec -DAPP_CONFIG=local1.conf
    sbt -Dconfig.resource=local1.conf run

5.1. Exercise the service

Use grpcurl to exercise the service:

  1. Start another terminal, and use grpcurl to add 1 pencil to a cart:

    grpcurl -d '{"cartId":"cart4", "itemId":"pencil", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Look at the log output in the terminal of the shopping-analytics-service. You should see the logging from the AddItem, showing that the new service consumed the event from Kafka:

    ItemAdded: 1 pencil to cart cart4

5.2. Stop the service

When finished, stop the shopping-cart-service and shopping-analytics-service with ctrl-c. Leave PostgresSQL and Kafka running for the next set of steps, or stop them with:

docker-compose stop
The following steps for cloud deployment are optional. If you are only running locally, you can skip to the next section of the tutorial.

6. Run in Kubernetes

Create a Kubernetes cluster and install the Akka Operator if you haven’t already.

6.1. Create a Kafka cluster

Follow the Amazon MSK instructions to setup a Kafka cluster in EKS.

Create two secrets for it, one for the shopping-cart-service:

kubectl create secret generic \
    shopping-cart-service-kafka-secret \
    --from-literal=bootstrapServers=<copied bootstrap servers connect string>

and another secret (with same content) for the shopping-analytics-service:

kubectl create secret generic \
    shopping-analytics-service-kafka-secret
    --from-literal=bootstrapServers=<copied bootstrap servers connect string>

The Amazon MSK page also covers how to create the shopping-cart-events topic.

6.2. Build Docker image

Create a Docker repository and authenticate Docker as described in Amazon Elastic Container Registry if you haven’t already.

Build and publish the Docker images for both shopping-cart-service and shopping-analytics-service.

Java
mvn -DskipTests -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com clean package docker:push
Scala
sbt -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com docker:publish

Take note of the image tag as displayed by the docker:publish docker:push command.

Java
DOCKER> Tagging image shopping-cart-service:20201209-135004-363ae2b successful!
Scala
[info] Successfully tagged shopping-cart-service:20201209-135004-363ae2b

6.3. Update the deployment descriptor

Update the deployment descriptors with the respective image tag and the Kafka credential secret.

For shopping-cart-service:

kubernetes/shopping-cart-service-cr.yml:
apiVersion: "v1"
kind: "Namespace"
metadata:
  name: "shopping"
---
apiVersion: akka.lightbend.com/v1
kind: AkkaMicroservice
metadata:
  name: shopping-cart-service
  namespace: "shopping"
spec:
  replicas: 1
  image: <docker-registry>/shopping-cart-service:<tag> (1)
  javaOptions: "-Xlog:gc -XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"
  resources:
    limits:
      memory: "2Gi"
    requests:
      memory: "2Gi"
      cpu: "1"
  jdbc:
    credentialsSecret: shopping-cart-service-jdbc-secret
  kafka:
    credentialsSecret: shopping-cart-service-kafka-secret  (2)
1 Replace <docker-registry> with your docker registry address and update the image reference with the image tag from the output of the Docker build above, for example: 803424716218.dkr.ecr.eu-central-1.amazonaws.com/shopping-cart-service:20201209-135004-363ae2b.
2 Add a kafka.credentialsSecret section pointing to the secret created in the Amazon MSK instructions.

Similar for the shopping-analytics-service:

kubernetes/shopping-analytics-service-cr.yml:
apiVersion: "v1"
kind: "Namespace"
metadata:
  name: "shopping"
---
apiVersion: akka.lightbend.com/v1
kind: AkkaMicroservice
metadata:
  name: shopping-analytics-service
  namespace: "shopping"
spec:
  replicas: 1
  image: <repository>/shopping-analytics-service:<tag>
  javaOptions: "-Xlog:gc -XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"
  resources:
    limits:
      memory: "2Gi"
    requests:
      memory: "2Gi"
      cpu: "1"
  kafka:
    credentialsSecret: shopping-analytics-service-kafka-secret

6.4. Apply to Kubernetes

Apply both shopping-cart-service/kubernetes/shopping-cart-service-cr.yml and shopping-order-service/kubernetes/shopping-analytics-service-cr.yml to Kubernetes:

kubectl apply -f shopping-cart-service/kubernetes/shopping-cart-service-cr.yml
kubectl apply -f shopping-analytics-service/kubernetes/shopping-analytics-service-cr.yml

You can see progress by viewing the status:

kubectl get akkamicroservices

See troubleshooting deployment status for more details.

6.5. Exercise the service in Kubernetes

  1. You can list the pods with:

    kubectl get pods
  2. Inspect logs from both services from a separate terminal window:

    kubectl logs -f <shopping-cart-service pod name from above>
    kubectl logs -f <shopping-analytics-service pod name from above>
  3. Add port forwarding for the gRPC endpoint from a separate terminal:

    kubectl port-forward svc/shopping-cart-service-grpc 8101:8101

Use grpcurl to exercise the service:

  1. Start another terminal, and use grpcurl to add 1 pencil to a cart:

    grpcurl -d '{"cartId":"cart4", "itemId":"pencil", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Look at the log output in the terminal of the shopping-analytics-service. You should see the logging from the AddItem, showing that the new service consumed the event from Kafka:

    ItemAdded: 1 pencil to cart cart4