Linux系统编程11:I/O复用

0. 背景

  • 阻塞IO操作
    通常IO操作(比如readwrite)都是阻塞I/O的,当调用read时,如果没有数据收到,线程或者进程就会被挂起,直到收到数据。

    阻塞IO.png

    当服务器处理1000个连接,但是只有很少连接执行IO操作,那么需要1000个线程或进程来处理1000个连接,而1000个线程大部分是被挂起的。

  • 线程内存和切换开销
    由于CPU的核数或超线程数一般都不大,比如4,8,16,32,64,128,比如4个核要跑1000个线程,那么每个线程的时间槽非常短,而线程切换非常频繁。

    • 线程是有内存开销的,1个线程可能需要512K(或2M)存放栈,那么1000个线程就要512M(或2G)内存。
    • 线程的切换,或者说上下文切换是有CPU开销的,当大量时间花在上下文切换的时候,分配给真正的操作的CPU就要少很多。

1. IO多路复用

I/O多路复用:多路网络连接复用一个IO线程。

使用一个线程来检查I/O流(Socket)的就绪状态。通过记录跟踪每个I/O流(Socket)的状态,来同时管理多个I/O流 。

MUX=multiplexing

多个Socket复用功能是在内核驱动实现的。

IO多路复用

在处理1000个连接时,只需要1个线程监控就绪状态,就绪的连接开一个线程处理就可以了,这样需要的线程数大大减少,减少了内存开销和上下文切换的CPU开销。

I/O ready 事件的通知是以一个监听集合为单位完成的。multiplex 的是监听集合,并非 I/O 本身。

  • 优点
    开销低
  • 缺点
    编程复杂度高

2. 分类

2.1 Select模式

2.1.1 结构体

fd_set:描述符集合(long类型数组)

2.1.2 函数

监听描述符事件,如果描述符集合中没有就绪,等待;反之,函数返回,把描述符集合清空,并设置已经就绪的描述符(设置为1)。

int select(int maxfd,fd_set *rdset,fd_set *wrset,fd_set *exset,struct timeval *timeout)
  • 参数
No. 参数 含义
1 maxfd 需要监视的最大的文件描述符值+1
2 rdset 需要检测的可读文件描述符的集合
3 wrset 需要检测的可写文件描述符的集合
4 exset 需要检测的异常文件描述符的集合
5 timeout 超时时间
  • 返回值
No. 返回值 含义
1 -1 出错
2 =0 超时
3 >0 获取到数据

2.1.3 宏定义

No. 参数 含义
1 FD_ZERO(fd_set *fdset) 清空文件描述符集
2 FD_SET(int fd,fd_set *fdset) 设置监听的描述符(把监听的描述符设置为1)
3 FD_CLR(int fd,fd_set *fdset) 清除监听的描述符(把监听的描述符设置为0)
4 FD_ISSET(int fd,fd_set *fdset) 判断描述符是否设置(判断描述符是否设置为1)
5 FD_SETSIZE 256

2.1.4 就绪条件

2.1.5 编码流程

  1. 定义描述符集
  2. 清空描述符集
  3. 设置指定的描述符并获取最大的描述符值+1
  4. 等待描述符就绪
  5. 判断已就绪的描述符,并做对应处理。

2.1.6 代码结构

// 定义描述符集
fd_set rset;
 
// 清空描述符集
FD_ZERO(&rset);
 
// 设置描述符 
FD_SET(fd1,&rset);
FD_SET(fd2,&rset);
 
// 获取最大描述符+1
int maxfdp1 = max(fd1,fd2) + 1;
 
// 等待描述符就绪
if(select(maxfdp1,&rset,NULL,NULL,NULL) > 0){
 
    // 判断已就绪的描述符
    if(FD_ISSET(fd1,&rset)){
            // do somthing
    }
    if(FD_ISSET(fd2,&rset)){
            // do somthing
    }
}

2.1.7 注意

  1. fd_set可容纳最大描述符数为FD_SETSIZE
  2. 每一次select()前,必须重新设置描述符,如果设置了新的描述符,需要重新计算maxfdp1
  3. 如果删除了描述符,需要把对应描述符的fd_set,执行FD_CLR操作

2.1.8 缺点

  • 需要修改传入的参数数组
  • 不能确切指定有数据的socket
  • 只能监视FD_SETSIZE个链接
  • 线程不安全

2.1.9 示例

  • select()实现服务器tcp_server_select.cpp
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>

#define max(a,b) ((a)>(b)?(a):(b))
void show_info(int connfd){
    struct sockaddr_in local_addr;
    bzero(&local_addr,sizeof(local_addr));
    socklen_t local_addr_len = sizeof(local_addr);
    getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
    printf("server local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
    
    struct sockaddr_in peer_addr;
    bzero(&peer_addr,sizeof(peer_addr));
    socklen_t peer_addr_len = sizeof(peer_addr);
    getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
    printf("server peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
    if(3 != argc){
        printf("usage:%s <ip> <#port>\n",argv[0]);
        return 1;
    }

    int listenfd = socket(AF_INET,SOCK_STREAM,0);
    if(-1 == listenfd){
        perror("listenfd open err");
        return 1;
    }
    printf("socket create OK\n");
    
    int flag = 1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));    

    struct sockaddr_in local_addr;
    bzero(&local_addr,sizeof(local_addr));
    local_addr.sin_family = AF_INET;
    local_addr.sin_addr.s_addr = inet_addr(argv[1]);
    local_addr.sin_port = htons(atoi(argv[2]));

    if(-1 == bind(listenfd,(struct sockaddr*)&local_addr,sizeof(local_addr))){
        perror("bind err");
        return 1;
    }
    printf("bind OK\n");

    if(-1 == listen(listenfd,10)){
        perror("listen err");
        return 1;
    }
    printf("listen OK\n");

    fd_set rfds;
    FD_ZERO(&rfds);
    
    FD_SET(listenfd,&rfds);
    int maxfdp1 = listenfd + 1;
    int connfds[FD_SETSIZE-1];  
    size_t connfds_cnt = 0;
    for(;;){
        int i;
        FD_SET(listenfd,&rfds);
        for(i=0;i<connfds_cnt;i++){
            FD_SET(connfds[i],&rfds);
            printf("FD_SET(%d)\n",connfds[i]);
        }
        printf("before select:%lu\n",rfds);
        if(-1 == select(maxfdp1,&rfds,NULL,NULL,NULL)){
            perror("select error");
            return 1;
        }
        printf("after select:%lu\n",rfds);
        if(FD_ISSET(listenfd,&rfds)){
            printf("listenfd ready\n");
            struct sockaddr_in remote_addr;
            bzero(&remote_addr,sizeof(remote_addr));
            socklen_t remote_addr_len = sizeof(remote_addr);
            int connfd = accept(listenfd,(struct sockaddr*)&remote_addr,&remote_addr_len);
            if(-1 == connfd){
                perror("accept err");
                //return 1;
            }

            if(connfds_cnt+1 == FD_SETSIZE-1){
                fprintf(stderr,"connfd size over %d\n",FD_SETSIZE-1);
                close(connfds[i]);
            }else{
                connfds[connfds_cnt++] = connfd;                
                maxfdp1 = max(connfd,maxfdp1-1)+1;
                show_info(connfd);
            }
        }
        for(i=0;i<connfds_cnt;i++){
            if(-1 == connfds[i]){
                continue;
            }
            if(FD_ISSET(connfds[i],&rfds)){

                char buf[BUFSIZ];
                bzero(buf,BUFSIZ);
                ssize_t len;
                if((len = read(connfds[i],buf,BUFSIZ-1)) == -1){
                    perror("read err");
                    //return 1;
                }
                if(0 == len){
                    printf("close %d\n",connfds[i]);
                    close(connfds[i]);
                    FD_CLR(connfds[i],&rfds);
                    memcpy(connfds+i,connfd+i+1,connfds_cnt-i-1);
                    connfds_cnt--;
                    i--;//数组发生变化,重新判断i的fd
                    continue;
                }
                printf("server recv:%s\n",buf);
                
                int fd = open(buf,O_RDONLY);
                if(-1 == fd){
                    perror("open file err");
                }
                struct stat file_stat;
                fstat(fd,&file_stat);
                if(-1 == sendfile(connfds[i],fd,NULL,file_stat.st_size)){
                    perror("sendfile err");
                }
                printf("server send file %s ok\n",buf);
                close(fd);
            }
        }
    }
    close(listenfd);

}

2.2 Poll模式

2.2.1 背景

优点
  • 不需要不修改传入的参数数组
  • 可以监视任意个链接cat /proc/sys/fs/file-max
缺点
  • 不能确切指定有数据的socket
  • 线程不安全

2.2.2 结构体

  • struct pollfd
成员 含义
fd 描述符
events 监听事件,主要用于设置监听事件
revents 实际触发的事件,用于判断触发的事件

2.2.3 函数

int poll(struct pollfd *fdarray, unsigned long nfds, int timeout)
  • 参数
No. 参数 含义
1 fdarray struct pollfd数组指针
2 nfds 数组元素个数
3 timeout 等待时间。INFTIM:永远等待;0:立即返回;>0:等待秒数;
  • struct pollfd
No. 参数 含义
1 fd 文件描述符
2 events 监视fd事件,监视事件可以是输入事件,可以是输出事件。
3 revents fd实际发生的事件,监视事件可以是输入事件,可以是输出事件,还可以是出错事件。
  • fd输入事件
No. 参数 含义
1 POLLRDNORM 普通数据
2 POLLRDBAND 优先级带数据
3 POLLIN 普通或者优先级带数据
  • fd输出事件
No. 参数 含义
1 POLLWRNORM 普通数据
2 POLLWRBAND 优先级带数据
3 POLLOUT 普通或者优先级带数据
  • fd出错事件
No. 参数 含义
1 POLLERR 发生错误
2 POLLHUP 发生挂起
3 POLLNVAL 描述符非法
  • 返回值
No. 返回值 含义
1 0 超时
2 -1 出错
3 正数 就绪描述符个数

2.2.4 概念

数据类型

No. 类型 实例
1 普通数据 正规的TCP数据,所有的UDP数据
2 优先级带数据 TCP带外数据

TCP带外数据数据中的急救车

2.2.5 编码流程

  1. 定义pollfd结构体数组
  2. 初始化pollfd结构体数组
  3. 设置监听poll事件
  4. 等待poll事件
  5. 判断触发事件的描述符,并做对应处理。

2.2.6 触发事件

2.2.7 代码结构

// 定义pollfd结构体数组
struct pollfd pollfds[OPEN_MAX];
 
// 初始化pollfd结构体数组
int i;
for(i=0;i<OPEN_MAX;i++){
    pollfds[i].fd = -1;
}
int pollfds_cnt = 0;
 
// 设置监听事件
pollfds[0].fd = fd1;
pollfds[0].event = POLLIN;
pollfds_cnt++;
pollfds[1].fd = fd1;
pollfds[1].event = POLLIN;
pollfds_cnt++;
 
// 等待poll事件
if(poll(pollfds,pollfds_cnt,INFTIM)>0){
    int i;
    for(i=0;i<pollfds_cnt;i++){
        // 判断触发事件的描述符
        if(pollfds[i].fd == fd1 && pollfds[i].revent & POLLIN){
            // do something
        }
        if(pollfds[i].fd == fd2 && pollfds[i].revent & POLLIN){
            // do something
        }
    }
}

2.2.8 注意

struct pollfd数组的最大数是OPEN_MAX(或者linux/fs.h中的INR_OPEN_MAX )
还可以通过如下方式查看:

  • cat /proc/sys/fs/file-max
  • ulimit

2.2.9 示例

  • poll()实现服务器tcp_server_poll.cpp
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/poll.h>
#include <linux/fs.h>

#define max(a,b) ((a)>(b)?(a):(b))
void show_info(int connfd){
    struct sockaddr_in local_addr;
    bzero(&local_addr,sizeof(local_addr));
    socklen_t local_addr_len = sizeof(local_addr);
    getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
    printf("server local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
    
    struct sockaddr_in peer_addr;
    bzero(&peer_addr,sizeof(peer_addr));
    socklen_t peer_addr_len = sizeof(peer_addr);
    getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
    printf("server peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
    if(3 != argc){
        printf("usage:%s <ip> <#port>\n",argv[0]);
        return 1;
    }

    int listenfd = socket(AF_INET,SOCK_STREAM,0);
    if(-1 == listenfd){
        perror("listenfd open err");
        return 1;
    }
    printf("socket create OK\n");
    
    int flag = 1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));    

    struct sockaddr_in local_addr;
    bzero(&local_addr,sizeof(local_addr));
    local_addr.sin_family = AF_INET;
    local_addr.sin_addr.s_addr = inet_addr(argv[1]);
    local_addr.sin_port = htons(atoi(argv[2]));

    if(-1 == bind(listenfd,(struct sockaddr*)&local_addr,sizeof(local_addr))){
        perror("bind err");
        return 1;
    }
    printf("bind OK\n");

    if(-1 == listen(listenfd,10)){
        perror("listen err");
        return 1;
    }
    printf("listen OK\n");

    struct pollfd poll_fd[INR_OPEN_MAX];
    poll_fd[0].fd = listenfd;
    poll_fd[0].events = POLLIN;
    size_t poll_fd_cnt = 1;

    for(;;){
        if(-1 != poll(poll_fd,poll_fd_cnt,-1)){
            if(poll_fd[0].revents == POLLIN){
                printf("accept listenfd\n");
                int connfd = accept(listenfd,NULL,NULL);
                if(-1 == connfd){
                    perror("accept err");
                }else{
                    if(poll_fd_cnt+1 == INR_OPEN_MAX){
                        fprintf(stderr,"connfd size over %d",INR_OPEN_MAX);
                        close(connfd);
                    }else{
                        poll_fd[poll_fd_cnt].fd = connfd;
                        poll_fd[poll_fd_cnt].events = POLLIN;
                        poll_fd_cnt++;
                    }
                }
            }
            int i;
            for(i=1;i<poll_fd_cnt;i++){
                if(poll_fd[i].revents & POLLIN){    
                    char buf[BUFSIZ];
                    bzero(buf,BUFSIZ);
                    ssize_t len;
                    printf("read connfd %d\n",poll_fd[i].fd);
                    if((len = read(poll_fd[i].fd,buf,BUFSIZ-1)) == -1){
                        perror("read err");
                        //return 1;
                    }
                    if(0 == len){
                        printf("close %d\n",poll_fd[i].fd);
                        printf("%d vs %d\n",poll_fd[i].revents,poll_fd[i].revents);
                        close(poll_fd[i].fd);
                        printf("%d vs %d\n",poll_fd[i].revents,poll_fd[i].revents);
                        memcpy(poll_fd+i,poll_fd+i+1,poll_fd_cnt-i-1);
                        poll_fd_cnt--;
                        i--;//数组发生变化,重新判断i的fd
                        continue;
                        //break;
                    }
                    printf("server recv:%s\n",buf);
                    
                    int fd = open(buf,O_RDONLY);
                    if(-1 == fd){
                        perror("open file err");
                    }
                    struct stat file_stat;
                    fstat(fd,&file_stat);
                    if(-1 == sendfile(poll_fd[i].fd,fd,NULL,file_stat.st_size)){
                        perror("sendfile err");
                    }
                    printf("server send file %s ok\n",buf);
                    close(fd);
                }
            }
        }
    }

    close(listenfd);
}
  • poll()实现客户端tcp_client_poll.cpp
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <math.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/poll.h>

#define max(a,b) ((a)>(b)?(a):(b))

void show_info(int connfd){
    struct sockaddr_in local_addr;
    bzero(&local_addr,sizeof(local_addr));
    socklen_t local_addr_len = sizeof(local_addr);
    getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
    printf("client local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
    
    struct sockaddr_in peer_addr;
    bzero(&peer_addr,sizeof(peer_addr));
    socklen_t peer_addr_len = sizeof(peer_addr);
    getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
    printf("clinet peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
    if(3 != argc){
        printf("usage:%s <ip> <#port> \n",argv[0]);
        return 1;
    }

    int connfd = socket(AF_INET,SOCK_STREAM,0);
    if(-1 == connfd){
        perror("socket err");
        return 1;
    }
    struct sockaddr_in remote_addr;
    bzero(&remote_addr,sizeof(remote_addr));
    remote_addr.sin_family = AF_INET;
    remote_addr.sin_addr.s_addr = inet_addr(argv[1]);
    remote_addr.sin_port = htons(atoi(argv[2]));    
    if(-1 == connect(connfd,(struct sockaddr*)&remote_addr,sizeof(remote_addr))){
        perror("connect err");
        return 1;
    }

    show_info(connfd);

    printf("connfd:%d\n",connfd);

    struct pollfd poll_fds[2];

    poll_fds[0].fd = connfd;
    poll_fds[0].events = POLLIN;
    poll_fds[1].fd = STDIN_FILENO;
    poll_fds[1].events = POLLIN;

    char buf[BUFSIZ];
    for(;;){
        if(-1 != poll(poll_fds,2,-1)){
            if(poll_fds[0].fd == connfd && poll_fds[0].revents & POLLRDNORM){
                bzero(buf,BUFSIZ);
                printf("recv msg\n");
                ssize_t n;
                if((n = read(connfd,buf,BUFSIZ)) == -1){
                    perror("read err");
                    return 1;
                }else if(0 == n){
                    printf("server close\n");
                    break;
                }
                printf("client recv:%s\n",buf);
            }
            if(poll_fds[1].fd == STDIN_FILENO && poll_fds[1].revents & POLLRDNORM){
                bzero(buf,BUFSIZ);
                printf("send msg\n");
                fgets(buf,BUFSIZ,stdin);
                write(connfd,buf,strlen(buf)-1);
            }
        }
    }

    close(connfd);
}

2.3 Epool模式

2.3.1 背景

优点
  • 能确切指定有数据的socket
  • 线程安全

2.3.2 结构体

struct epoll_event

成员 含义
data.fd 描述符
events 设置/获取epoll事件

结构体epoll_event被用于注册所感兴趣的事件和回传所发生待处理的事件,定义如下:

    typedef union epoll_data {
        void *ptr;
         int fd;
         __uint32_t u32;
         __uint64_t u64;
     } epoll_data_t;//保存触发事件的某个文件描述符相关的数据
     struct epoll_event {
         __uint32_t events;      /* epoll event */
         epoll_data_t data;      /* User data variable */
     };

其中events表示感兴趣的事件和被触发的事件,可能的取值为:

No. 事件 含义
1 EPOLLIN 表示对应的文件描述符可以读
2 EPOLLOUT 表示对应的文件描述符可以写
3 EPOLLPRI 表示对应的文件描述符有紧急的数可读
4 EPOLLERR 表示对应的文件描述符发生错误
5 EPOLLHUP 表示对应的文件描述符被挂断
6 EPOLLET ET的epoll工作模

2.3.2 函数

No. 操作 函数
1 创建 int epoll_create(int size)
2 控制 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
3 轮询I/O事件 int epoll_wait(int epfd,struct epoll_event events,int maxevents,int timeout)
①创建
int epoll_create(int size)
  • 参数
No. 参数 含义
1 size 监听的数目
  • 返回值
    文件描述符

可以在/proc/PID/fd/查看

② 控制
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
  • 参数
No. 参数 含义
1 epfd epoll文件描述符
2 op 操作
3 fd 关联的文件描述符
4 event 指向epoll_event的指针
  • 操作
No. 参数 含义
1 EPOLL_CTL_ADD 注册
2 EPOLL_CTL_MOD 修改
3 EPOLL_CTL_DEL 删除
  • 返回值
No. 返回值 含义
1 0 成功
2 -1 失败
③ 轮询I/O事件
int epoll_wait(int epfd,struct epoll_event events,int maxevents,int timeout)
  • 参数
No. 参数 含义
1 epfd epoll文件描述符
2 epoll_event 用于回传代处理事件的数组
3 maxevents 每次能处理的事件数
4 timeout 等待I/O事件发生的超时值ms,-1:永不超时;0: 立即返回
  • 返回值
No. 返回值 含义
1 正数 发生事件数
2 -1 错误

2.3.3 编码流程

  1. 创建epoll描述符
  2. 注册epoll事件
  3. 等待epoll事件
  4. 判断触发epoll事件的描述符和事件
  5. 关闭epoll描述符

2.3.4 触发条件

  • ET(Edge Triggered)模式--边沿触发
No. 操作 触发条件
1 接收缓冲区状态变化时触发读事件,即空的接收缓冲区刚接收到数据时触发读事件
2 发送缓冲区状态变化时触发写事件,即满的缓冲区刚空出空间时触发读事件
  • LT(Level Triggered)模式--水平触发
No. 操作 触发条件
1 接收缓冲区不为空,有数据可读,读事件一直触发
2 发送缓冲区不满,可以继续写入数据,写事件一直触发

2.3.5 代码结构

LT代码结构
// 创建epoll描述符
int epollfd = epoll_create(OPEN_MAX);
 
// 注册epoll事件
struct epoll_event evt;
evt.data.fd = fd1;
evt.events = EPOLLIN;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd1,&evt);
evt.data.fd = fd2;
evt.events = EPOLLIN;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd2,&evt);
 
// 等待epoll事件
int evts_cnt = 2;
struct epoll_event evts[evts_cnt];
int fd_cnt = epoll_wait(epollfd,&evts,evts_cnt,-1);:
int i;
for(i=0;i<fd_cnt,i++){
        // 判断触发epoll事件的描述符和事件
    if(evts[i].data.fd == fd1 && evts[i].events & EPOLLIN){
        // do something
    }
    if(evts[i].data.fd == fd2 && evts[i].events & EPOLLIN){
        // do something
    }
}
 
// 关闭epoll描述符
close(epollfd);
  • 示例
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/epoll.h>

int main(int argc,char* argv[]){
    
    int flag = EPOLLIN;
    int c;
    
    while((c = getopt(argc,argv,"e")) != -1){
        switch(c){
        case 'e':
            flag |= EPOLLET;
            break;
        }
    }

    if(optind != argc - 1){
        printf("usage:%s -e <#size>\n",argv[0]);
        return 1;
    }

    ssize_t size = atoi(argv[optind]);
    if(size <= 0){
        printf("size must > 0\n");
        return 1;
    }

    int epoll_fd = epoll_create(1);

    struct epoll_event evt;
    evt.data.fd = STDIN_FILENO;
    evt.events = flag;
    if(EPOLLET & flag){
        fcntl(STDIN_FILENO,F_SETFL,O_NONBLOCK);
    }
    epoll_ctl(epoll_fd,EPOLL_CTL_ADD,STDIN_FILENO,&evt);

    char buf[size+1];
    for(;;){
        struct epoll_event out_evt;
        epoll_wait(epoll_fd,&out_evt,1,-1);
        if(out_evt.events & EPOLLIN){
            bzero(buf,size+1);
            read(STDIN_FILENO,buf,size);
            printf("%s\n",buf);
        }
    }

    close(epoll_fd);
}
  • ET

ET模式的文件句柄必须是非阻塞的。

// 创建epoll描述符
int epollfd = epoll_create(OPEN_MAX);
 
// 注册epoll事件
struct epoll_event evt;
evt.data.fd = fd1;// fd1必须是非阻塞的
evt.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd1,&evt);
evt.data.fd = fd2;// fd2必须是非阻塞的
evt.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd2,&evt);
 
// 等待epoll事件
int evts_cnt = 2;
struct epoll_event evts[evts_cnt];
int fd_cnt = epoll_wait(epollfd,&evts,evts_cnt,-1);
int i;
for(i=0;i<fd_cnt,i++){
        // 判断触发epoll事件的描述符和事件
    if(evts[i].data.fd == fd1 && evts[i].events & EPOLLIN){
        // do something
    }
    if(evts[i].data.fd == fd2 && evts[i].events & EPOLLIN){
        // do something
    }
}
 
// 关闭epoll描述符
close(epollfd);
  • 示例
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/epoll.h>

int main(int argc,char* argv[]){
    
    int flag = EPOLLIN;
    int c;
    
    while((c = getopt(argc,argv,"e")) != -1){
        switch(c){
        case 'e':
            flag |= EPOLLET;
            break;
        }
    }

    if(optind != argc - 1){
        printf("usage:%s -e <#size>\n",argv[0]);
        return 1;
    }

    ssize_t size = atoi(argv[optind]);
    if(size <= 0){
        printf("size must > 0\n");
        return 1;
    }

    int epoll_fd = epoll_create(1);

    struct epoll_event evt;
    evt.data.fd = STDIN_FILENO;
    evt.events = flag;
    if(EPOLLET & flag){
        fcntl(STDIN_FILENO,F_SETFL,O_NONBLOCK);
    }
    epoll_ctl(epoll_fd,EPOLL_CTL_ADD,STDIN_FILENO,&evt);

    char buf[size+1];
    for(;;){
        struct epoll_event out_evt;
        epoll_wait(epoll_fd,&out_evt,1,-1);
        if(out_evt.events & EPOLLIN){
            for(;;){
                bzero(buf,size+1);
                ssize_t n = read(STDIN_FILENO,buf,size);
                if(n > 0){
                    printf("%s\n",buf);
                }else if(n <= 0){
                    if(EAGAIN == errno){
                        printf("EAGAIN\n");
                        break;
                    } 
                    perror("read err");
                    break;
                }
            }
        }
    }

    close(epoll_fd);
}

2.3.6 示例

  • epoll()实现服务器tcp_server_epoll.cpp
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <linux/fs.h>

#define max(a,b) ((a)>(b)?(a):(b))
void show_info(int connfd){
    struct sockaddr_in local_addr;
    bzero(&local_addr,sizeof(local_addr));
    socklen_t local_addr_len = sizeof(local_addr);
    getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
    printf("server local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
    
    struct sockaddr_in peer_addr;
    bzero(&peer_addr,sizeof(peer_addr));
    socklen_t peer_addr_len = sizeof(peer_addr);
    getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
    printf("server peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
    if(3 != argc){
        printf("usage:%s <ip> <#port>\n",argv[0]);
        return 1;
    }

    int listenfd = socket(AF_INET,SOCK_STREAM,0);
    if(-1 == listenfd){
        perror("listenfd open err");
        return 1;
    }
    printf("socket create OK\n");
    
    int flag = 1;
    setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));    

    struct sockaddr_in local_addr;
    bzero(&local_addr,sizeof(local_addr));
    local_addr.sin_family = AF_INET;
    local_addr.sin_addr.s_addr = inet_addr(argv[1]);
    local_addr.sin_port = htons(atoi(argv[2]));

    if(-1 == bind(listenfd,(struct sockaddr*)&local_addr,sizeof(local_addr))){
        perror("bind err");
        return 1;
    }
    printf("bind OK\n");

    if(-1 == listen(listenfd,10)){
        perror("listen err");
        return 1;
    }
    printf("listen OK\n");


    int epoll_fd = epoll_create(INR_OPEN_MAX);

    struct epoll_event evt;
    evt.data.fd = listenfd;
    evt.events = EPOLLIN;
    epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listenfd,&evt);

    int out_evts_cnt = 1;
    for(;;){
        struct epoll_event out_evts[out_evts_cnt];
        int fd_cnt = epoll_wait(epoll_fd,out_evts,out_evts_cnt,-1);
        int i;
        for(i=0;i<fd_cnt;i++){
            if(out_evts[i].data.fd == listenfd && out_evts[i].events & EPOLLIN){

                printf("accept listenfd\n");
                int connfd = accept(listenfd,NULL,NULL);
                if(-1 == connfd){
                    perror("accept err");
                }else{
                    if(out_evts_cnt+1 == INR_OPEN_MAX){
                        fprintf(stderr,"connfd size over %d",INR_OPEN_MAX);
                        close(connfd);
                    }else{
                        struct epoll_event evt;
                        evt.data.fd = connfd;
                        evt.events = EPOLLIN;
                        epoll_ctl(epoll_fd,EPOLL_CTL_ADD,connfd,&evt);
                        out_evts_cnt++;
                    }
                }
            }else if(out_evts[i].events & EPOLLIN){

                    char buf[BUFSIZ];
                    bzero(buf,BUFSIZ);
                    ssize_t len;
                    printf("read connfd %d\n",out_evts[i].data.fd);
                    if((len = read(out_evts[i].data.fd,buf,BUFSIZ-1)) == -1){
                        perror("read err");
                        //return 1;
                    }
                    if(0 == len){
                        printf("close %d\n",out_evts[i].data.fd);
                        close(out_evts[i].data.fd);
                        epoll_ctl(epoll_fd,EPOLL_CTL_DEL,out_evts[i].data.fd,out_evts+i);
                        continue;
                    }
                    printf("server recv:%s\n",buf);
                    
                    int fd = open(buf,O_RDONLY);
                    if(-1 == fd){
                        perror("open file err");
                    }
                    struct stat file_stat;
                    fstat(fd,&file_stat);
                    if(-1 == sendfile(out_evts[i].data.fd,fd,NULL,file_stat.st_size)){
                        perror("sendfile err");
                    }
                    printf("server send file %s ok\n",buf);
                    close(fd);
            }
        }
    }
    
    close(epoll_fd);
    close(listenfd);
}
  • epoll()实现客户端tcp_client_epoll.cpp
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <math.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>

#define max(a,b) ((a)>(b)?(a):(b))

void show_info(int connfd){
    struct sockaddr_in local_addr;
    bzero(&local_addr,sizeof(local_addr));
    socklen_t local_addr_len = sizeof(local_addr);
    getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
    printf("client local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
    
    struct sockaddr_in peer_addr;
    bzero(&peer_addr,sizeof(peer_addr));
    socklen_t peer_addr_len = sizeof(peer_addr);
    getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
    printf("clinet peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
    if(3 != argc){
        printf("usage:%s <ip> <#port> \n",argv[0]);
        return 1;
    }

    int connfd = socket(AF_INET,SOCK_STREAM,0);
    if(-1 == connfd){
        perror("socket err");
        return 1;
    }
    struct sockaddr_in remote_addr;
    bzero(&remote_addr,sizeof(remote_addr));
    remote_addr.sin_family = AF_INET;
    remote_addr.sin_addr.s_addr = inet_addr(argv[1]);
    remote_addr.sin_port = htons(atoi(argv[2]));    
    if(-1 == connect(connfd,(struct sockaddr*)&remote_addr,sizeof(remote_addr))){
        perror("connect err");
        return 1;
    }

    show_info(connfd);

    printf("connfd:%d\n",connfd);

    struct epoll_event in_evts[2];

    in_evts[0].data.fd = connfd;
    in_evts[0].events = EPOLLIN;
    in_evts[1].data.fd = STDIN_FILENO;
    in_evts[1].events = EPOLLIN;
    
    int epoll_fd = epoll_create(2);

    epoll_ctl(epoll_fd,EPOLL_CTL_ADD,connfd,in_evts);
    epoll_ctl(epoll_fd,EPOLL_CTL_ADD,STDIN_FILENO,in_evts+1);

    char buf[BUFSIZ];
    for(;;){
        struct epoll_event out_evts[2];
        int fd_cnt = epoll_wait(epoll_fd,out_evts,2,-1);
        
        int i;
        for(i=0;i<fd_cnt;i++){
            if(out_evts[i].data.fd == connfd && out_evts[i].events & EPOLLIN){
                
                bzero(buf,BUFSIZ);
                printf("recv msg\n");
                ssize_t n;
                if((n = read(connfd,buf,BUFSIZ)) == -1){
                    perror("read err");
                    return 1;
                }else if(0 == n){
                    printf("server close\n");
                    break;
                }
                printf("client recv:%s\n",buf);
            }
            if(out_evts[i].data.fd == STDIN_FILENO && out_evts[i].events & EPOLLIN){

                bzero(buf,BUFSIZ);
                printf("send msg\n");
                fgets(buf,BUFSIZ,stdin);
                write(connfd,buf,strlen(buf)-1);
            }
        }       
    }

    close(connfd);
}

3. 比较

3.1 select IO多路复用

  • 缺点
    • 只能监视FD_SETSIZE个连接
    • 不能确切指定有数据的socket
    • 每次需要修改传入的fd_set
    • 线程不安全

3.2 poll IO多路复用

  • 优点

    • 不需要不修改传入的pollfd数组
    • 可以监视任意个连接
  • 缺点

    • 不能确切指定有数据的socket
    • 线程不安全

3.3 epoll IO多路复用

  • 优点
    • 能确切指定有数据的socket
    • 线程安全
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容