Friday, July 5, 2013

Akka Source Code Notes


Mailbox typically holds an in memory concurrent queue called MessageQueue. An actor basically processes messages from a mailbox.

deadLetterMailbox is a special mailbox where messages that couldn’t be sent to an actor are sent. This can happen, for example, when an actor dies and there are pending messages in its mailbox, these messages are sent to deadLetterMailbox or dlm for short.

Dead letters mailbox just sends the message to DeadLetter actor.

The invoke function on the actor is the one that seems to execute the message

case class Envelope private (val message: Any, val sender: ActorRef)

So envelope is the message and the sender actor.

def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack


behaviorStack is the one that basically supplies the actor’s receive function

type Receive = PartialFunction[Any, Unit]

Actor’s receive function returns this partial function. When actor becomes something it pushes a new receive function on the behaviorStack.

ActorCell has most of the meat of the actor logic.

When a new actor is created the receive function is called and result is pushed into the behaviorStack.

Whatever is at the top of behaviorStack is the current behavior of the actor.
unbecome pops the behavior. unbecome always defaults to the default actor receive block, never goes down to Nil.

applyOrElse is a method on the partial function which calls the underlying function if it is defined for the given argument, otherwise calls the supplied default.

unhandled is the default method to handle unhandled messages:

def unhandled(message: Any) {
   message
match {
     
case Terminated(dead) throw new DeathPactException(dead)
     
case _                 context.system.eventStream.publish(UnhandledMessage(message, sender, self))
   
}
 
}


processMailbox in Mailbox.scala is the one that handles a message in the mailbox and in turn calls invoke on the actor.

Important detail of processMailbox is that system messages are checked at the end of each message, which has the implication that actor kill, resume etc take effect after the current message is processed:
message match {
         
case message: SystemMessage if shouldStash(message, currentState) stash(message)
         
case f: Failed handleFailure(f)
         
case DeathWatchNotification(a, ec, at) watchedActorTerminated(a, ec, at)
         
case Create(failure) create(failure)
         
case Watch(watchee, watcher) addWatcher(watchee, watcher)
         
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
         
case Recreate(cause) faultRecreate(cause)
         
case Suspend() faultSuspend()
         
case Resume(inRespToFailure) faultResume(inRespToFailure)
         
case Terminate() terminate()
         
case Supervise(child, async) supervise(child, async)
}


Similarly, adding/removing watchers also takes effect after the current message is processed.

This code is not clear to me:
protected final def systemQueueGet: LatestFirstSystemMessageList =
   
// Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such
   
// it just exists as a typed view during compile-time. The actual return type is still SystemMessage.
   
new LatestFirstSystemMessageList(Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage])



Mailbox implements runnable, it has a function called run. In run it calles processMailbox which in turn runs the message on the actor.

Ok, so the big picture is becoming clear now. The dispatcher uses the executorService to run mailbox as a runnable, thus mailbox’s run method is called on the the thread pool. The run method calls invoke on actor which sends message to the actor. Thus invocation of run on mailbox results in messages being processed by the actor.

registerForInvocation in dispatcher is the critical function here:
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
   
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
     
if (mbox.setAsScheduled()) {
       
try {
         executorService execute mbox
         
true
       
} catch {
         
case e: RejectedExecutionException
           
try {
             
executorService execute mbox
             
true
           
} catch { //Retry once
             
case e: RejectedExecutionException
               
mbox.setAsIdle()
               
eventStream.publish(Error(e, getClass.getName, getClass, "registerForExecution was rejected twice!"))
               
throw e
           
}
       
}
     
} else false
   
} else false
 
}

Calling dispatch on dispatcher puts the message on the mailbox and registers for execution:

protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
   
val mbox = receiver.mailbox
   mbox
.enqueue(receiver.self, invocation)
   registerForExecution
(mbox, true, false)
 
}

There are other kinds of dispatchers such as balancing dispatcher that handle dispatch a little differently.

override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = actorCell.sendMessage(message, sender)


The picture becomes clearer. Sending a message to an actor is calling ! on the corresponding actor (that I knew) which calls sendMessage.

def sendMessage(msg: Envelope): Unit =
   
try {
     
val m = msg.message.asInstanceOf[AnyRef]
     
if (system.settings.SerializeAllMessages && !m.isInstanceOf[NoSerializationVerificationNeeded]) {
       
val s = SerializationExtension(system)
       s
.deserialize(s.serialize(m).get, m.getClass).get
     
}
     dispatcher
.dispatch(this, msg)
   
} catch handleException

sendMessage in turn calls dispatch on the dispatcher which puts the message on the mailbox of the actor and registers mailbox for execution. Which uses the executor service to call execute on mailbox. Mailbox is a runnable with a run method that calls processMailbox. processMailbox executes "throughput" messages on the actor. It’s important to note that the throughput messages are running in a single run call, so they execute as a single call task on the thread.

One thing wasn’t clear to me, that registerForExecution calls execute on executor service immediately but message from actors are processed asynchronously, how does that work? What if there is no thread available to do that processing? This is explained by looking at the documentation of execute function of the executor interface: http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Executor.html

Execute doesn’t execute the runnable immediately but queues it up, execution will happen when a thread is available. Thus no polling of mailbox is required. Each time a message is sent an execute is called on the mailbox thus a run invocation is scheduled on the mailbox. Each run invocation will consume one or more messages. It seems to be that if throughput is more than 1 than there will be more than needed run calls. Need to investigate this more.


No comments:

Post a Comment