Implementing Views

You can access a single Entity with its Entity key. You might want to retrieve multiple Entities, or retrieve them using an attribute other than the key. Kalix Views allow you achieve this. By creating multiple Views, you can optimize for query performance against each one.

Views can be defined from any of the following:

In addition, this page describes:

Be aware that Views are not updated immediately when Entity state changes. Kalix does update Views as quickly as possible, but it is not instant and can take up to a few seconds for the changes to become visible in the query results. View updates might also take more time during failure scenarios than during normal operation.

The kalix-javascript-sdk GitHub repository includes an example of all views described on this page.

View from a Value Entity

Consider an example of a Customer Registry service with a customer Value Entity. When customer state changes, the entire state is emitted as a value change. Value changes update any associated Views. To create a View that lists customers by their name:

  1. Define the View service descriptor for a service that selects customers by name and associates a table name with the View. The table is created and used by Kalix to store the View, use any name for the table.

  2. Register the View.

This example assumes the following customer state is defined in a customer_domain.proto file:

syntax = "proto3";

package customer.domain;

message CustomerState {
  string customer_id = 1;
  string email = 2;
  string name = 3;
  Address address = 4;
}

message Address {
  string street = 1;
  string city = 2;
}

Define the View service descriptor

To get a view of multiple customers by their name, define the View as a service in Protobuf. The descriptor defines:

  • How to update the View

  • The source of View data

  • A table attribute that can be any name. Use this name in the query SELECT statement for the View.

  • The query that returns customers by name

syntax = "proto3";

package customer.view;

import "customer_domain.proto";
import "kalix/annotations.proto";
import "google/protobuf/any.proto";

service CustomerByName {

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { (1)
    option (kalix.method).eventing.in = { (2)
      value_entity: "customers"
    };
    option (kalix.method).view.update = { (3)
      table: "customers"
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) { (4)
    option (kalix.method).view.query = { (5)
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}

message ByNameRequest {
  string customer_name = 1;
}
1 The UpdateCustomer method defines how Kalix will update the view.
2 The source of the View is the "customers" Value Entity. This identifier is defined in the @ValueEntity(entityType = "customers") annotation of the Value Entity.
3 The (kalix.method).view.update annotation defines that this method is used for updating the View. You must define the table attribute for the table to be used in the query. Pick any name and use it in the query SELECT statement.
4 The GetCustomers method defines the query to retrieve a stream of customers.
5 The (kalix.method).view.query annotation defines that this method is used as a query of the View.

If the query should return only one result, stream can be removed from the return type:

rpc GetCustomer(ByEmailRequest) returns (domain.CustomerState) { (1)
  option (kalix.method).view.query = {
    query: "SELECT * FROM customers WHERE email = :email"
  };
}
1 Without stream when expecting single result.

When no result is found, the request fails with gRPC status code NOT_FOUND. A streamed call completes with an empty stream when no result is found.

See Query syntax reference for examples of valid query syntax.

Register the View

In the View implementation, register the View with Kalix. In addition to passing the service descriptor and a unique identifier, pass any descriptors that define state. In this example, the customer_domain.proto descriptor defines the Value Entity state:

customer-value-entity-view.js
JavaScript
const View = require("@kalix-io/kalix-javascript-sdk").View;

const view = new View(
  ["customer_view.proto", "customer_domain.proto"],
  "customer.view.CustomersResponseByName",
  {
    viewId: "response-by-name"
  }
);

module.exports = view;
TypeScript
import { View } from "@kalix-io/kalix-javascript-sdk";

const view: View = new View(
  ["customer_view.proto", "customer_domain.proto"],
  "customer.view.CustomersResponseByName",
  {
    viewId: "response-by-name"
  }
);

export default view;

Invoke the addComponent function to register the view with the service. For example:

index.js
JavaScript
const server = new Kalix();
server.addComponent(require("./customer-value-entity-view"))
server.addComponent(require("./customer-value-entity"))
server.start()
TypeScript
new Kalix()
  .addComponent(customerValueEntity)
  .addComponent(customerValueEntityView)
  .start();

View from Event Sourced Entity

Construct Event Sourced Entity Views from the events that the Entity emits. Build a state representation from the events and Query them. Using a Customer Registry service example, to create a View for querying customers by name:

The example assumes a customer_domain.proto file that defines the events that will update the View when a name changes:

syntax = "proto3";

package customer.domain;

message CustomerCreated {
  CustomerState customer = 1;
}

message CustomerNameChanged {
  string new_name = 1;
}

message CustomerAddressChanged {
  Address new_address = 1;
}

Define the View descriptor

A view descriptor:

  • Defines update methods for events.

  • Provides the source of the View.

  • Enables transformation updates.

  • Specifies a table attribute used by Kalix to store the View. Pick any name and use it in the Query SELECT statement for the View.

The following example customer_view.proto file defines a View to consume the CustomerCreated and CustomerNameChanged events. It must ignore all other events.

syntax = "proto3";

package customer.view;

import "customer_domain.proto";
import "kalix/annotations.proto";
import "google/protobuf/any.proto";

service CustomerByNameView {
  rpc ProcessCustomerCreated(domain.CustomerCreated) returns (domain.CustomerState) { (1)
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers" (2)
    };
    option (kalix.method).view.update = {
      table: "customers"
      transform_updates: true (3)
    };
  }

  rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (domain.CustomerState) { (4)
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers" (5)
    };
    option (kalix.method).view.update = {
      table: "customers"
      transform_updates: true (6)
    };
  }

  rpc IgnoreOtherEvents(google.protobuf.Any) returns (domain.CustomerState) { (7)
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
     };
     option (kalix.method).view.update = {
       table: "customers"
       transform_updates: true
     };
  };

  rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}
1 Define an update method for each event.
2 The source of the View is from the journal of the "customers" Event Sourced Entity. This identifier is defined in the @EventSourcedEntity(entityType = "customers") annotation of the Event Sourced Entity.
3 Enable transform_updates to be able to build the View state from the events.
4 One method for each event.
5 The same event_sourced_entity for all update methods. Note the required table attribute. Use any name, which you will reference in the query SELECT statement.
6 Enable transform_updates for all update methods.
7 Ignore events not relevant to this view.

See Query syntax reference for more examples of valid query syntax.

Implement transformation functions

Implement the View by defining the functions that transform events to View state and ignore other events:

customer-event-sourced-view.js
JavaScript
const CustomerState = view.lookupType("customer.domain.CustomerState")

view.setUpdateHandlers({ (1)
  ProcessCustomerCreated: customerCreated,
  ProcessCustomerNameChanged: customerNameChanged,
  IgnoreOtherEvents: ignoreOtherEvents
});

function customerCreated(event, state) {
  if (state)
    return state // already created
  else
    return CustomerState.create(event.customer)
}

function customerNameChanged(event, state) {
  state.name = event.newName
  return state
}

function ignoreOtherEvents(_event, state) {
  return state
}
TypeScript
const CustomerState = view.lookupType("customer.domain.CustomerState");

view.setUpdateHandlers({
  ProcessCustomerCreated: customerCreated,
  ProcessCustomerNameChanged: customerNameChanged,
  IgnoreOtherEvents: ignoreOtherEvents
});

function customerCreated(event: CustomerCreated, state: State) {
  if (state) return state;
  // already created
  else return CustomerState.create(event.customer || {});
}

function customerNameChanged(event: CustomerNameChanged, state: State) {
  state.name = event.newName;
  return state;
}

function ignoreOtherEvents(event: Any, state: State) {
  return state;
}
1 Each update method in the Protobuf definition should have a corresponding JavaScript function in view.setUpdateHandlers.

The first function parameter should correspond to the parameter in the Protobuf service call, that is, the event. You can optionally define a second parameter for the previous state. For the first event of an Event Sourced Entity or for the first change of a Value Entity there is no previous state and null is used for the state parameter. The function may also take a UpdateHandlerContext parameter.

This type of update transformation is a natural fit for Events emitted by an Event Sourced Entity, but it can also be used for Value Entities. For example, if the View representation is different from the Entity state you might want to transform it before presenting the View to the client.

Register the View

In the implementation, register the View with Kalix:

customer-event-sourced-view.js
JavaScript
const View = require("@kalix-io/kalix-javascript-sdk").View;

const view = new View(
  ["customer_view.proto", "customer_domain.proto"],
  "customer.view.CustomerByNameView", // or CustomerByNameViewFromTopic
  {
    viewId: "by-name"
  }
);

module.exports = view;
TypeScript
import { View } from "@kalix-io/kalix-javascript-sdk";

const view: View = new View(
  ["customer_view.proto", "customer_domain.proto"],
  "customer.view.CustomerByNameView", // or CustomerByNameViewFromTopic
  {
    viewId: "by-name"
  }
);
export default view;

Invoke the addComponent function to register the view with the service. For example:

index.js
JavaScript
const server = new Kalix();
server.addComponent(require("./customer-event-sourced-entity"))
server.addComponent(require("./customer-event-sourced-view"))
server.start()
TypeScript
new Kalix()
  .addComponent(customerEventSourcedEntity)
  .addComponent(customerEventSourcedEntityView)
  .start();

View from a topic

The source of a View can be an eventing topic. You define it in the same way as described in View from Event Sourced Entity or View from a Value Entity, but leave out the eventing.in annotation in the Protobuf.

syntax = "proto3";

package customer.view;

import "customer_domain.proto";
import "kalix/annotations.proto";
import "google/protobuf/any.proto";

service CustomerByNameViewFromTopic {
  rpc ProcessCustomerCreated(domain.CustomerCreated) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      topic: "customers" (1)
    };
    option (kalix.method).view.update = {
      table: "customers"
      transform_updates: true
    };
  }

  rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      topic: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers"
      transform_updates: true
    };
  }

  rpc IgnoreOtherEvents(google.protobuf.Any) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
     };
     option (kalix.method).view.update = {
       table: "customers"
       transform_updates: true
     };
  };

  rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}
1 This is the only difference from View from Event Sourced Entity.

How to transform results

To obtain different results than shown in the examples above, you can transform them:

Relational projection

Instead of using SELECT * you can define the columns to use in the response message:

syntax = "proto3";

package customer.view;

import "customer_domain.proto";
import "kalix/annotations.proto";
import "google/protobuf/any.proto";

message CustomerSummary {
  string id = 1;
  string name = 2;
}

service CustomerSummaryByName {
  rpc GetCustomers(ByNameRequest) returns (stream CustomerSummary) {
    option (kalix.method).view.query = {
      query: "SELECT customer_id AS id, name FROM customers WHERE name = :customer_name"
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }
}

Similarly, you can include values from the request message in the response, such as :request_id:

SELECT :request_id, customer_id as id, name FROM customers WHERE name = :customer_name

Response message including the result

Instead of streamed results, you can include the results in a repeated field in the response message:

message CustomersResponse {
  repeated domain.CustomerState results = 1; (1)
}

service CustomersResponseByName {
  rpc GetCustomers(ByNameRequest) returns (CustomersResponse) { (2)
    option (kalix.method).view.query = {
      query: "SELECT * AS results FROM customers WHERE name = :customer_name" (3)
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }
}
1 The response message contains a repeated field.
2 The return type is not streamed.
3 The repeated field is referenced in the query with * AS results.

How to test a View

View tests need to create gRPC clients for both the Entity and the View. For example:

JavaScript
const { IntegrationTestkit } = require("@kalix-io/testkit");
const should = require("chai").should();

const testkit = new IntegrationTestkit();
testkit.addComponent(require("../customer-value-entity"))
testkit.addComponent(require("../customer-value-entity-view"))

function client() {
  return testkit.clients.CustomerService;
}
TypeScript
import { IntegrationTestkit } from "@kalix-io/testkit";
import { expect } from "chai";
import customerValueEntity from "../src/customer-value-entity";
import customerValueEntityView from "../src/customer-value-entity-view";
import { customer as customerApi } from "../lib/generated/customer_api";
import { customer as customerView } from "../lib/generated/customer_view";

type CustomerService = customerApi.api.CustomerService;
type CustomersResponseByName = customerView.view.CustomersResponseByName;

type ServiceMethods<S extends protobuf.rpc.Service> = {
  [M in keyof S]: S[M] extends (x: any) => Promise<any> ? M : never;
}[keyof S];

type CustomerServiceAsync = {
  [M in ServiceMethods<CustomerService> as `${M}Async`]: (
    ...args: Parameters<CustomerService[M]>
  ) => ReturnType<CustomerService[M]>;
};

type CustomersResponseByNameAsync = {
  [M in ServiceMethods<CustomersResponseByName> as `${M}Async`]: (
    ...args: Parameters<CustomersResponseByName[M]>
  ) => ReturnType<CustomersResponseByName[M]>;
};

const testkit = new IntegrationTestkit()
  .addComponent(customerValueEntity)
  .addComponent(customerValueEntityView);

function client(): CustomerServiceAsync {
  return testkit.clients.CustomerService;
}

Since Views do not immediately update on changes, add a retry to make sure the test doesn’t fail unnecessarily. For example:

JavaScript
function view() {
  return testkit.clients.CustomersResponseByName;
}

describe("Customer registry service", function() {

  this.timeout(60000);

  before(done => testkit.start(done));
  after(done => testkit.shutdown(done));

  this.retries(10); // in case view has not updated yet
  beforeEach(function(done) { // add a delay between retries
    if (this.currentTest.currentRetry() > 0) {
      setTimeout(done, this.currentTest.currentRetry() * 1000);
    } else {
      done();
    }
  });
TypeScript
function view(): CustomersResponseByNameAsync {
  return testkit.clients.CustomersResponseByName;
}

describe("Customer registry service", function () {
  this.timeout(60000);

  before(done => testkit.start(done));
  after(done => testkit.shutdown(done));

  this.retries(10); // in case view has not updated yet
  beforeEach(function (done) {
    // add a delay between retries
    // @ts-ignore
    if (this.currentTest.currentRetry() > 0) {
      // add a delay between retries
      // @ts-ignore
      setTimeout(done, this.currentTest.currentRetry() * 1000);
    } else {
      done();
    }
  });

Provide some data:

JavaScript
  const alice = {customerId: "alice", email: "alice@example.com", name: "Alice", address: {street: "The Street", city: "The Big City"}};
  const bob = {customerId: "bob", email: "bob@somewhere.com", name: "Bob", address: {street: "The Road", city: "The Small City"}};
  const alice2 = {customerId: "alice2", email: "alice@somewhere.com", name: "Alice", address: {street: "The Avenue", city: "The Big City"}};
  const otherAlice = {customerId: "alice2", email: "alice@somewhere.com", name: "Other Alice", address: {street: "The Avenue", city: "The Big City"}};
TypeScript
  const alice = {
    customerId: "alice",
    email: "alice@example.com",
    name: "Alice",
    address: { street: "The Street", city: "The Big City" }
  };
  const bob = {
    customerId: "bob",
    email: "bob@somewhere.com",
    name: "Bob",
    address: { street: "The Road", city: "The Small City" }
  };
  const alice2 = {
    customerId: "alice2",
    email: "alice@somewhere.com",
    name: "Alice",
    address: { street: "The Avenue", city: "The Big City" }
  };
  const otherAlice = {
    customerId: "alice2",
    email: "alice@somewhere.com",
    name: "Other Alice",
    address: { street: "The Avenue", city: "The Big City" }
  };

Exercise the View:

JavaScript
  it("should get existing customers", async () => {
    (await client().getCustomerAsync({customerId: "alice"})).should.deep.equal(alice);
    (await client().getCustomerAsync({customerId: "bob"})).should.deep.equal(bob);
    (await client().getCustomerAsync({customerId: "alice2"})).should.deep.equal(alice2);
  });

  it("should lookup customers by name", async () => {
    (await view().getCustomersAsync({customerName: "Alice"})).results.should.have.deep.members([alice, alice2]);
    (await view().getCustomersAsync({customerName: "Bob"})).results.should.have.deep.members([bob]);
  });
TypeScript
  it("should get existing customers", async () => {
    expect(
      await client().getCustomerAsync({ customerId: "alice" })
    ).to.deep.equal(alice);
    expect(
      await client().getCustomerAsync({ customerId: "bob" })
    ).to.deep.equal(bob);
    expect(
      await client().getCustomerAsync({ customerId: "alice2" })
    ).to.deep.equal(alice2);
  });

  it("should lookup customers by name", async () => {
    expect(
      await view().getCustomersAsync({ customerName: "Alice" })
    ).to.deep.equal({ results: [alice, alice2] });
    expect(
      await view().getCustomersAsync({ customerName: "Bob" })
    ).to.deep.equal({ results: [bob] });
  });

Find the complete test example on GitHub.

How to modify a View

Kalix creates indexes for the View based on the query. For example, the following query will result in a View with an index on the name column:

SELECT * FROM customers WHERE name = :customer_name

If the query is changed, Kalix might need to add other indexes. For example, changing the above query to filter on the city would mean that Kalix needs to build a View with the index on the city column.

SELECT * FROM customers WHERE address.city = :city

Such changes require you to define a new View. Kalix will then rebuild it from the source event log or value changes.

Views from topics cannot be rebuilt from the source messages, because it’s not possible to consume all events from the topic again. The new View will be built from new messages published to the topic.

Rebuilding a new View may take some time if there are many events that have to be processed. The recommended way when changing a View is multi-step, with two deployments:

  1. Define the new View, and keep the old View intact. A new View is defined by a new service in Protobuf and different viewId when Register the View. Keep the old registerView.

  2. Deploy the new View, and let it rebuild. Verify that the new query works as expected. The old View can still be used.

  3. Remove the old View definition and rename the service to the old name if the public API is compatible.

  4. Deploy the second change.

The View definitions are stored and validated when a new version is deployed. There will be an error message if the changes are not compatible.

Drop obsolete view data

The data for old Views, that are no longer actively used, can be dropped using the kalix CLI service view commands.

A summary of all views for a running service can be listed using the views list command:

> kalix service views list customer-registry
NAME               ACTIVE   LAST UPDATED
CustomerByName     false    1d
CustomerByNameV2   true     5m

Any views that are inactive and no longer needed can be dropped using the views drop command:

> kalix service views drop customer-registry CustomerByName
The data for view 'CustomerByName' of service 'customer-registry' has successfully been dropped.

> kalix service views list customer-registry
NAME               ACTIVE   LAST UPDATED
CustomerByNameV2   true     10m

Query syntax reference

Define View queries in a language that is similar to SQL. The following examples illustrate the syntax for a customers entity, where the .proto file defines the table attribute as customers. To retrieve:

  • All customers without any filtering conditions (no WHERE clause):

    SELECT * FROM customers
  • Customers with a name matching the customer_name property of the request:

    SELECT * FROM customers WHERE name = :customer_name
  • Customers with matching customer_name AND city properties of the request:

    SELECT * FROM customers WHERE name = :customer_name AND address.city = :city
  • Customers in a city matching a literal value:

    SELECT * FROM customers WHERE address.city = 'New York'

Filter predicates

Use filter predicates in WHERE conditions to further refine results.

Comparison operators

The following comparison operators are supported:

  • = equals

  • != not equals

  • > greater than

  • >= greater than or equals

  • < less than

  • <= less than or equals

Logical operators

Combine filter conditions with the AND operator, and negate using the NOT operator. Group conditions using parentheses.

OR support is currently disabled, until it can be more efficiently indexed.
SELECT * FROM customers WHERE
  name = :customer_name AND NOT (address.city = 'New York' AND age > 65)

Array operators

Use IN or = ANY to check whether a value is contained in a group of values or in an array column or parameter (a repeated field in the Protobuf message).

Use IN with a list of values or parameters:

SELECT * FROM customers WHERE email IN ('bob@example.com', :some_email)

Use = ANY to check against an array column (a repeated field in the Protobuf message):

SELECT * FROM customers WHERE :some_email = ANY(emails)

Or use = ANY with a repeated field in the request parameters:

SELECT * FROM customers WHERE email = ANY(:some_emails)

Pattern matching

Use LIKE to pattern match on strings. The standard SQL LIKE patterns are supported, with _ (underscore) matching a single character, and % (percent sign) matching any sequence of zero or more characters.

SELECT * FROM customers WHERE name LIKE 'Bob%'
For index efficiency, the pattern must have a non-wildcard prefix or suffix. A pattern like '%foo%' is not supported. Given this limitation, only constant patterns with literal strings are supported; patterns in request parameters are not allowed.

Use the text_search function to search text values for words, with automatic tokenization and normalization based on language-specific configuration. The text_search function takes the text column to search, the query (as a parameter or literal string), and an optional language configuration.

text_search(<column>, <query parameter or string>, [<configuration>])

If the query contains multiple words, the text search will find values that contain all of these words (logically combined with AND), with tokenization and normalization automatically applied.

The following text search language configurations are supported: 'danish', 'dutch', 'english', 'finnish', 'french', 'german', 'hungarian', 'italian', 'norwegian', 'portuguese', 'romanian', 'russian', 'simple', 'spanish', 'swedish', 'turkish'. By default, a 'simple' configuration will be used, without language-specific features.

SELECT * FROM customers WHERE text_search(profile, :search_words, 'english')
Text search is currently only available for deployed services, and can’t be used in local testing.

Data types

The following data types are supported, for their corresponding Protobuf types. Arrays are created for a repeated field in a Protobuf message. Timestamps can be stored and compared using the google.protobuf.Timestamp message type.

Data type Protobuf type

Text

string

Integer

int32

Long (Big Integer)

int64

Float (Real)

float

Double

double

Boolean

bool

Byte String

bytes

Array

repeated fields

Timestamp

google.protobuf.Timestamp

Sorting

Results for a view query can be sorted. Use ORDER BY with view columns to sort results in ascending (ASC, by default) or descending (DESC) order.

If no explicit ordering is specified in a view query, results will be returned in the natural index order, which is based on the filter predicates in the query.

SELECT * FROM customers WHERE name = :name AND age > :min_age ORDER BY age DESC
Some orderings may be rejected, if the view index cannot be efficiently ordered. Generally, to order by a column it should also appear in the WHERE conditions.

Paging

Reading through a query result one "page" at a time rather than returning the entire result at once is possible in two ways, with a count based offset.

In both cases OFFSET and LIMIT are used.

OFFSET specifies at which offset in the result to start

LIMIT specifies a maximum number of results to return

Count based offset

The values can either be static, defined up front in the query:

SELECT * FROM customers LIMIT 10

Or come from fields in the request message:

SELECT * FROM customers OFFSET :start_from LIMIT :max_customers

Note: Using numeric offsets can lead to missing or duplicated entries in the result if entries are added to or removed from the view between requests for the pages.

Check if there are more pages

To check if there are more pages left, you can use the function has_more() providing a boolean value for the result. This works both for the count offset paging and also when only using LIMIT without any OFFSET:

SELECT * AS customers, has_more() AS more_customers FROM customers LIMIT 10

This query will return more_customers = true when the view contains more than 10 customers.

Total count of results

To get the total number of results that will be returned over all pages, use COUNT(*) in a query that projects its results into a field. The total count will be returned in the aliased field (using AS) or otherwise into a field named count.

SELECT * AS customers, COUNT(*) AS total, has_more() AS more FROM customers LIMIT 10