Implementing Actions

Actions are stateless functions that can be used to implement different uses cases, such as:

  • a pure function.

  • request conversion - you can use Actions to convert incoming data into a different format before forwarding a call to a different component.

  • as a face or controller to fan out to multiple calls to different components.

  • publish messages to a Topic.

  • subscribe to events from an Event Sourced Entity.

  • subscribe to state changes from a Value Entity.

  • schedule and cancel Timers.

Actions can be triggered in multiple ways. For example, by:

  • a gRPC service call.

  • an HTTP service call.

  • a forwarded call from another component.

  • a scheduled call from a Timer.

  • an incoming message from a Topic.

  • an incoming event from an Event Sourced Entity, from within the same service or from a different service.

  • state changes notification from a Value Entity on the same service.

  • a service life-cycle event (e.g. on startup).

Action’s Effect API

The Action’s Effect defines the operations that Kalix should perform when an incoming message is handled by an Action.

An Action Effect can either:

  • reply with a message to the caller

  • reply with a message to be published to a topic (in case the method is a publisher)

  • forward the message to another component

  • return an error

  • ignore the call

Actions as Pure Functions

In this first example, you will learn how to implement an Action as a pure stateless function. You will create a FibonacciAction that takes a number and returns the next number in the Fibonacci series.

To implement this action you need the following:

  • Extend our class from kalix.javasdk.action.Action. This is generic. No matter what action you want to create you always need to extend from Action new tab.

  • Add the Spring annotation @RequestMapping to provide a REST endpoint for the function. Here the stateless function should be reachable via HTTP.

  • Add the Spring annotations @GetMapping and @PostMapping to provide paths for GET and POST to calculate the Fibonacci of a number. Both functions do the same thing and implementation-wise the function exposed with GET calls the function exposed with POST.

src/main/java/com/example/fibonacci/FibonacciAction.java
import kalix.javasdk.action.Action;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.PathVariable;
@RequestMapping("/fibonacci")
public class FibonacciAction extends Action {

  private boolean isFibonacci(long num) {  (1)
    Predicate<Long> isPerfectSquare = (n) -> {
      long square = (long) Math.sqrt(n);
      return square * square == n;
    };
    return isPerfectSquare.test(5*num*num + 4) || isPerfectSquare.test(5*num*num - 4);
  }
  private long nextFib(long num) {   (2)
    double result = num * (1 + Math.sqrt(5)) / 2.0;
    return Math.round(result);
  }

  @GetMapping("/{number}/next")
  public Effect<Number> getNumber(@PathVariable Long number) { (3)
    return nextNumber(new Number(number));
  }

  @PostMapping("/next")
  public Effect<Number> nextNumber(@RequestBody Number number) {
    long num =  number.value();
    if (isFibonacci(num)) {                                     (4)
      return effects().reply(new Number(nextFib(num)));
    } else {
      return effects()                                          (5)
        .error("Input number is not a Fibonacci number, received '" + num + "'", INVALID_ARGUMENT);
    }
  }
}
1 isFibonacci checks if a number is a Fibonacci number.
2 nextFib calculates the next number.
3 This nextNumber implementation calls the nextNumber implementation below.
4 The nextNumber implementation first checks if the input number belongs to the Fibonacci series. If so, it calculates the next number and builds a reply using effects().reply().
5 Otherwise, if the input number doesn’t belong to the Fibonacci series, it builds an Effect reply error.

Actions return effects (i.e. Action.Effect) and there are different types of effects: a reply, an error, a forward call to another component, and to all of those you can add side effects. Here you want only the result of the calculation or an error. Therefore, you are using .reply and .error.

Multiple replies / reply streaming

An Action may return a stream of integers. To do this you need to define the return type as reactor.core.publisher.Flux<Effect<Integer>>.

The stream may publish an arbitrary number of replies.

Actions as Controllers

Actions can be used to implement MVC Controllers by acting as the external interface of a service, receiving requests, operating over the requests values and forwarding the call to other components in the same service.

To illustrate how you can use an Action as a Controller, we will build on top of a Value Entity used to implement a Shopping Cart example, adding a new Action to the existing shopping cart service.

If you are hearing about ValueEntity for the first time, be sure to visit Implementing Value Entities before continuing.

Below you can find a summary of the shopping cart value entity we will use in this chapter: it contains only the signatures of the available endpoints for brevity:

src/main/java/com/example/api/ShoppingCartEntity.java
@Id("cartId")
@TypeId("shopping-cart")
@RequestMapping("/cart/{cartId}") (1)
public class ShoppingCartEntity extends ValueEntity<ShoppingCart> {

  @PostMapping("/create") (2)
  public ValueEntity.Effect<ShoppingCartDTO> create() {
    //...

  @PostMapping("/items/add") (3)
  public ValueEntity.Effect<ShoppingCartDTO> addItem(@RequestBody LineItemDTO addLineItem) {
    //...

  @GetMapping (4)
  public ValueEntity.Effect<ShoppingCartDTO> getCart() {
    //...
}
1 Common path being used: /cart/ suffixed with a cartId.
2 POST endpoint exposed at (…​)/create used to create a new cart with cartId.
3 POST endpoint exposed at (…​)/items/add allowing to add an item to an cart.
4 GET endpoint for retrieving the state of a cart.

Forwarding Commands

The forward effect allows you to transform or further validate an incoming request before passing it on to another component and have the response message directly passed back to the client making the request. The response from the forwarded operation must have the same response type as the original request.

In this example it accepts requests with the same structure as the create endpoint listed above, by receiving a LineItemDTO, but add some additional verification of the request and only conditionally forward the request to the entity if the verification is successful:

src/main/java/com/example/api/ShoppingCartController.java
import kalix.javasdk.Metadata;
import kalix.javasdk.action.Action;
import kalix.javasdk.action.Action.Effect;
import kalix.javasdk.annotations.ForwardHeaders;
import kalix.javasdk.client.ComponentClient;

@RequestMapping("/carts")
public class ShoppingCartController extends Action {

  private final ComponentClient componentClient;

  public ShoppingCartController(ComponentClient componentClient) {
    this.componentClient = componentClient; (1)
  }

  @PostMapping("/{cartId}/items/add") (2)
  public Action.Effect<ShoppingCartDTO> verifiedAddItem(@PathVariable String cartId,
                                                        @RequestBody LineItemDTO addLineItem) {
    if (addLineItem.name().equalsIgnoreCase("carrot")) { (3)
      return effects().error("Carrots no longer for sale"); (4)
    } else {
      var deferredCall = componentClient.forValueEntity(cartId)
        .call(ShoppingCartEntity::addItem)
        .params(addLineItem); (5)
      return effects().forward(deferredCall); (6)
    }
  }
1 ComponentClient is injected on the constructor. It will be used to build calls to the underlining Entity.
2 Expose the command handler as a POST endpoint at specified path.
3 Check if the added item is carrots.
4 If it is "carrots" immediately return an error, disallowing adding the item.
5 For allowed requests, use componentClient to get a deferred call to the entity.
6 The deferredCall is then used with effects().forward() to forward the request to the entity.
You might be wondering what the componentClient is about. For now, think of it as a lightweight, type safe, HTTP client allowing you to reach out to other Kalix services. All details can be found at Component and Service Calls chapter.

Forwarding Headers

By default, Kalix does not forward gRPC/HTTP headers to Kalix components. This can be overridden with the @ForwardHeaders annotation.

src/main/java/com/example/api/ShoppingCartController.java
@RequestMapping("/carts")
@ForwardHeaders("UserRole") (1)
public class ShoppingCartController extends Action {

  @DeleteMapping("/{cartId}")
  public Effect<String> removeCart(@PathVariable String cartId,
                                   @RequestHeader("UserRole") String userRole) { (2)

    var userRoleFromMeta = actionContext().metadata().get("UserRole").get(); (3)

    Metadata metadata = Metadata.EMPTY.add("Role", userRole);
    return effects().forward(
        componentClient.forValueEntity(cartId)
            .call(ShoppingCartEntity::removeCart)
            .withMetadata(metadata)); (4)
  }
}
1 Specify headers names that should be forwarded to this component.
2 Access the header value with the @RequestHeader annotation.
3 Alternatively, the header value can be retrieved from the Metadata associated with this call.
4 Forward different headers to the next call, by using withMetadata method of from DeferredCall class.

Transform Request and Response to Another Component

The asyncReply and asyncEffect effects allow you to process and transform a request before calling another component and then also transform the response.

As an example, let us look at the problem of creating a new entity with an id generated by another component.

This example implements an initializeCart command for the controller Action which returns the generated id that can subsequently be used to interact with the cart.

src/main/java/com/example/api/ShoppingCartController.java
  @PostMapping("/create")
  public Effect<String> initializeCart() {
    final String cartId = UUID.randomUUID().toString(); (1)
    CompletionStage<ShoppingCartDTO> shoppingCartCreated =
        componentClient.forValueEntity(cartId)
            .call(ShoppingCartEntity::create) (2)
            .execute(); (3)


    // transform response
    CompletionStage<Effect<String>> effect =
        shoppingCartCreated.handle((empty, error) -> { (4)
          if (error == null) {
            return effects().reply(cartId); (5)
          } else {
            return effects().error("Failed to create cart, please retry"); (6)
          }
        });

    return effects().asyncEffect(effect); (7)
  }
1 Generate a new UUID.
2 Use the componentClient to create a call to endpoint create on the shopping cart.
3 execute() on the deferred call immediately triggers a call and returns a CompletionStage for the response.
4 Once the call succeeds or fails the CompletionStage is completed or failed, we can transform the result from CompletionStage<Empty>. to CompletionStage<Effect<String>> using handle.
5 On a successful response, create a reply effect passing back the cartId.
6 If the call leads to an error, create an error effect asking the client to retry.
7 effects().asyncEffect() allows us to reply with a CompletionStage<Effect<String>>.

The action generates a UUID to use as entity id for the shopping cart. UUIDs are extremely unlikely to lead to the same id being generated, but to completely guarantee two calls can never be assigned the same shopping cart we make use of the "boundary of consistency" provided by the entity - the entity will only process a single command at a time and can safely make decisions based on its state - for example to only allow creation once by storing something in its state signifying that it has been created.

In this case you mark that the entity has been created using a creation timestamp in the shopping cart state stored on first create call - when the timestamp has the default value of 0. If the cart has already been stored with a timestamp it returns an error effect:

src/main/java/com/example/api/ShoppingCartEntity.java
  @PostMapping("/create") (2)
  public ValueEntity.Effect<ShoppingCartDTO> create() {
    //...
    if (currentState().creationTimestamp() > 0L) {
      return effects().error("Cart was already created");
    } else {
      var newState = currentState().withCreationTimestamp(Instant.now().toEpochMilli());
      return effects()
        .updateState(newState)
        .thenReply(ShoppingCartDTO.of(newState));
    }
  }

Composing calls

The async call shown in the previous section, can also be used to chain or compose multiple calls to a single action response.

This example builds on the previous cart creation by adding an initial item in the cart once it has been created, but before it returns the new id to the client:

src/main/java/com/example/api/ShoppingCartController.java
  @PostMapping("/prepopulated")
  public Action.Effect<String> createPrePopulated() {
    final String cartId = UUID.randomUUID().toString();
    CompletionStage<ShoppingCartDTO> shoppingCartCreated =
      componentClient.forValueEntity(cartId).call(ShoppingCartEntity::create).execute();

    CompletionStage<ShoppingCartDTO> cartPopulated =
      shoppingCartCreated.thenCompose(empty -> { (1)
        var initialItem = new LineItemDTO("e", "eggplant", 1);

        return componentClient.forValueEntity(cartId)
          .call(ShoppingCartEntity::addItem)
          .params(initialItem) (2)
          .execute(); (3)
      });

    CompletionStage<String> reply = cartPopulated.thenApply(ShoppingCartDTO::cartId); (4)

    return effects()
        .asyncReply(reply); (5)
  }
1 CompletionStage#thenCompose allow you to perform an additional async operation, returning a CompletionStage once the current one completes successfully.
2 Create a request to add an initial item to the cart.
3 Execute the addItem call returns a CompletionStage<ShoppingCartDTO> once it succeeds.
4 Transform the successful completion of addItem with ShoppingCartDTO to the response type of this method - String.
5 effects().asyncReply() lets us reply once the CompletionStage<String> completes.

In this sample it is safe to base a subsequent call to the entity on the reply of the previous one, no client will know of the cart id until createPrePopulated replies.

There is no transaction or consistency boundary outside of the entity, so for a sequence of calls from an action to an entity, the state of the entity could be updated by other calls it receives in-between.

For example, imagine an action that for a cart id retrieves the state using getState to verify if too many items are already in the cart, and once that has been verified, it adds the item to the cart.

src/main/java/com/example/api/ShoppingCartController.java
  @PostMapping("/{cartId}/unsafeAddItem")
  public Action.Effect<String> unsafeValidation(@PathVariable String cartId,
                                                @RequestBody LineItemDTO addLineItem) {
    // NOTE: This is an example of an anti-pattern, do not copy this
    CompletionStage<ShoppingCartDTO> cartReply =
      componentClient.forValueEntity(cartId).call(ShoppingCartEntity::getCart).execute(); (1)

    CompletionStage<Action.Effect<String>> effect = cartReply.thenApply(cart -> {
      int totalCount = cart.items().stream()
          .mapToInt(LineItemDTO::quantity)
          .sum();

      if (totalCount < 10) {
        return effects().error("Max 10 items in a cart");
      } else {
        CompletionStage<String> addItemReply = componentClient.forValueEntity(cartId).call(ShoppingCartEntity::addItem).params(addLineItem)
          .execute().thenApply(ShoppingCartDTO::cartId);
        return effects()
          .asyncReply(addItemReply); (2)
      }
    });

    return effects().asyncEffect(effect);
  }
1 Between this call returning.
2 And this next call to the same entity, the entity could accept other commands that change the total count of items in the cart.

The problem with this is that a POST /cart/my-cart/items/add call directly to the entity happening between the GET /cart/my-cart action returning and the subsequent "addItem" call from the action would lead to more items in the cart than the allowed limit.

Such validation depending on state can only safely be done handling the command inside of the entity.

Actions as Life-cycle Hooks

An Action method can be triggered automatically when some predefined service life-cycle event happens (currently, only on startup is available), serving as a custom hook. For such use, the method needs to be public, annotated with @Trigger.OnStartup and cannot receive any parameters, as shown below.

The on startup hook is called every time a service instance boots up. This can happen for very different reasons: restarting / redeploying the service, scaling up to more instances or even without any user-driven action (e.g. Kalix Runtime versions being rolled out, infrastructure-related incidents, etc.). Therefore, you should carefully consider how you use this hook and its implementation.
public class OnStartupAction extends Action { (1)

  @PostMapping("/init")
  @Trigger.OnStartup( (2)
      maxRetries = 3) (3)
  public Action.Effect<String> init() { (4)
    // Do some initial operations here
    return effects().reply("Done");
  }

}
1 Only methods belonging to an Action can be configured as a hook.
2 This hook will be triggered once the instance startup is concluded (i.e. will be called 3 times if 3 instances are running).
3 Optionally, set the max amount of retries to 3.
4 The method must be public and receive no parameters.
If the call to the hook returns a failure and the maxRetries is set to a value greater than the default value (0), a number of retry calls will be executed with a fixed delay up until the configure amount is reached.

Running Side Effects

Emitting effects on another component

An Entity or an Action may also emit one or more side effects. A side effect is something whose result has no impact on the result of the current command—​if it fails, the current command still succeeds. The result of the side effect is therefore ignored. When used from inside an Entity, side effects are only performed after the successful completion of any state actions requested by the command handler.

There is no guarantee that a side effect will be executed successfully. If a failure occurs after the command is fully handled, effects might not be executed. Side effects are not retried in case of failures.

Side effects may be declared as synchronous or asynchronous. Asynchronous commands run in a "fire and forget" fashion. The code flow of the caller (the command handler of the entity which emitted the asynchronous command) continues while the command is being asynchronously processed. Meanwhile, synchronous commands run sequentially, that is, the commands are processed in order, one at a time. The final result of the command handler, either a reply or a forward, is not sent until all synchronous commands are completed.

Use case: mobile notification

You might want to run side effects to notify interested parties of a change in state. For example, after a withdrawal is made from a bank account, an account entity could send a notification to the account owner’s mobile phone.

Emitting a side effect

To illustrate how you can emit a side effect, you can build on top of the Action as a Controller example. In that previous example, you build a controller around the Value Entity Counter and forwarded the incoming request after modifying it.

This time, instead of using a forward, you will call the entity using a side effect.

Implementing the Action

The class DoubleCounterAction listens to the counter state changes. When the counter value changes this action doubles it.

src/main/java/com/example/actions/DoubleCounterAction.java
public Action.Effect<Confirmed> increaseWithSideEffect(Integer increase) {
  var counterId = actionContext().eventSubject().get(); (1)
  var doubleIncrease = increase * 2; (2)
  var deferredCall = componentClient.forValueEntity(counterId)
    .call(CounterEntity::increaseBy)
    .params(new Number(doubleIncrease));
  return effects().reply(Confirmed.instance).addSideEffect(SideEffect.of(deferredCall));  (3)
}
1 Retrieving the id of the counter.
2 On incoming request, doubling the value of increase.
3 Building a reply using Confirmed.getDefaultInstance(). And attaching a side effect, i.e. calling to the Counter to increase double the previous amount.
the response of the side effect is ignored by the command meaning that even if the deferred call to the Counter entity fails, the Action reply will succeed.

Testing the Action

Unit tests

The following snippet shows how the ActionTestkit is used to test the FibonacciAction implementation.

With the ActionTestkit you can call the methods of FibonacciAction. Each call you pass over to the test kit returns an ActionResult that contains the effect produced by the underlying action method.

Actions are unique units of computation where no local state is shared with previous or subsequent calls. The framework does not reuse an Action instance but creates a new one for each command handled and therefore this is also how the test kit behaves.
Java
src/test/java/com/example/actions/FibonacciActionTest.java
import kalix.javasdk.testkit.ActionResult;
import kalix.javasdk.testkit.ActionTestkit;
import org.junit.jupiter.api.Test;

public class FibonacciActionTest {

  @Test
  public void testNextFib() {
    ActionTestkit<FibonacciAction> testkit = ActionTestkit.of(FibonacciAction::new); (1)
    ActionResult<Number> result = testkit.call(a -> a.getNumber(3L));  (2)
    assertTrue(result.isReply());
    assertEquals(5L, result.getReply().value());
  }

  @Test
  public void testNextFibError() {
    ActionTestkit<FibonacciAction> testkit = ActionTestkit.of(FibonacciAction::new);  (1)
    ActionResult<Number> result = testkit.call(a -> a.getNumber(4L));     (2)
    assertTrue(result.isError());
    assertTrue(result.getError().startsWith("Input number is not a Fibonacci number"));
  }
}
1 The test kit is created to allow us to test the Action’s method.
2 Calling nextNumber method with some value.

ActionResult

Calling an action method through the test kit gives us back an ActionResult new tab. This class has methods that you can use to assert your expectations, such as:

  • getReply() returns the reply message passed to effects().reply() or throws an exception failing the test, if the effect returned was not a reply.

  • getError() returns the error description when effects().error() was returned to signal an error.

  • getForward() returns details about what message was forwarded and where the call was forwarded (since it is a unit test the forward is not actually executed).

The side effects of an Action can NOT be tested in isolation at the moment.

Integration tests

Actions (like any other Kalix component) can be verified with integration tests. The Spring WebClient utility can be used to run any HTTP call to test Kalix components.

Java
src/it/java/com/example/fibonacci/FibonacciActionIntegrationTest.java
@SpringBootTest(classes = Main.class) (1)
public class FibonacciActionIntegrationTest extends KalixIntegrationTestKitSupport { (2)

  @Autowired
  private WebClient webClient;

  private Duration timeout = Duration.of(5, SECONDS);

  @Test
  public void calculateNextNumber() {

    ResponseEntity<Number> response = webClient.get()
      .uri("/fibonacci/5/next")
      .retrieve()
      .toEntity(Number.class)
      .block(timeout); (3)

    Assertions.assertEquals(HttpStatus.OK, response.getStatusCode());
    Assertions.assertEquals(8, response.getBody().value());
  }

}
1 Mark the test as a Spring integration tests.
2 Set up the Kalix infrastructure by extending KalixIntegrationTestKitSupport.
3 Use WebClient to call the Action component endpoint.

In cases where detailed assertions on the HTTP response are not required, the Kalix ComponentClient can be used in integration tests.

Java
src/it/java/com/example/fibonacci/FibonacciActionComponentClientIntegrationTest.java
@SpringBootTest(classes = Main.class)
public class FibonacciActionComponentClientIntegrationTest extends KalixIntegrationTestKitSupport {

  private Duration timeout = Duration.of(5, SECONDS);

  @Test
  public void calculateNextNumber() throws ExecutionException, InterruptedException, TimeoutException {

    Number response = componentClient.forAction() (1)
        .call(FibonacciAction::nextNumber)
        .params(new Number(5))
        .execute() (2)
        .toCompletableFuture()
        .get(timeout.toMillis(), MILLISECONDS);

    Assertions.assertEquals(8, response.value());
  }
}
1 Use the ComponentClient to call the Action component endpoint.
2 Transform the DeferredCall to a CompletionStage and wait for the response.
The integration tests in samples are under in a specific project profile it and can be run using mvn verify -Pit.