Skip to content

Latest commit

 

History

History
334 lines (271 loc) · 12.4 KB

3.类源码分析-LocalActorRef,ActorCell,Actor.MD

File metadata and controls

334 lines (271 loc) · 12.4 KB

LocalActorRef

每个LocalActorRef内部都有一个ActorCell对象:

//创建ActorCell对象需要的参数是从LocalActorRef的构造函数中得到的
//而LocalActorRef构造函数中的参数是由ActorSystemImpl类提供的
private val actorCell: ActorCell = newActorCell(_system, this, _props, _dispatcher, _supervisor)

protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, dispatcher: MessageDispatcher, supervisor: InternalActorRef): ActorCell =
    new ActorCell(system, ref, props, dispatcher, supervisor)

看LocalActorRef类内部的源码,会发现几乎所有的向外提供的API内部都是通过ActorCell类实现的。 也就是说,程序员在代码中调用ActorRef的API时,实际上调用的时ActorCell的方法。

ActorCell

ActorCell源码分析

ActorCell object

private[akka] object ActorCell {
  //contextStack中存储的实际是ActorCell类型的List.(因为ActorCell是ActorContext的子类)
  val contextStack = new ThreadLocal[List[ActorContext]] {
    override def initialValue: List[ActorContext] = Nil
  }
}

ActorCell class

private[akka] class ActorCell(
  val system: ActorSystemImpl,
  val self: InternalActorRef,
  final val props: Props, 
  val dispatcher: MessageDispatcher,
  val parent: InternalActorRef)
  extends UntypedActorContext with AbstractActorContext with Cell
  with dungeon.ReceiveTimeout
  with dungeon.Children
  with dungeon.Dispatch
  with dungeon.DeathWatch
  with dungeon.FaultHandling {

  import ActorCell._

  final def isLocal = true

  final def systemImpl = system
  protected final def guardian = self
  protected final def lookupRoot = self
  final def provider = system.provider

  protected def uid: Int = self.path.uid
  
  private[this] var _actor: Actor = _
  def actor: Actor = _actor
  protected def actor_=(a: Actor): Unit = _actor = a
  
  var currentMessage: Envelope = _
  
  private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
  
  private[this] var sysmsgStash: LatestFirstSystemMessageList = SystemMessageList.LNil

  protected def stash(msg: SystemMessage): Unit = {
    assert(msg.unlinked)
    sysmsgStash ::= msg
  }

  private def unstashAll(): LatestFirstSystemMessageList = {
    val unstashed = sysmsgStash
    sysmsgStash = SystemMessageList.LNil
    unstashed
  }

  final def systemInvoke(message: SystemMessage): Unit = {
    def calculateState: Int =
      if (waitingForChildrenOrNull ne null) SuspendedWaitForChildrenState
      else if (mailbox.isSuspended) SuspendedState
      else DefaultState

    @tailrec def sendAllToDeadLetters(messages: EarliestFirstSystemMessageList): Unit =
      if (messages.nonEmpty) {
        val tail = messages.tail
        val msg = messages.head
        msg.unlink()
        provider.deadLetters ! msg
        sendAllToDeadLetters(tail)
      }

    def shouldStash(m: SystemMessage, state: Int): Boolean =
      (state: @switch) match {
        case DefaultState                   false
        case SuspendedState                 m.isInstanceOf[StashWhenFailed]
        case SuspendedWaitForChildrenState  m.isInstanceOf[StashWhenWaitingForChildren]
      }

    @tailrec
    def invokeAll(messages: EarliestFirstSystemMessageList, currentState: Int): Unit = {
      val rest = messages.tail
      val message = messages.head
      message.unlink()
      try {
        message match {
          case message: SystemMessage if shouldStash(message, currentState)  stash(message)
          case f: Failed  handleFailure(f)
          case DeathWatchNotification(a, ec, at)  watchedActorTerminated(a, ec, at)
          case Create(failure)  create(failure)
          case Watch(watchee, watcher)  addWatcher(watchee, watcher)
          case Unwatch(watchee, watcher)  remWatcher(watchee, watcher)
          case Recreate(cause)  faultRecreate(cause)
          case Suspend()  faultSuspend()
          case Resume(inRespToFailure)  faultResume(inRespToFailure)
          case Terminate()  terminate()
          case Supervise(child, async)  supervise(child, async)
          case NoMessage  // only here to suppress warning
        }
      } catch handleNonFatalOrInterruptedException { e 
        handleInvokeFailure(Nil, e)
      }
      val newState = calculateState
      // As each state accepts a strict subset of another state, it is enough to unstash if we "walk up" the state
      // chain
      val todo = if (newState < currentState) unstashAll() reverse_::: rest else rest

      if (isTerminated) sendAllToDeadLetters(todo)
      else if (todo.nonEmpty) invokeAll(todo, newState)
    }

    invokeAll(new EarliestFirstSystemMessageList(message), calculateState)
  }

  final def invoke(messageHandle: Envelope): Unit = {
    val influenceReceiveTimeout = !messageHandle.message.isInstanceOf[NotInfluenceReceiveTimeout]
    try {
      currentMessage = messageHandle
      if (influenceReceiveTimeout)
        cancelReceiveTimeout()
      messageHandle.message match {
        case msg: AutoReceivedMessage  autoReceiveMessage(messageHandle)
        case msg                       receiveMessage(msg)
      }
      currentMessage = null // reset current message after successful invocation
    } catch handleNonFatalOrInterruptedException { e 
      handleInvokeFailure(Nil, e)
    } finally {
      if (influenceReceiveTimeout)
        checkReceiveTimeout // Reschedule receive timeout
    }
  }

  def autoReceiveMessage(msg: Envelope): Unit = {
    if (system.settings.DebugAutoReceive)
      publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))

    msg.message match {
      case t: Terminated               receivedTerminated(t)
      case AddressTerminated(address)  addressTerminated(address)
      case Kill                        throw new ActorKilledException("Kill")
      case PoisonPill                  self.stop()
      case sel: ActorSelectionMessage  receiveSelection(sel)
      case Identify(messageId)         sender() ! ActorIdentity(messageId, Some(self))
    }
  }

  private def receiveSelection(sel: ActorSelectionMessage): Unit =
    if (sel.elements.isEmpty)
      invoke(Envelope(sel.msg, sender(), system))
    else
      ActorSelection.deliverSelection(self, sender(), sel)

  final def receiveMessage(msg: Any): Unit = actor.aroundReceive(behaviorStack.head, msg)

  /*
   * ACTOR CONTEXT IMPLEMENTATION
   */

  final def sender(): ActorRef = currentMessage match {
    case null                       system.deadLetters
    case msg if msg.sender ne null  msg.sender
    case _                          system.deadLetters
  }

  def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
    behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)

  def become(behavior: Procedure[Any]): Unit = become(behavior, discardOld = true)

  def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
    become({ case msg  behavior.apply(msg) }: Actor.Receive, discardOld)

  def unbecome(): Unit = {
    val original = behaviorStack
    behaviorStack =
      if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack
      else original.tail
  }

  /*
   * ACTOR INSTANCE HANDLING
   */

  //This method is in charge of setting up the contextStack and create a new instance of the Actor
  protected def newActor(): Actor = {
    contextStack.set(this :: contextStack.get)
    try {
      behaviorStack = emptyBehaviorStack
      val instance = props.newActor()

      if (instance eq null)
        throw ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")

      // If no becomes were issued, the actors behavior is its receive method
      behaviorStack = if (behaviorStack.isEmpty) instance.receive :: behaviorStack else behaviorStack
      instance
    } finally {
      val stackAfter = contextStack.get
      if (stackAfter.nonEmpty)
        contextStack.set(if (stackAfter.head eq null) stackAfter.tail.tail else stackAfter.tail) // pop null marker plus our context
    }
  }

  protected def create(failure: Option[ActorInitializationException]): Unit = {
    def clearOutActorIfNonNull(): Unit = {
      if (actor != null) {
        clearActorFields(actor, recreate = false)
        actor = null // ensure that we know that we failed during creation
      }
    }

    failure foreach { throw _ }

    try {
      val created = newActor()
      actor = created
      created.aroundPreStart()
      checkReceiveTimeout
      if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
    } catch {
      case e: InterruptedException 
        clearOutActorIfNonNull()
        Thread.currentThread().interrupt()
        throw ActorInitializationException(self, "interruption during creation", e)
      case NonFatal(e) 
        clearOutActorIfNonNull()
        e match {
          case i: InstantiationException  throw ActorInitializationException(self,
            """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
               a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new Creator ... )
               or is missing an appropriate, reachable no-args constructor.
              """, i.getCause)
          case x  throw ActorInitializationException(self, "exception during creation", x)
        }
    }
  }

  private def supervise(child: ActorRef, async: Boolean): Unit =
    if (!isTerminating) {
      // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
      initChild(child) match {
        case Some(crs) 
          handleSupervise(child, async)
          if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
        case None  publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
      }
    }

  // future extension point
  protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match {
    case r: RepointableActorRef if async  r.point(catchFailures = true)
    case _                               
  }

  final protected def clearActorCellFields(cell: ActorCell): Unit = {
    cell.unstashAll()
    if (!Reflect.lookupAndSetField(classOf[ActorCell], cell, "props", ActorCell.terminatedProps))
      throw new IllegalArgumentException("ActorCell has no props field")
  }

  final protected def clearActorFields(actorInstance: Actor, recreate: Boolean): Unit = {
    setActorFields(actorInstance, context = null, self = if (recreate) self else system.deadLetters)
    currentMessage = null
    behaviorStack = emptyBehaviorStack
  }

  final protected def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef): Unit =
    if (actorInstance ne null) {
      if (!Reflect.lookupAndSetField(actorInstance.getClass, actorInstance, "context", context)
        || !Reflect.lookupAndSetField(actorInstance.getClass, actorInstance, "self", self))
        throw new IllegalActorStateException(actorInstance.getClass + " is not an Actor since it have not mixed in the 'Actor' trait")
    }

  // logging is not the main purpose, and if it fails there’s nothing we can do
  protected final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_)  }

  protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
}

Actor trait

trait Actor {

  //可以看到,context是从ActorCell的contextStack中取出的
  //而contextStack中存储的实际是ActorCell, 所以context实际是ActorCell
  implicit val context: ActorContext = {
    val contextStack = ActorCell.contextStack.get
    if ((contextStack.isEmpty) || (contextStack.head eq null))
      throw ActorInitializationException(...)
    val c = contextStack.head
    ActorCell.contextStack.set(null :: contextStack)
    c
  }
  
  //通过context.self得到ActorCell的参数化字段self
  //根据前面分析到的,ActorCell的构造函数是在LocalActorRef中调用,self实际是LocalActorRef的一个实例
  implicit final val self = context.self
  
  //此处可以知道Actor的默认监管策略
  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
}