[原创] Scala Actor 中的 sendBuffer 与 MailBox

Purking 2011-02-07
在 Scala 的 Actor 中,我有这么一段代码在其中:
           
while(avaMisstions > actor.getMailBoxSize) {
              actor ! queue.dequeue
              LOG.info("Avarage Distribute, Queue Size: {}, MailBox: {}", queue.size, actor.getMailBoxSize)
             }

这段代码在每一次向 actor 发送了消息以后,然后再利用 getMailBoxSize 获取这个 actor 的 mailbox 大小进行判断,从而决定是否继续向这个 actor 发送消息,可是这段代码在程序中执行的时候则不是我想象的那样,这里会是一个死循环。在我看了源代码以后我才发现,原来这个是 scala 在实现的时候一个 Actor 除了拥有一个公开的 MailBox 还有一个 sendBuffer ;


Actor 的特质: trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with InputChannel[Any];
Actor 的单例: object Actor extends Combinators  [这个继承不管了]
主要的 ReplyReactor 特质:  trait ReplyReactor extends Reactor[Any] with ReactorCanReply
     ReactorCanReply 里面主要定义了: !?, !! 方法,具体没有用到过我现在还不很清楚...
     Reactor 特质中存在 {
            private[actors] val mailbox = new MQueue[Msg](“Reactor”)
            private[actors] val sendBuffer = new MQueue[Msg]("SendBuffer”)
            最常使用的 ! 方法
     }

然后再看看 Reactor.! 方法:
def !(msg: Msg) {
    send(msg, null)
  }


def send(msg: Msg, replyTo: OutputChannel[Any]) {
    val todo = synchronized {
      if (waitingFor ne Reactor.waitingForNone) {
        val savedWaitingFor = waitingFor
        waitingFor = Reactor.waitingForNone
        startSearch(msg, replyTo, savedWaitingFor)
      } else {
        sendBuffer.append(msg, replyTo)
        () => { /* do nothing */ }
      }
    }
    todo()
  }


在调用了这段代码以后,仅仅是将需要发送的消息给存放进入了 sendBuffer 中并没有看到 MailBox 的相关代码. 我们从 Actor 中获取
MailBox 有如下代码:
1.  从 object Actor 中获取 => def mailboxSize: Int = rawSelf.mailboxSize   (很少通过 Actor 来获取 MailBox 吧- -||)
2.  trait Reactor 中获取 =>    protected[actors] def mailboxSize: Int = mailbox.size   (这个就是我们继承 Actor 后能够获取到的 MailBox 大小)

我所使用的是 Reactor 中的 def mailboxSize 方法,由于他是 protected 的我有时候需要在外面拿到,所以自己写了:
     def getMailBoxSize = mailboxSize

看到这里,我相当疑惑,那到底是什么地方将 sendBuffer 中的消息穿给了 mailbox 呢?在继续阅读 Reactor 后发现如下代码:
protected def react(handler: PartialFunction[Msg, Unit]): Nothing = {
    synchronized { drainSendBuffer(mailbox) }
    searchMailbox(mailbox, handler, false)
    throw Actor.suspendException
  }

  private[actors] def searchMailbox(startMbox: MQueue[Msg],
                                    handler: PartialFunction[Msg, Any],
                                    resumeOnSameThread: Boolean) {
    var tmpMbox = startMbox
    var done = false
    while (!done) {
      val qel = tmpMbox.extractFirst(handler)
      if (tmpMbox ne mailbox)
        tmpMbox.foreachAppend(mailbox)
      if (null eq qel) {
        synchronized {
          // in mean time new stuff might have arrived
          if (!sendBuffer.isEmpty) {
            tmpMbox = new MQueue[Msg]("Temp")
            drainSendBuffer(tmpMbox)
            // keep going
          } else {
            waitingFor = handler
            /* Here, we throw a SuspendActorControl to avoid
               terminating this actor when the current ReactorTask
               is finished.

               The SuspendActorControl skips the termination code
               in ReactorTask.
             */
            throw Actor.suspendException
          }
        }
      } else {
        resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
        done = true
      }
    }
  }


 private[actors] def drainSendBuffer(mbox: MQueue[Msg]) {
    sendBuffer.foreachDequeue(mbox)
  }

MQueue : 中的方法
     private[actors] class MQueue[Msg >: Null](protected val label: String)

def foreachDequeue(target: MQueue[Msg]) {
    var curr = first
    while (curr != null) {
      target.append(curr)
      curr = curr.next
    }
    first = null
    last = null
    _size = 0
  }


这下明了了,原来是在调用 react 的时候进行了两个操作,一个是 drainSendBuffer ,将 sendBuffer 中的消息全部转移到 mailbox 中,然后再进行了searchMailbox 方法,从 mailbox 中获取第一条消息,同时还到 sendBuffer 中搜索一次,如果 sendBuffer 不为空的话,再将其中的消息全部转移到 mailbox 中。 现在知道 react 会触发 sendBuffer 与 mailbox 之间的同步,那么 receive 方法呢?

object Actor 中的方法
  def receive[R](f: PartialFunction[Any, R]): R = {
    assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")

    synchronized {
      if (shouldExit) exit() // links
      drainSendBuffer(mailbox)
    }

    var done = false
    while (!done) {
      val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
        senders = replyTo :: senders
        val matches = f.isDefinedAt(m)
        senders = senders.tail
        matches
      })
      if (null eq qel) {
        synchronized {
          // in mean time new stuff might have arrived
          if (!sendBuffer.isEmpty) {
            drainSendBuffer(mailbox)
            // keep going
          } else {
            waitingFor = f
            isSuspended = true
            scheduler.managedBlock(blocker)
            drainSendBuffer(mailbox)
            // keep going
          }
        }
      } else {
        received = Some(qel.msg)
        senders = qel.session :: senders
        done = true
      }
    }

    val result = f(received.get)
    received = None
    senders = senders.tail
    result
  }



哈哈,接着又继续读了下 reactWith...  receiveWith... 方法,其中的实现都拥有这写相似部分.
结论
     Scala  Actor 中存放消息的 Mailbox 中的消息,并不是调用 ! sender 就向 mailbox 队列中放,而是经过了 sendBuffer 来缓存了一下,并且在 react/receive (..WithIn..) 方法触发的时候会进行一次同步。

疑问
在这个 sendBuffer 与 mailbox 中的消息同步的问题解决后,又有一个疑问,如果 react 等方法是如何检测到 mailbox 中的消息的?如何从 mailbox 中获取消息来执行的? 带着这个问题又回到了 Reactor 中,然后看到了searchMailbox 方法中的 while 循环:
  private[actors] def searchMailbox(startMbox: MQueue[Msg], // 调用这个方法的地方将 mailbox 传递了进来
                                    handler: PartialFunction[Msg, Any],
                                    resumeOnSameThread: Boolean) {
    var tmpMbox = startMbox
    var done = false
    while (!done) {
      val qel = tmpMbox.extractFirst(handler)
      if (tmpMbox ne mailbox)
        tmpMbox.foreachAppend(mailbox)
      if (null eq qel) {
        synchronized {
          // in mean time new stuff might have arrived
          if (!sendBuffer.isEmpty) {
            tmpMbox = new MQueue[Msg]("Temp")
            drainSendBuffer(tmpMbox)
            // keep going
          } else {
            waitingFor = handler
            /* Here, we throw a SuspendActorControl to avoid
               terminating this actor when the current ReactorTask
               is finished.

               The SuspendActorControl skips the termination code
               in ReactorTask.
             */
            throw Actor.suspendException
          }
        }
      } else {
        resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
        done = true
      }
    }
  }



大概能够猜到是怎么一回事了,不过还是不很明确从初始化一个 Actor 调用 react 方法之后到我发送一条消息使此 Actor 的 sendBuffer 中拥有消息之前这两个点之间的过程是如何处理的。看到后面发现 Actor 的 start 方法最后使用到了 Executor 框架,并且拥有自己的 ForkJoinScheduler,难道 Scala 自己拥有自己实现的 ForkJoin 实现? 那么 JDK7 发布后后调整吗?
Global site tag (gtag.js) - Google Analytics