Section 8: Projection calling gRPC service

To complete the example, we need a way to handle orders. We will add another Projection from the events of the ShoppingCart entity. We will also create a gRPC Order Service, ShoppingOrderService. The Projection calls the Order Service when the shopping carts are checked out.

Example gRPC client

This part of the full example focuses on the gRPC client in the SendOrderProjection. On this page you will learn how to:

  • call another service with Akka gRPC

  • implement another gRPC service by adding the ShoppingOrderService

Source downloads

If you prefer to simply view and run the example, download a zip file containing the completed code:

Java
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

Scala
  • Source that includes all previous tutorial steps and allows you to start with the steps on this page.

  • Source with the steps on this page completed.

1. Add the Order gRPC service

Let’s add the service that handles shopping cart orders. The template and source downloads include a directory named shopping-order-service for this purpose. To add the order gRPC service, follow these steps:

  1. Open the shopping-order-service in IntelliJ just as you did with the shopping-cart-service.

  2. Define the interface of the service in a protobuf service descriptor. It should be located in the src/main/protobuf/ShoppingOrderService.proto in the new shopping-order-service project.

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_package = "shopping.order.proto";
    
    package shoppingorder;
    
    // gRPC definition for ShoppingOrderService
    
    service ShoppingOrderService {
        rpc Order (OrderRequest) returns (OrderResponse) {}
    }
    
    message OrderRequest {
        string cartId = 1;
        repeated Item items = 2;
    }
    
    message Item {
        string itemId = 1;
        int32 quantity = 2;
    }
    
    message OrderResponse {
        bool ok = 1;
    }
  3. Generate code by compiling the project:

    mvn compile
    sbt compile
  4. Implement the ShoppingOrderService in a new class ShoppingOrderServiceImpl:

    Java
    src/main/java/shopping/order/ShoppingOrderServiceImpl.java:
    package shopping.order;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CompletionStage;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import shopping.order.proto.Item;
    import shopping.order.proto.OrderRequest;
    import shopping.order.proto.OrderResponse;
    import shopping.order.proto.ShoppingOrderService;
    
    public final class ShoppingOrderServiceImpl implements ShoppingOrderService {
    
      private final Logger logger = LoggerFactory.getLogger(getClass());
    
      @Override
      public CompletionStage<OrderResponse> order(OrderRequest in) {
        int total = 0;
        for (Item item : in.getItemsList()) {
          total += item.getQuantity();
        }
        logger.info("Order {} items from cart {}.", total, in.getCartId());
        OrderResponse response = OrderResponse.newBuilder().setOk(true).build();
        return CompletableFuture.completedFuture(response);
      }
    }
    Scala
    src/main/scala/shopping/order/ShoppingOrderServiceImpl.scala:
    package shopping.order
    
    import scala.concurrent.Future
    
    import org.slf4j.LoggerFactory
    import shopping.order.proto.OrderRequest
    import shopping.order.proto.OrderResponse
    
    class ShoppingOrderServiceImpl extends proto.ShoppingOrderService {
    
      private val logger = LoggerFactory.getLogger(getClass)
    
      override def order(in: OrderRequest): Future[OrderResponse] = {
        val totalNumberOfItems =
          in.items.iterator.map(_.quantity).sum
        logger.info("Order {} items from cart {}.", totalNumberOfItems, in.cartId)
        Future.successful(OrderResponse(ok = true))
      }
    }
  5. Similar to the gRPC server for the ShoppingCartService we need to initialize the gRPC server. Add a ShoppingOrderServer object class:

    Java
    src/main/java/shopping/order/ShoppingOrderServer.java:
    package shopping.order;
    
    import akka.actor.typed.ActorSystem;
    import akka.grpc.javadsl.ServerReflection;
    import akka.grpc.javadsl.ServiceHandler;
    import akka.http.javadsl.Http;
    import akka.http.javadsl.ServerBinding;
    import akka.http.javadsl.model.HttpRequest;
    import akka.http.javadsl.model.HttpResponse;
    import akka.japi.function.Function;
    import java.net.InetSocketAddress;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.concurrent.CompletionStage;
    import shopping.order.proto.ShoppingOrderService;
    import shopping.order.proto.ShoppingOrderServiceHandlerFactory;
    
    public final class ShoppingOrderServer {
    
      private ShoppingOrderServer() {}
    
      static void start(
          String host, int port, ActorSystem<?> system, ShoppingOrderService grpcService) {
        @SuppressWarnings("unchecked")
        Function<HttpRequest, CompletionStage<HttpResponse>> service =
            ServiceHandler.concatOrNotFound(
                ShoppingOrderServiceHandlerFactory.create(grpcService, system),
                // ServerReflection enabled to support grpcurl without import-path and proto parameters
                ServerReflection.create(
                    Collections.singletonList(ShoppingOrderService.description), system));
    
        CompletionStage<ServerBinding> bound =
            Http.get(system).newServerAt(host, port).bind(service::apply);
    
        bound.whenComplete(
            (binding, ex) -> {
              if (binding != null) {
                binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system);
                InetSocketAddress address = binding.localAddress();
                system
                    .log()
                    .info(
                        "Shopping order at gRPC server {}:{}",
                        address.getHostString(),
                        address.getPort());
              } else {
                system.log().error("Failed to bind gRPC endpoint, terminating system", ex);
                system.terminate();
              }
            });
      }
    }
    Scala
    src/main/scala/shopping/order/ShoppingOrderServer.scala:
    package shopping.order
    
    import scala.concurrent.ExecutionContext
    import scala.concurrent.Future
    import scala.concurrent.duration._
    import scala.util.Failure
    import scala.util.Success
    
    import akka.actor.typed.ActorSystem
    import akka.grpc.scaladsl.ServerReflection
    import akka.grpc.scaladsl.ServiceHandler
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.HttpRequest
    import akka.http.scaladsl.model.HttpResponse
    
    object ShoppingOrderServer {
    
      def start(
          interface: String,
          port: Int,
          system: ActorSystem[_],
          grpcService: proto.ShoppingOrderService): Unit = {
        implicit val sys: ActorSystem[_] = system
        implicit val ec: ExecutionContext =
          system.executionContext
    
        val service: HttpRequest => Future[HttpResponse] =
          ServiceHandler.concatOrNotFound(
            proto.ShoppingOrderServiceHandler.partial(grpcService),
            // ServerReflection enabled to support grpcurl without import-path and proto parameters
            ServerReflection.partial(List(proto.ShoppingOrderService)))
    
        val bound =
          Http()
            .newServerAt(interface, port)
            .bind(service)
            .map(_.addToCoordinatedShutdown(3.seconds))
    
        bound.onComplete {
          case Success(binding) =>
            val address = binding.localAddress
            system.log.info(
              "Shopping order at gRPC server {}:{}",
              address.getHostString,
              address.getPort)
          case Failure(ex) =>
            system.log.error("Failed to bind gRPC endpoint, terminating system", ex)
            system.terminate()
        }
      }
    
    }
  6. Call the ShoppingOrderServer.start from Main:

    Java
    package shopping.order;
    
    import akka.actor.typed.ActorSystem;
    import akka.actor.typed.javadsl.Behaviors;
    import akka.management.cluster.bootstrap.ClusterBootstrap;
    import akka.management.javadsl.AkkaManagement;
    import com.typesafe.config.Config;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import shopping.order.proto.ShoppingOrderService;
    
    public class Main {
    
      private static final Logger logger = LoggerFactory.getLogger(Main.class);
    
      public static void main(String[] args) throws Exception {
        ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingOrderService");
        try {
          init(system);
        } catch (Exception e) {
          logger.error("Terminating due to initialization failure.", e);
          system.terminate();
        }
      }
    
      public static void init(ActorSystem<Void> system) {
        AkkaManagement.get(system).start();
        ClusterBootstrap.get(system).start();
    
        Config config = system.settings().config();
        String grpcInterface = config.getString("shopping-order-service.grpc.interface");
        int grpcPort = config.getInt("shopping-order-service.grpc.port");
        ShoppingOrderService grpcService = new ShoppingOrderServiceImpl();
        ShoppingOrderServer.start(grpcInterface, grpcPort, system, grpcService);
      }
    }
    Scala
    package shopping.order
    
    import akka.actor.typed.scaladsl.Behaviors
    import akka.actor.typed.ActorSystem
    import akka.management.cluster.bootstrap.ClusterBootstrap
    import akka.management.scaladsl.AkkaManagement
    import org.slf4j.LoggerFactory
    import scala.util.control.NonFatal
    
    object Main {
    
      val logger = LoggerFactory.getLogger("shopping.order.Main")
    
      def main(args: Array[String]): Unit = {
        val system = ActorSystem[Nothing](Behaviors.empty, "ShoppingOrderService")
        try {
          init(system)
        } catch {
          case NonFatal(e) =>
            logger.error("Terminating due to initialization failure.", e)
            system.terminate()
        }
      }
    
      def init(system: ActorSystem[_]): Unit = {
        AkkaManagement(system).start()
        ClusterBootstrap(system).start()
    
        val grpcInterface =
          system.settings.config.getString("shopping-order-service.grpc.interface")
        val grpcPort =
          system.settings.config.getInt("shopping-order-service.grpc.port")
        val grpcService = new ShoppingOrderServiceImpl
        ShoppingOrderServer.start(grpcInterface, grpcPort, system, grpcService)
      }
    
    }
The grpc.port configuration is defined in local1.conf, which is included in the generated template project.

2. Create the Projection

The new Projection for shopping-cart-service events will be similar to the one we developed for Kafka on the previous page, but when it receives ShoppingCart.CheckedOut events, it will call the ShoppingOrderService.

Create the Projection as follows:

  1. Include the service definition by copying the ShoppingOrderService.proto file from the shopping-order-service to the shopping-cart-service/src/main/protobuf directory.

  2. Generate code by compiling the shopping-cart-service project:

    mvn compile
    sbt compile
  3. Add a SendOrderProjectionHandler class in the shopping-cart-service project. This is the Projection Handler for processing the events:

    Java
    src/main/java/shopping/cart/SendOrderProjectionHandler.java:
    package shopping.cart;
    
    import static akka.Done.done;
    
    import akka.Done;
    import akka.actor.typed.ActorSystem;
    import akka.cluster.sharding.typed.javadsl.ClusterSharding;
    import akka.cluster.sharding.typed.javadsl.EntityRef;
    import akka.projection.eventsourced.EventEnvelope;
    import akka.projection.javadsl.Handler;
    import java.time.Duration;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CompletionStage;
    import java.util.stream.Collectors;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import shopping.order.proto.Item;
    import shopping.order.proto.OrderRequest;
    import shopping.order.proto.ShoppingOrderService;
    
    public final class SendOrderProjectionHandler extends Handler<EventEnvelope<ShoppingCart.Event>> {
    
      private final Logger log = LoggerFactory.getLogger(getClass());
      private final ClusterSharding sharding;
      private final Duration timeout;
      private final ShoppingOrderService orderService;
    
      public SendOrderProjectionHandler(
          ActorSystem<?> system, ShoppingOrderService orderService) { (1)
        sharding = ClusterSharding.get(system);
        timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout");
        this.orderService = orderService;
      }
    
      @Override
      public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope)
          throws Exception {
        if (envelope.event() instanceof ShoppingCart.CheckedOut) {
          ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) envelope.event();
          return sendOrder(checkedOut);
        } else {
          return CompletableFuture.completedFuture(done());
        }
      }
    
      private CompletionStage<Done> sendOrder(ShoppingCart.CheckedOut checkout) {
        EntityRef<ShoppingCart.Command> entityRef =
            sharding.entityRefFor(ShoppingCart.ENTITY_KEY, checkout.cartId);
        CompletionStage<ShoppingCart.Summary> reply =
            entityRef.ask(replyTo -> new ShoppingCart.Get(replyTo), timeout);
        return reply.thenCompose(
            cart -> { (2)
              List<Item> protoItems =
                  cart.items.entrySet().stream()
                      .map(
                          entry ->
                              Item.newBuilder()
                                  .setItemId(entry.getKey())
                                  .setQuantity(entry.getValue())
                                  .build())
                      .collect(Collectors.toList());
              log.info("Sending order of {} items for cart {}.", cart.items.size(), checkout.cartId);
              OrderRequest orderRequest =
                  OrderRequest.newBuilder().setCartId(checkout.cartId).addAllItems(protoItems).build();
              return orderService.order(orderRequest).thenApply(response -> done()); (3)
            });
      }
    }
    Scala
    src/main/scala/shopping/cart/SendOrderProjectionHandler.scala:
    package shopping.cart
    
    import scala.concurrent.ExecutionContext
    import scala.concurrent.Future
    
    import akka.Done
    import akka.actor.typed.ActorSystem
    import akka.cluster.sharding.typed.scaladsl.ClusterSharding
    import akka.projection.eventsourced.EventEnvelope
    import akka.projection.scaladsl.Handler
    import akka.util.Timeout
    import org.slf4j.LoggerFactory
    import shopping.order.proto.Item
    import shopping.order.proto.OrderRequest
    import shopping.order.proto.ShoppingOrderService
    
    class SendOrderProjectionHandler(
        system: ActorSystem[_],
        orderService: ShoppingOrderService) (1)
        extends Handler[EventEnvelope[ShoppingCart.Event]] {
      private val log = LoggerFactory.getLogger(getClass)
      private implicit val ec: ExecutionContext =
        system.executionContext
    
      private val sharding = ClusterSharding(system)
      implicit private val timeout: Timeout =
        Timeout.create(
          system.settings.config.getDuration("shopping-cart-service.ask-timeout"))
    
      override def process(
          envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = {
        envelope.event match {
          case checkout: ShoppingCart.CheckedOut =>
            sendOrder(checkout)
    
          case _ =>
            // this projection is only interested in CheckedOut events
            Future.successful(Done)
        }
    
      }
    
      private def sendOrder(checkout: ShoppingCart.CheckedOut): Future[Done] = {
        val entityRef =
          sharding.entityRefFor(ShoppingCart.EntityKey, checkout.cartId)
        entityRef.ask(ShoppingCart.Get).flatMap { cart => (2)
          val items =
            cart.items.iterator.map { case (itemId, quantity) =>
              Item(itemId, quantity)
            }.toList
          log.info(
            "Sending order of {} items for cart {}.",
            items.size,
            checkout.cartId)
          val orderReq = OrderRequest(checkout.cartId, items)
          orderService.order(orderReq).map(_ => Done) (3)
        }
      }
    
    }
1 ShoppingOrderService is the gRPC client
2 Retrieve the full shopping cart information from the entity. In the order we need to include the list of items and their quantities. That information is not included in the ShoppingCart.CheckedOut event, but we can retrieve it by asking the ShoppingCart entity for it.
3 Call the ShoppingOrderService. If the call to the ShoppingOrderService fails, the returned Future[Done] CompletionStage<Done> fails and the Projection is automatically restarted from the previously saved offset. This will result in retrying the call to the ShoppingOrderService. Since the Projection has at-least-once semantics, the ShoppingOrderService must be idempotent, that is, it must gracefully handle duplicate order attempts for the same cartId.

3. Initialize the Projection

The tagging of the events is already in place from when we created the query Projection. So, we just need to initialize it as follows:

  1. Place the initialization code of the Projection in an SendOrderProjection object class:

    Java
    src/main/java/shopping/cart/SendOrderProjection.java:
    package shopping.cart;
    
    import akka.actor.typed.ActorSystem;
    import akka.cluster.sharding.typed.ShardedDaemonProcessSettings;
    import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
    import akka.persistence.jdbc.query.javadsl.JdbcReadJournal;
    import akka.persistence.query.Offset;
    import akka.projection.ProjectionBehavior;
    import akka.projection.ProjectionId;
    import akka.projection.eventsourced.EventEnvelope;
    import akka.projection.eventsourced.javadsl.EventSourcedProvider;
    import akka.projection.javadsl.AtLeastOnceProjection;
    import akka.projection.javadsl.SourceProvider;
    import akka.projection.jdbc.javadsl.JdbcProjection;
    import java.util.Optional;
    import org.springframework.orm.jpa.JpaTransactionManager;
    import shopping.cart.repository.HibernateJdbcSession;
    import shopping.order.proto.ShoppingOrderService;
    
    public class SendOrderProjection {
    
      private SendOrderProjection() {}
    
      public static void init(
          ActorSystem<?> system,
          JpaTransactionManager transactionManager,
          ShoppingOrderService orderService) {
        ShardedDaemonProcess.get(system)
            .init(
                ProjectionBehavior.Command.class,
                "SendOrderProjection",
                ShoppingCart.TAGS.size(),
                index ->
                    ProjectionBehavior.create(
                        createProjectionsFor(system, transactionManager, orderService, index)),
                ShardedDaemonProcessSettings.create(system),
                Optional.of(ProjectionBehavior.stopMessage()));
      }
    
      private static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>>
          createProjectionsFor(
              ActorSystem<?> system,
              JpaTransactionManager transactionManager,
              ShoppingOrderService orderService,
              int index) {
        String tag = ShoppingCart.TAGS.get(index);
        SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
            EventSourcedProvider.eventsByTag(system, JdbcReadJournal.Identifier(), tag);
    
        return JdbcProjection.atLeastOnceAsync(
            ProjectionId.of("SendOrderProjection", tag),
            sourceProvider,
            () -> new HibernateJdbcSession(transactionManager),
            () -> new SendOrderProjectionHandler(system, orderService),
            system);
      }
    }
    Scala
    src/main/scala/shopping/cart/SendOrderProjection.scala:
    package shopping.cart
    
    import akka.actor.typed.ActorSystem
    import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
    import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
    import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
    import akka.persistence.query.Offset
    import akka.projection.ProjectionBehavior
    import akka.projection.ProjectionId
    import akka.projection.eventsourced.EventEnvelope
    import akka.projection.eventsourced.scaladsl.EventSourcedProvider
    import akka.projection.jdbc.scaladsl.JdbcProjection
    import akka.projection.scaladsl.AtLeastOnceProjection
    import akka.projection.scaladsl.SourceProvider
    import shopping.cart.repository.ScalikeJdbcSession
    import shopping.order.proto.ShoppingOrderService
    
    object SendOrderProjection {
    
      def init(system: ActorSystem[_], orderService: ShoppingOrderService): Unit = {
        ShardedDaemonProcess(system).init(
          name = "SendOrderProjection",
          ShoppingCart.tags.size,
          index =>
            ProjectionBehavior(createProjectionFor(system, orderService, index)),
          ShardedDaemonProcessSettings(system),
          Some(ProjectionBehavior.Stop))
      }
    
      private def createProjectionFor(
          system: ActorSystem[_],
          orderService: ShoppingOrderService,
          index: Int)
          : AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = {
        val tag = ShoppingCart.tags(index)
        val sourceProvider
            : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] =
          EventSourcedProvider.eventsByTag[ShoppingCart.Event](
            system = system,
            readJournalPluginId = JdbcReadJournal.Identifier,
            tag = tag)
    
        JdbcProjection.atLeastOnceAsync(
          projectionId = ProjectionId("SendOrderProjection", tag),
          sourceProvider,
          handler = () => new SendOrderProjectionHandler(system, orderService),
          sessionFactory = () => new ScalikeJdbcSession())(system)
      }
    
    }
  2. In Main, invoke the SendOrderProjection.init and create the gRPC client for the ShoppingOrderService like this:

    Java
    import akka.grpc.GrpcClientSettings;
    import shopping.order.proto.ShoppingOrderService;
    import shopping.order.proto.ShoppingOrderServiceClient;
    
    public class Main {
      public static void main(String[] args) {
        ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingCartService");
        try {
          init(system, orderServiceClient(system));
        } catch (Exception e) {
          logger.error("Terminating due to initialization failure.", e);
          system.terminate();
        }
      }
    
      public static void init(ActorSystem<Void> system, ShoppingOrderService orderService) {
        SendOrderProjection.init(system, transactionManager, orderService); (1)
      }
    
      static ShoppingOrderService orderServiceClient(ActorSystem<?> system) { (2)
        GrpcClientSettings orderServiceClientSettings =
            GrpcClientSettings.connectToServiceAt(
                    system.settings().config().getString("shopping-order-service.host"),
                    system.settings().config().getInt("shopping-order-service.port"),
                    system)
                .withTls(false);
    
        return ShoppingOrderServiceClient.create(orderServiceClientSettings, system);
      }
    1 This example is only showing the new SendOrderProjection.init here, additional initialization from previous steps should be kept.
    2 The reason for placing the initialization of orderServiceClient in a method is that tests can then replace it with a stub implementation.
    Scala
    import shopping.order.proto.{ ShoppingOrderService, ShoppingOrderServiceClient }
    import akka.grpc.GrpcClientSettings
    
    object Main {
    
      val logger = LoggerFactory.getLogger("shopping.cart.Main")
    
      def main(args: Array[String]): Unit = {
        val system =
          ActorSystem[Nothing](Behaviors.empty, "ShoppingCartService")
        try {
          val orderService = orderServiceClient(system)
          init(system, orderService)
        } catch {
          case NonFatal(e) =>
            logger.error("Terminating due to initialization failure.", e)
            system.terminate()
        }
      }
    
      def init(system: ActorSystem[_], orderService: ShoppingOrderService): Unit = {
        SendOrderProjection.init(system, orderService) (1)
      }
    
      protected def orderServiceClient( (2)
          system: ActorSystem[_]): ShoppingOrderService = {
        val orderServiceClientSettings =
          GrpcClientSettings
            .connectToServiceAt(
              system.settings.config.getString("shopping-order-service.host"),
              system.settings.config.getInt("shopping-order-service.port"))(system)
            .withTls(false)
        ShoppingOrderServiceClient(orderServiceClientSettings)(system)
      }
    1 This example is only showing the new SendOrderProjection.init here, additional initialization from previous steps should be kept.
    2 The reason for placing the initialization of orderServiceClient in a method is that tests can then replace it with a stub implementation.
  3. The gRPC client is using service discovery to locate the ShoppingOrderService. For local development add the following to src/main/resources/local-shared.conf, which is loaded when running locally:

    shopping-order-service.host = "localhost"
    shopping-order-service.port = 8301

4. Run locally

Follow these steps to run locally and exercise the new Projection and service:

  1. Start PostgresSQL and Kafka, unless they are already running:

    docker-compose up -d
  2. Run the shopping-order-service with:

    # make sure to compile before running exec:exec
    mvn compile exec:exec -DAPP_CONFIG=local1.conf
    sbt -Dconfig.resource=local1.conf run
  3. Keep the shopping-order-service running, and in another terminal run the shopping-cart-service with:

    # make sure to compile before running exec:exec
    mvn compile exec:exec -DAPP_CONFIG=local1.conf
    sbt -Dconfig.resource=local1.conf run

4.1. Exercise the service

Use grpcurl to exercise the service:

  1. Try the new order service directly (on port 8301):

    grpcurl -d '{"cartId":"cart1", "items":[{"itemId":"socks", "quantity":3}, {"itemId":"t-shirt", "quantity":2}]}' -plaintext 127.0.0.1:8301 shoppingorder.ShoppingOrderService.Order
  2. Use the checkout in the shopping cart service with grpcurl (note the different port number):

    grpcurl -d '{"cartId":"cart1", "itemId":"scissors", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
    grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout
  3. In the terminal of the shopping-order-service you should see the log of the order:

    Order 12 items from cart cart1

4.2. Stop the service

When finished:

  1. Stop the shopping-cart-service and shopping-order-service with ctrl-c.

  2. Stop PostgresSQL and Kafka with:

    docker-compose stop
    The following steps for cloud deployment are optional. If you are only running locally, you have completed the tutorial Congratulations.

5. Run in Kubernetes

Create a Kubernetes cluster and install the Akka Operator if you haven’t already.

5.1. Build Docker image

Create a Docker repository and authenticate Docker.

GCP

Follow the instructions in Using Container Registry with Google Cloud new tab to deploy Docker images on GCP’s container registry.

AWS

Follow the instructions in Amazon Elastic Container Registry to deploy Docker images on AWS’s container registry.

5.2. Additional steps for Docker and AWS

If you are using AWS, you will also need to complete the following procedures.

Build and publish the Docker images for both shopping-cart-service and shopping-order-service.

Java
mvn -DskipTests -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com clean package docker:push
Scala
sbt -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com docker:publish

Take note of the image tag as displayed by the docker:publish docker:push command.

Java
DOCKER> Tagging image shopping-cart-service:20201209-135004-363ae2b successful!
Scala
[info] Successfully tagged shopping-cart-service:20201209-135004-363ae2b

5.3. Update the deployment descriptor

Update the shopping-cart-service/kubernetes/shopping-cart-service-cr.yml and shopping-order-service/kubernetes/shopping-order-service-cr.yml deployment descriptors with the respective image tag produced on the previous step.

5.4. Apply to Kubernetes

Apply both shopping-cart-service/kubernetes/shopping-cart-service-cr.yml and shopping-order-service/kubernetes/shopping-order-service-cr.yml to Kubernetes:

kubectl apply -f shopping-cart-service/kubernetes/shopping-cart-service-cr.yml
kubectl apply -f shopping-order-service/kubernetes/shopping-order-service-cr.yml

You can see progress by viewing the status:

kubectl get akkamicroservices

See troubleshooting deployment status for more details.

5.5. Exercise the service in Kubernetes

  1. You can list the pods with:

    kubectl get pods
  2. Inspect logs from both services from a separate terminal window:

    kubectl logs -f <shopping-cart-service pod name from above>
    kubectl logs -f <shopping-order-service pod name from above>
  3. Add port forwarding for the shopping-cart-service gRPC endpoint from a separate terminal:

    kubectl port-forward svc/shopping-cart-service-grpc 8101:8101

Use grpcurl to exercise the service:

  1. Use the checkout in the shopping cart service with grpcurl:

    grpcurl -d '{"cartId":"cart1", "itemId":"scissors", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
    grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout
  2. In the terminal of the shopping-order-service you should see the log of the order:

    Order 15 items from cart cart1

Learn more

Congratulations, you finished the tutorial! The examples of gRPC, event sourcing, and Projections should be helpful when you create your own Reactive Microservices. The following sections go into more detail: