Implementing Views

You can access a single Entity with its Entity key. You might want to retrieve multiple Entities, and retrieve them using an attribute other than the key. Akka Serverless Views allow you achieve this. By creating multiple Views, you can optimize for query performance against each one.

Views can be defined from all of the following:

  • Event Sourced Entity events

  • Value Entities state changes

  • Messages received from subscribing to topics on a broker

Be aware that Views are not updated immediately when Entity state changes. Akka Serverless does update Views as quickly as possible, but it is not instant and can take up to a few seconds for the changes to become visible in the query results. View updates might also take more time during failure scenarios than during normal operation.

Creating a View from a Value Entity

Using an example of a customer registry, you can define a Customer Value Entity in Protobuf as:

message CustomerState {
  string customer_id = 1;
  string email = 2;
  string name = 3;
  Address address = 4;
}

message Address {
  string street = 1;
  string city = 2;
}

When Customer state changes, the entire state will be emitted as a value change, which will update any associated Views.

To get a View of multiple customers by their name, define the view service in Protobuf:

service CustomerByName {
  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { (1)
    option (akkaserverless.method).eventing.in = { (2)
      value_entity: "customers"
    };
    option (akkaserverless.method).view.update = { (3)
      table: "customers"
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) { (4)
    option (akkaserverless.method).view.query = { (5)
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}

message ByNameRequest {
  string customer_name = 1;
}
1 The UpdateCustomer method defines how the View is updated.
2 The source of the View is the "customers" Value Entity. This identifier is defined in the @ValueEntity(entityType = "customers") annotation of the Value Entity.
3 The (akkaserverless.method).view.update annotation defines that this method is used for updating the View. The table property must be defined and corresponds to the table used in the query. You can use any name for the table.
4 The GetCustomers method defines the query to retrieve a stream of Customer.
5 The (akkaserverless.method).view.query annotation defines that this method is used as a query of the View.

If the query is supposed to return only one result, remove the stream from the return type:

rpc GetCustomer(ByEmailRequest) returns (domain.CustomerState) { (1)
  option (akkaserverless.method).view.query = {
    query: "SELECT * FROM customers WHERE email = :email"
  };
}
1 Without stream when expecting single result.

When no result is found, the request will fail with gRPC status code NOT_FOUND. A streamed call would complete with an empty stream when no result is found.

Querying a View

You define View queries in a language that is similar to SQL. The following examples illustrate the syntax.

To get all customers without any filtering conditions (no WHERE clause):

SELECT * FROM customers

To get customers with a name matching the customer_name property of the request message:

SELECT * FROM customers WHERE name = :customer_name

To get customers matching the customer_name AND city properties of the request message:

SELECT * FROM customers WHERE name = :customer_name AND address.city = :city

To get customers in a city matching a literal value:

SELECT * FROM customers WHERE address.city = 'New York'

You can use the following filter predicates to further refine results:

  • = equals

  • != not equals

  • > greater than

  • >= greater than or equals

  • < less than

  • <= less than or equals

You can combine filter conditions with the AND and OR operators.

SELECT * FROM customers WHERE
  name = :customer_name AND address.city = 'New York' OR
  name = :customer_name AND address.city = 'San Francisco'

Registering a View

Once you’ve defined a View, register it with the AkkaServerless server, by invoking the registerView method. In addition to passing the service descriptor of the View, and a unique identifier of the View, you also need to pass any descriptors that you use for events, for example, the domain.proto descriptor.

new AkkaServerless()
    .registerView(
        CustomerViewModel.getDescriptor().findServiceByName("CustomerByName"),
        "customerByName",
        CustomerDomain.getDescriptor())

Creating a View from an Event Sourced Entity

The previous example derived a View from a Value Entity and used state changes to update the View. In contrast, to create a View from an Event Sourced Entity, you use events that the Entity emits to build a state representation.

Protobuf definition

Like the Value Entity example above, this View will provide a way to query customers. The Protobuf file defines the following events that will update the View:

message CustomerCreated {
  CustomerState customer = 1;
}

message CustomerNameChanged {
  string new_name = 1;
}

The following lines in the .proto file define a View to consume the CustomerCreated and CustomerNameChanged events:

service CustomerByNameView {
  rpc ProcessCustomerCreated(domain.CustomerCreated) returns (domain.CustomerState) { (1)
    option (akkaserverless.method).eventing.in = {
      event_sourced_entity: "customers" (2)
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
      transform_updates: true (3)
    };
  }

  rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (domain.CustomerState) { (4)
    option (akkaserverless.method).eventing.in = {
      event_sourced_entity: "customers" (5)
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
      transform_updates: true (6)
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) {
    option (akkaserverless.method).view.query = {
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}
1 Define an update method for each event.
2 The source of the View is from the journal of the "customers" Event Sourced Entity. This identifier is defined in the @EventSourcedEntity(entityType = "customers") annotation of the Event Sourced Entity.
3 Enable transform_updates to be able to build the View state from the events.
4 One method for each event.
5 Same event_sourced_entity for all update methods.
6 Enable transform_updates for all update methods.

The query definition works in the same way as described in the Querying a View section.

Update transformation class

Next, you need to define a Java class that transforms events to state that can be used in the View:

import com.akkaserverless.javasdk.view.UpdateHandler;
import com.akkaserverless.javasdk.view.View;
import customer.domain.CustomerDomain;

import java.util.Optional;

@View (1)
public class CustomerView {

  @UpdateHandler (2)
  public CustomerDomain.CustomerState processCustomerCreated(
      CustomerDomain.CustomerCreated event, Optional<CustomerDomain.CustomerState> state) {
    if (state.isPresent()) {
      return state.get(); // already created
    } else {
      return event.getCustomer();
    }
  }

  @UpdateHandler (3)
  public CustomerDomain.CustomerState processCustomerNameChanged(
      CustomerDomain.CustomerNameChanged event, CustomerDomain.CustomerState state) {
    return state.toBuilder().setName(event.getNewName()).build();
  }
}
1 The class must have the @View annotation.
2 Each update method in the Protobuf definition should have a corresponding method in the Java class. The methods must have the @UpdateHandler annotation.
3 One method for each event.

The first method parameter should correspond to the parameter in the Protobuf service call, the event.

A second parameter can optionally be defined for the previous state. Its type corresponds to the return type of the Protobuf service call. It can be defined as Optional. For the first event of an Event Sourced Entity or for the first change of a Value Entity there is no previous state and then Optional.empty or null is used for the state parameter.

The method may also take an UpdateHandlerContext parameter.

Events from an Event Sourced Entity is the canonical use case for this kind of update transformation class, but it can also be used for Value Entities. For example, if the View representation is different from the Entity state.

Registering

Register the View class with AkkaServerless:

new AkkaServerless()
    .registerView(
        CustomerView.class,
        CustomerViewModel.getDescriptor().findServiceByName("CustomerByNameView"),
        "customerByName",
        CustomerDomain.getDescriptor())

Creating a View from a topic

The source of a View can be an eventing topic. You define it in the same way as shown in Creating a View from an Event Sourced Entity or Creating a View from a Value Entity, but leave out the eventing.in annotation in the Protobuf file.

service CustomerByNameViewFromTopic {
  rpc ProcessCustomerCreated(domain.CustomerCreated) returns (domain.CustomerState) {
    option (akkaserverless.method).eventing.in = {
      topic: "customers" (1)
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
      transform_updates: true
    };
  }

  rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (domain.CustomerState) {
    option (akkaserverless.method).eventing.in = {
      topic: "customers"
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
      transform_updates: true
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) {
    option (akkaserverless.method).view.query = {
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}
1 This is the only difference from Creating a View from an Event Sourced Entity.

Transform result

When creating a View, you can transform the results as a relational projection instead of using a SELECT * statement.

Relational projection

Instead of using SELECT * you can define what columns that will be used in the response message:

message CustomerSummary {
  string id = 1;
  string name = 2;
}

service CustomerSummaryByName {
  rpc GetCustomers(ByNameRequest) returns (stream CustomerSummary) {
    option (akkaserverless.method).view.query = {
      query: "SELECT customer_id AS id, name FROM customers WHERE name = :customer_name"
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (akkaserverless.method).eventing.in = {
      value_entity: "customers"
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
    };
  }
}

In a similar way, you can include values from the request message in the response, for example :request_id:

SELECT :request_id, customer_id as id, name FROM customers WHERE name = :customer_name

Response message including the result

Instead of streamed results you can include the results in a repeated field in the response message:

message CustomersResponse {
  repeated domain.CustomerState results = 1; (1)
}

service CustomersResponseByName {
  rpc GetCustomers(ByNameRequest) returns (CustomersResponse) { (2)
    option (akkaserverless.method).view.query = {
      query: "SELECT * AS results FROM customers WHERE name = :customer_name" (3)
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (akkaserverless.method).eventing.in = {
      value_entity: "customers"
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
    };
  }
}
1 The response message contains a repeated field.
2 The return type is not streamed.
3 The repeated field is referenced in the query with * AS results.

Changing a View

Akka Serverless creates indexes for the View based on the query. For example, the following query will result in a View with an index on the name column:

SELECT * FROM customers WHERE name = :customer_name

If the query is changed, Akka Serverless might need to add other indexes. For example, changing the above query to filter on the city would mean that Akka Serverless needs to build a View with the index on the city column.

SELECT * FROM customers WHERE address.city = :city

Such changes require you to define a new View. Akka Serverless will then rebuild it from the source event log or value changes.

Views from topics cannot be rebuilt from the source messages, because it’s not possible to consume all events from the topic again. The new View will be built from new messages published to the topic.

Rebuilding a new View may take some time if there are many events that have to be processed. The recommended way when changing a View is a two-step deployment.

  1. Define the new View, and keep the old View intact. A new View is defined by a new service in Protobuf and different viewId when Registering a View. Keep the old registerView.

  2. Deploy the new View, and let it rebuild. Verify that the new query works as expected. The old View can still be used.

  3. Remove the old View definition and rename the service to the old name if the public API is compatible.

  4. Deploy the second change.

The View definitions are stored and validated when a new version is deployed. There will be an error message if the changes are not compatible.