Akka Persistence across multiple data centers

This chapter describes how Akka Persistence can be used across multiple data centers, availability zones or regions.

Akka Persistence basics

The reference documentation describes all details of Akka Persistence but here is a short summary in case you are not familiar with the concepts.

Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor is started, restarted after a JVM crash or by a supervisor, or migrated in a cluster. The key concept behind Akka persistence is that only changes to an actor’s internal state are persisted but never its current state directly (except for optional snapshots). Such stateful actors are recovered by replaying stored changes to these actors from which they can rebuild internal state.

This design of capturing all changes as domain events, which are immutable facts of things that have happened, is known as even sourcing

Akka persistence supports event sourcing with the PersistentActor traitAbstractPersistentActor abstract class. An actor that extends this traitclass uses the persist method to persist and handle events. The behavior of a PersistentActoran AbstractPersistentActor is defined by implementing receiveRecovercreateReceiveRecover and receiveCommandcreateReceive. More details and examples can be found in the Akka documentation.

Motivation

There can be many reasons for using more than one data center, such as:

  • Redundancy to tolerate failures in one location and still be operational.
  • Serve request from a location near the user to provide better responsiveness.
  • Balance the load over many servers.

Akka Persistence is using event sourcing that is based on the single writer principle, which means that there can only be one active instance of a PersistentActor with a given persistenceId. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state.

This restriction means that the single persistent actor can only live in one data center and would not be available during network partitions between the data centers. It is difficult to safely fail over the persistent actor from one data center to the other because:

  • The underlying data store might not have replicated all data when network partition occured, meaning that some updates would be lost if starting the persistent actor in the other data center. It would be even more problematic if the data is later replicated when the network partition heals, resulting in similar problems as with multiple active persistent actors.
  • To avoid above problem with lost or delayed data one could write all data with QUORUM consistency level across all data centers, but that would be very slow.
  • Detecting problem and failing over to another data center takes rather long time if it should be done with high confidence. Using ordinary Cluster Sharding and Split Brain Resolver would mean downing all nodes in a data center, which is likely not desired. Instead, one would typically like to wait until the network partition heals and accept that communication between the data centers is not possible in the meantime.

Approach

What if we could relax the single writer principle and allow persistent actors to be used in an active-active mode? The consistency boundary that we get from the ordinary persistent actor is nice and we would like to keep that within a data center, but network partitions across different data centers should not reduce availability. In other words, we would like one persistent actor instance in each data center and the persisted events should be replicated across the data centers with eventual consistency. Eventually, all events will be consumed by replicas in other data centers.

This new type of persistent replicated actor is called ReplicatedEntity.

When there is no network partitions and no concurrent writes the events stored by a ReplicatedEntity in one data center can be replicated and consumed by another (corresponding) instance in another data center without any concerns. Such replicated events can simply be applied to the local state.

replicated-events1.png

The interesting part begins when there are concurrent writes by ReplicatedEntity instances in different data centers. That is more likely to happen when there is a network partition, but it can also happen when there are no network issues. They simply write at the “same time” before the events from the other side have been replicated and consumed.

replicated-events2.png

The ReplicatedEntity has support for 3 slightly different ways of resolving such conflicts.

  • Conflict Free Replicated Data Type (CRDT) - if the state of the actor can be modeled as a CRDT there are by definition no conflicts and the events can just be applied.
  • Last writer wins - if it’s acceptable to use a timestamp to decide which event to apply.
  • Custom merge function - the decision can be made by an application specific function.

These strategies are described in detail later in this documentation.

To detect conflicts the ReplicatedEntity is automatically tracking causality between events using vector clocks for last writer wins and custom merge function strategies. If there are no conflicts the replicated events are just applied to the local state. When using CRDTs there is no need for detecting conflicts with vector clocks.

To be able to support these things the ReplicatedEntity has a different API than the PersistentActor in Akka Persistence. The concepts should be familiar and migrating between the APIs should not be difficult. Events stored by a PersistentActor can be read by a ReplicatedEntity, meaning that it’s possible to migrate an existing application to use this feature. There are also migration paths back to PersistentActor if that would be needed. The API is similar to Lagom’s PersistentEntity, but it has the full power of an Actor if needed.

The solution is using existing infrastructure for persistent actors and Akka persistence plugins, meaning that much of it has been battle tested.

Cassandra is currently the only supported data store, but the solution is designed to allow for other future implmentations.

The replication mechanism of the events is taking advantage of the multi data center support that exists in Cassandra, i.e. the data is replicated by Cassandra.

Dependency

To use the fast failover feature a dependency on the akka-persistence-multi-dc artifact must be added.

sbt
"com.lightbend.akka" %% "akka-persistence-multi-dc" % "1.1-M2"
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-persistence-multi-dc_2.11', version: '1.1-M2'
}
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-persistence-multi-dc_2.11</artifactId>
  <version>1.1-M2</version>
</dependency>

You find your credentials at https://www.lightbend.com/product/lightbend-reactive-platform/credentials including links to instructions of how to add the credentials to your build.

Getting started

ReplicatedEntity stub

This is what a ReplicatedEntity class looks like before filling in the implementation details:

Scala
import akka.persistence.multidc.scaladsl.LwwReplicatedEntity

final class Post1 extends LwwReplicatedEntity[BlogCommand, BlogEvent, BlogState] {

  override def initialState: BlogState = BlogState.empty

  override def behavior: Behavior = Actions()

  override def applyEvent(currentState: BlogState, event: BlogEvent): BlogState = ???

}

In this example we are using LwwReplicatedEntity that is using last writer wins to resolve conflicts. There are other strategies as described in Resolving conflicting updates.

The three type parameters that the concrete ReplicatedEntity subclass must define:

  • Command - the super class/interface of the commands
  • Event - the super class/interface of the events
  • State - the class of the state

initialState is an abstract method that your concrete subclass must implement to define the State when the entity is first created.

behavior is an abstract method that your concrete subclass must implement. It returns the Behavior of the entity. Behavior is a function from current State to Actions, which defines command handlers.

Use Actions() to create an immutable builder for defining the behavior. The behavior functions process incoming commands as described in the following sections.

Command Handlers

The commands for this example:

Scala
final case class AddPost(postId: String, content: PostContent) extends BlogCommand

final case class AddPostDone(postId: String)

final case class GetPost(postId: String) extends BlogCommand

final case class ChangeBody(postId: String, newContent: PostContent) extends BlogCommand

The functions that process incoming commands are registered using onCommand of the Actions.

Scala
// Command handlers are invoked for incoming messages (commands).
// A command handler must "return" the events to be persisted (if any).
Actions()
  .onCommand[AddPost] { (cmd, ctx, state) =>
    ctx.thenPersist(PostAdded(entityId, cmd.content)) { evt =>
      // After persist is done additional side effects can be performed
      ctx.sender ! AddPostDone(entityId)
    }
  }

You should define one command handler for each command class that the entity can receive.

A command handler is a function with 3 parameters for the Command, the CommandContext and current State.

A command handler returns a Persist directive that defines what event or events, if any, to persist. Use the thenPersist, thenPersistAll or done methods of the context that is passed to the command handler function to create the Persist directive.

  • thenPersist will persist one single event
  • thenPersistAll will persist several events atomically, i.e. all events are stored or none of them are stored if there is an error
  • done no events are to be persisted

External side effects can be performed after successful persist in the afterPersist function. In the above example a reply is sent to the sender.

The command can be validated before persisting state changes. Note that current state is passed as parameter to the command handler function.

Scala
.onCommand[AddPost] { (cmd, ctx, state) =>
  if (cmd.content.title == null || cmd.content.title.equals("")) {
    ctx.sender ! Status.Failure(new IllegalArgumentException("Title must be defined"))
    ctx.done
  } else {
    ctx.thenPersist(PostAdded(entityId, cmd.content)) { _ =>
      // After persist is done additional side effects can be performed
      ctx.sender ! AddPostDone(entityId)
    }
  }
}

A ReplicatedEntity may also process commands that do not change application state, such as query commands or commands that are not valid in the entity’s current state (such as a bid placed after the auction closed). Such command handlers are registered using onReadOnlyCommand of the Actions. Replies are sent as ordinary actor messages to the sender of the context that is passed to the command handler function, or to any other ActorRef in the commands or state.

The onReadOnlyCommand is simply a convenience function that avoids you having to return no events followed by a side effect.

Scala
.onReadOnlyCommand[GetPost] { (_, ctx, state) =>
  ctx.sender ! state.content.get
}

The commands must be immutable to avoid concurrency issues that may occur from changing a command instance that has been sent.

You need to create a serializer for the commands so that they can be sent as remote messages in the Akka cluster. We recommend against using Java serialization.

Event Handlers

The events for this example:

Scala
final case class PostAdded(postId: String, content: PostContent) extends BlogEvent

final case class BodyChanged(postId: String, newContent: PostContent) extends BlogEvent

When an event has been persisted successfully the current state is updated by applying the event to the current state. The method for updating the state is applyEvent and it must be implemented by the concrete ReplicatedEntity class.

Scala
// applyEvent is used both when persisting new events, replaying
// events, and consuming replicated events.
override def applyEvent(currentState: BlogState, event: BlogEvent): BlogState = {
  event match {
    case PostAdded(postId, content) =>
      BlogState(Some(content), published = false)

    case BodyChanged(_, newContent) =>
      currentState.withContent(newContent)
  }
}

The event handler returns the new state. The state must be immutable, so you return a new instance of the state. Current state is passed as a parameter to the event handler function. The same event handler are also used when the entity is started up to recover its state from the stored events, and for consuming replicated events and updating the state from those.

The events must be immutable to avoid concurrency issues that may occur from changing an event instance that is about to be persisted.

You need to create a serializer for the events, which are stored. We recommend against using Java serialization. When picking serialization solution for the events you should also consider that it must be possible read old events when the application has evolved. Strategies for that can be found in the Akka documentation.

State

The state for this example:

Scala
object BlogState {
  val empty = BlogState(None, published = false)
}

final case class BlogState(content: Option[PostContent], published: Boolean) {
  def withContent(newContent: PostContent): BlogState =
    copy(content = Some(newContent))

  def isEmpty: Boolean = content.isEmpty
}

final case class PostContent(title: String, body: String)

final case class PostSummary(postId: String, title: String)

The state must be immutable to avoid concurrency issues that may occur from changing a state instance that is about to be saved as snapshot.

You need to create a serializer for the state, because it is stored as snapshot. We recommend against using Java serialization. When picking serialization solution for the snapshot you should also consider that it might be necessary to read old snapshots when the application has evolved. Strategies for that can be found in the Akka documentation. It is not mandatory to be able to read old snapshots. If it fails it will instead replay more old events, which might have a performance cost.

Changing Behavior

For simple entities you can use the same set of command handlers independent of what state the entity is in. The actions can then be defined like this:

Scala
override def behavior: Behavior =
  Actions()
    .onCommand[AddPost] { (cmd, ctx, state) =>
      ctx.thenPersist(PostAdded(entityId, cmd.content)) { evt =>
        // After persist is done additional side effects can be performed
        ctx.sender ! AddPostDone(entityId)
      }
    }
    .onReadOnlyCommand[GetPost] { (_, ctx, state) =>
      ctx.sender ! state.content.get
    }

When the state changes it can also change the behavior of the entity in the sense that new functions for processing commands may be defined. This is useful when implementing finite state machine (FSM) like entities. The Actions, the set of command handlers, can be selected based on current state. The return type of the behavior method is a function from current State to Actions. The reason Actions can be used as in the above example is because Actions itself is such a function returning itself for any State.

This is how to define different behavior for different State:

Scala
override def behavior: Behavior = {
  case state if state.isEmpty  => initial
  case state if !state.isEmpty => postAdded
}
Scala
private val initial: Actions = {
  // Command handlers are invoked for incoming messages (commands).
  // A command handler must "return" the events to be persisted (if any).
  Actions()
    .onCommand[AddPost] { (cmd, ctx, state) =>
      ctx.thenPersist(PostAdded(entityId, cmd.content)) { evt =>
        // After persist is done additional side effects can be performed
        ctx.sender ! AddPostDone(entityId)
      }
    }
    .onCommand[AddPost] { (cmd, ctx, state) =>
      if (cmd.content.title == null || cmd.content.title.equals("")) {
        ctx.sender ! Status.Failure(new IllegalArgumentException("Title must be defined"))
        ctx.done
      } else {
        ctx.thenPersist(PostAdded(entityId, cmd.content)) { _ =>
          // After persist is done additional side effects can be performed
          ctx.sender ! AddPostDone(entityId)
        }
      }
    }
}
Scala
private val postAdded: Actions = {
  Actions()
    .onCommand[ChangeBody] { (cmd, ctx, state) =>
      ctx.thenPersist(BodyChanged(entityId, cmd.newContent)) { _ =>
        ctx.sender ! Done
      }
    }
    .onReadOnlyCommand[GetPost] { (_, ctx, state) =>
      ctx.sender ! state.content.get
    }
}

Actions is an immutable builder and therefore you have great flexibility when it comes to how to structure the various command handlers and combine them to the final behavior. Note that Actions has an append method that is useful for composing actions.

The event handler is always the same independent of state. The main reason for not making the event handler part of the Actions is that replicated events may be delayed and all events should be handled independent of what the current state is.

Minimum configuration

There are a few configuration properties that are needed to enable this feature. Here are required configuration properties for running with a single Akka node and a local Cassandra server:

akka.actor {
  provider = cluster
}
akka.remote {
  netty.tcp {
    hostname = "127.0.0.1"
    port = 2552
  }
}
akka.cluster {
  seed-nodes = ["akka.tcp://ClusterSystem@127.0.0.1:2552"]
  multi-data-center.self-data-center = DC-A
}
akka.persistence {
  snapshot-store.plugin = "cassandra-snapshot-store"

  multi-data-center {
    all-data-centers = ["DC-A"]
  }
}

Running the entity

The ReplicatedEntity is not an Actor, but it is run by an actor and have the same message processing semantics as an actor, i.e. each command/message is processed sequentially, one at a time, for a specific entity instance. It also has the same semantics when persisting events as PersistentActor, i.e. incoming commands/messages are stashed until the persist is completed.

To start the entity you need to create the Props of the actor:

Scala
import akka.persistence.multidc.scaladsl.LwwReplicatedEntity
import akka.persistence.multidc.PersistenceMultiDcSettings

val props = LwwReplicatedEntity.props(
  persistenceIdPrefix = "blog",
  entityId = Option("post-1"),
  entityFactory = () => new Post2,
  settings = PersistenceMultiDcSettings(system))

The parameters to the props are:

  • persistenceIdPrefix - Prefix for the persistenceId. Empty string is a valid prefix.
  • entityId - The identifier of the entity. When used with Cluster Sharding this should be None and then the entityId is picked from the name of the actor.
  • entityFactory - Factory for creating a new instance of the entity. It has to be a factory so that a new instance is created in case the actor is restarted.
  • settings - Configuration settings.

The persistenceId is the concatenation of persistenceIdPrefix, entityId and the data center identifier, separated with |.

Then you can start the actor with actorOf:

Scala
val ref = system.actorOf(props)
ref ! AddPost("post-1", PostContent(title = "First post", "..."))

and send commands as messages via the ActorRef.

ReplicatedEntity is typically used together with Cluster Sharding and then the Props is registered with the ClusterSharding extension and commands sent via the ActorRef of the ShardRegion like this:

Scala
import akka.persistence.multidc.scaladsl.LwwReplicatedEntity
import akka.persistence.multidc.PersistenceMultiDcSettings
import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion

val maxNumberOfShards = 1000
val extractEntityId: ShardRegion.ExtractEntityId = {
  case cmd: BlogCommand => (cmd.postId, cmd)
}
val extractShardId: ShardRegion.ExtractShardId = {
  case cmd: BlogCommand =>
    (math.abs(cmd.postId.hashCode) % maxNumberOfShards).toString
}

val props = LwwReplicatedEntity.props(
  persistenceIdPrefix = "blog",
  entityId = None,
  entityFactory = () => new Post2,
  settings = PersistenceMultiDcSettings(system))

val region =
  ClusterSharding(system).start("blog", props, ClusterShardingSettings(system),
    extractEntityId, extractShardId)

region ! AddPost("post-1", PostContent(title = "First post", "..."))

Resolving conflicting updates

Conflict Free Replicated Data Types

Writing custom code to resolve conflicts can be complicated to get right. One well-understood technique to create eventually-consistent systems is to model your state as a Conflict Free Replicated Data Type, a CRDT.

You can have your state extend OpCrdt to make explicit that it is intended to implement an Operation-based CRDT. The rule for Operation-based CRDT’s is that the operations must be commutative &emdash; in other words, applying the same events (which represent the operations) in any order should always produce the same final state. You may assume each event is applied only once, with causal delivery order.

FIXME make this true https://github.com/typesafehub/akka-commercial-addons/issues/93

A simple example would be a bank account that accepts deposits and withdrawals. Its state could be represented by a Counter which is is an OpCrdt shipped with this library. Note that this is distinct from the CRDT’s implemented in Akka Distributed Data, which are State-based rather than Operation-based.

The commands and events used in this actor are:

Scala
sealed trait Command
final case class Deposit(amount: Int) extends Command
final case class Withdraw(amount: Int) extends Command
final case object GetBalance extends Command
final case class Balance(amount: Int)
sealed trait Event
final case class Deposited(amount: Int) extends Event
final case class Withdrawn(amount: Int) extends Event

Based on these, we can easily create a CrdtReplicatedEntity by converting our events to operations on the Counter data type:

Scala
class BankAccount extends CrdtReplicatedEntity[BankAccount.Command, BankAccount.Event, Counter.Updated, Counter] {
  import BankAccount._

  override def initialState: Counter = Counter.empty

  override def convertEvent(event: Event): Counter.Updated = event match {
    case Deposited(amount) => Counter.Updated(amount)
    case Withdrawn(amount) => Counter.Updated(-amount)
  }

  override def behavior: Behavior = {
    Actions()
      .onCommand[Deposit] { (cmd, ctx, state) =>
        ctx.thenPersist(Deposited(cmd.amount)) { state2 =>
          ctx.sender() ! Balance(state2.value.intValue)
        }
      }
      .onCommand[Withdraw] { (cmd, ctx, state) =>
        cmd match {
          case Withdraw(amount) =>
            ctx.thenPersist(Withdrawn(amount)) { state2 =>
              ctx.sender() ! Balance(state2.value.intValue)
            }
        }
      }
      // it is also possible to use partial pattern match (with extra allocation)
      //.onCommand[Withdraw] {
      //  case (Withdraw(amount), ctx, state) =>
      // ...
      .onReadOnlyCommand[GetBalance.type] { (GetBalance, ctx, state) =>
        ctx.sender() ! Balance(state.value.intValue)
      }
  }

}
def props(settings: PersistenceMultiDcSettings): Props =
  CrdtReplicatedEntity.props("account", entityId = None, () => new BankAccount, settings)

Last writer wins

Last writer wins means that when there is a conflict the event is used if the timestamp of the event is later (higher) than the timestamp of previous local update, otherwise the event is discarded. In case of equal timestamps the data center identifiers are compared and the one sorted first in alphanumberical order is picked.

This relies on synchronized clocks and should only be used when the choice of value is not important for concurrent updates occurring within the clock skew.

Causality between events are tracked with vector clocks and the timestamp is only used if a conflict between local vector clock and the vector clock in the event is detected. If there is no conflict the event is applied without checking the timestamp, because it’s known that it happened after the previous local update.

lww.png

The nature of last writer wins means that the events must represent an update of the full state, otherwise there is a risk that the state in different data centers will be different and not eventually converge to the same state.

An example of that would be an entity representing a blog post and the fields auhtor and title could be updated separately with events AuthorChanged(newAuthor: String) and TitleChanged(newTitle: String).

Let’s say the blog post is created and the initial state of title=Akka, author=unknown is in sync in both data centers DC-A and DC-B.

In DC-A author is changed to “Bob” at time 100. Before that event has been replicated over to DC-B the title is updated to “Akka News” at time 101 in DC-B. When the events have been replicated the result will be:

DC-A: The title update is later so the event is used and new state is title=Akka News, author=Bob

DC-B: The author update is earlier so the event is discarded and state is title=Akka News, author=unknown

The problem here is that the partial update of the state is not applied on both sides, so the states have diverged and will not become the same.

To solve this with last writer wins the events must carry the full state, such as AuthorChanged(newContent: PostContent) and TitleChanged(newContent: PostContent). Then the result would eventually be title=Akka News, author=unknown on both sides. The author update is lost but that is because the changes were performed concurrently. More important is that the state is eventually consistent.

The above Getting starting example is using last writer wins.

Merge function

TODO: We are looking for use cases where an entity implemented with a custom merge function would be easier or better than an entity implemented as a CRDT. Let us know if you have an example or reason for why we should provide this alternative.

The idea is that the entity would implement this method for merging conflicting events:

def merge(currentState: State, replicatedEvent: ReplicatedEvent[Event]): State

One difference compared to the CRDT approach is that this merge would only be called when conflict was detected by vector clocks, otherwise the event would be applied to current state with applyEvent.

Side effects

The general recommendation for external side effects is to perform them after the event has been persisted, in the function that as passed to ctx.thenPersist or ctx.thenPersistAll.

Side effects from the event handler is generally discouraged, because the event handlers are also used during replay and when consuming replicated events and that would result in undesired re-execution of the side effects.

recoveryCompleted can be a good place to based on current state after recovery retry the side effects that were intended to be performed but have not been confirmed yet. You would typically persist one event for the intention to perform the side effect before doing it, and then store an acknowledgment event when it has been confirmed by the destination that is was performed.

One can try best effort to only perform side effects once but to achieve effectively once-and-only-once the destination must be able to de-duplicate retried requests (or the side effect is idempotent in other ways).

Coordination between different data centers for deciding where a side effect should be performed is not provided by the ReplicatedEntity. You have to use another tool for such consensus decisions, e.g. ZooKeeper. You could also simply decide that such side effects should only be performed by one data center, but you would still have to handle duplicate attempts.

For some use cases it can be useful to trigger side effects after consuming replicated events, e.g. when an auction has been closed in all data centers and all bids have been replicated. TODO: The API for that is still missing.

How it works

You don’t have to read this section to be able to use the feature, but to use the abstraction efficiently and for the right type of use cases it can be good to understand how it’s implemented. For example, it should give you the right expectations of the overhead that the solution introduces compared to using plain Akka Persistence in one data center.

Storage and replication

Each ReplicatedEntity instance has it’s own unique persistenceId and thereby it’s own event log. The persisted events are written with LOCAL_QUORUM write consistency and also read with LOCAL_QUORUM during recovery. LOCAL_QUORUM means that it requires successful writes/reads from a majority of Cassandra replicas in the local data center.

The persistenceId is constructed by concatenating the entityId with the identifier of the data center where the ReplicatedEntity is running. In another data center another ReplicatedEntity instance for the same entityId may be running and it will have a different persistenceId, and event log.

When a ReplicatedEntity is started it starts a stream that queries events from the event log of corresponding replicated entities in the other data centers. To do that it needs to know the persistenceId for the other instances. Those are known by the configuration property akka.persistence.multi-dc.all-data-centers and the concatenation convention.

This stream is infinite and restarted with backoff in case of failures. The stream is essentially periodically polling Cassandra for new data for each persistenceId. To be able to scale to many entities the polling frequency is adjusted dynamically based on which entities that are active. More about that in a moment.

The events found by this stream is what we call replicated events. The actual replication is ordinary Cassandra replication across data centers.

When an event is persisted by a ReplicatedEntity some additional meta data is stored together with the event. The meta data is stored in the meta column in the journal table used by akka-persistence-cassandra. The reason for storing the meta data in a separate column instead of wrapping the original event is that it should be seamless to migrate away from this tool, if needed, and still be able to read the events without any additional dependencies.

The meta data for each event contains:

  • timestamp
  • data center identifier
  • vector clock (not stored for CRDT)

When a ReplicatedEntity consumes a replicated event from this stream it does slightly different things depending on conflict resolution strategy. For CRDT entities the event can be applied immediately. For last writer wins and custom merge function it checks if there is a conflict by comparing the local vector clock with the vector clock in the event.

After applying the replicated event it is also persisted in the event log of the consuming ReplicatedEntity. Additional meta data is stored with this event also, which shows that it is a handled event to break the replication cycle. Those handled events are replayed when a ReplicatedEntity is recovering and that is the way it knows the sequence number to start at when starting the replicated events stream.

This means that each event is stored one additional time in each data center.

As mentioned above the replicated events stream is polling Cassandra for new data for each persistenceId. To reduce this polling to the entities that are active a notification mechanism is used. When an event has been stored a notification is stored in a separate table in Cassandra. Those notifications are aggregated and written in the background with consistency level ONE. Delivery of the notifications doesn’t have to be reliable. In the other data center those notifications are read periodically to find which entities that are active and would have new events to read. For inactive entities the polling of new events are only done at a low frequency, in case the notifications are not delivered.

Causal delivery order

FIXME make this true https://github.com/typesafehub/akka-commercial-addons/issues/93

Causal delivery order means that events persisted in one data center are read in the same order in other data centers. The order of concurrent events is undefined, which should be no problem when using CRDT’s and otherwise will be detected.

For example:

DC-1: write e1
DC-2: read e1, write e2
DC-1: read e2, write e3

In the above example the causality is e1 -> e2 -> e3. Also in a third data center DC-3 these events will be read in the same order e1, e2, e3.

Another example with concurrent events:

DC1: write e1
DC2: read e1, write e2
DC1: write e3 (e2 and e3 are concurrent)
DC1: read e2
DC2: read e3

e2 and e3 are concurrent, i.e. they don’t have a causal relation: DC1 sees them in the order “e1, e3, e2”, while DC2 sees them as “e1, e2, e3”.

A third data center may also see the events as either “e1, e3, e2” or as “e1, e2, e3”.

Detecting conflicts

To detect conflicts the ReplicatedEntity is automatically tracking causality between events from different data centers using vector clocks.

This is used for last writer wins and custom merge function strategies. If there are no conflicts the replicated events are just applied to the local state. When using CRDTs there is no need for tracking causality.

causality.png

Each data center “owns” a slot in the vector clock and increase its counter when an event is persisted. The vector clock is stored with the event, and when a replicated event is consumed the vector clock of the event is merged with the local vector clock.

When comparing two vector clocks v1 and v2 get one of the following results:

  • v1 is SAME as v2 iff for all i v1(i) == v2(i)
  • v1is BEFORE v2 iff for all i v1(i) <= v2(i) and there exist a j such that v1(j) < v2(j)
  • v1is AFTER v2 iff for all i v1(i) >= v2(i) and there exist a j such that v1(j) > v2(j)
  • v1is CONCURRENT with v2 otherwise

Setting up Cassandra for multi data centers

See Cassandra documentation for production deployments.

For local development and test a single local Cassandra server can be used.

The CassandraLauncher can be useful in tests.

CCM is useful for running a local Cassandra cluster.

Failures

If persistence of an event fails the problem is logged and the actor will unconditionally be stopped.

The reason that it cannot resume when persist fails is that it is unknown if the event was actually persisted or not, and therefore it is in an inconsistent state. Restarting on persistent failures will most likely fail anyway since the journal is probably unavailable. It is better to stop the actor and after a back-off timeout start it again. The akka.pattern.BackoffSupervisor actor is provided to support such restarts.

See Akka documentation of how to use the `BackoffSupervisor.

Snapshots

TODO

Custom CRDT implementation

TODO

Testing

TODO

Refactoring Consideration

If you change the class name of a ReplicatedEntity you have to override entityTypeName and retain the original name because this name is part of the key of the store data (it is part of the persistenceId of the underlying PersistentActor). By default the entityTypeName is using the short class name of the concrete ReplicatedEntity class. Therefore it can be good to define a stable entityTypeName up front.

Migration from/to PersistentActor

TODO data

TODO API

Configuration

Example of the most important settings, including settings from other related Akka libraries:

akka.actor {
  provider = cluster
}
akka.remote {
  netty.tcp {
    # Change this to real hostname for production
    hostname = "host1"
    port = 2552
  }
}
akka.cluster {
  # Change this to real hostname for production
  seed-nodes = ["akka.tcp://ClusterSystem@host1:2552", "akka.tcp://ClusterSystem@host2:2552"]

  # Change this to the Akka data center this node belongs to
  multi-data-center.self-data-center = DC-A
}
akka.persistence {
  snapshot-store.plugin = "cassandra-snapshot-store"

  multi-data-center {
    all-data-centers = ["DC-A", "DC-B"]
  }
}
cassandra-journal-multi-dc {
  # Change this to real hostname for production
  contact-points = ["host3", "host4"]
  # Port of contact points in the Cassandra cluster.
  port = 9042

  keyspace = "myapp"

  replication-strategy = "NetworkTopologyStrategy"

  # Replication factor list for data centers
  data-center-replication-factors = ["dc1:3", "dc2:3"]

  # Change this to the Cassandra data center this node belongs to,
  # note that Akka data center identifiers and Cassandra data center
  # identifiers are not the same.
  local-datacenter = "dc1"
}

A full reference of the configuration settings available can be found here:


akka.persistence.multi-data-center { all-data-centers = [] # Configuration id of the journal plugin servicing replicated persistent actors. # When configured, uses this value as absolute path to the journal configuration entry. # Configuration entry must contain few required fields, such as `class`. # See `src/main/resources/reference.conf` in akka-persistence. journal-plugin-id = "cassandra-journal-multi-dc" replicated-events-query-journal-plugin-id = "cassandra-query-journal-multi-dc" } cassandra-journal-multi-dc = ${cassandra-journal} # include default settings cassandra-journal-multi-dc { class = "akka.persistence.multidc.internal.CassandraReplicatedEventJournal" # The query journal to use when recovering query-plugin = "cassandra-query-journal-multi-dc" # Write consistency level # The default read and write consistency levels ensure that persistent actors can read their own writes. # During normal operation, persistent actors only write to the journal, reads occur only during recovery. write-consistency = "LOCAL_QUORUM" # Read consistency level read-consistency = "LOCAL_QUORUM" low-frequency-read-events-interval = 30 s notification { write-interval = 1 s read-interval = 500 ms look-for-old = 10 minutes publish-delay = 250 ms additional-random-publish-delay = 250 ms write-consistency = "ONE" write-retries = 2 read-consistency = "ONE" read-retries = 2 } } cassandra-query-journal-multi-dc = ${cassandra-query-journal} # include default settings cassandra-query-journal-multi-dc { class = "akka.persistence.multidc.CassandraReadJournalProvider" # Absolute path to the write journal plugin configuration section write-plugin = "cassandra-journal-multi-dc" }

Additional Examples

Auction Example

Having to use CRDTs for replication might seem like a severe constraint because the set of predefined CRDTs is quite limited. In this example we want to show that real-world applications can be implemented by designing events in a way that they don’t conflict. In the end, you will end up with a solution based on a custom CRDT.

We are building a small auction service. An auction is represented by one replicated actor. It should have the following operations:

  • Place a bid
  • Get the currently highest bid
  • Finish the auction

We model those operations as commands to be sent to the auction actor:

case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String)

// commands
sealed trait AuctionCommand
case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand
case object Finish extends AuctionCommand // An auction coordinator needs to schedule this event to each replica
case object GetHighestBid extends AuctionCommand

The auction entity is an event-sourced persistent actor. These events are used to persist state changes:

// events
sealed trait AuctionEvent
case class BidRegistered(bid: Bid) extends AuctionEvent
case class AuctionFinished(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent

You may have noticed here, that we include the highestCounterOffer in the AuctionFinished event. This is because we use a popular auction style where the winner does not have to pay the highest bidden price but only just enough to beat the second highest bid.

Let’s have a look at the auction entity that will handle incoming commands:

case class AuctionSetup(
  name:       String,
  initialBid: Bid // the initial bid is basically the minimum price bidden at start time by the owner
)

class AuctionEntity(auctionSetup: AuctionSetup)
  extends CrdtReplicatedEntity[AuctionCommand, AuctionEvent, AuctionEvent, AuctionState] {

  override def initialState: AuctionState =
    AuctionState(
      stillRunning = true,
      highestBid = auctionSetup.initialBid,
      highestCounterOffer = auctionSetup.initialBid.offer
    )

  override def behavior: Behavior = {
    Actions()
      .onCommand[OfferBid] {
        case (OfferBid(bidder, offer), ctx, _) =>
          ctx.thenPersist(BidRegistered(Bid(bidder, offer, Instant.now(), selfDc)))()
      }
      .onCommand[Finish.type] {
        case (Finish, ctx, state) =>
          ctx.thenPersist(AuctionFinished(selfDc, state.highestBid, state.highestCounterOffer))()
      }
      .onReadOnlyCommand[GetHighestBid.type] { (_, ctx, state) =>
        ctx.sender() ! state.highestBid.copy(offer = state.highestCounterOffer)
      }
  }

  def convertEvent(event: AuctionEvent): AuctionEvent = event
}

The auction entity is started with the initial parameters for the auction. As seen before, replicated entities need to be parameterized with the types for commands and events and also for the internal state.

Our entity derives from CrdtReplicatedEntity. This means that we have to provide a state implementation which is an OpCrdt, an operation-based CRDT. In our case that’s the custom AuctionState which we’ll come back to later.

In the initialState method, a replicated entity needs to define its original state. In our case, it’s straightforward to initialize the initial state from our initialization parameters as given in the AuctionSetup instance. The minimum bid is in our case modelled as an initialBid.

The behavior defines how to react to external commands. In our case, for OfferBid and AuctionFinished we do nothing more than to emit events corresponding to the command. For GetHighestBid we respond with details from the state. Note, that we overwrite the actual offer of the highest bid here with the amount of the highestCounterOffer. This is done to follow the popular auction style where the actual highest bid is never publicly revealed.

In a CrdtReplicatedEntity, events are handled by the CRDT implementation. The entity itself can define a conversion when you want to use a predefined CRDT with your own events. In our case, no conversion is necessary as we provide our own custom CRDT.

Let’s have a look at our state class, AuctionState which also represents the CRDT in our example.

case class AuctionState(
  stillRunning:        Boolean,
  highestBid:          Bid,
  highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer
) extends OpCrdt[AuctionEvent] {
  type T = AuctionState

  def applyEvent(event: AuctionEvent): AuctionState =
    event match {
      case BidRegistered(b) =>
        if (isHigherBid(b, highestBid)) withNewHighestBid(b)
        else withTooLowBid(b)
      case _: AuctionFinished => copy(stillRunning = false)
    }

  def withNewHighestBid(bid: Bid): AuctionState = {
    require(stillRunning)
    require(isHigherBid(bid, highestBid))
    copy(
      highestBid = bid,
      highestCounterOffer = highestBid.offer // keep last highest bid around
    )
  }
  def withTooLowBid(bid: Bid): AuctionState = {
    require(stillRunning)
    require(isHigherBid(highestBid, bid))
    copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer
  }

  def isHigherBid(first: Bid, second: Bid): Boolean =
    first.offer > second.offer ||
      (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
      // If timestamps are equal, choose by dc where the offer was submitted
      // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
      // particular DC would not be an advantage.
      (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0)
}

The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid, and the highest counter offer so far.

In applyEvents, we handle persisted events to drive the state change. When a new bid is registered,

  • it needs to be decided whether the new bid is the winning bid or not
  • the state needs to be updated accordingly

The point of CRDTs is that the state must be end up being the same regardless of the order the events have been processed. We can see how this works in the auction example: we are only interested in the highest bid, so, if we can define an ordering on all bids, it should suffice to compare the new bid with currently highest to eventually end up with the globally highest regardless of the order in which the events come in.

The ordering between is crucial, therefore. We need to ensure that it is deterministic and does not depend on local state outside of our state class so that all replicas come to the same result. We define the ordering as this:

  • A higher bid wins.
  • If there’s a tie between the two highest bids, the bid that was registered earlier wins. For that we keep track of the (local) timestamp the bid was registered.
  • We need to make sure that no timestamp is used twice in the same DC (missing in this example).
  • If there’s a tie between the timestamp, we define an arbitrary but deterministic ordering on the DCs, in our case we just compare the name strings of the DCs. That’s why we need to keep the identifier of the DC where a bid was registered for every Bid.

If the new bid was higher, we keep this one as the new highest and keep the amount of the former highest as the highestCounterOffer. If the new bid was lower, we just update the highestCounterOffer if necessary.

Using those rules, the order of incoming does not matter. Replicas in all DCs should eventually converge to the same result.

Open questions

The auction example shows basic features of an auction. There are a few additional

  • Replica only eventually converge to the same result. That might lead to surprising results because highest bids from other replicas than the local one might only turn up with a delay. Another surprising result might be that two bids with the same amount issued each to different replicas in quick succession might be ordered differently due clock differences between replicas. In a real bidding system, it needs to be made sure that no replica has a competitive advantage over another one.
  • Finishing auctions need some amount of coordination between replicas. Something needs to trigger completion of the auction and all replicas need to confirm that no bids will be issued afterwards.

Complete example source code

For reference here’s the complete example, including imports and tests:

/**
 * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
 */
package akka.persistence.multidc.scaladsl

import java.time.Instant

import akka.actor.{ ActorRef, Props }
import akka.persistence.multidc.PersistenceMultiDcSettings

object AuctionExampleSpec {
  type MoneyAmount = Int

  case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String)

  // commands
  sealed trait AuctionCommand
  case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand
  case object Finish extends AuctionCommand // An auction coordinator needs to schedule this event to each replica
  case object GetHighestBid extends AuctionCommand

  // events
  sealed trait AuctionEvent
  case class BidRegistered(bid: Bid) extends AuctionEvent
  case class AuctionFinished(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent

  def auctionProps(pid: String, auctionSetup: AuctionSetup, settings: PersistenceMultiDcSettings): Props =
    CrdtReplicatedEntity.props("auction", Some(pid), () => new AuctionEntity(auctionSetup), settings)

  case class AuctionState(
    stillRunning:        Boolean,
    highestBid:          Bid,
    highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer
  ) extends OpCrdt[AuctionEvent] {
    type T = AuctionState

    def applyEvent(event: AuctionEvent): AuctionState =
      event match {
        case BidRegistered(b) =>
          if (isHigherBid(b, highestBid)) withNewHighestBid(b)
          else withTooLowBid(b)
        case _: AuctionFinished => copy(stillRunning = false)
      }

    def withNewHighestBid(bid: Bid): AuctionState = {
      require(stillRunning)
      require(isHigherBid(bid, highestBid))
      copy(
        highestBid = bid,
        highestCounterOffer = highestBid.offer // keep last highest bid around
      )
    }
    def withTooLowBid(bid: Bid): AuctionState = {
      require(stillRunning)
      require(isHigherBid(highestBid, bid))
      copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer
    }

    def isHigherBid(first: Bid, second: Bid): Boolean =
      first.offer > second.offer ||
        (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
        // If timestamps are equal, choose by dc where the offer was submitted
        // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
        // particular DC would not be an advantage.
        (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0)
  }

  case class AuctionSetup(
    name:       String,
    initialBid: Bid // the initial bid is basically the minimum price bidden at start time by the owner
  )

  class AuctionEntity(auctionSetup: AuctionSetup)
    extends CrdtReplicatedEntity[AuctionCommand, AuctionEvent, AuctionEvent, AuctionState] {

    override def initialState: AuctionState =
      AuctionState(
        stillRunning = true,
        highestBid = auctionSetup.initialBid,
        highestCounterOffer = auctionSetup.initialBid.offer
      )

    override def behavior: Behavior = {
      Actions()
        .onCommand[OfferBid] {
          case (OfferBid(bidder, offer), ctx, _) =>
            ctx.thenPersist(BidRegistered(Bid(bidder, offer, Instant.now(), selfDc)))()
        }
        .onCommand[Finish.type] {
          case (Finish, ctx, state) =>
            ctx.thenPersist(AuctionFinished(selfDc, state.highestBid, state.highestCounterOffer))()
        }
        .onReadOnlyCommand[GetHighestBid.type] { (_, ctx, state) =>
          ctx.sender() ! state.highestBid.copy(offer = state.highestCounterOffer)
        }
    }

    def convertEvent(event: AuctionEvent): AuctionEvent = event
  }
}

class AuctionExampleSpec extends BaseSpec("AuctionExampleSpec") {
  import AuctionExampleSpec._

  class TestSetup(testName: String) {
    val minimumBid = 12
    val auctionSetup = AuctionSetup("bicycle", Bid("me", minimumBid, Instant.now(), ""))

    val nodeA = system.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetup, settings), s"auction-A-$testName")
    val nodeB = otherSystem.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetup, otherSettings), s"auction-B-$testName")

    def expectHighestBid(node: ActorRef): Bid = {
      node ! GetHighestBid
      expectMsgType[Bid]
    }
    def expectHighestBid(node: ActorRef, bidder: String, expected: MoneyAmount): Unit =
      expectHighestBid(node).offer shouldEqual expected
  }

  "AuctionExample" should {
    "propagate highest bid to replicated actor" in new TestSetup("test1") {
      // simple bidding
      nodeA ! OfferBid("Mary", 42)
      expectHighestBid(nodeA, "Mary", minimumBid) // ebay style, still the minimum offer

      nodeA ! OfferBid("Paul", 41)
      expectHighestBid(nodeA, "Mary", 41) // ebay style, now updated to the highest counter offer

      awaitAssert {
        // check that results have propagated to b
        expectHighestBid(nodeB, "Mary", 41) // ebay style, now updated to the highest counter offer
      }

      // make sure that first bidder still keeps the crown
      nodeB ! OfferBid("c", 42)
      expectHighestBid(nodeB, "Mary", 42)
    }

    "eventually resolve conflicting bids during auction if bids are highest (but different) in each dc" in new TestSetup("test2") {
      // highest bid comes first
      nodeA ! OfferBid("Mary", 42)
      nodeB ! OfferBid("Paul", 41)

      awaitAssert {
        expectHighestBid(nodeA, "Mary", 41)
        expectHighestBid(nodeB, "Mary", 41)
      }

      // highest bid comes second
      val time2 = Instant.now()
      nodeA ! OfferBid("Paul", 50)
      nodeB ! OfferBid("Kat", 60)

      awaitAssert {
        expectHighestBid(nodeA, "Kat", 50)
        expectHighestBid(nodeB, "Kat", 50)
      }
    }
    "eventually resolve conflicting bids during auction if bids are highest and equal (but different time) in each dc" in new TestSetup("test3") {
      // highest bid comes first
      val time1 = Instant.now()
      val time2 = Instant.ofEpochSecond(time1.getEpochSecond + 1)
      nodeA ! OfferBid("Mary", 15)
      Thread.sleep(1)
      nodeB ! OfferBid("Paul", 15)

      awaitAssert {
        expectHighestBid(nodeA, "Mary", 15)
        expectHighestBid(nodeB, "Mary", 15)
      }
    }

    "eventually come to a consistent final result" in {
      pending
    }
  }
}
The source code for this page can be found here.