The Master Actor in Detail

The Master actor is without question the most involved component in this example. This is because it is designed to deal with failures. While the Akka cluster takes care of restarting the Master in case of a failure, we also want to make sure that the new Master can arrive at the same state as the failed Master. We use event sourcing and Akka Persistence to achieve this.

The Master actor is it without question the most involved component in the guide.

If the back-end node hosting the Master actor would crash the Akka Cluster Singleton makes sure it starts up on a different node, but we would also want it to reach the exact same state as the crashed node Master. This is achieved through use of event sourcing and Akka Persistence.

Tracking current work items

The current set of work item is modelled in the WorkState class. It keeps track of the current set of work that is pending, has been accepted by a worker, has completed etc. Every change to the WorkState is modelled as a domain event:

case class WorkAccepted(work: Work) extends WorkDomainEvent
case class WorkStarted(workId: String) extends WorkDomainEvent
case class WorkCompleted(workId: String, result: Any) extends WorkDomainEvent
case class WorkerFailed(workId: String) extends WorkDomainEvent
case class WorkerTimedOut(workId: String) extends WorkDomainEvent

This allows us to capture and store each such event that happens, we can later replay all of them on an empty model and arrive at the exact same state. This is how event sourcing and Akka Persistence allows the actor to start on any node and reach the same state as a previous instance.

If the Master fails and is restarted, the replacement Master replays events from the log to retrieve the current state. This means that when the WorkState is modified, the Master must persist the event before acting on it. When the event is successfully stored, we can modify the state. Otherwise, if a failure occurs before the event is persisted, the replacement Master will not be able to attain the same state as the failed Master.

Let’s look at how a command to process a work item from the front-end comes in:

case work: Work =>
  // idempotent
  if (workState.isAccepted(work.workId)) {
    sender() ! Master.Ack(work.workId)
  } else {
    log.info("Accepted work: {}", work.workId)
    persist(WorkAccepted(work)) { event ⇒
      // Ack back to original sender
      sender() ! Master.Ack(work.workId)
      workState = workState.updated(event)
      notifyWorkers()
    }
  }

The first thing you might notice is the comment saying idempotent, this means that the same work message may arrive multiple times, but regardless how many times the same work arrives, it should only be executed once. This is needed since the FrontEnd actor re-sends work in case of the Work or Ack messages getting lost (Akka does not provide any guarantee of delivery, see details in the docs).

To make the logic idempotent we simple check if the work id is already known, and if it is we simply Ack it without further logic. If the work is previously unknown, we start by transforming it into a WorkAccepted event, which we persist, and only in the handler-function passed to persist do we actually update the workState, send an Ack back to the FrontEnd and trigger a search for available workers.

Implementation items required for Akka Persistence

In a “normal” Actor the only thing we have to do is to implement receive, which is then invoked for each incoming message. In a PersistentActor there are three things that needs to be implemented:

  1. persistenceId is a global identifier for the actor, we must make sure that there is never more than one Actor instance with the same persistenceId running globally, or else we would possibly mess up its journal.
  2. receiveCommand corresponds to the receive method of regular actors. Messages sent to the actor end up here.
  3. receiveRecover is invoked with the recorded events of the actor when it starts up

Tracking workers

Unlike the Master actor, the example system contains multiple workers that can be stopped and restarted frequently. We do not need to save their state since the Master is tracking work and will simply send work to another worker if the original fails to respond. So, rather than persisting a list of available workers, the example uses the following strategy:

  • Running workers periodically register with the master using a RegisterWorker message. If a back-end node fails and the Master is started on a new node, the registrations go automatically to the new node.
  • Any type of failure – whether from the network, worker actor, or node – that prevents a RegisterWorker message from arriving within the work-timeout period causes the ‘Master’ actor to remove the worker from its list.
case CleanupTick =>
  workers.foreach {
    case (workerId, WorkerState(_, Busy(workId, timeout), _)) if timeout.isOverdue() =>
      log.info("Work timed out: {}", workId)
      workers -= workerId
      persist(WorkerTimedOut(workId)) { event ⇒
        workState = workState.updated(event)
        notifyWorkers()
      }


    case (workerId, WorkerState(_, Idle, lastHeardFrom)) if lastHeardFrom.isOverdue() =>
      log.info("Too long since heard from worker {}, pruning", workerId)
      workers -= workerId

    case _ => // this one is a keeper!
  }

When stopping a Worker Actor still tries to gracefully remove it self using the DeRegisterWorker message, but in case of crash it will have no chance to communicate that with the master node.

case MasterWorkerProtocol.DeRegisterWorker(workerId) =>
  workers.get(workerId) match {
    case Some(WorkerState(_, Busy(workId, _), _)) =>
      // there was a workload assigned to the worker when it left
      log.info("Busy worker de-registered: {}", workerId)
      persist(WorkerFailed(workId)) { event ⇒
        workState = workState.updated(event)
        notifyWorkers()
      }
    case Some(_) =>
      log.info("Worker de-registered: {}", workerId)
    case _ =>
  }
  workers -= workerId

Now let’s move on to the last piece of the puzzle, the worker nodes.