Leveraging Speculative execution for Model Serving

By Boris Lublinsky May 24, 2018

In my recent models serving book I introduced treating models as data and showed how this approach can significantly simplify model serving, including real-time model updates. The main limitation of the solution presented in the book is a single model per data type, which is rarely the case in the real life deployments. As described in detail by Ted Dunning, in his Machine Learning Logistics book, in real life deployments, there is typically an ensemble of models scoring the same data item in parallel and then a decision block decides which result to use. In this blog post, I describe how to extend the solution described in the models serving book to support speculative model serving and I provide an implementation based on Akka Streams. The complete code of the implementation described in this post is available from GitHub.

Why speculative model serving?

According to Wikipedia, speculative execution is:

“an optimization technique where a computer system performs some task that may not be needed. Work is done before it is known whether it is actually needed, so as to prevent a delay that would have to be incurred by doing the work after it is known that it is needed. If it turns out the work was not needed after all, most changes made by the work are reverted and the results are ignored.

The objective is to provide more concurrency if extra resources are available. This approach is employed in a variety of areas, including branch prediction in pipelined processors, value prediction for exploiting value locality, prefetching memory and files” etc.

In the case of Model Serving, speculative execution means scoring data in parallel leveraging a set of models, then selecting the best score based on some metric. The use case where this becomes important is where there are several models that differ in performance or result quality from one case to another.
The importance of speculative execution in model serving stems from the ability of such a approach to provide the following features for machine serving applications:

  • Guaranteed execution time. Assuming that we have several models with the fastest providing fixed execution time, it is possible to provide a model serving implementation with a fixed upper-limit on execution time, as long as that time is larger than the execution time of the simplest model
  • Consensus based model serving. Assuming that we have several models, we can implement model serving where prediction is the one returned by the majority of the models.
  • Quality based model serving. Assuming that we have an metric allowing us to evaluate the quality of model serving results, this approach allows us to pick the result with the best quality. It is, of course, possible to combine multiple feature, for example, consensus based model serving with the guaranteed execution time, where the consensus result is used when it completes within a given time interval.

Overall architecture

The overall architecture for speculative model serving is presented below:

speculative model serving

Here, incoming data is feed into both the Model Serving Controller and the Model Learning (training) component. The Model Learning component is used for periodic recalculation of models, after which updates are pushed into Kafka for ingestion by the Model Servers. In the heart of our implementation is the Model Serving Controller responsible for orchestration of execution of individual Model Servers and deciding on the final model serving result.
Once replies from all of the individual Model Servers are received or the wait time expires, the Model Serving Controller chooses the reply, based on some defined criteria. The reply is propagated downstream.
Individual Model Servers process the input data based on their current model and return results back to the Model Serving Controller. Additionally, the Model Servers are listening to the Kafka topics with model updates.
The model serving result can be accompanied by a confidence level. This information is optional and, if present, can be used by the Model Serving Controller to evaluate the result. Additional optional fields can be added to the Model Server replies for use when evaluating the results.
An additional aspect in the overall system is configuration of speculative server. Using another Kafka topic allows change models ensemble for a given data type and some of decision making configurations.
So that the Model Serving Controller knows when a reply has been received for a particular datum, a “handshake” in our implementation is provided by a unique id (GUID), which is passed to the Model Servers and is returned back to the controller along with the serving result.
With this in place, messages for invoking a Model Server and its reply can be represented as follows (we are using Protocol Buffers for encoding messages):

syntax = "proto3";    
option java_package = "com.lightbend.speculative";    
  
// Description of the model serving request.    
message ServingRequest {  
   string uuid = 1;    
   bytes data = 2;    
}    
  
// Description of the model serving response.    
message ServingResponse {    
   string uuid = 1;
   bytes data = 2;    
   double confidence = 3;    
   repeated ServingQualifier qualifiers = 4;    
}    
  
// Description of the model serving qualifier.    
message ServingQualifier{    
   string key = 1;    
   string value = 2;    
}

Implementing speculative model serving with Akka

Although there are many different options for implementing this architecture, we will show how to implement it leveraging Akka Streams integrated with Actors. We will implement individual model servers and model service controller as Actors and use Akka Streams for overall orchestration of execution and Akka HTTP for providing HTTP access to our implementation.
In addition to the speculative model serving functionality our implementation also includes two additional components:
- Custom persistence implementation (file-based persistence) allowing us to load or restore the state of participating actors, including currently running models and the overall configuration of speculative execution - model ensembles for a given data type and some of decision making configurations, etc. - Queryable state state implementation (compare to queryable state Flink and Kafka Streams) allowing us to observe execution - statistics for individual and speculative models processing. When creating an Akka Actors application we need to first design the actor hierarchy. Following this design documentation, we decided on the following actor hierarchy
Speculative Model Serving Actors

The actors in this hierarchy are described below.

Model service manager Actor

Model service manager Actor, a singleton that provides the entry point into the whole actor system. It is responsible for overall management of our application actors and routing messages across the rest of the actors. It supports all the methods provided by the rest of the actors by forwarding client requests to appropriate executors (ModelManager or DataManager). Implementation of this actor is fairly straightforward:

class ModelServingManager extends Actor {

  implicit val askTimeout = Timeout(100, TimeUnit.MILLISECONDS)
  implicit val ec = context.dispatcher

  println(s"Creating Model Serving manager")

  // Create support actors
  val modelManager = context.actorOf(ModelManager.props, "modelManager")
  val dataManager = context.actorOf(DataManager.props, "dataManager")


  override def receive = {
    // Model methods
    // Update model
    case model: ModelWithDescriptor => modelManager forward model
    // Get list of model servers
    case getModels : GetModels => modelManager forward getModels
    // Get state of the model
    case getState: GetModelServerState => modelManager forward getState

    // Data methods
    // Configure Data actor
    case configuration : SpeculativeDescriptor =>
      ask(modelManager, GetModelActors(configuration.models)).mapTo[GetModelActorsResult]
        .map(actors => SetSpeculativeServer(configuration.datatype, configuration.tmout, actors.models.toList))
        .pipeTo(dataManager)(sender())

    // process data
    case record: WineRecord => dataManager forward record
    // Get state of speculative executor
    case getState: GetSpeculativeServerState => dataManager forward getState
    // Get List of data processors
    case getProcessors : GetDataProcessors => dataManager forward getProcessors
  }
}

Models manager

Models manager is also a singleton responsible for managing all model servers. It is also responsible for routing model updates to them. The code for this class is presented below:

class ModelManager extends Actor {    
 // This is just for testing    
 def gen = ThreadLocalRandom.current()    
  
 private def getModelServer(modelID: String): ActorRef =    
   		context.child(modelID).getOrElse(context.actorOf(ModelServingActor.props(modelID), modelID))    
 private def getInstances : GetModelsResult =    
   		GetModelsResult(context.children.map(_.path.name).toSeq)    
 override def receive = {    
   // Redirect to model update.        
   case model: ModelWithDescriptor =>    
     // This is just for testing    
     val models = getInstances.models    
     val modelServer = getModelServer(models(gen.nextInt(models.size)))    
     modelServer forward model    
   // Get State of model server    
   case getState: GetModelServerState => {    
     context.child(getState.ModelID) match {    
       case Some(actorRef) => actorRef forward getState    
       case _ => sender() ! ModelToServeStats.empty    
     }    
   }    
   // Get current list of existing models    
   case getModels : GetModels => sender() ! getInstances    
   // Create actors from names. Support method for data processor configuration    
   case createList : GetModelActors => sender() ! GetModelActorsResult(createList.models.map(getModelServer(_)))    
 }    
}  

Model server

This actor is an implementation of the model server responsible for serving an individual model. Implementation of this actor is based on the models serving book with the addition of a local persistence implemented to make model updates durable in the event of restarts. It looks as follows:

class ModelServingActor(modelID : String) extends Actor {

  println(s"Creating model serving actor $modelID")
  private var currentModel: Option[Model] = None
  private var newModel: Option[Model] = None
  private var currentState: Option[ModelToServeStats] = None
  private var newState: Option[ModelToServeStats] = None
  // For testing
  def gen = ThreadLocalRandom.current()

  override def preStart : Unit = {
    val state = FilePersistence.restoreModelState(modelID)
    newState = state._2
    newModel = state._1
  }

  override def receive = {

    // Update Model. This only works for the local (in memory) invocation, because ModelWithDescriptor is not serializable
    case model : ModelWithDescriptor =>
      // Update model
      println(s"Model Server $modelID has a new model $model")
      newState = Some(ModelToServeStats(model.descriptor))
      newModel = Some(model.model)
      FilePersistence.saveModelState(modelID, newModel.get, newState.get)
      sender() ! "Done"
    // Process data
    case record : ServingRequest =>
      // Process data
      newModel.foreach { model =>
        // Update model
        // close current model first
        currentModel.foreach(_.cleanup())
        // Update model
        currentModel = newModel
        currentState = newState
        newModel = None
      }

      currentModel match {
        case Some(model) =>
          val start = System.nanoTime()
          val quality = model.score(record.data.asInstanceOf[WineRecord]).asInstanceOf[Double]
           // Just for testing
          Thread.sleep(gen.nextInt(20)*10l)
          val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)
          currentState = currentState.map(_.incrementUsage(duration))
          sender() ! ServingResponse(record.GUID, ServingResult(modelID, quality, duration))

        case _ =>
          sender() ! ServingResponse(record.GUID, ServingResult.noModel)
      }
    // Get current state
    case request : GetModelServerState => {
      // State query
      sender() ! currentState.getOrElse(ModelToServeStats.empty)
    }
  }
}

Persistence here is using a custom persistence implementation, that can be found at com.lightbend.akka.speculative.persistence.FilePersistence

Data manager

Data manager is also a singleton, responsible for management of all speculative model serving controllers including their creation, configuration and routing model serving data requests to them.

class DataManager extends Actor {

  println(s"Creating Data manager")

  private def getDataServer(dataType: String): Option[ActorRef] = context.child(dataType)

  private def createDataServer(dataType: String, tmout : Long, models : List[ActorRef]) : ActorRef=
    context.actorOf(SpeculativeModelServingActor.props(dataType, tmout, models), dataType)

  private def getInstances : GetDataProcessorsResult =
    GetDataProcessorsResult(context.children.map(_.path.name).toSeq)


  override def receive = {

    // Configuration update
    case configuration : SetSpeculativeServer =>
      getDataServer(configuration.datatype) match {
        case Some(actor) => actor forward configuration                                           // Update existing one
        case _ =>
          createDataServer(configuration.datatype, configuration.tmout, configuration.models)     // Create the new one
          sender() ! "Done"
      }
    // process data record
    case record: WineRecord => getDataServer(record.dataType) match {
      case Some(actor) => actor forward record
      case _ =>  sender() ! ServingResult.noModel
    }
    // Get current state
    case getState: GetSpeculativeServerState => {
      getDataServer(getState.dataType) match {
        case Some(actorRef) => actorRef forward getState
        case _ => sender() ! SpeculativeExecutionStats.empty
      }
    }
    // Get List of data processors
    case getProcessors : GetDataProcessors => sender() ! getInstances
  }
}

Speculative model server

This actor is an implementation of the speculative model serving controller responsible for coordination of individual model servers and deciding on the result. It uses a set of model servers based on the following parameters:
- List of model servers - list of ActorRefs used for actual model serving - Timeout - a time to wait response from the actual model server (by increasing this you can ensure that all model servers will have chance to deliver a result) - Decider - a custom class implementing Decider. This class needs to implement a single method decideResult choosing 1 of the results collected by the speculative model server in the given time interval. Different implementations of this class can provide different speculative model processing policies. This is the most complex actor implemented as follows

class SpeculativeModelServingActor(dataType : String, tmout : Long, models : List[ActorRef]) extends Actor {    
 val ACTORTIMEOUT = new FiniteDuration(100, TimeUnit.MILLISECONDS)    
 val SERVERTIMEOUT = 100l    
 private val modelProcessors = models.to[ListBuffer]    
 implicit var askTimeout = Timeout(if(tmout <= 0) tmout else  SERVERTIMEOUT, TimeUnit.MILLISECONDS)    
 val decider = SimpleDesider    
 implicit val ec = context.dispatcher
  
 var state = SpeculativeExecutionStats(dataType, decider.getClass.getName, askTimeout.duration.length, getModelsNames())    
 override def preStart : Unit = {   
   // Restore state from persistence    
   val state = FilePersistence.restoreDataState(dataType)    
   state._1.foreach(tmout => askTimeout = Timeout(if(tmout > 0) tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS))    
   state._2.foreach(models => {    
     modelProcessors.clear()    
     models.foreach(path => context.system.actorSelection(path).resolveOne(ACTORTIMEOUT).onComplete {    
       case Success(ref) => modelProcessors += ref    
       case _ =>    
     }    
   )})    
 }    
 override def receive = {    
   // Model serving request    
   case record : WineRecord =>    
     val request = ServingRequest(UUID.randomUUID().toString, record)    
     val start = System.nanoTime()    
     Future.sequence(    
       // For every available model    
        modelProcessors.toList.map(    
          // Invoke model serving, map result and lift it to try    
          ask(_,request)(askTimeout).mapTo[ServingResponse]).map(f => f.map(Success(_)).recover({case e => Failure(e)})))    
         // collect all successful serving    
        .map(_.collect{ case Success(x) => x})    
         // Invoke decider    
        .map(decider.decideResult(_)).mapTo[ServingResult]    
         // Update stats    
        .map(servingResult => {    
          if(servingResult.processed)    
            state = state.incrementUsage(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),    
 									servingResult.actor)    
          servingResult    
        })    
        // respond    
        .pipeTo(sender())    
   // Current State request    
   case request : GetSpeculativeServerState => sender() ! state    
   // Configuration update    
   case configuration : SetSpeculativeServer =>    
     askTimeout = Timeout(if(configuration.tmout > 0) configuration.tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS)    
     modelProcessors.clear()    
     modelProcessors ++= configuration.models    
     state.updateConfig(askTimeout.duration.length, getModelsNames())    
     FilePersistence.saveDataState(dataType, configuration.tmout, configuration.models)    
     sender() ! "Done"    
 }    
 private def getModelsNames() : List[String] = modelProcessors.toList.map(_.path.name)    
}    
 

Overall execution

Execution of the individual actors is orchestrated by the Akka Stream implementation. The overall code for wine quality evaluation (models serving book) is presented below:

object AkkaModelServer {    
 // Set up Akka execution environment    
 implicit val system = ActorSystem("ModelServing")    
 implicit val materializer = ActorMaterializer()    
 implicit val executionContext = system.dispatcher    
 implicit val askTimeout = Timeout(30.seconds)    
  
 // Define Kafka reader settings     
 val dataConsumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)    
   .withBootstrapServers(KAFKA_BROKER)    
   .withGroupId(DATA_GROUP)    
   .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")    
  
 val modelConsumerSettings = ….    
  
 val speculativeConsumerSettings = ….    
  
 def main(args: Array[String]): Unit = {    
  
   // Create necessary actors    
   val models = List("model1", "model2", "model3")    
   val modelserver = system.actorOf(ModelServingManager.props)    
   ask(modelserver, SpeculativeDescriptor("wine", 100, models)).onComplete{    
     case Success(r) => println("Data Server initialized")    
     case _ =>    
   }    
  
   // Speculative stream processing    
   Consumer.atMostOnceSource(speculativeConsumerSettings, Subscriptions.topics(SPECULATIVE_TOPIC))    
     .map(record => SpeculativeConverter.fromByteArray(record.value)).collect { case Success(a) => a }    
     .mapAsync(1)(elem => modelserver ? elem)    
     .runWith(Sink.ignore) // run the stream, we do not read the results directly    
  
   // Model stream processing    
   Consumer.atMostOnceSource(modelConsumerSettings, Subscriptions.topics(MODELS_TOPIC))    
     .map(record => ModelToServe.fromByteArray(record.value)).collect { case Success(a) => a }    
     .map(record => ModelWithDescriptor.fromModelToServe(record)).collect { case Success(a) => a }    
     .mapAsync(1)(elem => modelserver ? elem)    
     .runWith(Sink.ignore) // run the stream, we do not read the results directly    
  
   // Data stream processing    
   Consumer.atMostOnceSource(dataConsumerSettings, Subscriptions.topics(DATA_TOPIC))    
     .map(record => DataRecord.fromByteArray(record.value)).collect { case Success(a) => a }    
     .mapAsync(1)(elem => (modelserver ? elem).mapTo[ServingResult])    
     .runForeach(result => {    
       result.processed match {    
         case true => println(s"Calculated quality - ${result.result} by actor ${result.actor}.Calculated in ${result.duration} ms")    
         case _ => println ("No model available - skipping")    
       }    
     })    
  
   // Rest Server    
   startRest(modelserver)    
 }    

This implementation relies on REST Server implementing queryable state similar to the implementation described in models serving book.

Distributed speculative model serving implementation

Although the implementation described above works fine, the actor logic for the speculative model serving might be hard to read and debug. An alternative implementation, splitting this actor into speculative execution starter and collector is provided as well. An overall architecture for such an implementation is presented below:

Speculative Model Serving Distributed

This architecture is implemented with the following actor’s:
Speculative Model Serving Actors Distributed
Here we introduced 2 additional actors - starter/collector pair.

Speculative execution starter

Speculative Execution Starter is responsible only for starting speculative model serving for a given datum. It sends a notification to the Speculative Execution Collector about the start of a specific speculative execution (identified by GUID) and forwards model serving requests to all model servers (defined in the speculative model server configuration) specifying Speculative Execution Collector as a destination for serving result. The implementation of the actor is presented below

class SpeculativeModelServingStarterActor(dataType : String,  models : List[ActorRef], collector :     
ActorRef) extends Actor {    
 implicit val askTimeout = Timeout(100, TimeUnit.MILLISECONDS)    
 private val modelProcessors = models.to[ListBuffer]    
 implicit val ec = context.dispatcher
 override def preStart : Unit = {   
   val state = FilePersistence.restoreDataState(dataType)    
   state._2 match {    
     case Some(models) =>    
       modelProcessors.clear()    
       ask(AkkaModelServer.modelserver, GetModelActors(models)).mapTo[GetModelActorsResult].onComplete {    
         case Success(servers) => modelProcessors ++= servers.asInstanceOf[GetModelActorsResult].models    
         case _ =>    
       }    
     case _ =>   // Do nothing    
   }    
 }    
  
 override def receive = {    
   // Model serving request    
   case record : WineRecord =>    
     val request = ServingRequest(UUID.randomUUID().toString, record)    
     collector ! StartSpeculative(request.GUID, System.nanoTime(), sender(), modelProcessors.size)    
     modelProcessors.foreach( _ tell(request, collector))    
   // Configuration update    
   case configuration : SetSpeculativeServerStarter =>    
     modelProcessors.clear()    
     modelProcessors ++= configuration.models    
 }    
  
 private def getModelsNames() : List[String] = modelProcessors.toList.map(_.path.name)    
}  

Speculative execution collector

Speculative Execution Collector is responsible for collecting and processing results for model serving requests. It collects all serving results for a given GUID and calculates the final response based on the following parameters:
- Timeout - a time to wait response from the actual model server (by increasing this you can ensure that all model servers will have chance to deliver result) - Decider - a custom class implementing Decider. This class needs to implement a single method decideResult choosing 1 of the results collected by the speculative model server in the given time interval. Different implementations of this class can provide different speculative model processing policies. Implementation of the actor looks as follows

class SpeculativeModelServingCollectorActor(dataType : String, tmout : Long, models : List[String]) extends Actor {    
 val SERVERTIMEOUT = 100l    
 val decider = SimpleDesider    
 var timeout = new FiniteDuration(if(tmout > 0) tmout else  SERVERTIMEOUT, TimeUnit.MILLISECONDS)    
 private val modelProcessors = models.to[ListBuffer]    
 var state = SpeculativeExecutionStats(dataType, decider.getClass.getName, timeout.length, models)    
 val currentProcessing = collection.mutable.Map[String, CurrentProcessing]()    
 implicit val ec = context.dispatcher
 override def preStart : Unit = {    
   val state = FilePersistence.restoreDataState(dataType)    
   state._1.foreach(tmout => new FiniteDuration(if(tmout > 0) tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS))    
   state._2.foreach(models => {    
     modelProcessors.clear()    
     modelProcessors ++= models    
   })    
 }    
 override def receive = {    
   // Start speculative requesr    
   case start : StartSpeculative =>    
     // Set up the state    
     currentProcessing += (start.GUID -> CurrentProcessing(start.models, start.start, start.reply, new ListBuffer[ServingResponse]())) // Add to watch list    
     // Schedule timeout    
     context.system.scheduler.scheduleOnce(timeout, self, start.GUID)    
   // Result of indivirual model serving    
    case servingResponse : ServingResponse =>
      currentProcessing.get(servingResponse.GUID) match {
      case Some(processingResults) =>
        // We are still waiting for this GUID
        val current = CurrentProcessing(processingResults.models, processingResults.start, processingResults.reply, processingResults.results += servingResponse)
        current.results.size match {
          case size if (size >= current.models) => processResult(servingResponse.GUID, current)  // We are done
          case _ => currentProcessing += (servingResponse.GUID -> current)                       // Keep going
        }
      case _ => // Timed out
    }
   // Speculative execution completion    
   case stop : String =>    
     currentProcessing.contains(stop) match {    
     case true => processResult(stop, currentProcessing(stop))    
     case _ => // Its already done    
   }    
   // Current State request    
   case request : GetSpeculativeServerState => sender() ! state    
   // Configuration update    
   case configuration : SpeculativeDescriptor =>    
     timeout = new FiniteDuration(if(tmout > 0) tmout else  SERVERTIMEOUT, TimeUnit.MILLISECONDS)    
     modelProcessors.clear()    
     modelProcessors ++= configuration.models    
     state.updateConfig(tmout, models)    
     FilePersistence.saveDataState(dataType, configuration.tmout, configuration.models.toList)    
     sender() ! "Done"    
 }    
  
 // Complete speculative execution    
 private def processResult(GUID : String, results: CurrentProcessing) : Unit = {    
   val servingResult = decider.decideResult(results.results.toList).asInstanceOf[ServingResult]    
   results.reply ! servingResult    
   if(servingResult.processed)    
     state = state.incrementUsage(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - results.start),servingResult.actor)    
   currentProcessing -= GUID    
 }    
}    

Splitting speculative model serving actor into two, although increases the amount of actors, allows to simplify their implementation and might be easier to debug and test
Execution of the individual actors is orchestrated by Akka stream implementation similar to the one presented above.

Conclusion

In this post I have described how to extend a model serving approach based on representation of model as data described in models serving book by incorporating speculative model serving as defined in Machine Learning Logistics book. I have defined the advantages of speculative model serving and presented an Akka-based implementation for it.