Section 3: Create the gRPC Cart service
We will create the gRPC ShoppingCartService
for the Cart Service shown in the architectural overview below. The motivation for using gRPC as the inter-service protocol is explained in Akka gRPC concepts. The Introduction to Akka gRPC video is also a good starting point for learning Akka gRPC.
First, we will create the ShoppingCartService
with a single operation that adds items to a cart. The implementation will only log the calls for now. Later, we’ll expand the service with more operations and real implementations. On this page you will learn how to:
-
define the interface of a gRPC service
-
implement the service interface
-
initialize and run an HTTP server and the service locally
-
interact with the service from the command line
Akka Workshop
The first video of the
Akka Workshop Series covers creating the gRPC series.
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. Create service definition
gRPC takes a schema-first approach, where the protocol is declared in a Protobuf service descriptor. From the service descriptor the source code for the messages, client and server stubs are generated.
First, create a protobuf service descriptor file: src/main/protobuf/ShoppingCartService.proto
. In it, define the service interface as shown below:
syntax = "proto3";
option java_multiple_files = true;
option java_package = "shopping.cart.proto";
package shoppingcart;
// gRPC definition for ShoppingCartService
service ShoppingCartService { (1)
rpc AddItem (AddItemRequest) returns (Cart) {}
}
message AddItemRequest { (2)
string cartId = 1;
string itemId = 2;
int32 quantity = 3;
}
message Cart {
repeated Item items = 1;
}
message Item {
string itemId = 1;
int32 quantity = 2;
}
1 | Defines the requests a client may send to the service in the service definition. |
2 | Describes the request to add an item to the shopping cart. |
2. Generate code
The Akka gRPC plugin will generate code from the service descriptor when you compile the project. Run sbt
mvn
to compile:
sbt compile
mvn compile
You might have to synchronize the build.sbt pom.xml in IntelliJ after this so that it adds the source directories for the generated code.
|
3. Implement the service
Let’s implement the ShoppingCartService
trait interface. Create a ShoppingCartServiceImpl
class that extends the generated ShoppingCartService
trait interface. Implement the addItem
method from the trait interface by logging the call and return a successful Future
CompletionStage
.
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java:
package shopping.cart; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.AddItemRequest; import shopping.cart.proto.Cart; import shopping.cart.proto.Item; import shopping.cart.proto.ShoppingCartService; public final class ShoppingCartServiceImpl implements ShoppingCartService { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<Cart> addItem(AddItemRequest in) { (1) logger.info("addItem {} to cart {}", in.getItemId(), in.getCartId()); Item item = Item.newBuilder().setItemId(in.getItemId()).setQuantity(in.getQuantity()).build(); Cart cart = Cart.newBuilder().addItems(item).build(); return CompletableFuture.completedFuture(cart); } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala:
package shopping.cart import scala.concurrent.Future import org.slf4j.LoggerFactory class ShoppingCartServiceImpl extends proto.ShoppingCartService { private val logger = LoggerFactory.getLogger(getClass) override def addItem(in: proto.AddItemRequest): Future[proto.Cart] = { (1) logger.info("addItem {} to cart {}", in.itemId, in.cartId) Future.successful( proto.Cart(items = List(proto.Item(in.itemId, in.quantity)))) } }
1 | The method corresponding to the rpc AddItem in the service definition. Defined in the generated proto.ShoppingCartService . |
4. Initialize HTTP Server
We will run the gRPC service implementation in an Akka HTTP server. Add the following server initialization code in a ShoppingCartServer
object
class
:
- Java
-
src/main/java/shopping/cart/ShoppingCartServer.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.grpc.javadsl.ServerReflection; import akka.grpc.javadsl.ServiceHandler; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.japi.function.Function; import java.net.InetSocketAddress; import java.time.Duration; import java.util.Collections; import java.util.concurrent.CompletionStage; import shopping.cart.proto.ShoppingCartService; import shopping.cart.proto.ShoppingCartServiceHandlerFactory; public final class ShoppingCartServer { private ShoppingCartServer() {} static void start(String host, int port, ActorSystem<?> system, ShoppingCartService grpcService) { @SuppressWarnings("unchecked") Function<HttpRequest, CompletionStage<HttpResponse>> service = ServiceHandler.concatOrNotFound( ShoppingCartServiceHandlerFactory.create(grpcService, system), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.create( (1) Collections.singletonList(ShoppingCartService.description), system)); CompletionStage<ServerBinding> bound = Http.get(system).newServerAt(host, port).bind(service::apply); (2) bound.whenComplete( (binding, ex) -> { (3) if (binding != null) { binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system); InetSocketAddress address = binding.localAddress(); system .log() .info( "Shopping online at gRPC server {}:{}", address.getHostString(), address.getPort()); } else { system.log().error("Failed to bind gRPC endpoint, terminating system", ex); system.terminate(); } }); } }
- Scala
-
src/main/scala/shopping/cart/ShoppingCartServer.scala
package shopping.cart import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import akka.actor.typed.ActorSystem import akka.grpc.scaladsl.ServerReflection import akka.grpc.scaladsl.ServiceHandler import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse object ShoppingCartServer { def start( interface: String, port: Int, system: ActorSystem[_], grpcService: proto.ShoppingCartService): Unit = { implicit val sys: ActorSystem[_] = system implicit val ec: ExecutionContext = system.executionContext val service: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound( proto.ShoppingCartServiceHandler.partial(grpcService), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.partial(List(proto.ShoppingCartService)) ) (1) val bound = Http() .newServerAt(interface, port) .bind(service) .map(_.addToCoordinatedShutdown(3.seconds)) (2) bound.onComplete { (3) case Success(binding) => val address = binding.localAddress system.log.info( "Shopping online at gRPC server {}:{}", address.getHostString, address.getPort) case Failure(ex) => system.log.error("Failed to bind gRPC endpoint, terminating system", ex) system.terminate() } } }
1 | Concatenate the service implementation with ServerReflection for better grcurl support. |
2 | Start the HTTP server for the gRPC service. |
3 | Some logging at startup. |
5. Main method
To run the service we need a class with a main
method. Edit the Main
class that is included from the template project. It should initialize the ActorSystem
and the ShoppingCartServer
like this:
- Java
-
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.management.cluster.bootstrap.ClusterBootstrap; import akka.management.javadsl.AkkaManagement; import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.ShoppingCartService; public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingCartService"); (1) try { init(system); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system) { AkkaManagement.get(system).start(); (2) ClusterBootstrap.get(system).start(); Config config = system.settings().config(); String grpcInterface = config.getString("shopping-cart-service.grpc.interface"); int grpcPort = config.getInt("shopping-cart-service.grpc.port"); ShoppingCartService grpcService = new ShoppingCartServiceImpl(); ShoppingCartServer.start(grpcInterface, grpcPort, system, grpcService); (3) } }
- Scala
-
package shopping.cart import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.ActorSystem import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement import org.slf4j.LoggerFactory import scala.util.control.NonFatal object Main { val logger = LoggerFactory.getLogger("shopping.cart.Main") def main(args: Array[String]): Unit = { val system = ActorSystem[Nothing](Behaviors.empty, "ShoppingCartService") (1) try { init(system) } catch { case NonFatal(e) => logger.error("Terminating due to initialization failure.", e) system.terminate() } } def init(system: ActorSystem[_]): Unit = { AkkaManagement(system).start() (2) ClusterBootstrap(system).start() val grpcInterface = system.settings.config.getString("shopping-cart-service.grpc.interface") val grpcPort = system.settings.config.getInt("shopping-cart-service.grpc.port") val grpcService = new ShoppingCartServiceImpl ShoppingCartServer.start( grpcInterface, grpcPort, system, grpcService ) (3) } }
1 | Start an ActorSystem with the Main actor Behavior . |
2 | Initialization of Akka Management that is used for forming the Akka Cluster. |
3 | Initialize the gRPC server. This is the code you should add to the existing Main . |
The grpc.port
configuration is defined in local1.conf
, which is included in the generated template project.
6. Run locally
You can run this service with:
sbt -Dconfig.resource=local1.conf run
# make sure to compile before running exec:exec
mvn compile exec:exec -DAPP_CONFIG=local1.conf
Note the log output from the ShoppingCartServer
Shopping online at gRPC server 127.0.0.1:8101
6.1. Exercise the service
Use grpcurl
to exercise the service:
-
Add 3 socks to a cart:
grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
-
Add 2 t-shirts to the same cart:
grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
Note the logging from the ShoppingCartServiceImpl
in the console.