Front-End Nodes

Typically in systems built with Akka, clients submit requests using a RESTful API. Either Akka HTTP or Play Framework are great choices for implementing an HTTP API for the front-end. To limit the scope of this example, we have chosen to emulate client activity with two ordinary actors:

  • The FrontEnd actor generates payloads at random intervals and sends them to the ‘Master’ actor.
  • The WorkResultConsumerActor that consumes results and logs them.

The FrontEnd actor only concerns itself with posting workloads, and does not care when the work has been completed. When a workload has been processed successfully and passed to the Master actor it publishes the result to all interested cluster nodes through Distributed Pub-Sub.

The WorkResultConsumerActor subscribes to the completion events and logs when a workload has completed.

Now, let’s take a look at the code that accomplishes this front-end behavior.

The Front-end Actor

class FrontEnd extends Actor with ActorLogging with Timers {
  import FrontEnd._
  import context.dispatcher

  val masterProxy = context.actorOf(
    MasterSingleton.proxyProps(context.system),
    name = "masterProxy")

  var workCounter = 0

  def nextWorkId(): String = UUID.randomUUID().toString

  override def preStart(): Unit = {
    timers.startSingleTimer("tick", Tick, 5.seconds)
  }

  def receive = idle

  def idle: Receive = {
    case Tick =>
      workCounter += 1
      log.info("Produced work: {}", workCounter)
      val work = Work(nextWorkId(), workCounter)
      context.become(busy(work))
  }

  def busy(workInProgress: Work): Receive = {
    sendWork(workInProgress)

    {
      case Master.Ack(workId) =>
        log.info("Got ack for workId {}", workId)
        val nextTick = ThreadLocalRandom.current.nextInt(3, 10).seconds
        timers.startSingleTimer(s"tick", Tick, nextTick)
        context.become(idle)

      case NotOk =>
        log.info("Work {} not accepted, retry after a while", workInProgress.workId)
        timers.startSingleTimer("retry", Retry, 3.seconds)

      case Retry =>
        log.info("Retrying work {}", workInProgress.workId)
        sendWork(workInProgress)
    }
  }

  def sendWork(work: Work): Unit = {
    implicit val timeout = Timeout(5.seconds)
    (masterProxy ? work).recover {
      case _ => NotOk
    } pipeTo self
  }

}

Note in the source code that as the ‘FrontEnd’ actor starts up, it:

  1. Schedules ‘Tick’ messages to itself.
  2. Each ‘Tick’ message:
  3. Triggers creation of a new ‘Work’ message.
  4. Sends the ‘Work’ message to the ‘Master’ actor of a ‘back-end’ node.
  5. Switches to a new ‘busy’ behavior.

As you can see the FrontEnd actor schedules Tick messages to itself when starting up. the Tick message then triggers creation of a new Work, sending the work to the Master actor on a back-end node and switching to a new busy behavior.

The cluster contains one Master actor. The FrontEnd actor does not need to know the exact location because it sends work to the master using the ClusterSingletonProxy.

The ‘Master’ actor can accept or deny a work request and we need to deal with unexpected errors:

  • If the ‘Master’ accepts the request, the actor schedules a new tick to itself and toggles back to the idle behavior.
  • To deal with failures, the message uses the ask pattern to add a timeout to wait for a reply. If the timeout expires before the master responds, the returned ‘Future’ fails with an akka.pattern.AskTimeoutException.
  • We transform timeouts or denials from the ‘Master’ into a ‘NotOK’ value. The ‘Future’ is piped to the ‘FrontEnd’ actor as a message with the completed value, either the successful result, or ‘NotOK’. If the work is not accepted or there is no response, for example if the message or response got lost, the FrontEnd actor backs off a bit and then sends the work again.

The future is then piped to the actor itself, meaning that when it completes the value it is completed with is sent to the actor as a message.

When a workload has been acknowledged by the master, the actor schedules a new tick to itself and toggles back to the idle behavior.

If the work is not accepted or there is no response, for example if the message or response got lost, the FrontEnd actor backs off a bit and then sends the work again.

You can see the how the actors on a front-end node is started in the method Main.startFrontEnd:

def startFrontEnd(port: Int): Unit = {
  val system = ActorSystem("ClusterSystem", config(port, "front-end"))
  system.actorOf(FrontEnd.props, "front-end")
  system.actorOf(WorkResultConsumer.props, "consumer")
}

The Work Result Consumer Actor

As mentioned in the introduction, results are published using Distributed Pub-Sub. The ‘WorkResultConsumerActor’ subscribes to completion events and logs when a workload has completed:

class WorkResultConsumer extends Actor with ActorLogging {

  val mediator = DistributedPubSub(context.system).mediator
  mediator ! DistributedPubSubMediator.Subscribe(Master.ResultsTopic, self)

  def receive = {
    case _: DistributedPubSubMediator.SubscribeAck =>
    case WorkResult(workId, result) =>
      log.info("Consumed result: {}", result)
  }

}

In an actual application you would probably want a way for clients to poll or stream the status changes of the submitted work.