Section 3: Create the gRPC Cart service

We will create the gRPC ShoppingCartService for the Cart Service shown in the architectural overview below. The motivation for using gRPC as the inter-service protocol is explained in Akka gRPC concepts. The Introduction to Akka gRPC video new tab is also a good starting point for learning Akka gRPC.

Example gRPC service

First, we will create the ShoppingCartService with a single operation that adds items to a cart. The implementation will only log the calls for now. Later, we’ll expand the service with more operations and real implementations. On this page you will learn how to:

  • define the interface of a gRPC service

  • implement the service interface

  • initialize and run an HTTP server and the service locally

  • interact with the service from the command line

Akka Workshop

The first video of the Akka Workshop Series new tab covers creating the gRPC series.

Source downloads

If you prefer to simply view and run the example, download a zip file containing the completed code:

Java
  • Source with the initial project template.

  • Source with the steps on this page completed.

Scala
  • Source with the initial project template.

  • Source with the steps on this page completed.

1. Create service definition

gRPC takes a schema-first approach, where the protocol is declared in a Protobuf service descriptor. From the service descriptor the source code for the messages, client and server stubs are generated.

First, create a protobuf service descriptor file: src/main/protobuf/ShoppingCartService.proto. In it, define the service interface as shown below:

syntax = "proto3";

option java_multiple_files = true;
option java_package = "shopping.cart.proto";

package shoppingcart;

// gRPC definition for ShoppingCartService

service ShoppingCartService { (1)
    rpc AddItem (AddItemRequest) returns (Cart) {}
}

message AddItemRequest { (2)
    string cartId = 1;
    string itemId = 2;
    int32 quantity = 3;
}

message Cart {
    repeated Item items = 1;
}

message Item {
    string itemId = 1;
    int32 quantity = 2;
}
1 Defines the requests a client may send to the service in the service definition.
2 Describes the request to add an item to the shopping cart.

2. Generate code

The Akka gRPC plugin will generate code from the service descriptor when you compile the project. Run sbt mvn to compile:

sbt compile
mvn compile
You might have to synchronize the build.sbt pom.xml in IntelliJ after this so that it adds the source directories for the generated code.

3. Implement the service

Let’s implement the ShoppingCartService trait interface. Create a ShoppingCartServiceImpl class that extends the generated ShoppingCartService trait interface. Implement the addItem method from the trait interface by logging the call and return a successful Future CompletionStage.

Java
src/main/java/shopping/cart/ShoppingCartServiceImpl.java:
package shopping.cart;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shopping.cart.proto.AddItemRequest;
import shopping.cart.proto.Cart;
import shopping.cart.proto.Item;
import shopping.cart.proto.ShoppingCartService;

public final class ShoppingCartServiceImpl implements ShoppingCartService {

  private final Logger logger = LoggerFactory.getLogger(getClass());

  @Override
  public CompletionStage<Cart> addItem(AddItemRequest in) { (1)
    logger.info("addItem {} to cart {}", in.getItemId(), in.getCartId());
    Item item = Item.newBuilder().setItemId(in.getItemId()).setQuantity(in.getQuantity()).build();
    Cart cart = Cart.newBuilder().addItems(item).build();
    return CompletableFuture.completedFuture(cart);
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala:
package shopping.cart

import scala.concurrent.Future

import org.slf4j.LoggerFactory

class ShoppingCartServiceImpl extends proto.ShoppingCartService {

  private val logger = LoggerFactory.getLogger(getClass)

  override def addItem(in: proto.AddItemRequest): Future[proto.Cart] = { (1)
    logger.info("addItem {} to cart {}", in.itemId, in.cartId)
    Future.successful(
      proto.Cart(items = List(proto.Item(in.itemId, in.quantity))))
  }

}
1 The method corresponding to the rpc AddItem in the service definition. Defined in the generated proto.ShoppingCartService.

4. Initialize HTTP Server

We will run the gRPC service implementation in an Akka HTTP server. Add the following server initialization code in a ShoppingCartServer object class:

Java
src/main/java/shopping/cart/ShoppingCartServer.java:
package shopping.cart;

import akka.actor.typed.ActorSystem;
import akka.grpc.javadsl.ServerReflection;
import akka.grpc.javadsl.ServiceHandler;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.function.Function;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
import shopping.cart.proto.ShoppingCartService;
import shopping.cart.proto.ShoppingCartServiceHandlerFactory;

public final class ShoppingCartServer {

  private ShoppingCartServer() {}

  static void start(String host, int port, ActorSystem<?> system, ShoppingCartService grpcService) {
    @SuppressWarnings("unchecked")
    Function<HttpRequest, CompletionStage<HttpResponse>> service =
        ServiceHandler.concatOrNotFound(
            ShoppingCartServiceHandlerFactory.create(grpcService, system),
            // ServerReflection enabled to support grpcurl without import-path and proto parameters
            ServerReflection.create( (1)
                Collections.singletonList(ShoppingCartService.description), system));

    CompletionStage<ServerBinding> bound =
        Http.get(system).newServerAt(host, port).bind(service::apply); (2)

    bound.whenComplete(
        (binding, ex) -> { (3)
          if (binding != null) {
            binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system);
            InetSocketAddress address = binding.localAddress();
            system
                .log()
                .info(
                    "Shopping online at gRPC server {}:{}",
                    address.getHostString(),
                    address.getPort());
          } else {
            system.log().error("Failed to bind gRPC endpoint, terminating system", ex);
            system.terminate();
          }
        });
  }
}
Scala
src/main/scala/shopping/cart/ShoppingCartServer.scala
package shopping.cart

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success

import akka.actor.typed.ActorSystem
import akka.grpc.scaladsl.ServerReflection
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse

object ShoppingCartServer {

  def start(
      interface: String,
      port: Int,
      system: ActorSystem[_],
      grpcService: proto.ShoppingCartService): Unit = {
    implicit val sys: ActorSystem[_] = system
    implicit val ec: ExecutionContext =
      system.executionContext

    val service: HttpRequest => Future[HttpResponse] =
      ServiceHandler.concatOrNotFound(
        proto.ShoppingCartServiceHandler.partial(grpcService),
        // ServerReflection enabled to support grpcurl without import-path and proto parameters
        ServerReflection.partial(List(proto.ShoppingCartService))
      ) (1)

    val bound =
      Http()
        .newServerAt(interface, port)
        .bind(service)
        .map(_.addToCoordinatedShutdown(3.seconds)) (2)

    bound.onComplete { (3)
      case Success(binding) =>
        val address = binding.localAddress
        system.log.info(
          "Shopping online at gRPC server {}:{}",
          address.getHostString,
          address.getPort)
      case Failure(ex) =>
        system.log.error("Failed to bind gRPC endpoint, terminating system", ex)
        system.terminate()
    }
  }

}
1 Concatenate the service implementation with ServerReflection for better grcurl support.
2 Start the HTTP server for the gRPC service.
3 Some logging at startup.

5. Main method

To run the service we need a class with a main method. Edit the Main class that is included from the template project. It should initialize the ActorSystem and the ShoppingCartServer like this:

Java
package shopping.cart;

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.management.javadsl.AkkaManagement;
import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shopping.cart.proto.ShoppingCartService;

public class Main {

  private static final Logger logger = LoggerFactory.getLogger(Main.class);

  public static void main(String[] args) {
    ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingCartService"); (1)
    try {
      init(system);
    } catch (Exception e) {
      logger.error("Terminating due to initialization failure.", e);
      system.terminate();
    }
  }

  public static void init(ActorSystem<Void> system) {
    AkkaManagement.get(system).start(); (2)
    ClusterBootstrap.get(system).start();

    Config config = system.settings().config();
    String grpcInterface = config.getString("shopping-cart-service.grpc.interface");
    int grpcPort = config.getInt("shopping-cart-service.grpc.port");
    ShoppingCartService grpcService = new ShoppingCartServiceImpl();
    ShoppingCartServer.start(grpcInterface, grpcPort, system, grpcService); (3)
  }
}
Scala
package shopping.cart

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorSystem
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import org.slf4j.LoggerFactory
import scala.util.control.NonFatal

object Main {

  val logger = LoggerFactory.getLogger("shopping.cart.Main")

  def main(args: Array[String]): Unit = {
    val system =
      ActorSystem[Nothing](Behaviors.empty, "ShoppingCartService") (1)
    try {
      init(system)
    } catch {
      case NonFatal(e) =>
        logger.error("Terminating due to initialization failure.", e)
        system.terminate()
    }
  }

  def init(system: ActorSystem[_]): Unit = {
    AkkaManagement(system).start() (2)
    ClusterBootstrap(system).start()

    val grpcInterface =
      system.settings.config.getString("shopping-cart-service.grpc.interface")
    val grpcPort =
      system.settings.config.getInt("shopping-cart-service.grpc.port")
    val grpcService = new ShoppingCartServiceImpl
    ShoppingCartServer.start(
      grpcInterface,
      grpcPort,
      system,
      grpcService
    ) (3)
  }

}
1 Start an ActorSystem with the Main actor Behavior.
2 Initialization of Akka Management that is used for forming the Akka Cluster.
3 Initialize the gRPC server. This is the code you should add to the existing Main.

The grpc.port configuration is defined in local1.conf, which is included in the generated template project.

6. Run locally

You can run this service with:

sbt -Dconfig.resource=local1.conf run
# make sure to compile before running exec:exec
mvn compile exec:exec -DAPP_CONFIG=local1.conf

Note the log output from the ShoppingCartServer

Shopping online at gRPC server 127.0.0.1:8101

6.1. Exercise the service

Use grpcurl to exercise the service:

  1. Add 3 socks to a cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
  2. Add 2 t-shirts to the same cart:

    grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem

Note the logging from the ShoppingCartServiceImpl in the console.

6.2. Stop the service

You can stop the service with ctrl-c.