前几篇讲解了服务之间互相发送消息,消息的挂起,恢复,fork等等,感觉还是有点搞不清他们之间是怎么协作的,例如fork产生的协程什么时候被调用,如果有多个fork又怎么被调用.这篇试着讲解一下.
首先要明白的是,一般所有的lua层函数都是以协程的方式被执行的,包括fork产生的函数.除非你在skynet.start()之外调用函数.这点通过前面的分析可以知道. start()函数调用timeout产生协程. fork产生协程列表.
我们知道,lua层设置的回调函数为skynet.dispatch_message. 他主要调用raw_dispatch_message. 在那里才是驱动协程函数执行的地方.一个协程结束或挂起之后将由suspend函数来接管,相信通过前面的讲解,大家对这个函数不陌生了.
如果入口函数start没有调用fork,sleep,wait之类的函数,那么驱动start()执行的消息将结束.看看我们的协程池函数:
local function co_create(f)
local co = table.remove(coroutine_pool)
if co == nil then
co = coroutine.create(function(...)
f(...) --① 函数执行完毕
while true do
f = nil
coroutine_pool[#coroutine_pool+1] = co
f = coroutine_yield "EXIT" --②挂起协程
f(coroutine_yield())
end
end)
else
coroutine_resume(co, f) --③
end
return co
end
在①处函数执行完毕,然后进入②,协程挂起. 将由suspend函数接管.执行cmd == 'EXIT'分支.
当再次收到消息时(注意,一般再次收到消息是其他服务的call调用,类型不为response),调用skynet.dispatch_message时,参看前面raw_dispatch_message的代码,如果消息类型不为response,那么将再次调用co_create().此时将执行3️⃣,回到前次消息挂起的地方,返回的f就是想要执行的、在dispatch中设置的回调函数。之后协程再次挂起,然后又恢复,直至suspend接管。
如果有skynet.sleep(),那么会产生一个sessionID并被挂起,suspend函数接管,用session_id_coroutine关联session和当前协程。等到时间到会产生resond类型的消息,根据sesssionID找到协程,恢复协程,此时start(func)中的func函数才算结束,即上面的①。
我们注意到suspend函数的'SLEEP'分支还有一个sleep_session表,他是用了干嘛的呢?
查找skynet.lua源码,发现skynet.wakeup引用到他了。在介绍skynet.wakeup()之前,先说说skynet.fork()。
skynet.fork()也会调用协程池函数co_create()来产生协程。那么调用这个函数时,会走③的流程,然后coroutine_resume(co)吗,那样岂不是乱套了。不会的!因为这个时候当前函数还没有结束呢,coroutine_pool仍然为空,走的还是if分支。所以他只是创建一个协程,插入到fork_queue表中,并没有启用协程。
我们追踪fork_queue,发现是在raw_dispatch_message结束后调用的,而且是顺序取出协程,并执行。raw_dispatch_message调用结束,可能是入口函数执行完毕,调用了yield('EXIT'),也有可能执行了sleep,调用了yield('SLEEP'),这都会导致suspend返回。
这里我们可以回到开头提出的问题了。skynet.fork()产生的的协程函数,将在主协程(start()启用的)结束或挂起时被调用。有多个fork产生的协程函数时,将按顺序挨个取出执行,全部执行完毕时,skynet.dispatch_message才算结束,下个消息才得以到达。在c接口层,同一个服务的消息队列是严格按照顺序来执行的,所以这样没有问题。
再次强调,lua协程只是模拟了多线程的执行,并不会真正有多个协程在执行,所以确保skynet.fork()函数中不要出现死循环或者很费时的操作,因为他不会像linux一样真正多线程的执行。
填一下上面挖的坑。skynet.wakeup是用来干嘛的呢?
我们想提前唤醒执行sleep函数的协程可以吗。当然可以,skynet.wakeup就是来干这个的。sleep后,何时去执行skynet.wakeup,毕竟sleep挂起协程之后要等到超时才会重新恢复,既然恢复之后,再调用wakeup还有什么用呢。你可能想到了,在skynet.fork()产生的协程中调用。前面说到了,sleep之后就可以执行fork里的协程了。skynet.wakeup是怎么做到的呢?
看看skynet.wakeup的实现:
function skynet.wakeup(co)
if sleep_session[co] then
table.insert(wakeup_queue, co)
return true
end
end
他只是根据sleep_session表来产生一个新的表wakeup_queue。
如果要唤醒的协程没有在sleep_session表中,则不会有任何结果。前面说到sleep()函数会让suspend产生sleep_session表来关联要挂起的协程。所以waitup一个sleep的协程会插入到wakeup_queue表中。而wakeup_queue是在disptch_wakeup中取出并恢复的。disptch_wakeup仍然是在suspend中最后调用的,也就是说一个协程被挂起或结束后会去检查wakeup_queue列表。
disptch_wakeup的实现为:
local function dispatch_wakeup()
local co = table.remove(wakeup_queue,1)
if co then
local session = sleep_session[co]
if session then
session_id_coroutine[session] = "BREAK"
return suspend(co, coroutine_resume(co, false, "BREAK"))
end
end
end
注意,即使调用了wakeup,也有可能不会立马唤醒那个协程,因为wakeup只是插入了一个挂起的协程,真正要到disptch_wakeup中去恢复,如果wakeup后面有个很费时的操作,那么将等到他执行完毕,才会在suspend最后去调用disptch_wakeup。
这下我们明白了sleep_session的作用。然而我们发现在skynet.wait()函数里面也有这个表,部分代码也与skynet.sleep()类似。
不难猜到skynet.wait()的作用:挂起指定的或者正在运行的协程。由于他不会超时,所以必须由skynet.wakeup()来唤醒。我们可以用这组函数来实现资源的同步。
有个有趣的问题是,sleep之后,被wakeup,sleep超时之后仍然会收到一条response类型的消息,只不过sessionID对应的协程在dispatch_wakeup中已经置为'BREAK'了,他不会再做任何操作了。
最后附一张图,来说明上面的过程:
至此,skynet lua层中基本所有的消息流程就分析到了。周日花了差不多一下午写的,坐着很累了,如有遗漏的地方下次再补充。