libuv 源码分析1: loop和poll
0 背景
libuv是一个开源异步I/O库(Asynchronous I/O)。主页在这里libuv
应用案例:Nodejs . 比起libevent来说,比较年轻。
前提假设:本文假设你对unix上的套接字编程比较熟悉,熟悉阻塞/非阻塞套接字,了解select, poll, epoll。
先上一张libuv架构图:
在linux上,libuv是对epoll的封装;在windows上,是对完成端口的封装;在macOS/FreeBSD上,是对kqueue的封装。
本文不讨论libuv基本的用法,只是稍微分析一下源码。本文只分析loop, poll和tcp。tcp下一章再讲。
1. uv_run执行了些什么
uv_run是uv消息队列的入口函数,我们看一下里面执行了些什么:
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
/* The if statement lets gcc compile it to a conditional store. Avoids
* dirtying a cache line.
*/
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
我们可以看到主要执行了以下几个步骤:
-
uv__update_time
: 更新时间 -
uv__run_timers
: 调用timer。 -
uv__run_pending
: 调用回调。通常来说回调一般在poll fd结束后就立即执行,但是总有例外:有一些I/O回调会延迟到下一次迭代中执行。那些被延迟的回调正是在这里执行 -
uv__run_idle
: 执行idle。查看uv__run_idle
的实现,发现仅仅是对idle进行遍历、执行回调,并没有删除的操作,所以idle是每次while都执行的。idle名不副实。(除了idle, prepare, check,都是执行完一个删除一个。既然回调都执行完了,保存也肯定没有必要) -
uv__run_prepare
: 执行prepare -
uv__io_poll(loop, timeout)
: 把新增的需要被监听的fd放到poll中;poll我们所关心的fd,注意有一个timeout。 -
uv__run_check
: 执行check -
uv__run_closing_handles
: 执行close handle。
这和uv官网上贴的图是一致的:
下面我们简要来看一下几个主要函数:
1.0 uv_handle_t
uv_handle_t
: 一个handle, 是uv_tcp_t, uv_udp_t, uv_timer_t, uv_poll_t
等的公共父类(通过包含uv_handle_t
结构体的所有成员来达到继承,通过uv_handle_t.type
达到辨别是哪个子类)。
对与uv_handle_t
的任何子类的关闭,比如uv_tcp_t
,都需要调用uv_close(uv_handle_t*, uv_close_cb)
。其中uv_close_cb
一般用于释放资源,比如我们的handle是通过malloc得到的,此时就要free。如果是在栈上分配的,就不能free,可以进行其他清理工作。特别是timer,不仅仅要stop,还要close。
1.1 uv_run_timers
看一下实现:
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;
for (;;) {
heap_node = heap_min((struct heap*) &loop->timer_heap);
if (heap_node == NULL)
break;
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout > loop->time)
break;
uv_timer_stop(handle);
uv_timer_again(handle);
handle->timer_cb(handle);
}
}
从一个小堆中依次取出node,如果超时了,执行cb,再把timer放回小堆里面;如果没超时,则break,后续的也不再检查(最小的都没超时,后面更大的也不可能超时)。
1.2 uv_run_pending
简单的从队列从取出,再自行回调:
while (!QUEUE_EMPTY(&pq)) {
q = QUEUE_HEAD(&pq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}
1.3 uv__run_idle
uv__run_prepare
uv__run_check
同上。不过idle从队列取出后,会再放回去。所以idle每次都要手动的stop。我就碰到过一次,cpu占用率100%,检查后发现是idle没有暂停。
QUEUE_MOVE(&loop->name##_handles, &queue); \
while (!QUEUE_EMPTY(&queue)) { \
q = QUEUE_HEAD(&queue); \
h = QUEUE_DATA(q, uv_##name##_t, queue); \
QUEUE_REMOVE(q); \
QUEUE_INSERT_TAIL(&loop->name##_handles, q); \
h->name##_cb(h); \
}
可以看到先把loop中的队列move到queue上,依次遍历queue每个handle后,再把遍历的handle放到loop中的队列。
uv__run_prepare
、uv__run_check
和uv__run_idle
都是一样的。因为它们的init, start, stop,run定义都是通过同一个宏实现的(所以prepare和check也要手动停止):
UV_LOOP_WATCHER_DEFINE(prepare, PREPARE)
UV_LOOP_WATCHER_DEFINE(check, CHECK)
UV_LOOP_WATCHER_DEFINE(idle, IDLE)
#define UV_LOOP_WATCHER_DEFINE(name, type) \
int uv_##name##_init(uv_loop_t* loop, uv_##name##_t* handle) { \
uv__handle_init(loop, (uv_handle_t*)handle, UV_##type); \
handle->name##_cb = NULL; \
return 0; \
} \
int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) { \
... \
}
int uv_##name##_stop(uv_##name##_t* handle) { \
... \
}
void uv__run_##name(uv_loop_t* loop) { \
... \
} \
\
void uv__##name##_close(uv_##name##_t* handle) { \
... \
}
UV_LOOP_WATCHER_DEFINE
定义了init, start, top, run函数,有兴趣的可以看一下
1.4 uv_backend_timeout
和 uv__io_poll
这个是监听文件描述符的函数,也就是调用epoll/kqueue/IOCP监听套接字的函数。
首先看一下它的timeout是怎么计算的:
int uv_backend_timeout(const uv_loop_t* loop) {
if (loop->stop_flag != 0)
return 0;
if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
return 0;
if (!QUEUE_EMPTY(&loop->idle_handles))
return 0;
if (!QUEUE_EMPTY(&loop->pending_queue))
return 0;
if (loop->closing_handles)
return 0;
return uv__next_timeout(loop);
}
int uv__next_timeout(const uv_loop_t* loop) {
const struct heap_node* heap_node;
const uv_timer_t* handle;
uint64_t diff;
heap_node = heap_min((const struct heap*) &loop->timer_heap);
if (heap_node == NULL)
return -1; /* block indefinitely */
handle = container_of(heap_node, uv_timer_t, heap_node);
if (handle->timeout <= loop->time)
return 0;
diff = handle->timeout - loop->time;
if (diff > INT_MAX)
diff = INT_MAX;
return diff;
}
- 没有活跃的handle,返回0. epoll中,timeout为0的意思是立即返回。
- idle或pending_queue不为空,返回0
- loop被关闭返会0
- timer队列为空,返回-1. epoll中,timeout为-1的意思是阻塞直到有fd有event产生
- timer队列不为空,如果timer超时了,返回0。否则timeout为从现在到最早timer要超时的时间。比如现在是19:00:00, timer最早超时未19:00:10,那么timeout为10s。
这也就解释了,为什么不手动stop idle,loop会一直转。就是因为这里io poll不等待,使得while一直空转。
下面看看真正的poll:
void uv__io_poll(uv_loop_t* loop, int timeout) {
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
// ...
uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_ADD, w->fd, &e)
// ...
}
for (;;) {
// ...
nfds = uv__epoll_wait(loop->backend_fd, events, ARRAY_SIZE(events), timeout);
// ...
pe->events &= w->pevents | POLLERR | POLLHUP;
w->cb(loop, w, pe->events);
}
// ...
}
int uv__epoll_pwait(int epfd, struct uv__epoll_event* events, int nevents, int timeout, uint64_t sigmask) {
#if defined(__NR_epoll_pwait)
int result;
result = syscall(__NR_epoll_pwait, epfd, events, nevents, timeout, &sigmask, sizeof(sigmask));
#if MSAN_ACTIVE
if (result > 0)
__msan_unpoison(events, sizeof(events[0]) * result);
#endif
return result;
#else
return errno = ENOSYS, -1;
#endif
}
- 先把
watcher_queue
中剩余的fd移出来并放到poll队列中; - poll等待,如果在超时之前有fd返回,则调用对应fd的回调, 然后继续等待直到timeout超时。
uv__io_poll
中包含了非常精巧的一点:libuv中的timer是由poll等待fd的时间来达到的:如果还没达到timer超时,则继续等待;如果超时了,就不在等待,进入到loop下一个步骤当中。并不是调用了操作系统timer相关的API。
通过syscall调用epoll。我们调用系统函数,不一定要通过具体名字,可以通过syscall然后传入要调用的系统函数对应的id。
1.5 uv__run_closing_handles
遍历等待关闭的队列,关闭stream(包括tcp,pipe等)或者udp以及其他handle,调用handle对应的close_cb
总结:uv在linux上是对epoll的封装;idle、prepare、check要手动关闭,否则while会一直循环。
下面来整理一下loop的数据结构(挑选了几个比较重要的field,有删减):
struct uv_loop_s {
int backend_fd; // epoll_create返回的fd。最终针对这个fd进行epoll操作
void* data; // 用户可以用这个字段来保存数据
void* handle_queue[2]; // handle双链表。一切uv_handle_t子类(包括但不限于tcp, udp,pipe)实例的地址、地址、地址,都在这个双链表里面。为了简便,后面会忽略存储的是handle的地址。
void* watcher_queue[2]; // 用户在上一个迭代中新增加的需要监听的fd,这些fd还没有添加到epoll中。这些fd,将在这次迭代中被添加到epoll中
uv__io_t** watchers; // 保存我们监听的fd相关的数据结构(即uv__io_t,包括fd本身,callback回调,epoll关心的events等)。是通过watchers[fd]来索引得到uv__io_t的。
unsigned int nwatchers; // watchers的大小,不是个数
unsigned int nfds; // watchers的个数
struct {
void* min;
unsigned int nelts;
} timer_heap; // 小堆。用来保存timer
uv_handle_t* closing_handles; // 待关闭队列。执行了uv_close后的handle被放入到这个队列
};
2. uv_poll_t
要对一个fd poll的话,主要有以下步骤:
int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd);
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb);
int uv_poll_stop(uv_poll_t* handle);
-
uv_close
: 把handle放到loop->closing_handles队列中。
2.1 uv_poll_init
主要执行了一下几个步骤:
-
uv__io_check_fd
: 检测fd是否有效。先把fd放到epoll队列里(UV__EPOLL_CTL_ADD),再移出来(UV__EPOLL_CTL_DEL),出错了返回或者abort; -
uv__nonblock
: 设置套接字为非阻塞 -
uv__handle_init
: 初始化handle。把handle插入到loop->handle_queue的队列尾端,设置handle类型为UV_POLL
。 -
uv__io_init
: 初始化uv__io_t
2.2 uv_poll_start
主要执行了以下几个步骤:
-
uv__io_start
: 把fd对应的uv__io_t
添加到watcher_queue
队尾。更新loop->watchers, loop->nfds等等。如果loop->watchers太小,则先扩容再更新loop->watchers - 把我们传入进来的回调赋值给handle->poll_cb。当有数据来了后,会调用我们这个回调。
2.2 uv_poll_stop
主要做了以下几个步骤:
- 把
uv_poll_t
对应的uv__io_t
(因为是handle子类,所有有uv__io_t
)从loop->watcher_queue中移出。
2.3 uv_close
- 把fd对应的handle放到
loop->closing_handles
中。uv会在下个循环close掉closing_handles
中的fd,以及会释放相关资源。