How the IoT example was created

The IoT example service uses an Event Sourced Entity. Event Sourced Entities capture each change to data, as opposed to overwriting existing values. They emit changes as events, which are persisted in a log that can be replayed. Event Sourcing provides complete traceability of device behavior for security, analytics, and simulation.

The following sections highlight the main steps to create the IoT stateful service:

Download the example project

Download the example source from GitHub for Java or for JavaScript:

Define service API and message types

You define the service API in gRPC .proto file format. As a best practice, the example uses two .proto files, one that defines the service and a second containing the domain-specific definitions for the entity. From the .proto files, the gRPC compiler creates client and server side code that saves work for you and that enables Akka Serverless to serialize message data at runtime.

  1. View the IoT message definitions and service API:

    Javascript

    From the wirelessmesh directory, open the wirelessmeshservice.proto file.

    Java

    From the source/main/proto directory, open the wirelessmeshservice.proto file.

  2. Find the AddCustomerLocationCommand message and note that it includes a (akkaserverless.field).entity_key, which allows us store and locate customers by a unique ID.

  3. Locate the WirelessMeshService definition and note the operations it supports for managing customers and their devices.

  4. From the same directory, open the wirelessmeshdomain.proto file and view the events that will be stored for the service.

Now, let’s look at the implementation.

Using Event Sourced Entities

The snippet below demonstrates how to use Event Sourced Entities:

JavaScript

From the file wirelessmesh.js:

const EventSourced = require("akkaserverless").EventSourced;
const eventPublisher = require("./eventPublisher.js");
const deviceClient = require("./deviceClient.js");

const entity = new EventSourced(
  ["wirelessmeshservice.proto", "wirelessmeshdomain.proto"],
  "wirelessmeshservice.WirelessmeshService",
  {
    persistenceId: "customer-location",
    snapshotEvery: 50,
    includeDirs: ["./"],
    serializeFallbackToJson: true // Enables JSON support for persistence
  }
);
Java

From the file WirelessMeshMain.java, the entity must be registered as shown in the following snippet:

public class WirelessMeshMain {

    public static void main(String... args) {
        new Akkaserverless()
                .registerEventSourcedEntity(
                        CustomerLocation.class,
                        Wirelessmeshservice.getDescriptor().findServiceByName("WirelessMeshService"),
                        Wirelessmeshdomain.getDescriptor())
                .start();
    }
}

The Customer Location entity will be seeded with the current state upon loading and thereafter will completely serve the backend needs for a particular device. If you want to compare the entities to a traditional relational database, you can look at each instance of these entities as being roughly equivalent to a row in a database, only each one is completely addressable and in memory.

How to connect to external systems

Your Akka Serverless services don’t exist in a vacuum, so you’ll need to connect from services to external systems. This IoT example shows how you can connect to Google Cloud Pub/Sub.

Javascript

The connectivity to Google Cloud Pub/Sub is handled in the file wirelessmesh/eventPublisher.js. When the following code is executed, a new topic connection will be created, and a message will be sent with the payload of your message.

const {PubSub} = require('@google-cloud/pubsub');

function eventPublisher() {

  // Check if publishing is on or off.
  let publishOn = process.env.PUBLISH_EVENTS && process.env.PUBLISH_EVENTS === "ON"

  return {
    /**
     * Publish event to google pubsub.
     * @param event
     * @returns {Promise<void>}
     */
    async publish(event) {
      if (publishOn === true) {
        const pubSubClient = new PubSub();
        const dataBuffer = Buffer.from(JSON.stringify(event));
        const topicName = "wirelessmesh";

        try {
          const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
          console.log(`Message ${messageId} published.`);
        } catch (error) {
          console.error(`Received error while publishing: ${error.message}`);
          process.exitCode = 1;
        }
      }
    },
  };
}

In wirelessmesh.js, each command handler method calls the GooglePubsubClient to send an enriched event. As an example, the snippet below handles the event when new customer locations are added.

entity.addCustomerLocation = function(addCustomerLocationCommand, entityState, ctx) {
  // Validate that the the command has not already been handled, i.e. not yet added.
  if (entityState.added) {
    ctx.fail("Customer location already added");
  }
  else {
    // Create the event.
    const customerLocationAdded = {
      type: "CustomerLocationAdded",
      customerLocationId: addCustomerLocationCommand.customerLocationId,
      accessToken: addCustomerLocationCommand.accessToken
    };
    // Emit the event.
    ctx.emit(customerLocationAdded);
    eventPublisher.publish(customerLocationAdded);
    return {};
  }
}
Java

The connectivity to Google Cloud Pub/Sub is handled in the file GooglePubsubClient.java. Every time the snippet below gets executed, a new topic connection will be created and a message will be sent with the payload of your message (encoded as a protobuf bytestring).

import com.google.protobuf.ByteString;

public class GooglePubsubClient {
    public void publish(String topic, ByteString eventByteString) {
        TopicName topicName = TopicName.of("akkaserverless-wirelessmesh-java", topic);
        ...
    }
}

Within CustomerLocation.java, each command handler method calls the GooglePubsubClient to send an enriched event. As an example, the snippet below handles the event when new customer locations are added (the code below only shows the call to Google Cloud Pub/Sub).

@CommandHandler
public Empty addCustomerLocation(AddCustomerLocationCommand addCustomerLocationCommand, CommandContext ctx) {
    ...
    new GooglePubsubClient().publish("customer-location", event.toByteString());
    return Empty.getDefaultInstance();
}

We’ve only highlighted a bit of the IoT example code. To try a tutorial that shows you how to create a shopping cart, see Implement the shopping cart.