每个LocalActorRef内部都有一个ActorCell对象:
//创建ActorCell对象需要的参数是从LocalActorRef的构造函数中得到的
//而LocalActorRef构造函数中的参数是由ActorSystemImpl类提供的
private val actorCell: ActorCell = newActorCell(_system, this, _props, _dispatcher, _supervisor)
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, dispatcher, supervisor)
看LocalActorRef类内部的源码,会发现几乎所有的向外提供的API内部都是通过ActorCell类实现的。 也就是说,程序员在代码中调用ActorRef的API时,实际上调用的时ActorCell的方法。
ActorCell源码分析
ActorCell object
private[akka] object ActorCell {
//contextStack中存储的实际是ActorCell类型的List.(因为ActorCell是ActorContext的子类)
val contextStack = new ThreadLocal[List[ActorContext]] {
override def initialValue: List[ActorContext] = Nil
}
}
ActorCell class
private[akka] class ActorCell(
val system: ActorSystemImpl,
val self: InternalActorRef,
final val props: Props,
val dispatcher: MessageDispatcher,
val parent: InternalActorRef)
extends UntypedActorContext with AbstractActorContext with Cell
with dungeon.ReceiveTimeout
with dungeon.Children
with dungeon.Dispatch
with dungeon.DeathWatch
with dungeon.FaultHandling {
import ActorCell._
final def isLocal = true
final def systemImpl = system
protected final def guardian = self
protected final def lookupRoot = self
final def provider = system.provider
protected def uid: Int = self.path.uid
private[this] var _actor: Actor = _
def actor: Actor = _actor
protected def actor_=(a: Actor): Unit = _actor = a
var currentMessage: Envelope = _
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
private[this] var sysmsgStash: LatestFirstSystemMessageList = SystemMessageList.LNil
protected def stash(msg: SystemMessage): Unit = {
assert(msg.unlinked)
sysmsgStash ::= msg
}
private def unstashAll(): LatestFirstSystemMessageList = {
val unstashed = sysmsgStash
sysmsgStash = SystemMessageList.LNil
unstashed
}
final def systemInvoke(message: SystemMessage): Unit = {
def calculateState: Int =
if (waitingForChildrenOrNull ne null) SuspendedWaitForChildrenState
else if (mailbox.isSuspended) SuspendedState
else DefaultState
@tailrec def sendAllToDeadLetters(messages: EarliestFirstSystemMessageList): Unit =
if (messages.nonEmpty) {
val tail = messages.tail
val msg = messages.head
msg.unlink()
provider.deadLetters ! msg
sendAllToDeadLetters(tail)
}
def shouldStash(m: SystemMessage, state: Int): Boolean =
(state: @switch) match {
case DefaultState ⇒ false
case SuspendedState ⇒ m.isInstanceOf[StashWhenFailed]
case SuspendedWaitForChildrenState ⇒ m.isInstanceOf[StashWhenWaitingForChildren]
}
@tailrec
def invokeAll(messages: EarliestFirstSystemMessageList, currentState: Int): Unit = {
val rest = messages.tail
val message = messages.head
message.unlink()
try {
message match {
case message: SystemMessage if shouldStash(message, currentState) ⇒ stash(message)
case f: Failed ⇒ handleFailure(f)
case DeathWatchNotification(a, ec, at) ⇒ watchedActorTerminated(a, ec, at)
case Create(failure) ⇒ create(failure)
case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher)
case Recreate(cause) ⇒ faultRecreate(cause)
case Suspend() ⇒ faultSuspend()
case Resume(inRespToFailure) ⇒ faultResume(inRespToFailure)
case Terminate() ⇒ terminate()
case Supervise(child, async) ⇒ supervise(child, async)
case NoMessage ⇒ // only here to suppress warning
}
} catch handleNonFatalOrInterruptedException { e ⇒
handleInvokeFailure(Nil, e)
}
val newState = calculateState
// As each state accepts a strict subset of another state, it is enough to unstash if we "walk up" the state
// chain
val todo = if (newState < currentState) unstashAll() reverse_::: rest else rest
if (isTerminated) sendAllToDeadLetters(todo)
else if (todo.nonEmpty) invokeAll(todo, newState)
}
invokeAll(new EarliestFirstSystemMessageList(message), calculateState)
}
final def invoke(messageHandle: Envelope): Unit = {
val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
try {
currentMessage = messageHandle
if (influenceReceiveTimeout)
cancelReceiveTimeout()
messageHandle.message match {
case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
case msg ⇒ receiveMessage(msg)
}
currentMessage = null // reset current message after successful invocation
} catch handleNonFatalOrInterruptedException { e ⇒
handleInvokeFailure(Nil, e)
} finally {
if (influenceReceiveTimeout)
checkReceiveTimeout // Reschedule receive timeout
}
}
def autoReceiveMessage(msg: Envelope): Unit = {
if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case t: Terminated ⇒ receivedTerminated(t)
case AddressTerminated(address) ⇒ addressTerminated(address)
case Kill ⇒ throw new ActorKilledException("Kill")
case PoisonPill ⇒ self.stop()
case sel: ActorSelectionMessage ⇒ receiveSelection(sel)
case Identify(messageId) ⇒ sender() ! ActorIdentity(messageId, Some(self))
}
}
private def receiveSelection(sel: ActorSelectionMessage): Unit =
if (sel.elements.isEmpty)
invoke(Envelope(sel.msg, sender(), system))
else
ActorSelection.deliverSelection(self, sender(), sel)
final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)
/*
* ACTOR CONTEXT IMPLEMENTATION
*/
final def sender(): ActorRef = currentMessage match {
case null ⇒ system.deadLetters
case msg if msg.sender ne null ⇒ msg.sender
case _ ⇒ system.deadLetters
}
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)
def become(behavior: Procedure[Any]): Unit = become(behavior, discardOld = true)
def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
become({ case msg ⇒ behavior.apply(msg) }: Actor.Receive, discardOld)
def unbecome(): Unit = {
val original = behaviorStack
behaviorStack =
if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack
else original.tail
}
/*
* ACTOR INSTANCE HANDLING
*/
//This method is in charge of setting up the contextStack and create a new instance of the Actor
protected def newActor(): Actor = {
contextStack.set(this :: contextStack.get)
try {
behaviorStack = emptyBehaviorStack
val instance = props.newActor()
if (instance eq null)
throw ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")
// If no becomes were issued, the actors behavior is its receive method
behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
instance
} finally {
val stackAfter = contextStack.get
if (stackAfter.nonEmpty)
contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context
}
}
protected def create(failure: Option[ActorInitializationException]): Unit = {
def clearOutActorIfNonNull(): Unit = {
if (actor != null) {
clearActorFields(actor, recreate = false)
actor = null // ensure that we know that we failed during creation
}
}
failure foreach { throw _ }
try {
val created = newActor()
actor = created
created.aroundPreStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case e: InterruptedException ⇒
clearOutActorIfNonNull()
Thread.currentThread().interrupt()
throw ActorInitializationException(self, "interruption during creation", e)
case NonFatal(e) ⇒
clearOutActorIfNonNull()
e match {
case i: InstantiationException ⇒ throw ActorInitializationException(self,
"""exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new Creator ... )
or is missing an appropriate, reachable no-args constructor.
""", i.getCause)
case x ⇒ throw ActorInitializationException(self, "exception during creation", x)
}
}
}
private def supervise(child: ActorRef, async: Boolean): Unit =
if (!isTerminating) {
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
initChild(child) match {
case Some(crs) ⇒
handleSupervise(child, async)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
}
}
// future extension point
protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match {
case r: RepointableActorRef if async ⇒ r.point(catchFailures = true)
case _ ⇒
}
final protected def clearActorCellFields(cell: ActorCell): Unit = {
cell.unstashAll()
if (!Reflect.lookupAndSetField(classOf[ActorCell], cell, "props", ActorCell.terminatedProps))
throw new IllegalArgumentException("ActorCell has no props field")
}
final protected def clearActorFields(actorInstance: Actor, recreate: Boolean): Unit = {
setActorFields(actorInstance, context = null, self = if (recreate) self else system.deadLetters)
currentMessage = null
behaviorStack = emptyBehaviorStack
}
final protected def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef): Unit =
if (actorInstance ne null) {
if (!Reflect.lookupAndSetField(actorInstance.getClass, actorInstance, "context", context)
|| !Reflect.lookupAndSetField(actorInstance.getClass, actorInstance, "self", self))
throw new IllegalActorStateException(actorInstance.getClass + " is not an Actor since it have not mixed in the 'Actor' trait")
}
// logging is not the main purpose, and if it fails there’s nothing we can do
protected final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) ⇒ }
protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
}
trait Actor {
//可以看到,context是从ActorCell的contextStack中取出的
//而contextStack中存储的实际是ActorCell, 所以context实际是ActorCell
implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw ActorInitializationException(...)
val c = contextStack.head
ActorCell.contextStack.set(null :: contextStack)
c
}
//通过context.self得到ActorCell的参数化字段self
//根据前面分析到的,ActorCell的构造函数是在LocalActorRef中调用,self实际是LocalActorRef的一个实例
implicit final val self = context.self
//此处可以知道Actor的默认监管策略
def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
}