Quickstart: Customer Registry with Kafka in Java

Create a customer registry that includes publishing to Kafka. Package it into a container, and run it on Akka Serverless.

In this Quickstart you will learn:

  • How to add additional functionality, allowing to publish customer’s events to Kafka.

  • How to package the customer registry into a container.

  • How to deploy and run the customer registry on Akka Serverless.

Before you begin

  • If you’re new to Akka Serverless, create an account so you can try out Akka Serverless for free.

  • You’ll also need to install the Akka Serverless CLI to deploy from a terminal window.

  • For this quickstart, you’ll also need

If you want to bypass writing code and jump straight to the deployment:

  1. Download the source code using the Akka Serverless CLI: akkasls quickstart download customer-registry-kafka-java

  2. Skip to Package and deploy your service.

Start from the Customer Registry Entity

Start by downloading the Customer Registry Quickstart source code using the Akka Serverless CLI:

akkasls quickstart download customer-registry-java

In this guide, we will describe how to subscribe to events from the entity and forward them to a Kafka Broker. How to do this with an Action. Publishing an event each time a customer is created or updated

Define an Action

The customer_action.proto will contain the definition of this action.

  1. In your project, create a directory for your protobuf file, src/main/proto/customer/action.

    Linux or macOS
    mkdir -p ./src/main/proto/customer/action
    Windows 10+
    mkdir src/main/proto/customer/action
  2. Create a customer_action.proto file and save it in the src/main/proto/customer/action directory.

  3. Add declarations for:

    • The protobuf syntax version, proto3.

    • The package name, customer.action.

    • The required Java outer classname, CustomerAction. Messages defined in this file will be generated as inner classes.

    • Import customer/api/customer_api.proto,customer/domain/customer_domain.proto, and Akka Serverless akkaserverless/annotations.proto.

      src/main/proto/customer/action/customer_action.proto
      syntax = "proto3";
      package customer.action;
      
      option java_outer_classname = "CustomerAction";
      
      import "customer/api/customer_api.proto";
      import "customer/domain/customer_domain.proto";
      import "akkaserverless/annotations.proto";
  4. Add the service definition. The service definition is annotated with akkaserverless.codegen indicating we want to generate an Action for this service.

  5. Add declarations for:

    • Listening to customer.domain.CustomerState events from the value entity customer by using the option eventing.in.

    • Publishing to the Kafka topic customer_changes by using the option eventing.out.

      src/main/proto/customer/action/customer_action.proto
      service CustomerStateSubscription {
        option (akkaserverless.codegen) = {
          action: {}
        };
      
        rpc OnStateChange (customer.domain.CustomerState) returns (customer.api.Customer) {
          option (akkaserverless.method).eventing.in = {
            value_entity: "customers"
          };
          option (akkaserverless.method).eventing.out = {
            topic: "customer_changes"
          };
        }
      }
  6. Run mvn compile from the project root directory to generate source classes in which you add business logic.

    mvn compile

Create an Action

Actions are stateless functions that can be triggered in multiple ways. In this case, the action is triggered by each value change customer.domain.CustomerState received by the Value Entity customer.domain.Customer.

  1. If it’s not open already, open src/main/java/customer/action/CustomerStateSubscriptionAction.java for editing.

  2. Modify the onStateChange method by adding the logic to handle the action. The complete method should include the following:

    src/main/java/customer/action/CustomerStateSubscriptionAction.java
      @Override
      public Effect<CustomerApi.Customer> onStateChange(CustomerDomain.CustomerState customerState) {
       CustomerApi.Address address = CustomerApi.Address.newBuilder()
                .setStreet(customerState.getAddress().getStreet())
                .setCity(customerState.getAddress().getCity())
                .build();
    
       CustomerApi.Customer customer = CustomerApi.Customer.newBuilder()
                .setCustomerId(customerState.getCustomerId())
                .setEmail(customerState.getEmail())
                .setName(customerState.getName())
                .setAddress(address)
                .build();
    
    
        return effects().reply(customer);
      }
    • The incoming message contains the updated state of the customer entity and this action converts it to a CustomerApi.Customer and passes it to the Pub/Sub mechanism of choice. In this example Kafka.

Package and deploy your service

To build and publish the container image and then deploy the service, follow these steps:

  1. If you haven’t done so yet, sign in to your Akka Serverless account. If this is your first time using Akka Serverless, this will let you register an account, create your first project, and set this project as the default.

    akkasls auth login
  2. Use the deploy target to build the container image, publish it to the container registry as configured in the pom.xml file, and then automatically deploy the service to Akka Serverless using akkasls:

    mvn deploy
  3. You can verify the status of the deployed service using:

    akkasls service list

Invoke your service

Now that you have deployed your service, the next step is to invoke it using gRPCurl

  1. From the "Service Explorer" click on the method you want to invoke

  2. Click on "gRPCurl"

  3. In the bottom section of the dialog, fill in the values you want to send to your service

  4. In the top section of the dialog, click the "Copy to clipboard" button

  5. Open a new command line and paste the content you just copied

  6. Check the change made to the Customer appears in the topic customer_changes in the Kafka cluster on your Confluent Cloud.

Next steps