Implementing Views

Entities can be accessed by their individual Entity key, but in many cases you need to query the information for many entities by other attributes than the key. For that purpose you will use Views.

The approach of separating operations that write data from those that read the data is an architectural pattern called Command Query Responsibility Segregation (CQRS). Views can be defined from events of Event Sourced Entities, state changes of Value Entities or messages received from subscribing to topics on a broker. From the events or state changes you can maintain one or more representations that are optimized for queries.

Be aware that Views are not updated immediately by the entity changes. The goal is to update the Views as quickly as possible but it is not instant and may take up to a few seconds for the changes to become visible in the query results. For some failure scenarios it may take longer, but eventually the Views are updated.

View from Value Entity

Given a domain of a customer registry we can define a Customer in Protobuf as:

message Customer {
  string customer_id = 1 [(akkaserverless.field).entity_key = true];
  string email = 2;
  string name = 3;
  Address address = 4;
}

message Address {
  string street = 1;
  string city = 2;
}

The Customer can be a Value Entity and the changed state would be emitted as value changes, which are updating the View. We will look at View from Event Sourced Entity later.

Define the View as a service in Protobuf:

service CustomerByName {
  rpc UpdateCustomer(Customer) returns (Customer) { (1)
    option (akkaserverless.method).eventing = { (2)
      in: {
        value_changes: "customers"
      }
    };
    option (akkaserverless.method).view.update = { (3)
      table: "customers"
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream Customer) { (4)
    option (akkaserverless.method).view.query = { (5)
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}

message ByNameRequest {
  string customer_name = 1;
}
1 The UpdateCustomer method defines how the View is updated.
2 The source of the View is from the value changes of the "customers" Value Entity. This identifier is defined in the @ValueEntity(entityType = "customers") annotation of the Value Entity.
3 The (akkaserverless.method).view.update annotation defines that this method is used for updating the View. The table property must be defined and corresponds to the table used in the query.
4 The GetCustomers method defines the query to retrieve a stream of Customer.
5 The (akkaserverless.method).view.query annotation defines that this method is used as a query of the View.

If the query is supposed to return one result the stream can be removed from the return type:

rpc GetCustomer(ByEmailRequest) returns (Customer) { (1)
  option (akkaserverless.method).view.query = {
    query: "SELECT * FROM customers WHERE email = :email"
  };
}
1 Without stream when expecting single result.

When no result is found, the request will fail with gRPC status code NOT_FOUND. A streamed call would complete with an empty stream when no result is found.

Query

The query is defined in a query language that is similar to a subset of SQL. Let’s look at and explain a few example queries.

All customers without any filtering conditions (no WHERE clause):

SELECT * FROM customers

Customers with a name matching the customer_name property of the request message:

SELECT * FROM customers WHERE name = :customer_name

Customers with matching customer_name AND city properties of the request message:

SELECT * FROM customers WHERE name = :customer_name AND address.city = :city

Customers in city matching a literal value:

SELECT * FROM customers WHERE address.city = 'New York'

Possible filter predicates are:

  • = equals

  • != not equals

  • > greater than

  • >= greater than or equals

  • < less than

  • <= less than or equals

The filter conditions can be combined with AND/OR.

SELECT * FROM customers WHERE
  name = :customer_name AND address.city = 'New York' OR
  name = :customer_name AND address.city = 'San Francisco'

Registering the View

Once you’ve defined your View, you can register it with the AkkaServerless server, by invoking the registerView method. In addition to passing the service descriptor of the View, and a unique identifier of the View, you also need to pass any descriptors that you use for events, for example, the domain.proto descriptor.

new AkkaServerless()
    .registerView(
        CustomerViewModel.getDescriptor().findServiceByName("CustomerByName"),
        "customerByName",
        CustomerViewModel.getDescriptor())

View from Event Sourced Entity

In the above example, the source of the View was a Value Entity and then the state changes can be used automatically to update the View.

When using an Event Sourced Entity the changes are emitted as events. From these events we have to build a representation (state) for the View.

Protobuf definition

The events can be defined in Protobuf as:

message CustomerCreated {
  Customer customer = 1;
}

message CustomerNameChanged {
  string new_name = 1;
}

The View that is consuming these events is defined in Protobuf:

service CustomerByNameView {
  rpc ProcessCustomerCreated(CustomerCreated) returns (Customer) { (1)
    option (akkaserverless.method).eventing = {
      in: {
        event_log: "customers" (2)
      }
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
      transform_updates: true (3)
    };
  }

  rpc ProcessCustomerNameChanged(CustomerNameChanged) returns (Customer) { (4)
    option (akkaserverless.method).eventing = {
      in: {
        event_log: "customers" (5)
      }
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
      transform_updates: true (6)
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream Customer) {
    option (akkaserverless.method).view.query = {
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}
1 Define an update method for each event.
2 The source of the View is from the event log of the "customers" Event Sourced Entity. This identifier is defined in the @EventSourcedEntity(entityType = "customers") annotation of the Event Sourced Entity.
3 Enable transform_updates to be able to build the View state from the events.
4 One method for each event.
5 Same event_log for all update methods.
6 Enable transform_updates for all update methods.

The query definition works in the same way as described in the Query section.

Update transformation class

Define a Java class for implementing the transformation of events to View state.

import com.akkaserverless.javasdk.view.UpdateHandler;
import com.akkaserverless.javasdk.view.View;

import java.util.Optional;

@View (1)
public class CustomerView {

  @UpdateHandler (2)
  public CustomerViewModel.Customer processCustomerCreated(
      CustomerViewModel.CustomerCreated event, Optional<CustomerViewModel.Customer> state) {
    if (state.isPresent()) {
      return state.get(); // already created
    } else {
      return event.getCustomer();
    }
  }

  @UpdateHandler (3)
  public CustomerViewModel.Customer processCustomerNameChanged(
      CustomerViewModel.CustomerNameChanged event, CustomerViewModel.Customer state) {
    return state.toBuilder().setName(event.getNewName()).build();
  }
}
1 The class must have the @View annotation.
2 Each update method in the Protobuf definition should have a corresponding method in the Java class. The methods must have the @UpdateHandler annotation.
3 One method for each event.

The first method parameter should correspond to the parameter in the Protobuf service call, i.e. the event.

A second parameter can optionally be defined for the previous state. Its type corresponds to the return type of the Protobuf service call. It can be defined as Optional. For the first event of an Event Sourced Entity or for the first change of a Value Entity there is no previous state and then Optional.empty or null is used for the state parameter.

The method may also take a UpdateHandlerContext parameter.

NOTE

Events from an Event Sourced Entity is the canonical use case for this kind of update transformation class, but it can also be used for Value Entities. For example, if the View representation is different from the Entity state.

Registering

Register the View class in AkkaServerless:

new AkkaServerless()
    .registerView(
        CustomerView.class,
        CustomerViewModel.getDescriptor().findServiceByName("CustomerByNameView"),
        "customerByName",
        CustomerViewModel.getDescriptor())

View from topic

The source of a View can be an eventing topic. You define it in the same way as View from Event Sourced Entity or View from Value Entity aside from the eventing.in annotation in the Protobuf.

service CustomerByNameViewFromTopic {
  rpc ProcessCustomerCreated(CustomerCreated) returns (Customer) {
    option (akkaserverless.method).eventing = {
      in: {
        topic: "customers" (1)
      }
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
      transform_updates: true
    };
  }

  rpc ProcessCustomerNameChanged(CustomerNameChanged) returns (Customer) {
    option (akkaserverless.method).eventing = {
      in: {
        topic: "customers"
      }
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
      transform_updates: true
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream Customer) {
    option (akkaserverless.method).view.query = {
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}
1 This is the only difference from View from Event Sourced Entity.

Transform result

Relational projection

Instead of using SELECT * you can define what columns that will be used in the response message:

message CustomerSummary {
  string id = 1;
  string name = 2;
}

service CustomerSummaryByName {
  rpc GetCustomers(ByNameRequest) returns (stream CustomerSummary) {
    option (akkaserverless.method).view.query = {
      query: "SELECT customer_id AS id, name FROM customers WHERE name = :customer_name"
    };
  }

  rpc UpdateCustomer(Customer) returns (Customer) {
    option (akkaserverless.method).eventing = {
      in: {
        value_changes: "customers"
      }
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
    };
  }
}

In a similar way, you can include values from the request message in the response, for example :request_id:

SELECT :request_id, customer_id as id, name FROM customers WHERE name = :customer_name

Response message including the result

Instead of streamed results you can include the results in a repeated field in the response message:

message CustomersResponse {
  repeated Customer results = 1; (1)
}

service CustomersResponseByName {
  rpc GetCustomers(ByNameRequest) returns (CustomersResponse) { (2)
    option (akkaserverless.method).view.query = {
      query: "SELECT * AS results FROM customers WHERE name = :customer_name" (3)
    };
  }

  rpc UpdateCustomer(Customer) returns (Customer) {
    option (akkaserverless.method).eventing = {
      in: {
        value_changes: "customers"
      }
    };
    option (akkaserverless.method).view.update = {
      table: "customers"
    };
  }
}
1 The response message contains a repeated field.
2 The return type is not streamed.
3 The repeated field is referenced in the query with * AS results.

Changing the View

How the indexes for the View are stored is derived from the query.

For example, the following query will result in a View with an index on the name column.

SELECT * FROM customers WHERE name = :customer_name

If the query is changed, other indexes might be needed. For example, changing the above query to filter on the city would mean that a View with index on the city column would have to be built.

SELECT * FROM customers WHERE address.city = :city

Such changes require that a new View must be defined, and it will be rebuilt from the source event log or value changes.

WARNING

Views from topics cannot be rebuilt from the source messages, because it’s not possible to consume all events from the topic again. The new View will be built from new messages published to the topic.

Rebuilding a new View may take some time if there are many events that have to be processed. The recommended way when changing a View is a two-step deployment.

  1. Define the new View, and keep the old View intact. A new View is defined by a new service in Protobuf and different viewId when Registering the View. Keep the old registerView.

  2. Deploy the new View, and let it rebuild. Verify that the new query works as expected. The old View can still be used.

  3. Remove the old View definition and rename the service to the old name if the public API is compatible.

  4. Deploy the second change.

The View definitions are stored and validated when a new version is deployed. There will be an error message if the changes are not compatible.

Not supported yet

The following query capabilities are not supported yet, but will soon be added:

  • transformation function for filtering and transformation of the result in code

  • parentheses for AND / OR clauses

  • IS NULL and IS NOT NULL

  • limits, offset and paging

  • string comparators LIKE, STARTS_WITH

  • set comparators IN

  • comparisons with other column references