Section 3: Create the gRPC Cart service

We will create the gRPC ShoppingCartService for the Cart Service shown in the architectural overview below. The motivation for using gRPC as the inter-service protocol is explained in Akka gRPC concepts. The Introduction to Akka gRPC video new tab is also a good starting point for learning Akka gRPC.

Example gRPC service

First, we will create the ShoppingCartService with a single operation that adds items to a cart. The implementation will only log the calls for now. Later, we’ll expand the service with more operations and real implementations. On this page you will learn how to:

  • define the interface of a gRPC service

  • implement the service interface

  • initialize and run an HTTP server and the service locally

  • interact with the service from the command line

  • deploy to the cloud

Akka Workshop

The first video of the Akka Workshop Series new tab covers creating the gRPC series.

Source downloads

If you prefer to simply view and run the example, download a zip file containing the completed code:

Java
  • Source with the initial project template.

  • Source with the steps on this page completed.

Scala
  • Source with the initial project template.

  • Source with the steps on this page completed.

1. Create service definition

gRPC takes a schema-first approach, where the protocol is declared in a Protobuf service descriptor. From the service descriptor the source code for the messages, client and server stubs are generated.

First, create a protobuf service descriptor file: src/main/protobuf/ShoppingCartService.proto. In it, define the service interface as shown below:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "shopping.cart.proto";

package shoppingcart;

// gRPC definition for ShoppingCartService

service ShoppingCartService { (1)
    rpc AddItem (AddItemRequest) returns (Cart) {}
}

message AddItemRequest { (2)
    string cartId = 1;
    string itemId = 2;
    int32 quantity = 3;
}

message Cart {
    repeated Item items = 1;
}

message Item {
    string itemId = 1;
    int32 quantity = 2;
}
1 Defines the requests a client may send to the service in the service definition.
2 Describes the request to add an item to the shopping cart.

2. Generate code

The Akka gRPC plugin will generate code from the service descriptor when you compile the project. Run sbt mvn to compile:

sbt compile
mvn compile
You might have to synchronize the build.sbt pom.xml in IntelliJ after this so that it adds the source directories for the generated code.

3. Implement the service

Let’s implement the ShoppingCartService trait interface. Create a ShoppingCartServiceImpl class that extends the generated ShoppingCartService trait interface. Implement the addItem method from the trait interface by logging the call and return a successful Future CompletionStage.

Java
src/main/java/shopping/cart/ShoppingCartServiceImpl.java:
package shopping.cart;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shopping.cart.proto.AddItemRequest;
import shopping.cart.proto.Cart;
import shopping.cart.proto.Item;
import shopping.cart.proto.ShoppingCartService;

public final class ShoppingCartServiceImpl implements ShoppingCartService {

  private final Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public CompletionStage<Cart> addItem(AddItemRequest in) { (1)
    logger.info("addItem {} to cart {}", in.getItemId(), in.getCartId());
    Item item = Item.newBuilder().setItemId(in.getItemId()).setQuantity(in.getQuantity()).build();
    Cart cart = Cart.newBuilder().addItems(item).build();
    return CompletableFuture.completedFuture(cart);
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala:
package shopping.cart

import scala.concurrent.Future

import org.slf4j.LoggerFactory

class ShoppingCartServiceImpl extends proto.ShoppingCartService {

  private val logger = LoggerFactory.getLogger(getClass)

  override def addItem(in: proto.AddItemRequest): Future[proto.Cart] = { (1)
    logger.info("addItem {} to cart {}", in.itemId, in.cartId)
    Future.successful(
      proto.Cart(items = List(proto.Item(in.itemId, in.quantity))))
  }

}
1 The method corresponding to the rpc AddItem in the service definition. Defined in the generated proto.ShoppingCartService.

4. Initialize HTTP Server

We will run the gRPC service implementation in an Akka HTTP server. Add the following server initialization code in a ShoppingCartServer object class:

Java
src/main/java/shopping/cart/ShoppingCartServer.java:
package shopping.cart;

import akka.actor.typed.ActorSystem;
import akka.grpc.javadsl.ServerReflection;
import akka.grpc.javadsl.ServiceHandler;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.function.Function;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
import shopping.cart.proto.ShoppingCartService;
import shopping.cart.proto.ShoppingCartServiceHandlerFactory;

public final class ShoppingCartServer {

  private ShoppingCartServer() {}

  static void start(String host, int port, ActorSystem<?> system, ShoppingCartService grpcService) {
    @SuppressWarnings("unchecked")
    Function<HttpRequest, CompletionStage<HttpResponse>> service =
        ServiceHandler.concatOrNotFound(
            ShoppingCartServiceHandlerFactory.create(grpcService, system),
            // ServerReflection enabled to support grpcurl without import-path and proto parameters
            ServerReflection.create( (1)
                Collections.singletonList(ShoppingCartService.description), system));

    CompletionStage<ServerBinding> bound =
        Http.get(system).newServerAt(host, port).bind(service::apply); (2)

    bound.whenComplete(
        (binding, ex) -> { (3)
          if (binding != null) {
            binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system);
            InetSocketAddress address = binding.localAddress();
            system
                .log()
                .info(
                    "Shopping online at gRPC server {}:{}",
                    address.getHostString(),
                    address.getPort());
          } else {
            system.log().error("Failed to bind gRPC endpoint, terminating system", ex);
            system.terminate();
          }
        });
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCartServer.scala
package shopping.cart

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success

import akka.actor.typed.ActorSystem
import akka.grpc.scaladsl.ServerReflection
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse

object ShoppingCartServer {

  def start(
      interface: String,
      port: Int,
      system: ActorSystem[_],
      grpcService: proto.ShoppingCartService): Unit = {
    implicit val sys: ActorSystem[_] = system
    implicit val ec: ExecutionContext =
      system.executionContext

    val service: HttpRequest => Future[HttpResponse] =
      ServiceHandler.concatOrNotFound(
        proto.ShoppingCartServiceHandler.partial(grpcService),
        // ServerReflection enabled to support grpcurl without import-path and proto parameters
        ServerReflection.partial(List(proto.ShoppingCartService))
      ) (1)

    val bound =
      Http()
        .newServerAt(interface, port)
        .bind(service)
        .map(_.addToCoordinatedShutdown(3.seconds)) (2)

    bound.onComplete { (3)
      case Success(binding) =>
        val address = binding.localAddress
        system.log.info(
          "Shopping online at gRPC server {}:{}",
          address.getHostString,
          address.getPort)
      case Failure(ex) =>
        system.log.error("Failed to bind gRPC endpoint, terminating system", ex)
        system.terminate()
    }
  }

}
1 Concatenate the service implementation with ServerReflection for better grcurl support.
2 Start the HTTP server for the gRPC service.
3 Some logging at startup.

5. Main method

To run the service we need a class with a main method. Edit the Main class that is included from the template project. It should initialize the ActorSystem and the ShoppingCartServer like this:

Java
package shopping.cart;

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.management.javadsl.AkkaManagement;
import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shopping.cart.proto.ShoppingCartService;

public class Main {

  private static final Logger logger = LoggerFactory.getLogger(Main.class);

  public static void main(String[] args) {
    ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingCartService"); (1)
    try {
      init(system);
    } catch (Exception e) {
      logger.error("Terminating due to initialization failure.", e);
      system.terminate();
    }
  }

  public static void init(ActorSystem<Void> system) {
    AkkaManagement.get(system).start(); (2)
    ClusterBootstrap.get(system).start();

    Config config = system.settings().config();
    String grpcInterface = config.getString("shopping-cart-service.grpc.interface");
    int grpcPort = config.getInt("shopping-cart-service.grpc.port");
    ShoppingCartService grpcService = new ShoppingCartServiceImpl();
    ShoppingCartServer.start(grpcInterface, grpcPort, system, grpcService); (3)
  }
}
Scala
package shopping.cart

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorSystem
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import org.slf4j.LoggerFactory
import scala.util.control.NonFatal

object Main {

  val logger = LoggerFactory.getLogger("shopping.cart.Main")

  def main(args: Array[String]): Unit = {
    val system =
      ActorSystem[Nothing](Behaviors.empty, "ShoppingCartService") (1)
    try {
      init(system)
    } catch {
      case NonFatal(e) =>
        logger.error("Terminating due to initialization failure.", e)
        system.terminate()
    }
  }

  def init(system: ActorSystem[_]): Unit = {
    AkkaManagement(system).start() (2)
    ClusterBootstrap(system).start()

    val grpcInterface =
      system.settings.config.getString("shopping-cart-service.grpc.interface")
    val grpcPort =
      system.settings.config.getInt("shopping-cart-service.grpc.port")
    val grpcService = new ShoppingCartServiceImpl
    ShoppingCartServer.start(
      grpcInterface,
      grpcPort,
      system,
      grpcService
    ) (3)
  }

}
1 Start an ActorSystem with the Main actor Behavior.
2 Initialization of Akka Management that is used for forming the Akka Cluster.
3 Initialize the gRPC server. This is the code you should add to the existing Main.

The grpc.port configuration is defined in local1.conf, which is included in the generated template project.

6. Run locally

You can run this service with:

sbt -Dconfig.resource=local1.conf run
# make sure to compile before running exec:exec
mvn compile exec:exec -DAPP_CONFIG=local1.conf

Note the log output from the ShoppingCartServer

Shopping online at gRPC server 127.0.0.1:8101

6.1. Exercise the service

Use grpcurl to exercise the service:

  1. Add 3 socks to a cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Add 2 t-shirts to the same cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem

Note the logging from the ShoppingCartServiceImpl in the console.

6.2. Stop the service

You can stop the service with ctrl-c.

The following steps for cloud deployment are optional. If you are only running locally, you can skip to the next section of the tutorial.

7. Run in Kubernetes

We suggest that you read the Akka Cloud Platform section to get an understanding of how deployment with the Akka Cloud Platform works.

7.1. Akka Workshop

The fifth video of the Akka Workshop Series new tab walks you through the process of running in Kubernetes on AWS or GCP. It will take you step-by-step to aid you in understanding the process.

Before following the steps below, create a Kubernetes cluster and install the Akka Operator. Used the instructions below for:

GCP

Follow the instructions in Using Container Registry with Google Cloud new tab to deploy Docker images on GCP’s container registry. We suggest that using the Container Registry Quickstart new tab if you aren’t already familiar with the registry. You may also use the Artifact Registry new tab. There is guidance on transitioning new tab as well as a Docker new tab quickstart for the Artifact Registry.

AWS

Follow the instructions in Amazon Elastic Container Registry to deploy Docker images on AWS’s container registry.

7.2. Additional steps for Docker and AWS

If you are using AWS, you will also need to complete the following procedures.

The build.sbt and project/plugins.sbt pom.xml contains build plugin settings for building a Docker image.

Java
mvn -DskipTests -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com clean package docker:push
Scala
sbt -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com docker:publish

Take note of the image tag as displayed by the docker:publish docker:push command.

Java
DOCKER> Tagging image shopping-cart-service:20201209-135004-363ae2b successful!
Scala
[info] Successfully tagged shopping-cart-service:20201209-135004-363ae2b

The version number of the image is derived from the git commit. If you see error "Given Docker name 'shopping-cart-service:${git.commit.time}-${git.commit.id.abbrev}' is invalid" it is because the directory is not a git repository. You can enable git for the directory with the following commands.

git init
git add .
git commit -m "some descriptive commit message"

Alternatively the version can be specified with -Dversion.number=0.1-SNAPSHOT.

mvn -Dversion.number=0.1-SNAPSHOT -DskipTests -Ddocker.registry=803424716218.dkr.ecr.eu-central-1.amazonaws.com clean package docker:push

The version number of the image, as well as from the project itself, is derived from the git commit using sbt-dynver new tab. If your project is not being versioned with git, you will see a fixed version number (eg: HEAD-20210119-1515). You can enable git for the directory with the following commands.

git init
git add .
git commit -m "some descriptive commit message"

Alternatively you can add the version to your build.sbt file and maintain it manually.

version := "1.0.0"

7.3. Update the deployment descriptor

In kubernetes/shopping-cart-service-cr.yml you will find the deployment descriptor of the AkkaMicroservice that the Akka Operator will use when deploying the application in Kubernetes.

kubernetes/shopping-cart-service-cr.yml:
apiVersion: "v1"
kind: "Namespace"
metadata:
  name: "shopping"
---
apiVersion: akka.lightbend.com/v1
kind: AkkaMicroservice
metadata:
  name: shopping-cart-service
  namespace: "shopping"
spec:
  replicas: 1
  image: <docker-registry>/shopping-cart-service:<tag> (1)
  javaOptions: "-Xlog:gc -XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"
  resources:
    limits:
      memory: "2Gi"
    requests:
      memory: "2Gi"
      cpu: "1"
1 Replace <docker-registry> with your docker registry address and update the image reference with the image tag from the output of the Docker build above, for example: 803424716218.dkr.ecr.eu-central-1.amazonaws.com/shopping-cart-service:20201209-135004-363ae2b.

7.4. Apply to Kubernetes

Deploy the shopping-cart-service-cr.yml to Kubernetes:

kubectl apply -f kubernetes/shopping-cart-service-cr.yml

It will also create the namespace shopping

It can be convenient to set that namespace as current instead of specifying the --namespace parameter in all kubectl commands.

kubectl config set-context --current --namespace=shopping

The Akka Operator will notice the deployment descriptor and deploy the shopping-cart-service. You can see progress by viewing the status:

kubectl get akkamicroservices/shopping-cart-service

See troubleshooting deployment status for more details.

7.5. Exercise the service in Kubernetes

  1. You can list the pods with:

    kubectl get pods
  2. Inspect logs from a separate terminal window:

    kubectl logs -f <shopping-cart-service pod name from above>
  3. Add port forwarding for the gRPC endpoint from a separate terminal:

    kubectl port-forward svc/shopping-cart-service-grpc 8101:8101

Use grpcurl to exercise the service:

  1. Add 3 socks to a cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Add 2 t-shirts to the same cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem

Note the logging from the ShoppingCartServiceImpl in the console that is running kubectl logs -f.