Section 8: Projection calling gRPC service
To complete the example, we need a way to handle orders. We will add another Projection from the events of the ShoppingCart
entity. We will also create a gRPC Order Service, ShoppingOrderService
. The Projection calls the Order Service when the shopping carts are checked out.
This part of the full example focuses on the gRPC client in the SendOrderProjection
. On this page you will learn how to:
-
call another service with Akka gRPC
-
implement another gRPC service by adding the
ShoppingOrderService
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. Add the Order gRPC service
Let’s add the service that handles shopping cart orders. The template and source downloads include a directory named shopping-order-service
for this purpose. To add the order gRPC service, follow these steps:
-
Open the
shopping-order-service
in IntelliJ just as you did with the shopping-cart-service. -
Define the interface of the service in a protobuf service descriptor. It should be located in the
src/main/protobuf/ShoppingOrderService.proto
in the newshopping-order-service
project.syntax = "proto3"; option java_multiple_files = true; option java_package = "shopping.order.proto"; package shoppingorder; // gRPC definition for ShoppingOrderService service ShoppingOrderService { rpc Order (OrderRequest) returns (OrderResponse) {} } message OrderRequest { string cartId = 1; repeated Item items = 2; } message Item { string itemId = 1; int32 quantity = 2; } message OrderResponse { bool ok = 1; }
-
Generate code by compiling the project:
mvn compile
sbt compile
-
Implement the
ShoppingOrderService
in a new classShoppingOrderServiceImpl
:- Java
-
src/main/java/shopping/order/ShoppingOrderServiceImpl.java:
package shopping.order; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.order.proto.Item; import shopping.order.proto.OrderRequest; import shopping.order.proto.OrderResponse; import shopping.order.proto.ShoppingOrderService; public final class ShoppingOrderServiceImpl implements ShoppingOrderService { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<OrderResponse> order(OrderRequest in) { int total = 0; for (Item item : in.getItemsList()) { total += item.getQuantity(); } logger.info("Order {} items from cart {}.", total, in.getCartId()); OrderResponse response = OrderResponse.newBuilder().setOk(true).build(); return CompletableFuture.completedFuture(response); } }
- Scala
-
src/main/scala/shopping/order/ShoppingOrderServiceImpl.scala:
package shopping.order import scala.concurrent.Future import org.slf4j.LoggerFactory import shopping.order.proto.OrderRequest import shopping.order.proto.OrderResponse class ShoppingOrderServiceImpl extends proto.ShoppingOrderService { private val logger = LoggerFactory.getLogger(getClass) override def order(in: OrderRequest): Future[OrderResponse] = { val totalNumberOfItems = in.items.iterator.map(_.quantity).sum logger.info("Order {} items from cart {}.", totalNumberOfItems, in.cartId) Future.successful(OrderResponse(ok = true)) } }
-
Similar to the gRPC server for the ShoppingCartService we need to initialize the gRPC server. Add a
ShoppingOrderServer
object class:- Java
-
src/main/java/shopping/order/ShoppingOrderServer.java:
package shopping.order; import akka.actor.typed.ActorSystem; import akka.grpc.javadsl.ServerReflection; import akka.grpc.javadsl.ServiceHandler; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.japi.function.Function; import java.net.InetSocketAddress; import java.time.Duration; import java.util.Collections; import java.util.concurrent.CompletionStage; import shopping.order.proto.ShoppingOrderService; import shopping.order.proto.ShoppingOrderServiceHandlerFactory; public final class ShoppingOrderServer { private ShoppingOrderServer() {} static void start( String host, int port, ActorSystem<?> system, ShoppingOrderService grpcService) { @SuppressWarnings("unchecked") Function<HttpRequest, CompletionStage<HttpResponse>> service = ServiceHandler.concatOrNotFound( ShoppingOrderServiceHandlerFactory.create(grpcService, system), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.create( Collections.singletonList(ShoppingOrderService.description), system)); CompletionStage<ServerBinding> bound = Http.get(system).newServerAt(host, port).bind(service::apply); bound.whenComplete( (binding, ex) -> { if (binding != null) { binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system); InetSocketAddress address = binding.localAddress(); system .log() .info( "Shopping order at gRPC server {}:{}", address.getHostString(), address.getPort()); } else { system.log().error("Failed to bind gRPC endpoint, terminating system", ex); system.terminate(); } }); } }
- Scala
-
src/main/scala/shopping/order/ShoppingOrderServer.scala:
package shopping.order import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import akka.actor.typed.ActorSystem import akka.grpc.scaladsl.ServerReflection import akka.grpc.scaladsl.ServiceHandler import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse object ShoppingOrderServer { def start( interface: String, port: Int, system: ActorSystem[_], grpcService: proto.ShoppingOrderService): Unit = { implicit val sys: ActorSystem[_] = system implicit val ec: ExecutionContext = system.executionContext val service: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound( proto.ShoppingOrderServiceHandler.partial(grpcService), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.partial(List(proto.ShoppingOrderService))) val bound = Http() .newServerAt(interface, port) .bind(service) .map(_.addToCoordinatedShutdown(3.seconds)) bound.onComplete { case Success(binding) => val address = binding.localAddress system.log.info( "Shopping order at gRPC server {}:{}", address.getHostString, address.getPort) case Failure(ex) => system.log.error("Failed to bind gRPC endpoint, terminating system", ex) system.terminate() } } }
-
Call the
ShoppingOrderServer.start
fromMain
:- Java
-
package shopping.order; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.management.cluster.bootstrap.ClusterBootstrap; import akka.management.javadsl.AkkaManagement; import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.order.proto.ShoppingOrderService; public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingOrderService"); try { init(system); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system) { AkkaManagement.get(system).start(); ClusterBootstrap.get(system).start(); Config config = system.settings().config(); String grpcInterface = config.getString("shopping-order-service.grpc.interface"); int grpcPort = config.getInt("shopping-order-service.grpc.port"); ShoppingOrderService grpcService = new ShoppingOrderServiceImpl(); ShoppingOrderServer.start(grpcInterface, grpcPort, system, grpcService); } }
- Scala
-
package shopping.order import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.ActorSystem import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement import org.slf4j.LoggerFactory import scala.util.control.NonFatal object Main { val logger = LoggerFactory.getLogger("shopping.order.Main") def main(args: Array[String]): Unit = { val system = ActorSystem[Nothing](Behaviors.empty, "ShoppingOrderService") try { init(system) } catch { case NonFatal(e) => logger.error("Terminating due to initialization failure.", e) system.terminate() } } def init(system: ActorSystem[_]): Unit = { AkkaManagement(system).start() ClusterBootstrap(system).start() val grpcInterface = system.settings.config.getString("shopping-order-service.grpc.interface") val grpcPort = system.settings.config.getInt("shopping-order-service.grpc.port") val grpcService = new ShoppingOrderServiceImpl ShoppingOrderServer.start(grpcInterface, grpcPort, system, grpcService) } }
The grpc.port configuration is defined in local1.conf , which is included in the generated template project.
|
2. Create the Projection
The new Projection for shopping-cart-service
events will be similar to the one we developed for Kafka on the previous page, but when it receives ShoppingCart.CheckedOut
events, it will call the ShoppingOrderService
.
Create the Projection as follows:
-
Include the service definition by copying the
ShoppingOrderService.proto
file from theshopping-order-service
to theshopping-cart-service/src/main/protobuf
directory. -
Generate code by compiling the
shopping-cart-service
project:mvn compile
sbt compile
-
Add a
SendOrderProjectionHandler
class in theshopping-cart-service
project. This is the ProjectionHandler
for processing the events:- Java
-
src/main/java/shopping/cart/SendOrderProjectionHandler.java:
package shopping.cart; import static akka.Done.done; import akka.Done; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.projection.eventsourced.EventEnvelope; import akka.projection.javadsl.Handler; import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.order.proto.Item; import shopping.order.proto.OrderRequest; import shopping.order.proto.ShoppingOrderService; public final class SendOrderProjectionHandler extends Handler<EventEnvelope<ShoppingCart.Event>> { private final Logger log = LoggerFactory.getLogger(getClass()); private final ClusterSharding sharding; private final Duration timeout; private final ShoppingOrderService orderService; public SendOrderProjectionHandler( ActorSystem<?> system, ShoppingOrderService orderService) { (1) sharding = ClusterSharding.get(system); timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout"); this.orderService = orderService; } @Override public CompletionStage<Done> process(EventEnvelope<ShoppingCart.Event> envelope) throws Exception { if (envelope.event() instanceof ShoppingCart.CheckedOut) { ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) envelope.event(); return sendOrder(checkedOut); } else { return CompletableFuture.completedFuture(done()); } } private CompletionStage<Done> sendOrder(ShoppingCart.CheckedOut checkout) { EntityRef<ShoppingCart.Command> entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, checkout.cartId); CompletionStage<ShoppingCart.Summary> reply = entityRef.ask(replyTo -> new ShoppingCart.Get(replyTo), timeout); return reply.thenCompose( cart -> { (2) List<Item> protoItems = cart.items.entrySet().stream() .map( entry -> Item.newBuilder() .setItemId(entry.getKey()) .setQuantity(entry.getValue()) .build()) .collect(Collectors.toList()); log.info("Sending order of {} items for cart {}.", cart.items.size(), checkout.cartId); OrderRequest orderRequest = OrderRequest.newBuilder().setCartId(checkout.cartId).addAllItems(protoItems).build(); return orderService.order(orderRequest).thenApply(response -> done()); (3) }); } }
- Scala
-
src/main/scala/shopping/cart/SendOrderProjectionHandler.scala:
package shopping.cart import scala.concurrent.ExecutionContext import scala.concurrent.Future import akka.Done import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.projection.eventsourced.EventEnvelope import akka.projection.scaladsl.Handler import akka.util.Timeout import org.slf4j.LoggerFactory import shopping.order.proto.Item import shopping.order.proto.OrderRequest import shopping.order.proto.ShoppingOrderService class SendOrderProjectionHandler( system: ActorSystem[_], orderService: ShoppingOrderService) (1) extends Handler[EventEnvelope[ShoppingCart.Event]] { private val log = LoggerFactory.getLogger(getClass) private implicit val ec: ExecutionContext = system.executionContext private val sharding = ClusterSharding(system) implicit private val timeout: Timeout = Timeout.create( system.settings.config.getDuration("shopping-cart-service.ask-timeout")) override def process( envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { envelope.event match { case checkout: ShoppingCart.CheckedOut => sendOrder(checkout) case _ => // this projection is only interested in CheckedOut events Future.successful(Done) } } private def sendOrder(checkout: ShoppingCart.CheckedOut): Future[Done] = { val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, checkout.cartId) entityRef.ask(ShoppingCart.Get).flatMap { cart => (2) val items = cart.items.iterator.map { case (itemId, quantity) => Item(itemId, quantity) }.toList log.info( "Sending order of {} items for cart {}.", items.size, checkout.cartId) val orderReq = OrderRequest(checkout.cartId, items) orderService.order(orderReq).map(_ => Done) (3) } } }
1 | ShoppingOrderService is the gRPC client |
2 | Retrieve the full shopping cart information from the entity. In the order we need to include the list of items and their quantities. That information is not included in the ShoppingCart.CheckedOut event, but we can retrieve it by asking the ShoppingCart entity for it. |
3 | Call the ShoppingOrderService . If the call to the ShoppingOrderService fails, the returned Future[Done] CompletionStage<Done> fails and the Projection is automatically restarted from the previously saved offset. This will result in retrying the call to the ShoppingOrderService . Since the Projection has at-least-once semantics, the ShoppingOrderService must be idempotent, that is, it must gracefully handle duplicate order attempts for the same cartId . |
3. Initialize the Projection
The tagging of the events is already in place from when we created the query Projection. So, we just need to initialize it as follows:
-
Place the initialization code of the Projection in an
SendOrderProjection
object class:- Java
-
src/main/java/shopping/cart/SendOrderProjection.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.persistence.jdbc.query.javadsl.JdbcReadJournal; import akka.persistence.query.Offset; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.EventEnvelope; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.AtLeastOnceProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.jdbc.javadsl.JdbcProjection; import java.util.Optional; import org.springframework.orm.jpa.JpaTransactionManager; import shopping.cart.repository.HibernateJdbcSession; import shopping.order.proto.ShoppingOrderService; public class SendOrderProjection { private SendOrderProjection() {} public static void init( ActorSystem<?> system, JpaTransactionManager transactionManager, ShoppingOrderService orderService) { ShardedDaemonProcess.get(system) .init( ProjectionBehavior.Command.class, "SendOrderProjection", ShoppingCart.TAGS.size(), index -> ProjectionBehavior.create( createProjectionsFor(system, transactionManager, orderService, index)), ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static AtLeastOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjectionsFor( ActorSystem<?> system, JpaTransactionManager transactionManager, ShoppingOrderService orderService, int index) { String tag = ShoppingCart.TAGS.get(index); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsByTag(system, JdbcReadJournal.Identifier(), tag); return JdbcProjection.atLeastOnceAsync( ProjectionId.of("SendOrderProjection", tag), sourceProvider, () -> new HibernateJdbcSession(transactionManager), () -> new SendOrderProjectionHandler(system, orderService), system); } }
- Scala
-
src/main/scala/shopping/cart/SendOrderProjection.scala:
package shopping.cart import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal import akka.persistence.query.Offset import akka.projection.ProjectionBehavior import akka.projection.ProjectionId import akka.projection.eventsourced.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.jdbc.scaladsl.JdbcProjection import akka.projection.scaladsl.AtLeastOnceProjection import akka.projection.scaladsl.SourceProvider import shopping.cart.repository.ScalikeJdbcSession import shopping.order.proto.ShoppingOrderService object SendOrderProjection { def init(system: ActorSystem[_], orderService: ShoppingOrderService): Unit = { ShardedDaemonProcess(system).init( name = "SendOrderProjection", ShoppingCart.tags.size, index => ProjectionBehavior(createProjectionFor(system, orderService, index)), ShardedDaemonProcessSettings(system), Some(ProjectionBehavior.Stop)) } private def createProjectionFor( system: ActorSystem[_], orderService: ShoppingOrderService, index: Int) : AtLeastOnceProjection[Offset, EventEnvelope[ShoppingCart.Event]] = { val tag = ShoppingCart.tags(index) val sourceProvider : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider.eventsByTag[ShoppingCart.Event]( system = system, readJournalPluginId = JdbcReadJournal.Identifier, tag = tag) JdbcProjection.atLeastOnceAsync( projectionId = ProjectionId("SendOrderProjection", tag), sourceProvider, handler = () => new SendOrderProjectionHandler(system, orderService), sessionFactory = () => new ScalikeJdbcSession())(system) } }
-
In
Main
, invoke theSendOrderProjection.init
and create the gRPC client for theShoppingOrderService
like this:- Java
-
import akka.grpc.GrpcClientSettings; import shopping.order.proto.ShoppingOrderService; import shopping.order.proto.ShoppingOrderServiceClient; public class Main { public static void main(String[] args) { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingCartService"); try { init(system, orderServiceClient(system)); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system, ShoppingOrderService orderService) { SendOrderProjection.init(system, transactionManager, orderService); (1) } static ShoppingOrderService orderServiceClient(ActorSystem<?> system) { (2) GrpcClientSettings orderServiceClientSettings = GrpcClientSettings.connectToServiceAt( system.settings().config().getString("shopping-order-service.host"), system.settings().config().getInt("shopping-order-service.port"), system) .withTls(false); return ShoppingOrderServiceClient.create(orderServiceClientSettings, system); }
1 This example is only showing the new SendOrderProjection.init
here, additional initialization from previous steps should be kept.2 The reason for placing the initialization of orderServiceClient
in a method is that tests can then replace it with a stub implementation. - Scala
-
import shopping.order.proto.{ ShoppingOrderService, ShoppingOrderServiceClient } import akka.grpc.GrpcClientSettings object Main { val logger = LoggerFactory.getLogger("shopping.cart.Main") def main(args: Array[String]): Unit = { val system = ActorSystem[Nothing](Behaviors.empty, "ShoppingCartService") try { val orderService = orderServiceClient(system) init(system, orderService) } catch { case NonFatal(e) => logger.error("Terminating due to initialization failure.", e) system.terminate() } } def init(system: ActorSystem[_], orderService: ShoppingOrderService): Unit = { SendOrderProjection.init(system, orderService) (1) } protected def orderServiceClient( (2) system: ActorSystem[_]): ShoppingOrderService = { val orderServiceClientSettings = GrpcClientSettings .connectToServiceAt( system.settings.config.getString("shopping-order-service.host"), system.settings.config.getInt("shopping-order-service.port"))(system) .withTls(false) ShoppingOrderServiceClient(orderServiceClientSettings)(system) }
1 This example is only showing the new SendOrderProjection.init
here, additional initialization from previous steps should be kept.2 The reason for placing the initialization of orderServiceClient
in a method is that tests can then replace it with a stub implementation.
-
The gRPC client is using service discovery to locate the
ShoppingOrderService
. For local development add the following tosrc/main/resources/local-shared.conf
, which is loaded when running locally:shopping-order-service.host = "localhost" shopping-order-service.port = 8301
4. Run locally
Follow these steps to run locally and exercise the new Projection and service:
-
Start PostgresSQL and Kafka, unless they are already running:
docker-compose up -d
-
Run the
shopping-order-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
-
Keep the
shopping-order-service
running, and in another terminal run theshopping-cart-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
4.1. Exercise the service
Use grpcurl
to exercise the service:
-
Try the new order service directly (on port 8301):
grpcurl -d '{"cartId":"cart1", "items":[{"itemId":"socks", "quantity":3}, {"itemId":"t-shirt", "quantity":2}]}' -plaintext 127.0.0.1:8301 shoppingorder.ShoppingOrderService.Order
-
Use the checkout in the shopping cart service with
grpcurl
(note the different port number):grpcurl -d '{"cartId":"cart1", "itemId":"scissors", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout
-
In the terminal of the
shopping-order-service
you should see the log of the order:Order 12 items from cart cart1