Actions

Actions are stateless functions that can be triggered in multiple ways. For example, by

  • a gRPC service call

  • an HTTP service call

  • a new item in an Event Sourced Entity’s journal

  • a forwarded call from another component

Use case: request conversion

You can use Actions to convert incoming data into a different format before forwarding a call to a different component.

A service might need to offer a request data format that does not correspond directly with the commands of an Event Sourced Entity. By exposing a service implemented by an Action, the Action implementation can adapt the data format to the command (or commands) expected by the Event Sourced Entity. The incoming requests get forwarded to the target component.

Use case: listening to a journal

To listen to an Event Sourced Entity’s journal, an Action can be set up for Eventing.

The Event Sourced Entity journal contains events that capture all state changes. By subscribing to the journal with the Event Sourced Entity name, another component can receive all events of that type emitted by an Event Sourced Entity.

Together with Topic publishing, this may be used to inform other services asynchronously about certain events.

Implementing Actions

The following example shows a .proto file for an action:

// Copyright 2021-2023 Lightbend Inc.

syntax = "proto3";

package perf.action;

import "perf_domain.proto";
import "perf_api.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";
import "kalix/annotations.proto";

message PingRequest {}

message PongResponse {}

service PingActionService {
  rpc Ping(PingRequest) returns (PongResponse) {
    option (google.api.http) = {
      get: "/action/ping"
    };
  }
}

service FromValueActionService {
  rpc Consume(domain.ValueState) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "values"
    };
  }
}

service FromTopicActionService {
  rpc Consume(domain.ValueState) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      topic: "values"
    };
  }
}

message ToTopicRequest {
  string id = 1;
  int32 payload_size = 2;
}

service ToTopicActionService {
  rpc ProduceToValues(ToTopicRequest) returns (domain.ValueState) {
    option (kalix.method).eventing.out = {
      topic: "values"
    };
  }

  rpc ProduceToIncrements(api.IncrementRequest) returns (api.IncrementRequest) {
    option (kalix.method).eventing.out = {
      topic: "increments"
    };
  }
}

service FromValueToTopicActionService {
  rpc Produce(domain.ValueState) returns (domain.ValueState) {
    option (kalix.method).eventing.in = {
      value_entity: "values"
    };
    option (kalix.method).eventing.out = {
      topic: "values"
    };
  }
}

service FromTopicForwardToEntityActionService {
  rpc Forward(api.IncrementRequest) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      topic: "increments"
    };
  }
}

The following shows the implementation:

JavaScript
/*
 * Copyright 2019-2023 Lightbend Inc.
 */

const Action = require("@lightbend/kalix-javascript-sdk").Action

const tckModel = new Action(
  "proto/action.proto",
  "kalix.tck.model.action.ActionTckModel"
);

const Response = tckModel.lookupType("kalix.tck.model.action.Response");

tckModel.commandHandlers = {
  ProcessUnary: processUnary,
  ProcessStreamedIn: processStreamedIn,
  ProcessStreamedOut: processStreamedOut,
  ProcessStreamed: processStreamed
};

function processUnary(request, context) {
  respondWith(singleResponse(createResponses(request)), context);
}

function processStreamedIn(context) {
  const responses = [];
  context.on("data", request => responses.push(...createResponses(request)));
  context.on("end", () => respondWith(singleResponse(responses), context));
}

function processStreamedOut(request, context) {
  createResponses(request).forEach(response => respondWith(response, context));
  context.end();
}

function processStreamed(context) {
  context.on("data", request => createResponses(request).forEach(response => respondWith(response, context)));
  context.on("end", () => context.end());
}

function respondWith(response, context) {
  // need to accumulate effects, before replying, forwarding, or failing
  response.effects.forEach(effect => context.effect(two.service.methods.Call, { id: effect.id }, effect.synchronous));
  if (response.fail) context.fail(
      response.fail,
      9, // optional parameter, sets the gRPC status code to 9 - FAILED_PRECONDITION
  );
  else if (response.forward) context.forward(two.service.methods.Call, { id: response.forward });
  else if (response.reply) context.write(Response.create({ message: response.reply }));
  else context.write(); // empty message
}

function createResponses(request) {
  return request.groups.map(createResponse);
}

function createResponse(group) {
  const response = {
    effects: []
  };
  group.steps.forEach(step => {
    if (step.reply) {
      response.reply = step.reply.message;
    } else if (step.forward) {
      response.forward = step.forward.id;
    } else if (step.effect) {
      response.effects.push({ id: step.effect.id, synchronous: step.effect.synchronous });
    } else if (step.fail) {
      response.fail = step.fail.message;
    }
  });
  return response;
}

function singleResponse(responses) {
  return responses.reduce((response, next) => ({
    reply: next.reply || response.reply,
    forward: next.forward || response.forward,
    fail: next.fail || response.fail,
    effects: response.effects.concat(next.effects)
  }), { effects: [] });
}

const two = new Action(
  "proto/action.proto",
  "kalix.tck.model.action.ActionTwo"
);

two.commandHandlers = {
  Call: request => Response.create()
};
TypeScript
/*
 * Copyright 2021-2023 Lightbend Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import { Action, replies } from "@kalix-io/kalix-javascript-sdk";
import * as proto from "../lib/generated/proto";

type Request = proto.kalix.tck.model.action.Request;
type ProcessGroup = proto.kalix.tck.model.action.ProcessGroup;

const { Response, ProcessGroup } = proto.kalix.tck.model.action;

export const tckModel: Action = new Action(
    "proto/action.proto",
    "kalix.tck.model.action.ActionTckModel"
).setCommandHandlers({
  ProcessUnary: processUnary,
  ProcessStreamedIn: processStreamedIn,
  ProcessStreamedOut: processStreamedOut,
  ProcessStreamed: processStreamed
});

function processUnary(request: Request): replies.Reply {
  return createReplyForGroup(ProcessGroup.create(request.groups[0]));
}

function processStreamedIn(context: Action.StreamedInContext) {
  let reply = replies.emptyReply();
  context.on("data", request => {
    const replyForThisRequest = createReplyForGroup(request.groups[0]);
    if (!replyForThisRequest.isEmpty()) {
      // keep the last type of reply but pass along the effects
      if (reply.getEffects())
        replyForThisRequest.addEffects(reply.getEffects());
      reply = replyForThisRequest;
    } else if (replyForThisRequest.getEffects()) {
      // pass along the effects from empty reply, but keep the previous non-empty reply
      reply.addEffects(replyForThisRequest.getEffects());
    }
  });
  // last callback return value is sent back for stream in, if it is a Reply
  context.on("end", () => reply);
}

function processStreamedOut(
    request: Request,
    context: Action.StreamedOutContext
) {
  createReplies(request).forEach(reply => {
    // imperative send of Reply (since we could have 1:* for the incoming, and they can happen async?)
    context.reply(reply);
  });
  context.end();
}

function processStreamed(context: Action.StreamedCommandContext) {
  context.on("data", request => {
    createReplies(request).forEach(reply =>
        // imperative send of Reply (since we could have 1:* for the incoming, and they can happen async?)
        context.reply(reply)
    );
  });
  context.on("end", () => context.end());
}

function createReplies(request: Request): replies.Reply[] {
  return request.groups.map(group =>
      createReplyForGroup(ProcessGroup.create(group))
  );
}

function createReplyForGroup(group: ProcessGroup): replies.Reply {
  let reply = replies.emptyReply();
  group.steps?.forEach(step => {
    if (step.reply) {
      reply = replies.message(Response.create({ message: step.reply.message }));
    } else if (step.forward) {
      reply = replies.forward(two.service.methods.Call, {
        id: step.forward.id
      });
    } else if (step.effect) {
      reply.addEffect(
          two.service.methods.Call,
          { id: step.effect.id },
          step.effect.synchronous || false
      );
    } else if (step.fail) {
      reply = replies.failure(
          step.fail.message || "",
          replies.GrpcStatus.FailedPrecondition, // Optional parameter, sets the gRPC code
      );
    }
  });
  return reply;
}

export const two: Action = new Action(
    "proto/action.proto",
    "kalix.tck.model.action.ActionTwo"
).setCommandHandlers({
  Call: () => Response.create()
});