Libevent确实方便了开发人员,对于定时器、信号处理、关心的文件或者套接字,只需要挂载到event_base上面,设置好对应的回调函数和参数就可以了,当对应的事件发生时,Libevent会自动调度相应的回调函数进行处理。
本文就按照之前在sshinner(https://github.com/taozhijiang/sshinner)中使用Libevent的过程,以这些接口函数作为突破点,沿着代码走了一朝,尝试探究一下Libevent的内部工作流程是怎样的。由于本人能力有限,有些东西可能不够详尽或者准确,还望不吝指出。
一、创建event_base
struct event_base * main_base = event_base_new(void);
主要是根据参数,创建event_base结构体,然后初始化一些数据,如果对默认的参数不满意需要个性化配置的话,可以先创建event_config,然后调用event_base_new_with_config来创建。其中在eventops这个变量中,按照优先级顺序排序罗列了常见的IO复用模型,比如kqueue、epoll、poll、select等,由于Libevent是跨平台的,这些IO复用在有些平台可能是不可用的,同时你还可以在event_config中选择过滤某些不想要的模型。
当选定了某个IO复用模型之后,其操作结构eventop就被添加到base->evsel中,然后调用其特定的init初始化函数。这些操作跟文件系统file_operations结构极为的类似。
那我们接着跟下去,看看大名鼎鼎的epoll类提供了哪些操作吧:
const struct eventop epollops = {
"epoll",
epoll_init,
epoll_nochangelist_add,
epoll_nochangelist_del,
epoll_dispatch,
epoll_dealloc,
1, /* need reinit */
EV_FEATURE_ET|EV_FEATURE_O1|EV_FEATURE_EARLY_CLOSE,
0
};
在初始化函数epoll_init当中,基本就类似epoll使用时候标准化的准备工作:首先调用epoll_create创建epfd,然后预先创建INITIAL_NEVENT(32)个空间用于存放epoll_event,如果使用了timerfd,则再调用timerfd_create创建对应的timerfd。最后这些fd以及epoll_event都存放在struct epollop当中,然后作为epoll_init函数的返回保存在base->evbase上。
struct epollop {
struct epoll_event *events; //数组
int nevents;
int epfd;
#ifdef USING_TIMERFD
int timerfd;
#endif
};
在创建event_base的最后,还调用了event_base_priority_init进行了一个初始化操作,如果有多个优先级,就有对应的多个等待队列挂靠在base->activequeues上面,而base->nactivequeues记录了优先级的数目。
二、创建listen套接字,并建立connect事件侦听
2.1 基本过程
listener = evconnlistener_new_bind(srvopt.main_base, accept_conn_cb, NULL,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1/*backlog*/,
(struct sockaddr*)&sin, sizeof(sin));
这个算是个简化版的函数,你可以自己先手动建立和绑定socket,然后再调用evconnlistener_new建立connect事件侦听。这个函数给socket设置了一个高大上的符号SO_KEEPALIVE(SO_KEEPALIVE 保持连接检测对方主机是否崩溃,避免服务器永远阻塞于TCP连接的输入),SOCKET开发还是有很多参数的,比如之前sshinner网络一直出问题,消息不能及时的被发送接收,最后跟踪shadowsockets-libev发现,是要给socket添加TCP_NODELAY参数,问题才得以解决。
在evconnlistener_new函数中,首先调用listen,然后分配evconnlistener_event这个数据结构,base作为struct evconnlistener类型传递给用户空间,而listener主要作为内部隐藏的数据结构,为通用的struct event数据类型。
struct evconnlistener_event {
struct evconnlistener base;
struct event listener;
};
struct evconnlistener {
const struct evconnlistener_ops *ops;
void *lock;
evconnlistener_cb cb;
evconnlistener_errorcb errorcb;
void *user_data;
unsigned flags;
short refcnt;
int accept4_flags;
unsigned enabled : 1;
};
struct event {
struct event_callback ev_evcallback;
/* for managing timeouts */
union {
TAILQ_ENTRY(event) ev_next_with_common_timeout;
int min_heap_idx;
} ev_timeout_pos;
evutil_socket_t ev_fd;
struct event_base *ev_base;
union {
/* used for io events */
struct {
LIST_ENTRY (event) ev_io_next;
struct timeval ev_timeout;
} ev_io;
/* used by signal events */
struct {
LIST_ENTRY (event) ev_signal_next;
short ev_ncalls;
/* Allows deletes in callback */
short *ev_pncalls;
} ev_signal;
} ev_;
short ev_events;
short ev_res; /* result passed to event callback */
struct timeval ev_timeout;
};
通过上面的数据结构可以清晰的发现,调用evconnlistener_new_bind函数作为参数提供的回调函数和参数,都被赋值给了evconnlistener_event->base.cb和base.user_data上面。接下来调用了两个比较重要的函数:
event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
listener_read_cb, lev);
evconnlistener_enable(&lev->base);
evconnlistener_enable(&lev->base)通过追根述源是调用了event_listener_enable,最后调用了event_add(&lev_e->listener, NULL)。而event_assign和event_add都是比较重要的函数,event_assign类似于event_new的作用,只不过参数是一个已经初始化了的struct event,而event_add则是把event由initialized状态变成pending状态,以便开始接收事件,其实后面可以发现,bufferevent_enable等接口,底层也是调用的event_add实现的。
接下来把上面这两个函数慢慢品读。
2.2 event_assign调用
event_assign(&lev->listener, base, fd, EV_READ|EV_PERSIST,
listener_read_cb, lev);
初看上面比较奇怪,在evconnlistener_new这个函数的上半部分已经设置了一个base.cb和base.user_data了,怎么下面又调用一个event_assign来设置一个listener_read_cb回调呢?其实上面是用户提供的callback和args,但是这并没有直接跟某个事件相关联,而下面的event_assign却是设置了&lev->listener(标准的struct event类型)为固定的listener_read_cb回调函数,当发生了EV_READ就会被自动调用。然后在listener_read_cb中,我们发现:
cb = lev->cb; //
user_data = lev->user_data; //
cb(lev, new_fd, (struct sockaddr*)&ss, (int)socklen, user_data);
errorcb(lev, user_data);
所以说,其实Libevent内部根本没有什么诸如LISTEN的事件,还是用的标准EV_READ(因为最最底层的epoll异步只能监听read/write/except事件),只是做了个封装,当连接之后激活EV_READ(为什么呢?为什么呢?)的回调函数,而出错了就调用err_callback函数,所以accept_conn_cb实际是被手动调用的。
2.3 event_add调用
evconnlistener_enable的调用被翻译到event_add函数,其实不光光是这里,后面最常用的bufferevent_enable这类函数,其实也是翻译到底层的event_add函数(event_add_nolock_)上面。
struct evconnlistener_event *lev_e =
EVUTIL_UPCAST(lev, struct evconnlistener_event, base); //这个宏比较好看
return event_add(&lev_e->listener, NULL);
#define EVUTIL_UPCAST(ptr, type, field) \
((type *)(((char*)(ptr)) - evutil_offsetof(type, field)))
event_add有两个参数,后面一个参数是struct timeval的超时参数,如果是NULL就表示无限期等待,这里先就不考虑这种情况。其中最主要做的事情就是调用evmap_io_add_/evmap_signal_add_函数将事件加入到base当中:
// 对照上面,返回-1 error, 0 没有实际操作, 1 真实添加了
res = evmap_io_add_(base, ev->ev_fd, ev);
res = evmap_signal_add_(base, (int)ev->ev_fd, ev);
// 上面函数的核心操作
GET_IO_SLOT_AND_CTOR(ctx, io, fd, evmap_io, evmap_io_init,
evsel->fdinfo_len);
evsel->add(base, ev->ev_fd,
old, (ev->ev_events & EV_ET) | res, extra);
首先,由于Libevent的设计是跨平台的,而Windows和Linux对socket和fd的表达和处理方式不同,GET_IO_SLOT_AND_CTOR的行为也有差异:Windows使用的是hashtable维护着struct event_map_entry结构,而Linux平台就直接是用的指针数组(数组,元素类型是指针),用fd作为偏移来索引,指针指向的结构按需分配,十分的简洁高效。
/* Used to map signal numbers to a list of events. If EVMAP_USE_HT is not
defined, this structure is also used as event_io_map, which maps fds to a
list of events.
*/
struct event_signal_map {
/* An array of evmap_io * or of evmap_signal *; empty entries are
* set to NULL. */
void **entries;
/* The number of entries available in entries */
int nentries;
};
/** Mapping from file descriptors to enabled (added) events */
struct event_io_map io;
/** Mapping from signal numbers to enabled (added) events. */
struct event_signal_map sigmap;
在每一个event_base结构体中,都定义了struct event_signal_map类型的两个成员io和sigmap(Linux平台),用于信号量和FD与events之间的事件映射。然后看GET_IO_SLOT_AND_CTOR(GET_SIGNAL_SLOT_AND_CTOR)这个宏,查询或者创建fd对应的struct evmap_io对象ctx,将当前的事件和之前的事件进行合并,并调用evsel->add进行更新(最终反应到底层epoll上面就是epoll_ctl命令进行操作了),并把当前的event结构体插入到前面找到的ctx->events链表当中。
三、建立主事件循环
无论是主线程,还是对于每个线程池用自己的event_base,最终都会调用这个函数作为主循环进行事件处理。
event_base_loop(main_base, 0);
先不考虑那些FLAG(控制何时推出啊啥的),在event_base_loop主要做的事情是:
/* F1 */ event_queue_make_later_events_active(base);
/* F2 */ res = evsel->dispatch(base, tv_p);
/* F3 */ timeout_process(base);
F1的函数主要是在Libevent中引用了Deferred Callback机制,操作上就是从event_base的active_later_queue队列中将事件取出来,然后添加到activequeues[evcb->evcb_pri]对应优先级队列上面。
F2对于epoll类型,就是调用epoll_dispatch函数:首先调用epoll_apply_changes(base);对event_base->changelist上面挂靠的所有对fd的事件修改都执行底层修改使之生效;然后用res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);获取被激活的事件;对获取到的每个fd的事件,提取被激活的事件类型,然后调用evmap_io_active_(base, events[i].data.fd, ev | EV_ET);函数处理。
evmap_io_active_(struct event_base *base, evutil_socket_t fd, short events)
{
struct event_io_map *io = &base->io;
struct evmap_io *ctx;
struct event *ev;
GET_IO_SLOT(ctx, io, fd, evmap_io);
LIST_FOREACH(ev, &ctx->events, ev_io_next) {
if (ev->ev_events & events)
event_active_nolock_(ev, ev->ev_events & events, 1);
}
}
执行的结果就是,这个fd对应的evmap_io上的所有事件,以及Deferred Callback事件,都被收集添加到event_base->activequeues[evcb->evcb_pri]队列中去。
于是乎,最后的好戏就是:
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);
在event_process_active中调用了event_process_active_single_queue。当然作者考虑的细节还是比较细腻的:如果当前被调度的活动事件过多,就考虑在timer和maxcb两个维度上限制本轮的事件处理量,而在event_process_active_single_queue中,会不断从事件链表中取出事件处理(包括执行对应的回调函数)。从实现方式上看来,按照优先级从高到低的顺序,每一轮只处理一个最高优先级非空事件队列中的事件,然后就返回了。这样看来,如果高优先级的事件太多太活跃,那么低优先级的事件还是会有被饿死的风险。
四、基于bufferevent的普通socket读写事件
bufferevent使得网络的开发变的很方便,无论是从事件还是底层的evbuffer都提供了一套丰富灵活的接口。但是需要注意的是bufferevent目前只能用于TCP连接的类型,对于UDP只能手动建立struct event事件,然后设置事件和回调函数了,而且在回调函数中,一般也只能调用sendto/recvfrom等操作接口。
struct event_base *base = evconnlistener_get_base(listener);
struct bufferevent *bev =
bufferevent_socket_new(base, fd, 0 /*BEV_OPT_CLOSE_ON_FREE*/);
bufferevent_setcb(bev, bufferread_cb, NULL, bufferevent_cb, NULL);
bufferevent_enable(bev, EV_READ|EV_WRITE);
上面算是在网络开发中用的最频繁的了,比如在listener的connection回调函数中,接收到一个新的套接字fd,那么就对这个套接字设置bufferevent事件,设置对应的回调函数。
在bufferevent_socket_new函数中:
struct bufferevent_private *bufev_p;
struct bufferevent *bufev;
bufferevent_init_common_(bufev_p, base, &bufferevent_ops_socket, options);
bufev_p->bev;
event_assign(&bufev->ev_read, bufev->ev_base, fd,
EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev);
event_assign(&bufev->ev_write, bufev->ev_base, fd,
EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev);
evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
evbuffer_freeze(bufev->input, 0);
evbuffer_freeze(bufev->output, 1);
跟之前的struct evconnlistener_event一样,这里返回给用户空间可用的struct bufferevent也是struct bufferevent_private的一部分,不过相比listener的单个struct event,这里的bufferevent内容成员要复杂的多,其中我们比较熟悉的有:input、output两个evbuffer,可以调用evbuffer的族函数进行相关的高级处理;此外还设置了be_ops为bufferevent_ops_socket,而enabled使能的时间中默认为EV_WRITE,所以EV_READ需要手动enable;接下来设置bufev_private->deferred的callback回调函数和调用参数为bufferevent_run_deferred_callbacks_locked和bufev_private。
最后的两个event_assign分别将ev_read和ev_write两个event的回调函数设置为了bufferevent_readcb/bufferevent_writecb。由于EV_WRITE默认是使能的,所以还调用了evbuffer_add_cb设置其默认的回调函数为bufferevent_socket_outbuf_cb。为了安全,还将两个evbuffer先冻结起来,准备工作还未就绪,所以此时还不允许数据传输。
其实,正如上面的例子,对于bufferevent,通常的写操作就使用其默认的callback就可以了,实际开发当中我们最关心的是读事件,因为我们要接收数据处理数据(即便只是转发操作),而写数据只要准备好要发送的数据,底层的写就让其自动处理就可以了。
说到底,这里的bufferevent和evconnlistener类似,也是采用了两段式设计:在bufferevent中的ev_read/write被激活调度的时候,其自动执行的是bufferevent_readcb/writecb函数,在这些标准函数中会做一些的预先处理操作,比如evbuffer_read/evbuffer_write_atmost的读写,到最后通过bufferevent_trigger_nolock_调用用户设定的回调函数。然后你可能感兴趣EV_WRITE默认的bufferevent_socket_outbuf_cb干了啥?查看其代码,其实也就是:检查确保当前ev_write是否是pending的,如果不是就bufferevent_add_event_变成pending状态就好了。我们知道,epoll事件最底层是操作系统直接驱动的,所以如果底层驱动发现socket是可写的,就可以调度底层发送数据了,因此这个函数实际上其实啥都没做。
最后的bufferevent_enable(bev, EV_READ|EV_WRITE);跟之前的evconnlistener_enable也大差不差的,最终都是通过be_socket_enable->bufferevent_add_event_->event_add方式,将event加入到对应的event_base上面去,使之变为pending状态。
五、小结
Libevent的代码实现的十分精妙,注释也比较多,当然也有一些参数和符号尚无注释,自己暂时也没能意会,本文尚有很多需要补充之处。
哎,想想自己epoll+线程池两个c文件搞定,而Libevent把这个做的如此之精妙(且尚无线程池模型),不得不由衷的让人敬佩:牛人怎么就那么牛呢?这也使得我坚信,初学者和企业应用之间总隔着一条沟沟需要跨越,如果没有项目带着你走,那就可以靠读这些开源项目的代码来弥合,希望大家都能愉快的玩耍和成长!
本文完!