[原创] Scala Actor 中的 sendBuffer 与 MailBox
Purking
2011-02-07
在 Scala 的 Actor 中,我有这么一段代码在其中:
while(avaMisstions > actor.getMailBoxSize) { actor ! queue.dequeue LOG.info("Avarage Distribute, Queue Size: {}, MailBox: {}", queue.size, actor.getMailBoxSize) } 这段代码在每一次向 actor 发送了消息以后,然后再利用 getMailBoxSize 获取这个 actor 的 mailbox 大小进行判断,从而决定是否继续向这个 actor 发送消息,可是这段代码在程序中执行的时候则不是我想象的那样,这里会是一个死循环。在我看了源代码以后我才发现,原来这个是 scala 在实现的时候一个 Actor 除了拥有一个公开的 MailBox 还有一个 sendBuffer ; Actor 的特质: trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with InputChannel[Any]; Actor 的单例: object Actor extends Combinators [这个继承不管了] 主要的 ReplyReactor 特质: trait ReplyReactor extends Reactor[Any] with ReactorCanReply ReactorCanReply 里面主要定义了: !?, !! 方法,具体没有用到过我现在还不很清楚... Reactor 特质中存在 { private[actors] val mailbox = new MQueue[Msg](“Reactor”) private[actors] val sendBuffer = new MQueue[Msg]("SendBuffer”) 最常使用的 ! 方法 } 然后再看看 Reactor.! 方法: def !(msg: Msg) { send(msg, null) } def send(msg: Msg, replyTo: OutputChannel[Any]) { val todo = synchronized { if (waitingFor ne Reactor.waitingForNone) { val savedWaitingFor = waitingFor waitingFor = Reactor.waitingForNone startSearch(msg, replyTo, savedWaitingFor) } else { sendBuffer.append(msg, replyTo) () => { /* do nothing */ } } } todo() } 在调用了这段代码以后,仅仅是将需要发送的消息给存放进入了 sendBuffer 中并没有看到 MailBox 的相关代码. 我们从 Actor 中获取 MailBox 有如下代码: 1. 从 object Actor 中获取 => def mailboxSize: Int = rawSelf.mailboxSize (很少通过 Actor 来获取 MailBox 吧- -||) 2. trait Reactor 中获取 => protected[actors] def mailboxSize: Int = mailbox.size (这个就是我们继承 Actor 后能够获取到的 MailBox 大小) 我所使用的是 Reactor 中的 def mailboxSize 方法,由于他是 protected 的我有时候需要在外面拿到,所以自己写了: def getMailBoxSize = mailboxSize 看到这里,我相当疑惑,那到底是什么地方将 sendBuffer 中的消息穿给了 mailbox 呢?在继续阅读 Reactor 后发现如下代码: protected def react(handler: PartialFunction[Msg, Unit]): Nothing = { synchronized { drainSendBuffer(mailbox) } searchMailbox(mailbox, handler, false) throw Actor.suspendException } private[actors] def searchMailbox(startMbox: MQueue[Msg], handler: PartialFunction[Msg, Any], resumeOnSameThread: Boolean) { var tmpMbox = startMbox var done = false while (!done) { val qel = tmpMbox.extractFirst(handler) if (tmpMbox ne mailbox) tmpMbox.foreachAppend(mailbox) if (null eq qel) { synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { tmpMbox = new MQueue[Msg]("Temp") drainSendBuffer(tmpMbox) // keep going } else { waitingFor = handler /* Here, we throw a SuspendActorControl to avoid terminating this actor when the current ReactorTask is finished. The SuspendActorControl skips the termination code in ReactorTask. */ throw Actor.suspendException } } } else { resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) done = true } } } private[actors] def drainSendBuffer(mbox: MQueue[Msg]) { sendBuffer.foreachDequeue(mbox) } MQueue : 中的方法 private[actors] class MQueue[Msg >: Null](protected val label: String) def foreachDequeue(target: MQueue[Msg]) { var curr = first while (curr != null) { target.append(curr) curr = curr.next } first = null last = null _size = 0 } 这下明了了,原来是在调用 react 的时候进行了两个操作,一个是 drainSendBuffer ,将 sendBuffer 中的消息全部转移到 mailbox 中,然后再进行了searchMailbox 方法,从 mailbox 中获取第一条消息,同时还到 sendBuffer 中搜索一次,如果 sendBuffer 不为空的话,再将其中的消息全部转移到 mailbox 中。 现在知道 react 会触发 sendBuffer 与 mailbox 之间的同步,那么 receive 方法呢? object Actor 中的方法 def receive[R](f: PartialFunction[Any, R]): R = { assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") synchronized { if (shouldExit) exit() // links drainSendBuffer(mailbox) } var done = false while (!done) { val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => { senders = replyTo :: senders val matches = f.isDefinedAt(m) senders = senders.tail matches }) if (null eq qel) { synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { drainSendBuffer(mailbox) // keep going } else { waitingFor = f isSuspended = true scheduler.managedBlock(blocker) drainSendBuffer(mailbox) // keep going } } } else { received = Some(qel.msg) senders = qel.session :: senders done = true } } val result = f(received.get) received = None senders = senders.tail result } 哈哈,接着又继续读了下 reactWith... receiveWith... 方法,其中的实现都拥有这写相似部分. 结论: Scala Actor 中存放消息的 Mailbox 中的消息,并不是调用 ! sender 就向 mailbox 队列中放,而是经过了 sendBuffer 来缓存了一下,并且在 react/receive (..WithIn..) 方法触发的时候会进行一次同步。 疑问 在这个 sendBuffer 与 mailbox 中的消息同步的问题解决后,又有一个疑问,如果 react 等方法是如何检测到 mailbox 中的消息的?如何从 mailbox 中获取消息来执行的? 带着这个问题又回到了 Reactor 中,然后看到了searchMailbox 方法中的 while 循环: private[actors] def searchMailbox(startMbox: MQueue[Msg], // 调用这个方法的地方将 mailbox 传递了进来 handler: PartialFunction[Msg, Any], resumeOnSameThread: Boolean) { var tmpMbox = startMbox var done = false while (!done) { val qel = tmpMbox.extractFirst(handler) if (tmpMbox ne mailbox) tmpMbox.foreachAppend(mailbox) if (null eq qel) { synchronized { // in mean time new stuff might have arrived if (!sendBuffer.isEmpty) { tmpMbox = new MQueue[Msg]("Temp") drainSendBuffer(tmpMbox) // keep going } else { waitingFor = handler /* Here, we throw a SuspendActorControl to avoid terminating this actor when the current ReactorTask is finished. The SuspendActorControl skips the termination code in ReactorTask. */ throw Actor.suspendException } } } else { resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) done = true } } } 大概能够猜到是怎么一回事了,不过还是不很明确从初始化一个 Actor 调用 react 方法之后到我发送一条消息使此 Actor 的 sendBuffer 中拥有消息之前这两个点之间的过程是如何处理的。看到后面发现 Actor 的 start 方法最后使用到了 Executor 框架,并且拥有自己的 ForkJoinScheduler,难道 Scala 自己拥有自己实现的 ForkJoin 实现? 那么 JDK7 发布后后调整吗? |