Quickstart: Customer Registry with Kafka in Java
Create a customer registry that includes publishing to Kafka. Package it into a container, and run it on Akka Serverless.
In this Quickstart you will learn:
-
How to add additional functionality, allowing to publish customer’s events to Kafka.
-
How to package the customer registry into a container.
-
How to deploy and run the customer registry on Akka Serverless.
Before you begin
-
If you’re new to Akka Serverless, create an account so you can try out Akka Serverless for free.
-
You’ll also need to install the Akka Serverless CLI to deploy from a terminal window.
-
For this quickstart, you’ll also need
-
Java 11 or higher
-
Configure your project message broker to use Kafka and create the topic
customer_changes
by following the Configure message brokers how-to. When setting the broker the configuration file you pass should be likekafka/my-dev.kafka.properties
that you can find in this project.
If you want to bypass writing code and jump straight to the deployment:
|
Start from the Customer Registry Entity
Start by downloading the Customer Registry Quickstart source code using the Akka Serverless CLI:
akkasls quickstart download customer-registry-java
In this guide, we will describe how to subscribe to events from the entity and forward them to a Kafka Broker. How to do this with an Action. Publishing an event each time a customer is created or updated
Define an Action
The customer_action.proto
will contain the definition of this action.
-
In your project, create a directory for your protobuf file,
src/main/proto/customer/action
.- Linux or macOS
-
mkdir -p ./src/main/proto/customer/action
- Windows 10+
-
mkdir src/main/proto/customer/action
-
Create a
customer_action.proto
file and save it in thesrc/main/proto/customer/action
directory. -
Add declarations for:
-
The protobuf syntax version,
proto3
. -
The package name,
customer.action
. -
The required Java outer classname,
CustomerAction
. Messages defined in this file will be generated as inner classes. -
Import
customer/api/customer_api.proto
,customer/domain/customer_domain.proto
, and Akka Serverlessakkaserverless/annotations.proto
.src/main/proto/customer/action/customer_action.protosyntax = "proto3"; package customer.action; option java_outer_classname = "CustomerAction"; import "customer/api/customer_api.proto"; import "customer/domain/customer_domain.proto"; import "akkaserverless/annotations.proto";
-
-
Add the service definition. The service definition is annotated with
akkaserverless.codegen
indicating we want to generate an Action for this service. -
Add declarations for:
-
Listening to
customer.domain.CustomerState
events from the value entitycustomer
by using the optioneventing.in
. -
Publishing to the Kafka topic
customer_changes
by using the optioneventing.out
.src/main/proto/customer/action/customer_action.protoservice CustomerStateSubscription { option (akkaserverless.codegen) = { action: {} }; rpc OnStateChange (customer.domain.CustomerState) returns (customer.api.Customer) { option (akkaserverless.method).eventing.in = { value_entity: "customers" }; option (akkaserverless.method).eventing.out = { topic: "customer_changes" }; } }
-
-
Run
mvn compile
from the project root directory to generate source classes in which you add business logic.mvn compile
Create an Action
Actions are stateless functions that can be triggered in multiple ways. In this case, the action is triggered by each value change customer.domain.CustomerState
received by the Value Entity customer.domain.Customer
.
-
If it’s not open already, open
src/main/java/customer/action/CustomerStateSubscriptionAction.java
for editing. -
Modify the
onStateChange
method by adding the logic to handle the action. The complete method should include the following:src/main/java/customer/action/CustomerStateSubscriptionAction.java@Override public Effect<CustomerApi.Customer> onStateChange(CustomerDomain.CustomerState customerState) { CustomerApi.Address address = CustomerApi.Address.newBuilder() .setStreet(customerState.getAddress().getStreet()) .setCity(customerState.getAddress().getCity()) .build(); CustomerApi.Customer customer = CustomerApi.Customer.newBuilder() .setCustomerId(customerState.getCustomerId()) .setEmail(customerState.getEmail()) .setName(customerState.getName()) .setAddress(address) .build(); return effects().reply(customer); }
-
The incoming message contains the updated state of the customer entity and this action converts it to a
CustomerApi.Customer
and passes it to the Pub/Sub mechanism of choice. In this example Kafka.
-
Package and deploy your service
To build and publish the container image and then deploy the service, follow these steps:
-
If you haven’t done so yet, sign in to your Akka Serverless account. If this is your first time using Akka Serverless, this will let you register an account, create your first project, and set this project as the default.
akkasls auth login
-
Use the
deploy
target to build the container image, publish it to the container registry as configured in thepom.xml
file, and then automatically deploy the service to Akka Serverless usingakkasls
:mvn deploy
-
You can verify the status of the deployed service using:
akkasls service list
Invoke your service
Now that you have deployed your service, the next step is to invoke it using gRPCurl
-
From the "Service Explorer" click on the method you want to invoke
-
Click on "gRPCurl"
-
In the bottom section of the dialog, fill in the values you want to send to your service
-
In the top section of the dialog, click the "Copy to clipboard" button
-
Open a new command line and paste the content you just copied
-
Check the change made to the Customer appears in the topic
customer_changes
in the Kafka cluster on your Confluent Cloud.
Next steps
-
You can learn more about Value Entities in the reference documentation.
-
Continue this example by adding Views, which makes it possible to query the customer registry.