epoll源码分析以及在Redis中的实现

1.概述

这篇文章分析一下linux中epoll的实现原理,主要为了增强自己对网络调用的理解。业界使用epoll的框架比较多,随便就能列出来很多,比如jdk的nio在linux下的实现,以及netty、redis等涉及到长链接网络请求的地方,我们都可以直接使用epoll。文末会从redis源码简单看看如何使用epoll做IO多路复用实现高并发。

2.具体实现

参考官方文档描述:

The central concept of the epoll API is the epoll instance, an inkernel data structure which, from a user-space perspective, can be considered as a container for two lists

所以 其实就是epoll就是内核的一个数据结构。从用户空间的角度,其实就是两个链表。所以基本上就是维护两个链表就可以了。 理解完这段话,我们也就能理解Epoll提供了三个方法了:

create是初始化这个内核的数据结构。返回一个fd。众所周知,unix万物皆文件。所以,这里创建返回的是一个文件fd。每次操作我们只需要传入fd,内核便能拿到epoll对应的数据结构。

epoll_ctl就是对其中一个链表的操作。这个链表存放用户感兴趣的io事件。当然在注册事件的时候,会有一些其他的操作。后面详细解释

epoll_wait,则是返回就绪事件(感兴趣的事件)。然后让应用层去处理。

int epoll_create(int size);

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

struct epoll_event {

    __uint32_t events;

    epoll_data_t data;

};

epoll_create方法

SYSCALL_DEFINE1(epoll_create1, int, flags)

{

    int error, fd;

    struct eventpoll *ep = NULL;

    struct file *file;

    // 创建内部数据结构eventpoll

    error = ep_alloc(&ep);

    //查询未使用的fd

    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));

    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,

                O_RDWR | (flags & O_CLOEXEC));

    ep->file = file;

    fd_install(fd, file);  //建立fd和file的关联关系

    return fd;

out_free_fd:

    put_unused_fd(fd);

out_free_ep:

    ep_free(ep);

    return error;

}

简单的说一下这个方法。首先这个方法返回一个文件描述符。我们使用这个文件描述符就可以找到对应的结构体。也就是一块内存区域。epoll的所有数据都会保存在这里。 这块内存区域用eventpoll结构体表示。 所以这个方法逻辑如下:

1.创建eventpoll结构体。初始化结构体相应的数据

2.查询一个未使用的fd,然后为epoll创建一个文件。将file的->private_data指向ep。创建文件的过程就不多说

3.将ep->file指向file。其实就是绑定一下。

4.将fd和file关联。这样我们就能通过fd找到对应的file。并找到ep对应的结构体(内存区域) 这里再说一下 file的private_data其实在设备驱动程序中非常重要,它可以指向一块自定义的数据结构。这也就是保证了一个设备驱动程序可以适配多个设备的原因。因为不同的设备可能存在不同的属性。epoll这里用private_data指向自己的数据结构是完全没有问题的。

eventpoll结构体内容如下。后面遇到详细说。

struct eventpoll {

    spinlock_t lock;

    struct mutex mtx;

    wait_queue_head_t wq; //sys_epoll_wait()使用的等待队列

    wait_queue_head_t poll_wait; //file->poll()使用的等待队列

    struct list_head rdllist; //所有准备就绪的文件描述符列表

    struct rb_root rbr; //用于储存已监控fd的红黑树根节点


    struct epitem *ovflist; //用于监听文件的结构。如果rdllist被锁定,临时事件会被连接到这里

    struct wakeup_source *ws; // 当ep_scan_ready_list运行时使用wakeup_source

    struct user_struct *user; //创建eventpoll描述符的用户

    struct file *file;

    int visited;          //用于优化循环检测检查

    struct list_head visited_list_link;

};

epoll_ctl方法

该方法主要就是对监听事件进行增删改。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,

        struct epoll_event __user *, event)

{

    int error;

    int full_check = 0;

    struct fd f, tf;

    struct eventpoll *ep;   

    struct epitem *epi;   

    struct epoll_event epds;

    struct eventpoll *tep = NULL;

    error = -EFAULT;

    //如果不是删除操作,将用户空间的epoll_event 拷贝到内核

    if (ep_op_has_event(op) &&

        copy_from_user(&epds, event, sizeof(struct epoll_event)))

    f = fdget(epfd); //epfd对应的文件

    tf = fdget(fd); //fd对应的文件.

    ...

    ep = f.file->private_data; // 取出epoll_create过程创建的ep

    ...

    epi = ep_find(ep, tf.file, fd); //ep红黑树中查看该fd

    switch (op) {

    case EPOLL_CTL_ADD:

        if (!epi) {

            epds.events |= POLLERR | POLLHUP;

            error = ep_insert(ep, &epds, tf.file, fd, full_check);

        }

        if (full_check)

            clear_tfile_check_list();

        break;

    case EPOLL_CTL_DEL:

        if (epi)

            error = ep_remove(ep, epi);

        break;

    case EPOLL_CTL_MOD:

        if (epi) {

            epds.events |= POLLERR | POLLHUP;

            error = ep_modify(ep, epi, &epds);

        }

        break;

    }

    mutex_unlock(&ep->mtx);

    fdput(tf);

    fdput(f);

    ...

    return error;

}

分享更多关于C/C++ Linux后端开发网络底层原理知识学习提升 点击 学习资料 获取,完善技术栈,内容知识点包括Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK等等。

上面省略了一些判断的代码。主要核心就是根据不同的事件类型执行不同的函数。

epoll调用ep_find从红黑树拿到对应的epi。如果已经存在,则不需要进行add。如果不存在,也不可进行remove和modify操作。整个过程会加锁。因为是红黑树,所以查找以及插入性能都是logn级别。所以对于高并发场景,也是能实现快速注册监听的。下面我们分别看一下这三个操作的逻辑。

ep_insert操作

顾名思义,就是加入监听事件。

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,

            struct file *tfile, int fd, int full_check)

{

    int error, revents, pwake = 0;

    unsigned long flags;

    long user_watches;

    struct epitem *epi;

    struct ep_pqueue epq; //[小节2.4.5]

    user_watches = atomic_long_read(&ep->user->epoll_watches);

    if (unlikely(user_watches >= max_user_watches))

        return -ENOSPC;

    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))

        return -ENOMEM;

    //构造并填充epi结构体

    INIT_LIST_HEAD(&epi->rdllink);

    INIT_LIST_HEAD(&epi->fllink);

    INIT_LIST_HEAD(&epi->pwqlist);

    epi->ep = ep;

    ep_set_ffd(&epi->ffd, tfile, fd); // 将tfile和fd都赋值给ffd

    epi->event = *event;

    epi->nwait = 0;

    epi->next = EP_UNACTIVE_PTR;

    if (epi->event.events & EPOLLWAKEUP) {

        error = ep_create_wakeup_source(epi);

    } else {

        RCU_INIT_POINTER(epi->ws, NULL);

    }

    epq.epi = epi;

    //设置轮询回调函数

    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

    //执行poll方法

    revents = ep_item_poll(epi, &epq.pt);

    spin_lock(&tfile->f_lock);

    list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);

    spin_unlock(&tfile->f_lock);

    ep_rbtree_insert(ep, epi); //将将当前epi添加到RB树

    spin_lock_irqsave(&ep->lock, flags);

    //事件就绪 并且 epi的就绪队列有数据

    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {

        list_add_tail(&epi->rdllink, &ep->rdllist);

        ep_pm_stay_awake(epi);

        //唤醒正在等待文件就绪,即调用epoll_wait的进程

        if (waitqueue_active(&ep->wq))

            wake_up_locked(&ep->wq);

        if (waitqueue_active(&ep->poll_wait))

            pwake++;

    }

    spin_unlock_irqrestore(&ep->lock, flags);

    atomic_long_inc(&ep->user->epoll_watches);

    if (pwake)

        ep_poll_safewake(&ep->poll_wait); //唤醒等待eventpoll文件就绪的进程

    return 0;

...

}

1.首先会初始化一个epi,对于目标文件的监听都是需要通过epi去维护。一个文件的监听对应一个epi。并且保存在ep的红黑树中。

struct epitem {

    union {

        struct rb_node rbn; //RB树节点将此结构链接到eventpoll RB树

        struct rcu_head rcu; //用于释放结构体epitem

    };

    struct list_head rdllink; //时间的就绪队列,主要就是链接到eventpoll的rdllist

    struct epitem *next; //配合eventpoll中的ovflist一起使用来保持单向链的条目

    struct epoll_filefd ffd; //该结构 监听的文件描述符信息,每一个socket fd都会对应一个epitem 。就是通过这个结构关联

    int nwait; //附加到poll轮询中的活跃等待队列数

    struct list_head pwqlist; //用于保存被监听文件的等待队列

    struct eventpoll *ep;  //epi所属的ep

    struct list_head fllink; //主要是为了实现一个文件被多个epoll监听。将该结构链接到文件的f_ep_link。

    struct wakeup_source __rcu *ws; //设置EPOLLWAKEUP时使用的wakeup_source

    struct epoll_event event; //监控的事件和文件描述符

};

2.初始化结束后,会将文件的fd以及对应文件指针绑定到epi的ffd中。主要作用就是将fd和改epi绑定起来。

struct epoll_filefd {

    struct file *file;

    int fd;

} __packed;

3.为epq的pt(其实就是一个poll_table)注册对应的函数ep_ptable_queue_proc。

struct ep_pqueue {

    poll_table pt;

    struct epitem *epi;

};

typedef struct poll_table_struct {

    poll_queue_proc _qproc;

    unsigned long _key;

} poll_table;

这里epq是一个结构体,里面绑定了epi以及poll_table。poll_table主要注册了ep_ptable_queue_proc函数。_key用于记录事件。 所以epq就保存了epi,以及对应的ep_ptable_queue_proc。后续执行回调函数的时候,我们可以通过poll_table的地址拿到对应的epq,最终拿到对应的epi,这也就是定义这个结构的目的。

4.调用ep_item_poll方法。这个方法我简单说一下。他会调用文件系统的poll方法。

static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)

{

    pt->_key = epi->event.events;、

    return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;

}

不同的驱动程序,都会有自己的poll方法,如果是TCP套接字,这个poll方法就是tcp_poll。在TCP中,会周期性的调用这个方法,调用频率取决于协议栈中断频率的设置。一旦有事件到达后,对应的tcp_poll方法被调用,tcp_poll方法会回调用sock_poll_wait(),该方法会调用这里注册的ep_ptable_queue_proc方法。epoll其实就是通过此机制实现将自己的回调函数加入到文件的waitqueue中的。这也是ep_ptable_queue_proc的目的。

5.调用ep_item_poll后会返回revents。也就是该fd触发的事件。如果有我们感兴趣的事件,会将其插入到ep的rdllist中。如果有进程正在等待文件的就绪状态,也就是调用epoll_wait睡眠的进程。那么会唤醒等待进程。

if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {

        list_add_tail(&epi->rdllink, &ep->rdllist);

        ep_pm_stay_awake(epi);

        //唤醒正在等待文件就绪,即调用epoll_wait的进程

        if (waitqueue_active(&ep->wq))

            wake_up_locked(&ep->wq);

        if (waitqueue_active(&ep->poll_wait))

            pwake++;

    }

ep_ptable_queue_proc方法

整个过程其实就是通过文件的poll方法绑定ep_ptable_queue_proc函数 。当该文件描述符对应的文件有事件到达后,回调用这个函数

Ps:其中file就是对应文件的结构体。当然 多个fd可以指向通过file结构。多个file可以同时指向同一个innode节点。在linux中,一个文件的内容是通过innode去定义描述的。file只是我们对文件操作的时候创建出来的。这个概念大家需要清楚。

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,

                poll_table *pt)

{

    struct epitem *epi = ep_item_from_epqueue(pt);

    struct eppoll_entry *pwq; 

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {

        //初始化回调方法

        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

        pwq->whead = whead;

        pwq->base = epi;

        //将ep_poll_callback放入等待队列whead

        add_wait_queue(whead, &pwq->wait);

        //将llink 放入epi->pwqlist的尾部

        list_add_tail(&pwq->llink, &epi->pwqlist);

        epi->nwait++;

    } else {

        epi->nwait = -1; //标记错误发生

    }

}

static inline void

init_waitqueue_func_entry(wait_queue_t *q, wait_queue_func_t func)

{

    q->flags    = 0;

    q->private    = NULL;

    q->func        = func;

}

ep_ptable_queue_proc有三个参数。 file就是监听文件的指针,whead为该fd对应的设备等待队列。pt就是我们当时调用文件的poll传入的东西。

在ep_ptable_queue_proc,引入了eppoll_entry。也就是pwq。pwq主要完成epi和epi事件发生时callback函数的关联。

从上面代码可以看出。首先根据pt拿到对应的epi。然后通过pwq将三者关联。

最后通过add_wait_queue方法,将eppoll_entry挂在到fd的设备等待队列上。也就是注册epoll的回调函数。

所以这个方法的主要目标就是将eppoll_entry挂在到fd的设备等待队列上。当设备有硬件数据到达时,硬件中断处理函数会唤醒该队列上等待的进程时,会调用唤醒函数ep_poll_callback。

分享更多关于C/C++ Linux后端开发网络底层原理知识学习提升 点击 学习资料 获取,完善技术栈,内容知识点包括Linux,Nginx,ZeroMQ,MySQL,Redis,线程池,MongoDB,ZK,Linux内核,CDN,P2P,epoll,Docker,TCP/IP,协程,DPDK等等。

ep_poll_callback方法

这个函数主要功能就是,当被监听文件的事件就绪,将文件对应的epi加入到就绪队列。当应用层调用epoll_wait()的时候,内核会将就绪队列的事件拷贝到用户空间。报告给应用。

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)

{

    int pwake = 0;

    unsigned long flags;

    struct epitem *epi = ep_item_from_wait(wait);

    struct eventpoll *ep = epi->ep;

    spin_lock_irqsave(&ep->lock, flags);

    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {

        if (epi->next == EP_UNACTIVE_PTR) {

            epi->next = ep->ovflist;

            ep->ovflist = epi;

            if (epi->ws) {

                __pm_stay_awake(ep->ws);

            }

        }

        goto out_unlock;

    }

    //如果此文件已在就绪列表中,很快就会退出

    if (!ep_is_linked(&epi->rdllink)) {

        //将epi就绪事件 插入到ep就绪队列

        list_add_tail(&epi->rdllink, &ep->rdllist);

        ep_pm_stay_awake_rcu(epi);

    }

    // 如果活跃,唤醒eventpoll等待队列和 ->poll()等待队列

    if (waitqueue_active(&ep->wq))

        wake_up_locked(&ep->wq);  //当队列不为空,则唤醒进程

    if (waitqueue_active(&ep->poll_wait))

        pwake++;

out_unlock:

    spin_unlock_irqrestore(&ep->lock, flags);

    if (pwake)

        ep_poll_safewake(&ep->poll_wait);

    if ((unsigned long)key & POLLFREE) {

        list_del_init(&wait->task_list); //删除相应的wait

        smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);

    }

    return 1;

}

//判断等待队列是否为空

static inline int waitqueue_active(wait_queue_head_t *q)

{

    return !list_empty(&q->task_list);

}

3.epoll实现总结

透过现象看本质,其实epoll的灵魂就是ep_item_poll和ep_poll_callback。

epoll依赖虚拟文件系统的ep_item_poll。将ep_poll_callback注册到对应文件的waitqueue中。当对应文件有数据到来。注册的函数就会被调用。epoll的回调会将对应文件的epi加入到就绪队列。

当用户调用epoll_wait(),epoll会加锁,将队列数据传递到用户空间,这个时间到的事件会被挂到ovflist中。

4.Redis使用Epoll

具体实现在ae_epoll.c中

typedef struct aeApiState {

    // epoll_event 实例描述符

    int epfd;

    // 事件槽

    struct epoll_event *events;

} aeApiState;

aeApiCreate方法

redis在初始化server的时候会调用aeCreateEventLoop方法。aeCreateEventLoop回调用aeApiCreate去创建epoll实例。

static int aeApiCreate(aeEventLoop *eventLoop) {

    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;

    state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize);

    if (!state->events) {

        zfree(state);

        return -1;

    }

    state->kqfd = kqueue();

    if (state->kqfd == -1) {

        zfree(state->events);

        zfree(state);

        return -1;

    }

    eventLoop->apidata = state;

    return 0;   

}

aeApiAddEvent方法

这个方法是关联事件到epoll,所以会调用epoll的ctl方法

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {

    aeApiState *state = eventLoop->apidata;

    struct epoll_event ee;

    /* If the fd was already monitored for some event, we need a MOD

    * operation. Otherwise we need an ADD operation.

    *

    * 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。

    * 如果已经关联了某个/某些事件,那么这是一个 MOD 操作。

    */

    int op = eventLoop->events[fd].mask == AE_NONE ?

            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    // 注册事件到 epoll

    ee.events = 0;

    mask |= eventLoop->events[fd].mask; /* Merge old events */

    if (mask & AE_READABLE) ee.events |= EPOLLIN;

    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;

    ee.data.u64 = 0; /* avoid valgrind warning */

    ee.data.fd = fd;

    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

    return 0;

}

这个方法在redis服务创建一个新的客户端的时候会调用。会注册这个客户端的读事件。

当redis需要给客户端写数据的时候会调用prepareClientToWrite方法。这个方法主要是注册对应fd的写事件。

如果注册失败,redis就不会将数据写入缓冲。

如果对应套件字可写,那么redis的事件循环就会将缓冲区新数据写入socket。

aeMain方法

Redis事件处理器的主循环。

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件处理前执行的函数,那么运行它

        if (eventLoop->beforesleep != NULL)

            eventLoop->beforesleep(eventLoop);

        // 开始处理事件

        aeProcessEvents(eventLoop, AE_ALL_EVENTS);

    }

}

这个方法最终会调用epoll_wait()获取对应事件并执行。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容