kqueue in libevent

[TOC]
kqueue 是 FreeBSD 上的一种的多路复用机制。它是针对传统的 select/poll 处理大量的文件描述符性能较低效而开发出来的。注册一批描述符到 kqueue 以后,当其中的描述符状态发生变化时,kqueue 将一次性通知应用程序哪些描述符可读、可写或出错了。

kqueue知识点说明

kqueue体系只有三样东西:struct kevent结构体,EV_SET宏以及kevent函数。即 kqueue()、kevent() 两个系统调用和 struct kevent 结构。

kevent

struct kevent { 
   uintptr_t ident;    /* 事件 ID */ 
   short   filter;    /* 事件过滤器 */ 
    /*
    添加到kqueue中:EV_ADD, 从kqueue中删除:EV_DELETE, 这两种是主要的行为
    一次性事件:EV_ONESHOT, 此事件是或操作, 指定了该事件, kevent()返回后, 事件会从kqueue中删除
    更新事件: EV_CLEAR,此事件是或操作, 手册上的解释是,当事件通知给用户后,事件的状态会被重置。可以用在类似      于epoll的ET模式,也可以用在描述符有时会出错的情况。
    其他事件: EOF事件:EV_EOF, 错误事件:EV_ERROR(返回值)
    */
   u_short  flags;    /* 行为标识 */ 
    
   u_int   fflags;    /* 过滤器标识值 */ 
   intptr_t data;     /* 过滤器数据 */ 
   void   *udata;    /* 应用透传数据 */ 
 }; 
在一个 kqueue 中,{ident, filter} 确定一个唯一的事件。

kevent函数

/* kevent 提供向内核注册反注册事件和返回就绪事件或错误事件: 
kq: kqueue 的文件描述符。 
changelist: 要注册 / 反注册的事件数组; 
nchanges: changelist 的元素个数。 
eventlist: 满足条件的通知事件数组; 
nevents: eventlist 的元素个数。 
timeout: 等待事件到来时的超时时间,0,立刻返回;NULL,一直等待;有一个具体值,等待 timespec 时间值。 返回值:可用事件的个数。*/
int kevent(int kq, const struct kevent *changelist, int nchanges, 
        struct kevent *eventlist, int nevents, 
        const struct timespec *timeout);
kqueue的参考文档

APUE:KQUEUE / FreeBSD

IBM kqueue

kqueue.c

struct kqop {
    struct kevent *changes;//有事件发生的队列
    int nchanges;//发生事情的个数。这样可以遍历事件队列
    struct kevent *events;//监控队列
    int nevents;//监控队列长度
    int kq;// kq队列
} kqop;

所有的方法

void *kq_init   (void);
int kq_add  (void *, struct event *);
int kq_del  (void *, struct event *);
int kq_recalc   (void *, int);
int kq_dispatch (void *, struct timeval *); 

将该类注册为eventop的一个实例。

struct eventop kqops = {
    "kqueue",
    kq_init,
    kq_add,
    kq_del,
    kq_recalc,
    kq_dispatch
};

kq_init

  1. 先检查环节变量,是否设置为kqueue可用。
  2. 初始化一个kqueue的队列。
  3. 初始化kevent的事件队列。
void *
kq_init(void)
{
    int kq;

    /* Disable kqueue when this environment variable is set */
    if (getenv("EVENT_NOKQUEUE"))
        return (NULL);

    memset(&kqop, 0, sizeof(kqop));

    /* Initalize the kernel queue */
    
    if ((kq = kqueue()) == -1) {
        log_error("kqueue");
        return (NULL);
    }

    kqop.kq = kq;

    /* Initalize fields */
    kqop.changes = malloc(NEVENT * sizeof(struct kevent));
    if (kqop.changes == NULL)
        return (NULL);
    kqop.events = malloc(NEVENT * sizeof(struct kevent));
    if (kqop.events == NULL) {
        free (kqop.changes);
        return (NULL);
    }
    kqop.nevents = NEVENT;

    return (&kqop);
}

kq_recal

这个类,不像select。发生一次事件后,需要重新注册事件。所以直接返回0。

int
kq_recalc(void *arg, int max)
{
    return (0);
}

kq_add

int
kq_add(void *arg, struct event *ev)
{
    struct kqop *kqop = arg;//获取this指针。
    struct kevent kev;//待添加事件

    //检查是否为信号事件。
    if (ev->ev_events & EV_SIGNAL) {
        int nsignal = EVENT_SIGNAL(ev);//获取句柄

        memset(&kev, 0, sizeof(kev));
        kev.ident = nsignal;//identifier for this event
        kev.filter = EVFILT_SIGNAL;// 有很多,EVFILT_READ,EVFILT_WRITE,EVFILT_TIMER等
        kev.flags = EV_ADD;//指定事件操作类型,比如EV_ADD,EV_ENABLE, EV_DELETE等 
        if (!(ev->ev_events & EV_PERSIST))
            kev.flags |= EV_ONESHOT;
        kev.udata = ev;//将ev当做参数了。
        
        if (kq_insert(kqop, &kev) == -1)
            return (-1);
        //注册信号行为。该信号函数是个空事件。可能留着以后用吧。
        if (signal(nsignal, kq_sighandler) == SIG_ERR)
            return (-1);

        ev->ev_flags |= EVLIST_X_KQINKERNEL;//给libevent的event的ev_flags添加
        return (0);
    }

    if (ev->ev_events & EV_READ) {
        memset(&kev, 0, sizeof(kev));
        kev.ident = ev->ev_fd;
        kev.filter = EVFILT_READ;
        kev.flags = EV_ADD;
        if (!(ev->ev_events & EV_PERSIST))
            kev.flags |= EV_ONESHOT;
        kev.udata = ev;
        
        if (kq_insert(kqop, &kev) == -1)
            return (-1);

        ev->ev_flags |= EVLIST_X_KQINKERNEL;
    }

    if (ev->ev_events & EV_WRITE) {
        memset(&kev, 0, sizeof(kev));
        kev.ident = ev->ev_fd;
        kev.filter = EVFILT_WRITE;
        kev.flags = EV_ADD;
        if (!(ev->ev_events & EV_PERSIST))
            kev.flags |= EV_ONESHOT;
        kev.udata = ev;
        
        if (kq_insert(kqop, &kev) == -1)
            return (-1);

        ev->ev_flags |= EVLIST_X_KQINKERNEL;
    }

    return (0);
}

kq_insert

int
kq_insert(struct kqop *kqop, struct kevent *kev)
{
    int nevents = kqop->nevents;

    if (kqop->nchanges == nevents) {
        struct kevent *newchange;
        struct kevent *newresult;
        //有事件过来,翻倍增加。
        nevents *= 2;

        newchange = realloc(kqop->changes,
                    nevents * sizeof(struct kevent));
        if (newchange == NULL) {
            log_error(__FUNCTION__": malloc");
            return (-1);
        }
        kqop->changes = newchange;
        //申请两个kevent。kqueue。一个kevent队列用来注册。一个kevent用来报告那些事件发生了。
        /* 觉得还是奇怪。不担心申请到同一片吗?*/
        newresult = realloc(kqop->changes,
                    nevents * sizeof(struct kevent));

        /*
         * If we fail, we don't have to worry about freeing,
         * the next realloc will pick it up.
         */
        if (newresult == NULL) {
            log_error(__FUNCTION__": malloc");
            return (-1);
        }
        kqop->events = newchange;

        kqop->nevents = nevents;
    }
    /* 压数据到最后*/
    memcpy(&kqop->changes[kqop->nchanges++], kev, sizeof(struct kevent));

    LOG_DBG((LOG_MISC, 70, __FUNCTION__": fd %d %s%s",
         kev->ident, 
         kev->filter == EVFILT_READ ? "EVFILT_READ" : "EVFILT_WRITE",
         kev->flags == EV_DELETE ? " (del)" : ""));

    return (0);
}

kq_del

int
kq_del(void *arg, struct event *ev)
{
    struct kqop *kqop = arg;
    struct kevent kev;
    //检查是不是kq的事件
    if (!(ev->ev_flags & EVLIST_X_KQINKERNEL))
        return (0);

    if (ev->ev_events & EV_SIGNAL) {
        int nsignal = EVENT_SIGNAL(ev);

        memset(&kev, 0, sizeof(kev));
        kev.ident = (int)signal;
        kev.filter = EVFILT_SIGNAL;
        kev.flags = EV_DELETE;//删除事件。
        //删除事件,也需要传入kqueue,等待内核删除。
        if (kq_insert(kqop, &kev) == -1)
            return (-1);
        //删除信号事件
        if (signal(nsignal, SIG_DFL) == SIG_ERR)
            return (-1);
        //将信号位置0.
        ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
        return (0);
    }

    if (ev->ev_events & EV_READ) {
        memset(&kev, 0, sizeof(kev));
        kev.ident = ev->ev_fd;
        kev.filter = EVFILT_READ;
        kev.flags = EV_DELETE;
        
        if (kq_insert(kqop, &kev) == -1)
            return (-1);

        ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
    }

    if (ev->ev_events & EV_WRITE) {
        memset(&kev, 0, sizeof(kev));
        kev.ident = ev->ev_fd;
        kev.filter = EVFILT_WRITE;
        kev.flags = EV_DELETE;
        
        if (kq_insert(kqop, &kev) == -1)
            return (-1);

        ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
    }

    return (0);
}

kq_dispatch

最重要的事件处理函数。

int
kq_dispatch(void *arg, struct timeval *tv)
{
    struct kqop *kqop = arg;
    struct kevent *changes = kqop->changes;
    struct kevent *events = kqop->events;
    struct event *ev;
    struct timespec ts;
    int i, res;

    TIMEVAL_TO_TIMESPEC(tv, &ts);//将毫秒级的时间,转为纳秒级

    res = kevent(kqop->kq, changes, kqop->nchanges,
             events, kqop->nevents, &ts);
    kqop->nchanges = 0;//这边把事件数目置0了。 todo
    if (res == -1) {
        if (errno != EINTR) {
            log_error("kevent");
            return (-1);
        }

        return (0);
    }

    LOG_DBG((LOG_MISC, 80, __FUNCTION__": kevent reports %d", res));

    //循环处理每个事件。根据flags来判断是发生了什么事件。
    for (i = 0; i < res; i++) {
        int which = 0;

        if (events[i].flags & EV_ERROR) {
            /* 
             * Error messages that can happen, when a delete fails.
             *   EBADF happens when the file discriptor has been
             *   closed,
             *   ENOENT when the file discriptor was closed and
             *   then reopened.
             * An error is also indicated when a callback deletes
             * an event we are still processing.  In that case
             * the data field is set to ENOENT.
             */
            if (events[i].data == EBADF ||
                events[i].data == ENOENT)
                continue;
            return (-1);
        }
        //前面add的时候,这边传的是ev
        ev = events[i].udata;
        //如果是EV_READ  EV_WRITE   EV_SIGNAL事件,就会跳过。
        if (events[i].filter == EVFILT_READ) {
            which |= EV_READ;
        } else if (events[i].filter == EVFILT_WRITE) {
            which |= EV_WRITE;
        } else if (events[i].filter == EVFILT_SIGNAL) {
            which |= EV_SIGNAL;
        } else
            events[i].filter = 0;

        if (!which)
            continue;
        /*只处理time事件和EV_PERSIST持久事件。
        将该事件添加到活动队列中。非信号事件只触发一次。
        并将该事件的ev_flags置为EVLIST_ACTIVE
        */ 
        event_active(ev, which,
            ev->ev_events & EV_SIGNAL ? events[i].data : 1);
    }

    for (i = 0; i < res; i++) {
        /* XXX */
        int ncalls, evres;

        if (events[i].flags & EV_ERROR || events[i].filter == NULL)
            continue;

        ev = events[i].udata;
        if (ev->ev_events & EV_PERSIST)
            continue;

        ncalls = 0;
        if (ev->ev_flags & EVLIST_ACTIVE) {
            ncalls = ev->ev_ncalls;
            evres = ev->ev_res;
        }
        ev->ev_flags &= ~EVLIST_X_KQINKERNEL;
        //非持久事件就删掉了。
        event_del(ev);
        /* 
        激活事件
        */ 
        if (ncalls)
            event_active(ev, evres, ncalls);
    }

    return (0);
}

收获

  1. 看完了整体的代码。感觉还是很神奇的。由不懂到懂经历了好长的时间。
  2. 代码上面没有感觉到什么特别之处。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容