[epoll 源码走读] epoll 实现原理

文章主要对 tcp 通信进行 epoll 源码走读。

Linux 源码:Linux 5.7 版本。epoll 核心源码:eventpoll.h / eventpoll.c


1. 应用场景

epoll 应用,适合海量用户,一个时间段内部分活跃的用户群体。

例如 app,正常用户并不是 24 小时都拿起手机玩个不停,可能玩一下,又去干别的事,回头又玩一下,断断续续地操作。即便正在使用 app 也不是连续产生读写通信事件,可能手指点击几下页面,页面产生需要的内容,用户就去浏览内容,不再操作了。换句话说,在海量用户里,同一个时间段内,很可能只有一小部分用户正在活跃,而在这一小部分活跃用户里,又只有一小撮人同时点击页面上的操作。那 epoll 管理海量用户,只需要将这一小撮人产生的事件,及时通知 appserver 处理逻辑即可。

问题:同样场景,如果用户是机器人,24 小时持续工作,这种场景下使用 epoll 还合适吗?


2. 预备知识


3. 使用

  • 接口。
接口 描述
epoll_create 创建 epoll。
epoll_ctl fd 事件注册函数,用户通过这个函数关注 fd 读写事件。
epoll_wait 阻塞等待 fd 事件发生。
  • 使用流程。
epoll 使用流程

🔥文章来源:wenfh2020.com


4. 事件

常用事件注释可以请参考 epoll_ctl 文档

// eventpoll.h
#define EPOLLIN     (__force __poll_t)0x00000001
#define EPOLLOUT    (__force __poll_t)0x00000004
#define EPOLLERR    (__force __poll_t)0x00000008
#define EPOLLHUP    (__force __poll_t)0x00000010
#define EPOLLRDHUP  (__force __poll_t)0x00002000
#define EPOLLEXCLUSIVE  ((__force __poll_t)(1U << 28))
#define EPOLLET     ((__force __poll_t)(1U << 31))
事件 描述
EPOLLIN 有可读数据到来。
EPOLLOUT 有数据要写。
EPOLLERR 该文件描述符发生错误。
EPOLLHUP 该文件描述符被挂断。常见 socket 被关闭(read == 0)。
EPOLLRDHUP 对端已关闭链接,或者用 shutdown 关闭了写链接。
EPOLLEXCLUSIVE 唯一唤醒事件,主要为了解决 epoll_wait 惊群问题。多线程下多个 epoll_wait 同时等待,只唤醒一个 epoll_wait 执行。 该事件只支持 epoll_ctl 添加操作 EPOLL_CTL_ADD。
EPOLLET 边缘触发模式。

通过 tcp_poll 函数,可以看到 socket 事件对应的相关事件逻辑。

// tcp.c
/*
 *    Wait for a TCP event.
 *
 *    Note that we don't need to lock the socket, as the upper poll layers
 *    take care of normal races (between the test and the event) and we don't
 *    go look at any of the socket buffers directly.
 */
__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait) {
    __poll_t mask;
    struct sock *sk = sock->sk;
    const struct tcp_sock *tp = tcp_sk(sk);
    int state;

    // fd 添加等待事件,关联事件回调。
    sock_poll_wait(file, sock, wait);

    // socket 对应事件逻辑。
    state = inet_sk_state_load(sk);
    if (state == TCP_LISTEN)
        return inet_csk_listen_poll(sk);

    /* Socket is not locked. We are protected from async events
     * by poll logic and correct handling of state changes
     * made by other threads is impossible in any case.
     */

    mask = 0;

    /*
     * EPOLLHUP is certainly not done right. But poll() doesn't
     * have a notion of HUP in just one direction, and for a
     * socket the read side is more interesting.
     *
     * Some poll() documentation says that EPOLLHUP is incompatible
     * with the EPOLLOUT/POLLWR flags, so somebody should check this
     * all. But careful, it tends to be safer to return too many
     * bits than too few, and you can easily break real applications
     * if you don't tell them that something has hung up!
     *
     * Check-me.
     *
     * Check number 1. EPOLLHUP is _UNMASKABLE_ event (see UNIX98 and
     * our fs/select.c). It means that after we received EOF,
     * poll always returns immediately, making impossible poll() on write()
     * in state CLOSE_WAIT. One solution is evident --- to set EPOLLHUP
     * if and only if shutdown has been made in both directions.
     * Actually, it is interesting to look how Solaris and DUX
     * solve this dilemma. I would prefer, if EPOLLHUP were maskable,
     * then we could set it on SND_SHUTDOWN. BTW examples given
     * in Stevens' books assume exactly this behaviour, it explains
     * why EPOLLHUP is incompatible with EPOLLOUT.    --ANK
     *
     * NOTE. Check for TCP_CLOSE is added. The goal is to prevent
     * blocking on fresh not-connected or disconnected socket. --ANK
     */
    if (sk->sk_shutdown == SHUTDOWN_MASK || state == TCP_CLOSE)
        mask |= EPOLLHUP;
    if (sk->sk_shutdown & RCV_SHUTDOWN)
        mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP;

    /* Connected or passive Fast Open socket? */
    if (state != TCP_SYN_SENT &&
        (state != TCP_SYN_RECV || rcu_access_pointer(tp->fastopen_rsk))) {
        int target = sock_rcvlowat(sk, 0, INT_MAX);

        if (READ_ONCE(tp->urg_seq) == READ_ONCE(tp->copied_seq) &&
            !sock_flag(sk, SOCK_URGINLINE) &&
            tp->urg_data)
            target++;

        if (tcp_stream_is_readable(tp, target, sk))
            mask |= EPOLLIN | EPOLLRDNORM;

        if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
            if (sk_stream_is_writeable(sk)) {
                mask |= EPOLLOUT | EPOLLWRNORM;
            } else {  /* send SIGIO later */
                sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
                set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);

                /* Race breaker. If space is freed after
                 * wspace test but before the flags are set,
                 * IO signal will be lost. Memory barrier
                 * pairs with the input side.
                 */
                smp_mb__after_atomic();
                if (sk_stream_is_writeable(sk))
                    mask |= EPOLLOUT | EPOLLWRNORM;
            }
        } else
            mask |= EPOLLOUT | EPOLLWRNORM;

        if (tp->urg_data & TCP_URG_VALID)
            mask |= EPOLLPRI;
    } else if (state == TCP_SYN_SENT && inet_sk(sk)->defer_connect) {
        /* Active TCP fastopen socket with defer_connect
         * Return EPOLLOUT so application can call write()
         * in order for kernel to generate SYN+data
         */
        mask |= EPOLLOUT | EPOLLWRNORM;
    }
    /* This barrier is coupled with smp_wmb() in tcp_reset() */
    smp_rmb();
    if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue))
        mask |= EPOLLERR;

    return mask;
}
EXPORT_SYMBOL(tcp_poll);

5. 源码工作流程

epoll 源码工作流程

6. 数据结构

6.1. eventpoll

/*
 * This structure is stored inside the "private_data" member of the file
 * structure and represents the main data structure for the eventpoll
 * interface.
 */
struct eventpoll {
    /*
     * This mutex is used to ensure that files are not removed
     * while epoll is using them. This is held during the event
     * collection loop, the file cleanup path, the epoll file exit
     * code and the ctl operations.
     */
    struct mutex mtx;

    /* Wait queue used by sys_epoll_wait() */
    wait_queue_head_t wq;

    /* Wait queue used by file->poll() */
    wait_queue_head_t poll_wait;

    /* List of ready file descriptors */
    struct list_head rdllist;

    /* Lock which protects rdllist and ovflist */
    rwlock_t lock;

    /* RB tree root used to store monitored fd structs */
    struct rb_root_cached rbr;

    /*
     * This is a single linked list that chains all the "struct epitem" that
     * happened while transferring ready events to userspace w/out
     * holding ->lock.
     */
    struct epitem *ovflist;

    /* wakeup_source used when ep_scan_ready_list is running */
    struct wakeup_source *ws;

    /* The user that created the eventpoll descriptor */
    struct user_struct *user;

    struct file *file;

    /* used to optimize loop detection check */
    int visited;
    struct list_head visited_list_link;

#ifdef CONFIG_NET_RX_BUSY_POLL
    /* used to track busy poll napi_id */
    unsigned int napi_id;
#endif
};
成员 描述
mtx 互斥变量,避免在遍历 epi 节点时(例如 ep_send_events),epi 被删除。
wq 等待队列,当 epoll_wait 没发现就绪事件需要处理,添加等待事件,需要睡眠阻塞等待唤醒进程。
poll_wait 等待队列,当epoll_ctl 监听的是另外一个 epoll fd 时使用。
rdllist 就绪列表,产生了用户注册的 fd读写事件的 epi 链表。
ovflist 单链表,当 rdllist 被锁定遍历,向用户空间发送数据时,rdllist 不允许被修改,新触发的就绪 epitem 被 ovflist 串联起来,等待 rdllist 被处理完了,重新将 ovflist 数据写入 rdllist。 详看 ep_scan_ready_list 逻辑。
user 创建 eventpoll 的用户结构信息。
lock 锁,保护 rdllist 和 ovflist 。
rbr 红黑树根结点,管理 fd 结点。
file eventpoll 对应的文件结构,Linux 一切皆文件,用 vfs 管理数据。
napi_id 应用于中断缓解技术。

6.2. epitem

fd 事件管理节点。可以添加到红黑树,也可以串联成就绪列表或其它列表。

/*
 * Each file descriptor added to the eventpoll interface will
 * have an entry of this type linked to the "rbr" RB tree.
 * Avoid increasing the size of this struct, there can be many thousands
 * of these on a server and we do not want this to take another cache line.
 */
struct epitem {
    union {
        /* RB tree node links this structure to the eventpoll RB tree */
        struct rb_node rbn;
        /* Used to free the struct epitem */
        struct rcu_head rcu;
    };

    /* List header used to link this structure to the eventpoll ready list */
    struct list_head rdllink;

    /*
     * Works together "struct eventpoll"->ovflist in keeping the
     * single linked chain of items.
     */
    struct epitem *next;

    /* The file descriptor information this item refers to */
    struct epoll_filefd ffd;

    /* Number of active wait queue attached to poll operations */
    int nwait;

    /* List containing poll wait queues */
    struct list_head pwqlist;

    /* The "container" of this item */
    struct eventpoll *ep;

    /* List header used to link this item to the "struct file" items list */
    struct list_head fllink;

    /* wakeup_source used when EPOLLWAKEUP is set */
    struct wakeup_source __rcu *ws;

    /* The structure that describe the interested events and the source fd */
    struct epoll_event event;
};
成员 描述
rbn 连接红黑树结构节点。
rdllink 就绪队列节点,用于将 epitem 串联成就绪队列列表。
next 指向下一个单链表节点的指针。配合 eventpoll 的 ovflist 使用。
ffd 记录节点对应的 fd 和 file 文件信息。
nwait 等待队列个数。
pwqlist 等待事件回调队列。当数据进入网卡,底层中断执行 ep_poll_callback。
ep eventpoll 指针,epitem 关联 eventpoll。
fllink epoll 文件链表结点,与 epoll 文件链表进行关联 file.f_ep_links。参考 fs.h, struct file 结构。
ws EPOLLWAKEUP 模式下使用。
event 用户关注的事件。

6.3. epoll_filefd

fd 对应 file 文件结构,Linux 一切皆文件,采用了 vfs (虚拟文件系统)管理文件或设备。

struct epoll_filefd {
    struct file *file;
    int fd;
} __packed;

6.4. epoll_event

用户关注的 epoll 事件结构。

struct epoll_event {
    __poll_t events;
    __u64 data;
} EPOLL_PACKED;
成员 描述
events 事件集合
data fd

6.5. poll_table_struct

就绪事件处理结构。

/* poll.h
 * Do not touch the structure directly, use the access functions
 * poll_does_not_wait() and poll_requested_events() instead.
 */
typedef struct poll_table_struct {
    poll_queue_proc _qproc;
    __poll_t _key;
} poll_table;

/*
 * structures and helpers for f_op->poll implementations
 */
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
成员 描述
_qproc 处理函数,可以指向 ep_ptable_queue_proc 函数,或者空。
_key 事件组合。

6.6. ep_pqueue

包装就绪事件处理结构,关联 epitem。

/* Wrapper struct used by poll queueing */
struct ep_pqueue {
    poll_table pt;
    struct epitem *epi;
};
成员 描述
pt 就绪事件处理结构。
epi epitem 对应节点。

7. 关键函数

函数 描述
eventpoll_init 初始化 epoll 模块。eventpoll 作为 Linux 内核的一部分,模块化管理。
do_epoll_create 为 eventpoll 结构分配资源。
do_epoll_ctl epoll 管理 fd 事件接口。
do_epoll_wait 有条件阻塞等待 fd 事件发生,返回对fd 和对应事件数据。
ep_item_poll 获取 fd 就绪事件,并关联 fd 和事件触发回调函数 ep_poll_callback。
ep_poll_callback fd 事件回调函数。当底层收到数据,中断调用 fd 关联的 ep_poll_callback 回调函数,如果事件是用户关注的事件,会将 fd 对应的 epi 结点添加进就绪队列,然后唤醒阻塞等待的 epoll_wait 处理。
ep_send_events 遍历就绪列表,拷贝内核空间就绪数据到用户空间。结合 ep_scan_ready_list 和 ep_send_events_proc 使用。
ep_scan_ready_list 遍历就绪列表。当 fd 收到数据,回调 ep_poll_callback,如果事件是用户关注的,那么将 fd 对应的 epi 结点添加到就绪队列,ep_scan_ready_list 会遍历这个就绪列表,将数据从内核空间拷贝到用户空间,或者其它操作。
ep_send_events_proc 内核将就绪列表数据,发送到用户空间。结合 ep_scan_ready_list 使用。LT/ET 模式在这个函数里实现。
ep_ptable_queue_proc 添加 fd 的等待事件到等待队列,关联 fd 与回调函数 ep_poll_callback。

8. 核心源码

8.1. 初始化

添加 epoll 模块到内核,slab 算法为 epoll 分配资源。

static int __init eventpoll_init(void) {
    struct sysinfo si;
    ...
    /* Allocates slab cache used to allocate "struct epitem" items */
    epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
            0, SLAB_HWCACHE_ALIGN|SLAB_PANIC|SLAB_ACCOUNT, NULL);

    /* Allocates slab cache used to allocate "struct eppoll_entry" */
    pwq_cache = kmem_cache_create("eventpoll_pwq",
        sizeof(struct eppoll_entry), 0, SLAB_PANIC|SLAB_ACCOUNT, NULL);

    return 0;
}
fs_initcall(eventpoll_init);

8.2. epoll_create

创建 eventpoll 对象,关联文件资源。

static int do_epoll_create(int flags) {
    int error, fd;
    struct eventpoll *ep = NULL;
    struct file *file;
    ...
    // slab 算法为 eventpoll 结构分配内存,并初始化 eventpoll 成员数据。
    error = ep_alloc(&ep);
    if (error < 0)
        return error;

    // 分配一个空闲的文件描述符。
    fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
    if (fd < 0) {
        error = fd;
        goto out_free_ep;
    }

    // slab 分配一个新的文件结构对象(struct file *)
    file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
                 O_RDWR | (flags & O_CLOEXEC));
    if (IS_ERR(file)) {
        error = PTR_ERR(file);
        goto out_free_fd;
    }
    ep->file = file;

    // fd 与 file* 结构进行绑定。
    fd_install(fd, file);
    return fd;
    ...
}

8.3. epoll_ctl

fd 对应的事件管理(增删改)。

  • 添加 fd 事件管理流程:fd 关联回调 ep_poll_callback。
fd -> socket -> poll -> ep_ptable_queue_proc -> wait_queue -> ep_poll_callback
  • 触发了 fd 关注的事件回调处理。
driver -> ep_poll_callback -> waitup -> epoll_wait(wake up)

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
        struct epoll_event __user *, event) {
    struct epoll_event epds;

    // 为了 event 数据的安全性,将数据进行拷贝,再进行逻辑处理。
    if (ep_op_has_event(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        return -EFAULT;

    return do_epoll_ctl(epfd, op, fd, &epds, false);
}

int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds, bool nonblock) {
    int error;
    int full_check = 0;
    struct fd f, tf;
    struct eventpoll *ep;
    struct epitem *epi;
    struct eventpoll *tep = NULL;
    ...
    // 检查参数合法性。
    ...
    // 在 do_epoll_create 实现里 anon_inode_getfile 将 private_data 与 eventpoll 关联。
    ep = f.file->private_data;
    ...
    // 红黑树检查 fd 是否已经被添加。
    epi = ep_find(ep, tf.file, fd);

    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            /* epoll 如果没有添加过该 fd,就添加到红黑树进行管理。
             * 事件默认关注异常处理(EPOLLERR | EPOLLHUP)。*/
            epds->events |= EPOLLERR | EPOLLHUP;
            error = ep_insert(ep, epds, tf.file, fd, full_check);
        } else
            error = -EEXIST;
        if (full_check)
            clear_tfile_check_list();
        break;
    case EPOLL_CTL_DEL:
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:
        if (epi) {
            if (!(epi->event.events & EPOLLEXCLUSIVE)) {
                epds->events |= EPOLLERR | EPOLLHUP;
                error = ep_modify(ep, epi, epds);
            }
        } else
            error = -ENOENT;
        break;
    }
    ...
    return error;
}

static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
             struct file *tfile, int fd, int full_check) {
    // epoll 管理 fd 和对应事件节点 epitem 数据结构。
    struct epitem *epi;
    struct ep_pqueue epq;
    ...
    epq.epi = epi;

    // 初始化就绪事件处理函数调用。poll() 接口调用 ep_ptable_queue_proc。
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

    // 添加等待队列,如果 fd 有用户关注的事件发生,返回对应 fd 关注的事件 revents。
    revents = ep_item_poll(epi, &epq.pt, 1);
    ...
    // 将当前节点,添加到 epoll 文件钩子,将 epoll 文件与 fd 对应文件串联起来。
    list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);

    // 将节点添加进二叉树
    ep_rbtree_insert(ep, epi);

    // 如果有关注的事件发生,将节点关联到就绪事件列表。
    if (revents && !ep_is_linked(epi)) {
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake(epi);

        /* 如果进程正在睡眠等待,唤醒它去处理就绪事件。睡眠事件 ep->wq 在 epoll_wait 中添加*/
        if (waitqueue_active(&ep->wq))
            // 唤醒进程
            wake_up(&ep->wq);

        // 如果监控的是另外一个 epoll_create 的 fd,有就绪事件,也唤醒进程。
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    ...
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    return 0;
}

8.4. ep_item_poll

fd 节点就绪事件处理。

static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt, int depth) {
    struct eventpoll *ep;
    bool locked;

    pt->_key = epi->event.events;
    if (!is_file_epoll(epi->ffd.file)) {
        // 非 epoll fd,tcp_poll 检查 socket 就绪事件,fd 关联回调函数 ep_poll_callback。
        return vfs_poll(epi->ffd.file, pt) & epi->event.events;
    } else {
        // epoll 嵌套。epoll_ctl 添加关注了另外一个 epoll 的 fd(epfd)。
        ep = epi->ffd.file->private_data;
        poll_wait(epi->ffd.file, &ep->poll_wait, pt);
        locked = pt && (pt->_qproc == ep_ptable_queue_proc);

        return ep_scan_ready_list(epi->ffd.file->private_data,
                    ep_read_events_proc, &depth, depth, locked) &
            epi->event.events;
    }
}

// vfs - Virtual Filesystem Switch(Linux 虚拟文件系统)
// poll.h 就绪事件处理函数。
static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) {
    if (unlikely(!file->f_op->poll))
        return DEFAULT_POLLMASK;
    // 这里的 poll 函数指针指向 tcp_poll 函数。
    return file->f_op->poll(file, pt);
}

// tcp.c
// tcp 就绪事件获取函数。
__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait) {
    __poll_t mask;
    struct sock *sk = sock->sk;
    const struct tcp_sock *tp = tcp_sk(sk);
    int state;

    /* 添加等待队列和关联事件回调函数 ep_poll_callback
     *(只有 epoll_ctl EPOLL_CTL_ADD 的情况下,才会添加等待事件,否则 wait == NULL)*/
    sock_poll_wait(file, sock, wait);

    // 检查 fd 是否有事件发生。
    state = inet_sk_state_load(sk);
    if (state == TCP_LISTEN)
        return inet_csk_listen_poll(sk);
    ...
}

// socket.h
static inline void sock_poll_wait(struct file *filp, struct socket *sock, poll_table *p) {
    // ep_insert 调用 ep_item_poll 才会插入等待事件。
    if (!poll_does_not_wait(p)) {
        poll_wait(filp, &sock->wq.wait, p);
        ...
    }
}

// poll.h
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) {
    if (p && p->_qproc && wait_address)
        // _qproc ---> ep_ptable_queue_proc
        p->_qproc(filp, wait_address, p);
}

8.5. ep_ptable_queue_proc

socket 的等待队列关联回调函数 ep_poll_callback

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) {
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        // 关联等待队列和ep_poll_callback。
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

        // whead ---> socket->wq.wait
        pwq->whead = whead;
        pwq->base = epi;

        /* 等待事件,添加到等待队列。EPOLLEXCLUSIVE 为了解决 epoll_wait 惊群问题。
         * 如果多线程同时调用 epoll_wait,那么 fd 应该设置 EPOLLEXCLUSIVE 事件。 */
        if (epi->event.events & EPOLLEXCLUSIVE) {
            add_wait_queue_exclusive(whead, &pwq->wait);
        } else {
            add_wait_queue(whead, &pwq->wait);
        }

        /* 等待事件,关联 epitem。epitem 为什么要有一个等待队列呢,
         * 因为有可能一个进程里存在多个 epoll 实例同时 epoll_ctl 关注一个 fd。*/
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
    } else {
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}

8.6. epoll_wait

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout) {
    return do_epoll_wait(epfd, events, maxevents, timeout);
}

static int do_epoll_wait(int epfd, struct epoll_event __user *events,
             int maxevents, int timeout) {
    ...
    // timeout 阻塞等待处理并返回就绪事件。
    error = ep_poll(ep, events, maxevents, timeout);
    ...
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout) {
    int res = 0, eavail, timed_out = 0;
    u64 slack = 0;
    bool waiter = false;
    wait_queue_entry_t wait;
    ktime_t expires, *to = NULL;

    // 计算 timeout 睡眠时间。如果有就绪事件,处理并发送到用户空间。
    ...

fetch_events:

    if (!ep_events_available(ep))
        // napi 中断缓解技术,避免网卡频繁中断 cpu,提高数据获取的效率。这里为了积攒网络数据进行返回。
        ep_busy_loop(ep, timed_out);

    // 检查就绪队列是否有数据。
    eavail = ep_events_available(ep);
    if (eavail)
        // 如果有就绪事件了,就直接不用睡眠等待了,进入发送环节。
        goto send_events;

    ...

    // 没有就绪事件发生,需要睡眠等待。
    if (!waiter) {
        waiter = true;
        // 等待事件,关联当前进程。
        init_waitqueue_entry(&wait, current);

        spin_lock_irq(&ep->wq.lock);
        // 添加等待事件。(为了解决惊群效应,所以等待事件添加了 WQ_FLAG_EXCLUSIVE 标识。查看 __wake_up_common 实现。)
        __add_wait_queue_exclusive(&ep->wq, &wait);
        spin_unlock_irq(&ep->wq.lock);
    }

    for (;;) {
        /*
         * We don't want to sleep if the ep_poll_callback() sends us
         * a wakeup in between. That's why we set the task state
         * to TASK_INTERRUPTIBLE before doing the checks.
         */

        // 设置当前进程状态为等待状态,可以被信号解除等待。
        set_current_state(TASK_INTERRUPTIBLE);
        /*
         * Always short-circuit for fatal signals to allow
         * threads to make a timely exit without the chance of
         * finding more events available and fetching
         * repeatedly.
         */

        // 信号中断,不要执行睡眠了。
        if (fatal_signal_pending(current)) {
            res = -EINTR;
            break;
        }

        // 检查就绪队列。
        eavail = ep_events_available(ep);
        if (eavail)
            break;

        // 信号中断,不要执行睡眠了。
        if (signal_pending(current)) {
            res = -EINTR;
            break;
        }

        // 进程进入睡眠状态。
        if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {
            timed_out = 1;
            break;
        }
    }

    // 进程等待超时,或者被唤醒,设置进程进入运行状态,等待内核调度运行。
    __set_current_state(TASK_RUNNING);

send_events:
    /*
     * Try to transfer events to user space. In case we get 0 events and
     * there's still timeout left over, we go trying again in search of
     * more luck.
     */

    // 有就绪事件就发送到用户空间,否则继续获取数据直到超时。
    if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) &&
        !timed_out)
        goto fetch_events;

    // 从等待队列中,删除等待事件。
    if (waiter) {
        spin_lock_irq(&ep->wq.lock);
        __remove_wait_queue(&ep->wq, &wait);
        spin_unlock_irq(&ep->wq.lock);
    }

    return res;
}

/* Used by the ep_send_events() function as callback private data */
struct ep_send_events_data {
    int maxevents;
    struct epoll_event __user *events;
    int res;
};

static int ep_send_events(struct eventpoll *ep,
              struct epoll_event __user *events, int maxevents) {
    struct ep_send_events_data esed;

    esed.maxevents = maxevents;
    esed.events = events;

    // 遍历事件就绪列表,发送就绪事件到用户空间。
    ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
    return esed.res;
}

8.7. ep_scan_ready_list

遍历就绪列表,处理 sproc 函数。这里 sproc 函数指针的使用,是为了减少代码冗余,将 ep_scan_ready_list 做成一个通用的函数。

// 
static __poll_t ep_scan_ready_list(struct eventpoll *ep,
                  __poll_t (*sproc)(struct eventpoll *,
                       struct list_head *, void *),
                  void *priv, int depth, bool ep_locked) {
    __poll_t res;
    struct epitem *epi, *nepi;
    LIST_HEAD(txlist);
    ...
    // 将就绪队列分片链接到 txlist 链表中。
    list_splice_init(&ep->rdllist, &txlist);
    res = (*sproc)(ep, &txlist, priv);
    ...
    // 在处理 sproc 回调处理过程中,可能产生新的就绪事件被写入 ovflist,将 ovflist 回写 rdllist。
    for (nepi = READ_ONCE(ep->ovflist); (epi = nepi) != NULL;
         nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
        if (!ep_is_linked(epi)) {
            list_add(&epi->rdllink, &ep->rdllist);
            ep_pm_stay_awake(epi);
        }
    }
    ...
    // txlist 在 epitem 回调中,可能没有完全处理完,那么重新放回到 rdllist,下次处理。
    list_splice(&txlist, &ep->rdllist);
    ...
}

8.8. ep_send_events_proc

处理就绪列表,将数据从内核空间拷贝到用户空间。

static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head, void *priv) {
    struct ep_send_events_data *esed = priv;
    __poll_t revents;
    struct epitem *epi, *tmp;
    struct epoll_event __user *uevent = esed->events;
    struct wakeup_source *ws;
    poll_table pt;
    init_poll_funcptr(&pt, NULL);
    ...

    // 遍历处理 txlist(原 ep->rdllist 数据)就绪队列结点,获取事件拷贝到用户空间。
    list_for_each_entry_safe (epi, tmp, head, rdllink) {
        if (esed->res >= esed->maxevents)
            break;
        ...
        // 先从就绪队列中删除 epi,如果是 LT 模式,就绪事件还没处理完,再把它添加回去。
        list_del_init(&epi->rdllink);

        // 获取 epi 对应 fd 的就绪事件。
        revents = ep_item_poll(epi, &pt, 1);
        if (!revents)
            continue;

        // 内核空间向用户空间传递数据。__put_user 成功拷贝返回 0。
        if (__put_user(revents, &uevent->events) ||
            __put_user(epi->event.data, &uevent->data)) {
            // 如果拷贝失败,继续保存在就绪列表里。
            list_add(&epi->rdllink, head);
            ep_pm_stay_awake(epi);
            if (!esed->res)
                esed->res = -EFAULT;
            return 0;
        }

        // 成功处理就绪事件的 fd 个数。
        esed->res++;
        uevent++;
        if (epi->event.events & EPOLLONESHOT)
            // #define EP_PRIVATE_BITS (EPOLLWAKEUP | EPOLLONESHOT | EPOLLET | EPOLLEXCLUSIVE)
            epi->event.events &= EP_PRIVATE_BITS;
        else if (!(epi->event.events & EPOLLET)) {
            /*
             * If this file has been added with Level
             * Trigger mode, we need to insert back inside
             * the ready list, so that the next call to
             * epoll_wait() will check again the events
             * availability. At this point, no one can insert
             * into ep->rdllist besides us. The epoll_ctl()
             * callers are locked out by
             * ep_scan_ready_list() holding "mtx" and the
             * poll callback will queue them in ep->ovflist.
             */
            /* lt 模式下,当前事件被处理完后,不会从就绪列表中删除,留待下一次 epoll_wait
             * 调用,再查看是否还有事件没处理,如果没有事件了就从就绪列表中删除。
             * 在遍历事件的过程中,不能写 ep->rdllist,因为已经上锁,只能把新的就绪信息
             * 添加到 ep->ovflist */
            list_add_tail(&epi->rdllink, &ep->rdllist);
            ep_pm_stay_awake(epi);
        }
    }

    return 0;
}

8.9. ep_poll_callback

fd 事件回调。当 fd 有网络事件发生,就会通过等待队列,进行回调。参考 __wake_up_common,如果事件是用户关注的事件,回调会唤醒进程进行处理。

static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key) {
    int pwake = 0;
    struct epitem *epi = ep_item_from_wait(wait);
    struct eventpoll *ep = epi->ep;
    __poll_t pollflags = key_to_poll(key);
    unsigned long flags;
    int ewake = 0;

    // 禁止本地中断并获得指定读锁。
    read_lock_irqsave(&ep->lock, flags);

    ep_set_busy_poll_napi_id(epi);

    // #define EP_PRIVATE_BITS (EPOLLWAKEUP | EPOLLONESHOT | EPOLLET | EPOLLEXCLUSIVE)
    // 如果 fd 没有关注除了 EP_PRIVATE_BITS 之外的事件,那么走解锁流程。
    if (!(epi->event.events & ~EP_PRIVATE_BITS))
        goto out_unlock;

    // 如果回调的事件,不是用户关注的 fd 事件,那么走解锁流程。
    if (pollflags && !(pollflags & epi->event.events))
        goto out_unlock;

    /*
     * If we are transferring events to userspace, we can hold no locks
     * (because we're accessing user memory, and because of linux f_op->poll()
     * semantics). All the events that happen during that period of time are
     * chained in ep->ovflist and requeued later on.
     */
    // 当内核空间向用户空间拷贝数据时,不添加 epi 到 rdllist,将它添加到 ovflist。
    if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
        if (epi->next == EP_UNACTIVE_PTR && chain_epi_lockless(epi))
            ep_pm_stay_awake_rcu(epi);
        goto out_unlock;
    }

    // epi 已经加入就绪链表就不需要添加了。
    if (!ep_is_linked(epi) &&
        list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) {
        ep_pm_stay_awake_rcu(epi);
    }

    // 当回调事件是用户关注的事件,那么需要唤醒进程处理。

    // ep->wq 在 epoll_wait 时添加,当没有就绪事件,epoll_wait 进行睡眠等待唤醒。
    if (waitqueue_active(&ep->wq)) {
        if ((epi->event.events & EPOLLEXCLUSIVE) &&
            !(pollflags & POLLFREE)) {
            // #define EPOLLINOUT_BITS (EPOLLIN | EPOLLOUT)
            switch (pollflags & EPOLLINOUT_BITS) {
            case EPOLLIN:
                if (epi->event.events & EPOLLIN)
                    ewake = 1;
                break;
            case EPOLLOUT:
                if (epi->event.events & EPOLLOUT)
                    ewake = 1;
                break;
            case 0:
                ewake = 1;
                break;
            }
        }
        wake_up(&ep->wq);
    }

    // ep->poll_wait 是 epoll 监控另外一个 epoll fd 的等待队列。如果触发事件,也需要唤醒进程处理。
    if (waitqueue_active(&ep->poll_wait))
        pwake++;

out_unlock:
    read_unlock_irqrestore(&ep->lock, flags);

    /* We have to call this outside the lock */
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);

    if (!(epi->event.events & EPOLLEXCLUSIVE))
        ewake = 1;

    if (pollflags & POLLFREE) {
        /*
         * If we race with ep_remove_wait_queue() it can miss
         * ->whead = NULL and do another remove_wait_queue() after
         * us, so we can't use __remove_wait_queue().
         */
        list_del_init(&wait->entry);
        /*
         * ->whead != NULL protects us from the race with ep_free()
         * or ep_remove(), ep_remove_wait_queue() takes whead->lock
         * held by the caller. Once we nullify it, nothing protects
         * ep/epi or even wait.
         */
        smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
    }

    return ewake;
}

9. 参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343