Actions

Actions are stateless functions that can be triggered in multiple ways. For example, by

  • a gRPC service call

  • an HTTP service call

  • a new item in an Event Sourced Entity’s journal

  • a forwarded call from another component

Use case: request conversion

You can use Actions to convert incoming data into a different format before forwarding a call to a different component.

A service might need to offer a request data format that does not correspond directly with the commands of an Event Sourced Entity. By exposing a service implemented by an Action, the Action implementation can adapt the data format to the command (or commands) expected by the Event Sourced Entity. The incoming requests get forwarded to the target component.

Use case: listening to a journal

To listen to an Event Sourced Entity’s journal, an Action can be set up for Eventing (see this section for details).

An Event Sourced Entity’s journal contains events that capture all state changes. By subscribing to the journal with the Event Sourced Entity name, another component can receive all events of that type emitted by an Event Sourced Entity.

Together with Topic publishing, this may be used to inform other services asynchronously about certain events.

Implementing Actions

The following example shows a .proto file for an action:

// Copyright 2019 Lightbend Inc.

syntax = "proto3";

package akkaserverless.action;


import "google/api/annotations.proto";

//
// The `ActionTckModel` service should be implemented in the following ways:
//
// - The `Process` methods receive `Request` messages with steps to take:
//   - The `ProcessUnary` method receives a single Request and gives a single Response.
//     Multiple request steps should be combined to give just one response, with subsequent steps taking precedence.
//   - The `ProcessStreamedIn` method receives a stream of Requests and gives a single Response.
//     All request steps should be combined to produce a single response after the request stream completes.
//   - The `ProcessStreamedOut` method receives a single Request and gives a stream of Responses.
//     The single request may contain multiple grouped steps, each group corresponding to an expected response.
//   - The `ProcessStreamed` method receives a stream of Requests and gives a stream of Responses.
//     Each request may contain multiple grouped steps, each group corresponding to an expected response.
// - Request steps must be processed in order, and can require replying, forwarding, or failing, and side effects.
// - The Request steps are grouped, for streamed Responses, where each group correlates with a Response.
// - The `Process` methods must reply with the requested reply message, unless forwarding or failing.
// - Forwarding and side effects must always be made to the second service `ActionTwo`.
//
service ActionTckModel {
  rpc ProcessUnary(Request) returns (Response) {
    option (google.api.http) = {
      post: "/tck/model/action/unary"
      body: "*"
    };
  }
  rpc ProcessStreamedIn(stream Request) returns (Response);
  rpc ProcessStreamedOut(Request) returns (stream Response);
  rpc ProcessStreamed(stream Request) returns (stream Response);
}

//
// The `ActionTwo` service is only for verifying forwards and side effects.
// The `Call` method is not required to do anything, and must return an empty `Response` message.
//
service ActionTwo {
  rpc Call(OtherRequest) returns (Response);
}

//
// A `Request` message contains the steps that the entity should process.
// Steps are grouped for streamed responses. Steps must be processed in order.
//
message Request {
  repeated ProcessGroup groups = 1;
}

//
// A `ProcessGroup` contains the steps for one response.
//
message ProcessGroup {
  repeated ProcessStep steps = 1;
}

//
// Each `ProcessStep` is one of:
//
// - Reply: reply with the given message in a `Response`.
// - Forward: forward to another service, in place of replying with a `Response`.
// - Fail: fail the current `Process` command by sending a failure.
// - SideEffect: add a side effect to the current reply, forward, or failure.
//
message ProcessStep {
  oneof step {
    Reply reply = 1;
    Forward forward = 2;
    Fail fail = 3;
    SideEffect effect = 4;
  }
}

//
// Reply with a message in the reponse.
//
message Reply {
  string message = 1;
}

//
// Replace the response with a forward to `akkaserverless.tck.model.ActionTwo/Call`.
// The payload must be an `OtherRequest` message with the given `id`.
//
message Forward {
  string id = 1;
}

//
// Fail the current command with the given description `message`.
//
message Fail {
  string message = 1;
}

//
// Add a side effect to the reply, to `akkaserverless.tck.model.ActionTwo/Call`.
// The payload must be an `OtherRequest` message with the given `id`.
// The side effect should be marked synchronous based on the given `synchronous` value.
//
message SideEffect {
  string id = 1;
  bool synchronous = 2;
}

//
// The `Response` message must contain the message from the corresponding reply step.
//
message Response {
  string message = 1;
}

//
// The `OtherRequest` message must contain the id for the forward or side effect.
//
message OtherRequest {
  string id = 1;
}

The following shows the implementation:

/*
 * Copyright 2019 Lightbend Inc.
 */

const Action = require("@lightbend/akkaserverless-javascript-sdk").Action

const tckModel = new Action(
  "proto/action.proto",
  "akkaserverless.tck.model.action.ActionTckModel"
);

const Response = tckModel.lookupType("akkaserverless.tck.model.action.Response");

tckModel.commandHandlers = {
  ProcessUnary: processUnary,
  ProcessStreamedIn: processStreamedIn,
  ProcessStreamedOut: processStreamedOut,
  ProcessStreamed: processStreamed
};

function processUnary(request, context) {
  respondWith(singleResponse(createResponses(request)), context);
}

function processStreamedIn(context) {
  const responses = [];
  context.on("data", request => responses.push(...createResponses(request)));
  context.on("end", () => respondWith(singleResponse(responses), context));
}

function processStreamedOut(request, context) {
  createResponses(request).forEach(response => respondWith(response, context));
  context.end();
}

function processStreamed(context) {
  context.on("data", request => createResponses(request).forEach(response => respondWith(response, context)));
  context.on("end", () => context.end());
}

function respondWith(response, context) {
  // need to accumulate effects, before replying, forwarding, or failing
  response.effects.forEach(effect => context.effect(two.service.methods.Call, { id: effect.id }, effect.synchronous));
  if (response.fail) context.fail(response.fail);
  else if (response.forward) context.forward(two.service.methods.Call, { id: response.forward });
  else if (response.reply) context.write(Response.create({ message: response.reply }));
  else context.write(); // empty message
}

function createResponses(request) {
  return request.groups.map(createResponse);
}

function createResponse(group) {
  const response = {
    effects: []
  };
  group.steps.forEach(step => {
    if (step.reply) {
      response.reply = step.reply.message;
    } else if (step.forward) {
      response.forward = step.forward.id;
    } else if (step.effect) {
      response.effects.push({ id: step.effect.id, synchronous: step.effect.synchronous });
    } else if (step.fail) {
      response.fail = step.fail.message;
    }
  });
  return response;
}

function singleResponse(responses) {
  return responses.reduce((response, next) => ({
    reply: next.reply || response.reply,
    forward: next.forward || response.forward,
    fail: next.fail || response.fail,
    effects: response.effects.concat(next.effects)
  }), { effects: [] });
}

const two = new Action(
  "proto/action.proto",
  "akkaserverless.tck.model.action.ActionTwo"
);

two.commandHandlers = {
  Call: request => Response.create()
};