0. 背景
-
阻塞IO操作
通常IO操作(比如read
和write
)都是阻塞I/O的,当调用read
时,如果没有数据收到,线程或者进程就会被挂起,直到收到数据。
当服务器处理1000个连接,但是只有很少连接执行IO操作,那么需要1000个线程或进程来处理1000个连接,而1000个线程大部分是被挂起的。
-
线程内存和切换开销
由于CPU的核数或超线程数一般都不大,比如4,8,16,32,64,128,比如4个核要跑1000个线程,那么每个线程的时间槽非常短,而线程切换非常频繁。
- 线程是有内存开销的,1个线程可能需要512K(或2M)存放栈,那么1000个线程就要512M(或2G)内存。
- 线程的切换,或者说上下文切换是有CPU开销的,当大量时间花在上下文切换的时候,分配给真正的操作的CPU就要少很多。
1. IO多路复用
I/O多路复用:多路网络连接复用一个IO线程。
使用一个线程来检查I/O流(Socket)的就绪状态。通过记录跟踪每个I/O流(Socket)的状态,来同时管理多个I/O流 。
多个Socket复用功能是在内核驱动实现的。
在处理1000个连接时,只需要1个线程监控就绪状态,就绪的连接开一个线程处理就可以了,这样需要的线程数大大减少,减少了内存开销和上下文切换的CPU开销。
I/O ready 事件的通知是以一个监听集合为单位完成的。multiplex 的是监听集合,并非 I/O 本身。
2. 分类
2.1 Select模式
2.1.1 结构体
fd_set
:描述符集合(long
类型数组)
2.1.2 函数
监听描述符事件,如果描述符集合中没有就绪,等待;反之,函数返回,把描述符集合清空,并设置已经就绪的描述符(设置为1
)。
int select(int maxfd,fd_set *rdset,fd_set *wrset,fd_set *exset,struct timeval *timeout)
No. |
参数 |
含义 |
1 |
maxfd |
需要监视的最大的文件描述符值+1 |
2 |
rdset |
需要检测的可读文件描述符的集合 |
3 |
wrset |
需要检测的可写文件描述符的集合 |
4 |
exset |
需要检测的异常文件描述符的集合 |
5 |
timeout |
超时时间 |
No. |
返回值 |
含义 |
1 |
-1 |
出错 |
2 |
=0 |
超时 |
3 |
>0 |
获取到数据 |
2.1.3 宏定义
No. |
参数 |
含义 |
1 |
FD_ZERO(fd_set *fdset) |
清空文件描述符集 |
2 |
FD_SET(int fd,fd_set *fdset) |
设置监听的描述符(把监听的描述符设置为1) |
3 |
FD_CLR(int fd,fd_set *fdset) |
清除监听的描述符(把监听的描述符设置为0) |
4 |
FD_ISSET(int fd,fd_set *fdset) |
判断描述符是否设置(判断描述符是否设置为1) |
5 |
FD_SETSIZE |
256 |
2.1.4 就绪条件
2.1.5 编码流程
- 定义描述符集
- 清空描述符集
- 设置指定的描述符并获取最大的描述符值+1
- 等待描述符就绪
- 判断已就绪的描述符,并做对应处理。
2.1.6 代码结构
// 定义描述符集
fd_set rset;
// 清空描述符集
FD_ZERO(&rset);
// 设置描述符
FD_SET(fd1,&rset);
FD_SET(fd2,&rset);
// 获取最大描述符+1
int maxfdp1 = max(fd1,fd2) + 1;
// 等待描述符就绪
if(select(maxfdp1,&rset,NULL,NULL,NULL) > 0){
// 判断已就绪的描述符
if(FD_ISSET(fd1,&rset)){
// do somthing
}
if(FD_ISSET(fd2,&rset)){
// do somthing
}
}
2.1.7 注意
-
fd_set
可容纳最大描述符数为FD_SETSIZE
。
- 每一次
select()
前,必须重新设置描述符,如果设置了新的描述符,需要重新计算maxfdp1
。
- 如果删除了描述符,需要把对应描述符的fd_set,执行FD_CLR操作
2.1.8 缺点
- 需要修改传入的参数数组
- 不能确切指定有数据的socket
- 只能监视
FD_SETSIZE
个链接
- 线程不安全
2.1.9 示例
-
select()
实现服务器tcp_server_select.cpp
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>
#define max(a,b) ((a)>(b)?(a):(b))
void show_info(int connfd){
struct sockaddr_in local_addr;
bzero(&local_addr,sizeof(local_addr));
socklen_t local_addr_len = sizeof(local_addr);
getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
printf("server local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
struct sockaddr_in peer_addr;
bzero(&peer_addr,sizeof(peer_addr));
socklen_t peer_addr_len = sizeof(peer_addr);
getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
printf("server peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
if(3 != argc){
printf("usage:%s <ip> <#port>\n",argv[0]);
return 1;
}
int listenfd = socket(AF_INET,SOCK_STREAM,0);
if(-1 == listenfd){
perror("listenfd open err");
return 1;
}
printf("socket create OK\n");
int flag = 1;
setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
struct sockaddr_in local_addr;
bzero(&local_addr,sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_addr.s_addr = inet_addr(argv[1]);
local_addr.sin_port = htons(atoi(argv[2]));
if(-1 == bind(listenfd,(struct sockaddr*)&local_addr,sizeof(local_addr))){
perror("bind err");
return 1;
}
printf("bind OK\n");
if(-1 == listen(listenfd,10)){
perror("listen err");
return 1;
}
printf("listen OK\n");
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(listenfd,&rfds);
int maxfdp1 = listenfd + 1;
int connfds[FD_SETSIZE-1];
size_t connfds_cnt = 0;
for(;;){
int i;
FD_SET(listenfd,&rfds);
for(i=0;i<connfds_cnt;i++){
FD_SET(connfds[i],&rfds);
printf("FD_SET(%d)\n",connfds[i]);
}
printf("before select:%lu\n",rfds);
if(-1 == select(maxfdp1,&rfds,NULL,NULL,NULL)){
perror("select error");
return 1;
}
printf("after select:%lu\n",rfds);
if(FD_ISSET(listenfd,&rfds)){
printf("listenfd ready\n");
struct sockaddr_in remote_addr;
bzero(&remote_addr,sizeof(remote_addr));
socklen_t remote_addr_len = sizeof(remote_addr);
int connfd = accept(listenfd,(struct sockaddr*)&remote_addr,&remote_addr_len);
if(-1 == connfd){
perror("accept err");
//return 1;
}
if(connfds_cnt+1 == FD_SETSIZE-1){
fprintf(stderr,"connfd size over %d\n",FD_SETSIZE-1);
close(connfds[i]);
}else{
connfds[connfds_cnt++] = connfd;
maxfdp1 = max(connfd,maxfdp1-1)+1;
show_info(connfd);
}
}
for(i=0;i<connfds_cnt;i++){
if(-1 == connfds[i]){
continue;
}
if(FD_ISSET(connfds[i],&rfds)){
char buf[BUFSIZ];
bzero(buf,BUFSIZ);
ssize_t len;
if((len = read(connfds[i],buf,BUFSIZ-1)) == -1){
perror("read err");
//return 1;
}
if(0 == len){
printf("close %d\n",connfds[i]);
close(connfds[i]);
FD_CLR(connfds[i],&rfds);
memcpy(connfds+i,connfd+i+1,connfds_cnt-i-1);
connfds_cnt--;
i--;//数组发生变化,重新判断i的fd
continue;
}
printf("server recv:%s\n",buf);
int fd = open(buf,O_RDONLY);
if(-1 == fd){
perror("open file err");
}
struct stat file_stat;
fstat(fd,&file_stat);
if(-1 == sendfile(connfds[i],fd,NULL,file_stat.st_size)){
perror("sendfile err");
}
printf("server send file %s ok\n",buf);
close(fd);
}
}
}
close(listenfd);
}
2.2 Poll模式
2.2.1 背景
优点
- 不需要不修改传入的参数数组
- 可以监视任意个链接
cat /proc/sys/fs/file-max
缺点
2.2.2 结构体
成员 |
含义 |
fd |
描述符 |
events |
监听事件,主要用于设置监听事件 |
revents |
实际触发的事件,用于判断触发的事件 |
2.2.3 函数
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout)
No. |
参数 |
含义 |
1 |
fdarray |
struct pollfd 数组指针 |
2 |
nfds |
数组元素个数 |
3 |
timeout |
等待时间。INFTIM :永远等待;0 :立即返回;>0 :等待秒数; |
No. |
参数 |
含义 |
1 |
fd |
文件描述符 |
2 |
events |
监视fd事件,监视事件可以是输入事件,可以是输出事件。 |
3 |
revents |
fd实际发生的事件,监视事件可以是输入事件,可以是输出事件,还可以是出错事件。 |
No. |
参数 |
含义 |
1 |
POLLRDNORM |
普通数据 |
2 |
POLLRDBAND |
优先级带数据 |
3 |
POLLIN |
普通或者优先级带数据 |
No. |
参数 |
含义 |
1 |
POLLWRNORM |
普通数据 |
2 |
POLLWRBAND |
优先级带数据 |
3 |
POLLOUT |
普通或者优先级带数据 |
No. |
参数 |
含义 |
1 |
POLLERR |
发生错误 |
2 |
POLLHUP |
发生挂起 |
3 |
POLLNVAL |
描述符非法 |
No. |
返回值 |
含义 |
1 |
0 |
超时 |
2 |
-1 |
出错 |
3 |
正数 |
就绪描述符个数 |
2.2.4 概念
数据类型
No. |
类型 |
实例 |
1 |
普通数据 |
正规的TCP数据,所有的UDP数据 |
2 |
优先级带数据 |
TCP带外数据 |
TCP带外数据数据中的急救车
2.2.5 编码流程
- 定义
pollfd
结构体数组
- 初始化
pollfd
结构体数组
- 设置监听poll事件
- 等待poll事件
- 判断触发事件的描述符,并做对应处理。
2.2.6 触发事件
2.2.7 代码结构
// 定义pollfd结构体数组
struct pollfd pollfds[OPEN_MAX];
// 初始化pollfd结构体数组
int i;
for(i=0;i<OPEN_MAX;i++){
pollfds[i].fd = -1;
}
int pollfds_cnt = 0;
// 设置监听事件
pollfds[0].fd = fd1;
pollfds[0].event = POLLIN;
pollfds_cnt++;
pollfds[1].fd = fd1;
pollfds[1].event = POLLIN;
pollfds_cnt++;
// 等待poll事件
if(poll(pollfds,pollfds_cnt,INFTIM)>0){
int i;
for(i=0;i<pollfds_cnt;i++){
// 判断触发事件的描述符
if(pollfds[i].fd == fd1 && pollfds[i].revent & POLLIN){
// do something
}
if(pollfds[i].fd == fd2 && pollfds[i].revent & POLLIN){
// do something
}
}
}
2.2.8 注意
struct pollfd
数组的最大数是OPEN_MAX
(或者linux/fs.h
中的INR_OPEN_MAX
)
还可以通过如下方式查看:
cat /proc/sys/fs/file-max
ulimit
2.2.9 示例
-
poll()
实现服务器tcp_server_poll.cpp
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/poll.h>
#include <linux/fs.h>
#define max(a,b) ((a)>(b)?(a):(b))
void show_info(int connfd){
struct sockaddr_in local_addr;
bzero(&local_addr,sizeof(local_addr));
socklen_t local_addr_len = sizeof(local_addr);
getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
printf("server local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
struct sockaddr_in peer_addr;
bzero(&peer_addr,sizeof(peer_addr));
socklen_t peer_addr_len = sizeof(peer_addr);
getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
printf("server peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
if(3 != argc){
printf("usage:%s <ip> <#port>\n",argv[0]);
return 1;
}
int listenfd = socket(AF_INET,SOCK_STREAM,0);
if(-1 == listenfd){
perror("listenfd open err");
return 1;
}
printf("socket create OK\n");
int flag = 1;
setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
struct sockaddr_in local_addr;
bzero(&local_addr,sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_addr.s_addr = inet_addr(argv[1]);
local_addr.sin_port = htons(atoi(argv[2]));
if(-1 == bind(listenfd,(struct sockaddr*)&local_addr,sizeof(local_addr))){
perror("bind err");
return 1;
}
printf("bind OK\n");
if(-1 == listen(listenfd,10)){
perror("listen err");
return 1;
}
printf("listen OK\n");
struct pollfd poll_fd[INR_OPEN_MAX];
poll_fd[0].fd = listenfd;
poll_fd[0].events = POLLIN;
size_t poll_fd_cnt = 1;
for(;;){
if(-1 != poll(poll_fd,poll_fd_cnt,-1)){
if(poll_fd[0].revents == POLLIN){
printf("accept listenfd\n");
int connfd = accept(listenfd,NULL,NULL);
if(-1 == connfd){
perror("accept err");
}else{
if(poll_fd_cnt+1 == INR_OPEN_MAX){
fprintf(stderr,"connfd size over %d",INR_OPEN_MAX);
close(connfd);
}else{
poll_fd[poll_fd_cnt].fd = connfd;
poll_fd[poll_fd_cnt].events = POLLIN;
poll_fd_cnt++;
}
}
}
int i;
for(i=1;i<poll_fd_cnt;i++){
if(poll_fd[i].revents & POLLIN){
char buf[BUFSIZ];
bzero(buf,BUFSIZ);
ssize_t len;
printf("read connfd %d\n",poll_fd[i].fd);
if((len = read(poll_fd[i].fd,buf,BUFSIZ-1)) == -1){
perror("read err");
//return 1;
}
if(0 == len){
printf("close %d\n",poll_fd[i].fd);
printf("%d vs %d\n",poll_fd[i].revents,poll_fd[i].revents);
close(poll_fd[i].fd);
printf("%d vs %d\n",poll_fd[i].revents,poll_fd[i].revents);
memcpy(poll_fd+i,poll_fd+i+1,poll_fd_cnt-i-1);
poll_fd_cnt--;
i--;//数组发生变化,重新判断i的fd
continue;
//break;
}
printf("server recv:%s\n",buf);
int fd = open(buf,O_RDONLY);
if(-1 == fd){
perror("open file err");
}
struct stat file_stat;
fstat(fd,&file_stat);
if(-1 == sendfile(poll_fd[i].fd,fd,NULL,file_stat.st_size)){
perror("sendfile err");
}
printf("server send file %s ok\n",buf);
close(fd);
}
}
}
}
close(listenfd);
}
-
poll()
实现客户端tcp_client_poll.cpp
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <math.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/poll.h>
#define max(a,b) ((a)>(b)?(a):(b))
void show_info(int connfd){
struct sockaddr_in local_addr;
bzero(&local_addr,sizeof(local_addr));
socklen_t local_addr_len = sizeof(local_addr);
getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
printf("client local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
struct sockaddr_in peer_addr;
bzero(&peer_addr,sizeof(peer_addr));
socklen_t peer_addr_len = sizeof(peer_addr);
getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
printf("clinet peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
if(3 != argc){
printf("usage:%s <ip> <#port> \n",argv[0]);
return 1;
}
int connfd = socket(AF_INET,SOCK_STREAM,0);
if(-1 == connfd){
perror("socket err");
return 1;
}
struct sockaddr_in remote_addr;
bzero(&remote_addr,sizeof(remote_addr));
remote_addr.sin_family = AF_INET;
remote_addr.sin_addr.s_addr = inet_addr(argv[1]);
remote_addr.sin_port = htons(atoi(argv[2]));
if(-1 == connect(connfd,(struct sockaddr*)&remote_addr,sizeof(remote_addr))){
perror("connect err");
return 1;
}
show_info(connfd);
printf("connfd:%d\n",connfd);
struct pollfd poll_fds[2];
poll_fds[0].fd = connfd;
poll_fds[0].events = POLLIN;
poll_fds[1].fd = STDIN_FILENO;
poll_fds[1].events = POLLIN;
char buf[BUFSIZ];
for(;;){
if(-1 != poll(poll_fds,2,-1)){
if(poll_fds[0].fd == connfd && poll_fds[0].revents & POLLRDNORM){
bzero(buf,BUFSIZ);
printf("recv msg\n");
ssize_t n;
if((n = read(connfd,buf,BUFSIZ)) == -1){
perror("read err");
return 1;
}else if(0 == n){
printf("server close\n");
break;
}
printf("client recv:%s\n",buf);
}
if(poll_fds[1].fd == STDIN_FILENO && poll_fds[1].revents & POLLRDNORM){
bzero(buf,BUFSIZ);
printf("send msg\n");
fgets(buf,BUFSIZ,stdin);
write(connfd,buf,strlen(buf)-1);
}
}
}
close(connfd);
}
2.3 Epool模式
2.3.1 背景
优点
2.3.2 结构体
struct epoll_event
成员 |
含义 |
data.fd |
描述符 |
events |
设置/获取epoll事件 |
结构体epoll_event
被用于注册所感兴趣的事件和回传所发生待处理的事件,定义如下:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;//保存触发事件的某个文件描述符相关的数据
struct epoll_event {
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
其中events表示感兴趣的事件和被触发的事件,可能的取值为:
No. |
事件 |
含义 |
1 |
EPOLLIN |
表示对应的文件描述符可以读 |
2 |
EPOLLOUT |
表示对应的文件描述符可以写 |
3 |
EPOLLPRI |
表示对应的文件描述符有紧急的数可读 |
4 |
EPOLLERR |
表示对应的文件描述符发生错误 |
5 |
EPOLLHUP |
表示对应的文件描述符被挂断 |
6 |
EPOLLET |
ET的epoll工作模 |
2.3.2 函数
No. |
操作 |
函数 |
1 |
创建 |
int epoll_create(int size) |
2 |
控制 |
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) |
3 |
轮询I/O事件 |
int epoll_wait(int epfd,struct epoll_event events,int maxevents,int timeout) |
①创建
int epoll_create(int size)
可以在/proc/PID/fd/
查看
② 控制
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
No. |
参数 |
含义 |
1 |
epfd |
epoll文件描述符 |
2 |
op |
操作 |
3 |
fd |
关联的文件描述符 |
4 |
event |
指向epoll_event 的指针 |
No. |
参数 |
含义 |
1 |
EPOLL_CTL_ADD |
注册 |
2 |
EPOLL_CTL_MOD |
修改 |
3 |
EPOLL_CTL_DEL |
删除 |
No. |
返回值 |
含义 |
1 |
0 |
成功 |
2 |
-1 |
失败 |
③ 轮询I/O事件
int epoll_wait(int epfd,struct epoll_event events,int maxevents,int timeout)
No. |
参数 |
含义 |
1 |
epfd |
epoll 文件描述符 |
2 |
epoll_event |
用于回传代处理事件的数组 |
3 |
maxevents |
每次能处理的事件数 |
4 |
timeout |
等待I/O事件发生的超时值ms,-1 :永不超时;0 : 立即返回 |
No. |
返回值 |
含义 |
1 |
正数 |
发生事件数 |
2 |
-1 |
错误 |
2.3.3 编码流程
- 创建epoll描述符
- 注册epoll事件
- 等待epoll事件
- 判断触发epoll事件的描述符和事件
- 关闭epoll描述符
2.3.4 触发条件
- ET(Edge Triggered)模式--边沿触发
No. |
操作 |
触发条件 |
1 |
读 |
接收缓冲区状态变化时触发读事件,即空的接收缓冲区刚接收到数据时触发读事件 |
2 |
写 |
发送缓冲区状态变化时触发写事件,即满的缓冲区刚空出空间时触发读事件 |
- LT(Level Triggered)模式--水平触发
No. |
操作 |
触发条件 |
1 |
读 |
接收缓冲区不为空,有数据可读,读事件一直触发 |
2 |
写 |
发送缓冲区不满,可以继续写入数据,写事件一直触发 |
2.3.5 代码结构
LT代码结构
// 创建epoll描述符
int epollfd = epoll_create(OPEN_MAX);
// 注册epoll事件
struct epoll_event evt;
evt.data.fd = fd1;
evt.events = EPOLLIN;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd1,&evt);
evt.data.fd = fd2;
evt.events = EPOLLIN;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd2,&evt);
// 等待epoll事件
int evts_cnt = 2;
struct epoll_event evts[evts_cnt];
int fd_cnt = epoll_wait(epollfd,&evts,evts_cnt,-1);:
int i;
for(i=0;i<fd_cnt,i++){
// 判断触发epoll事件的描述符和事件
if(evts[i].data.fd == fd1 && evts[i].events & EPOLLIN){
// do something
}
if(evts[i].data.fd == fd2 && evts[i].events & EPOLLIN){
// do something
}
}
// 关闭epoll描述符
close(epollfd);
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <sys/epoll.h>
int main(int argc,char* argv[]){
int flag = EPOLLIN;
int c;
while((c = getopt(argc,argv,"e")) != -1){
switch(c){
case 'e':
flag |= EPOLLET;
break;
}
}
if(optind != argc - 1){
printf("usage:%s -e <#size>\n",argv[0]);
return 1;
}
ssize_t size = atoi(argv[optind]);
if(size <= 0){
printf("size must > 0\n");
return 1;
}
int epoll_fd = epoll_create(1);
struct epoll_event evt;
evt.data.fd = STDIN_FILENO;
evt.events = flag;
if(EPOLLET & flag){
fcntl(STDIN_FILENO,F_SETFL,O_NONBLOCK);
}
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,STDIN_FILENO,&evt);
char buf[size+1];
for(;;){
struct epoll_event out_evt;
epoll_wait(epoll_fd,&out_evt,1,-1);
if(out_evt.events & EPOLLIN){
bzero(buf,size+1);
read(STDIN_FILENO,buf,size);
printf("%s\n",buf);
}
}
close(epoll_fd);
}
ET模式的文件句柄必须是非阻塞的。
// 创建epoll描述符
int epollfd = epoll_create(OPEN_MAX);
// 注册epoll事件
struct epoll_event evt;
evt.data.fd = fd1;// fd1必须是非阻塞的
evt.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd1,&evt);
evt.data.fd = fd2;// fd2必须是非阻塞的
evt.events = EPOLLIN | EPOLLET;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd2,&evt);
// 等待epoll事件
int evts_cnt = 2;
struct epoll_event evts[evts_cnt];
int fd_cnt = epoll_wait(epollfd,&evts,evts_cnt,-1);
int i;
for(i=0;i<fd_cnt,i++){
// 判断触发epoll事件的描述符和事件
if(evts[i].data.fd == fd1 && evts[i].events & EPOLLIN){
// do something
}
if(evts[i].data.fd == fd2 && evts[i].events & EPOLLIN){
// do something
}
}
// 关闭epoll描述符
close(epollfd);
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/epoll.h>
int main(int argc,char* argv[]){
int flag = EPOLLIN;
int c;
while((c = getopt(argc,argv,"e")) != -1){
switch(c){
case 'e':
flag |= EPOLLET;
break;
}
}
if(optind != argc - 1){
printf("usage:%s -e <#size>\n",argv[0]);
return 1;
}
ssize_t size = atoi(argv[optind]);
if(size <= 0){
printf("size must > 0\n");
return 1;
}
int epoll_fd = epoll_create(1);
struct epoll_event evt;
evt.data.fd = STDIN_FILENO;
evt.events = flag;
if(EPOLLET & flag){
fcntl(STDIN_FILENO,F_SETFL,O_NONBLOCK);
}
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,STDIN_FILENO,&evt);
char buf[size+1];
for(;;){
struct epoll_event out_evt;
epoll_wait(epoll_fd,&out_evt,1,-1);
if(out_evt.events & EPOLLIN){
for(;;){
bzero(buf,size+1);
ssize_t n = read(STDIN_FILENO,buf,size);
if(n > 0){
printf("%s\n",buf);
}else if(n <= 0){
if(EAGAIN == errno){
printf("EAGAIN\n");
break;
}
perror("read err");
break;
}
}
}
}
close(epoll_fd);
}
2.3.6 示例
-
epoll()
实现服务器tcp_server_epoll.cpp
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <linux/fs.h>
#define max(a,b) ((a)>(b)?(a):(b))
void show_info(int connfd){
struct sockaddr_in local_addr;
bzero(&local_addr,sizeof(local_addr));
socklen_t local_addr_len = sizeof(local_addr);
getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
printf("server local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
struct sockaddr_in peer_addr;
bzero(&peer_addr,sizeof(peer_addr));
socklen_t peer_addr_len = sizeof(peer_addr);
getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
printf("server peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
if(3 != argc){
printf("usage:%s <ip> <#port>\n",argv[0]);
return 1;
}
int listenfd = socket(AF_INET,SOCK_STREAM,0);
if(-1 == listenfd){
perror("listenfd open err");
return 1;
}
printf("socket create OK\n");
int flag = 1;
setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&flag,sizeof(flag));
struct sockaddr_in local_addr;
bzero(&local_addr,sizeof(local_addr));
local_addr.sin_family = AF_INET;
local_addr.sin_addr.s_addr = inet_addr(argv[1]);
local_addr.sin_port = htons(atoi(argv[2]));
if(-1 == bind(listenfd,(struct sockaddr*)&local_addr,sizeof(local_addr))){
perror("bind err");
return 1;
}
printf("bind OK\n");
if(-1 == listen(listenfd,10)){
perror("listen err");
return 1;
}
printf("listen OK\n");
int epoll_fd = epoll_create(INR_OPEN_MAX);
struct epoll_event evt;
evt.data.fd = listenfd;
evt.events = EPOLLIN;
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,listenfd,&evt);
int out_evts_cnt = 1;
for(;;){
struct epoll_event out_evts[out_evts_cnt];
int fd_cnt = epoll_wait(epoll_fd,out_evts,out_evts_cnt,-1);
int i;
for(i=0;i<fd_cnt;i++){
if(out_evts[i].data.fd == listenfd && out_evts[i].events & EPOLLIN){
printf("accept listenfd\n");
int connfd = accept(listenfd,NULL,NULL);
if(-1 == connfd){
perror("accept err");
}else{
if(out_evts_cnt+1 == INR_OPEN_MAX){
fprintf(stderr,"connfd size over %d",INR_OPEN_MAX);
close(connfd);
}else{
struct epoll_event evt;
evt.data.fd = connfd;
evt.events = EPOLLIN;
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,connfd,&evt);
out_evts_cnt++;
}
}
}else if(out_evts[i].events & EPOLLIN){
char buf[BUFSIZ];
bzero(buf,BUFSIZ);
ssize_t len;
printf("read connfd %d\n",out_evts[i].data.fd);
if((len = read(out_evts[i].data.fd,buf,BUFSIZ-1)) == -1){
perror("read err");
//return 1;
}
if(0 == len){
printf("close %d\n",out_evts[i].data.fd);
close(out_evts[i].data.fd);
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,out_evts[i].data.fd,out_evts+i);
continue;
}
printf("server recv:%s\n",buf);
int fd = open(buf,O_RDONLY);
if(-1 == fd){
perror("open file err");
}
struct stat file_stat;
fstat(fd,&file_stat);
if(-1 == sendfile(out_evts[i].data.fd,fd,NULL,file_stat.st_size)){
perror("sendfile err");
}
printf("server send file %s ok\n",buf);
close(fd);
}
}
}
close(epoll_fd);
close(listenfd);
}
-
epoll()
实现客户端tcp_client_epoll.cpp
#include <stdio.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <math.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#define max(a,b) ((a)>(b)?(a):(b))
void show_info(int connfd){
struct sockaddr_in local_addr;
bzero(&local_addr,sizeof(local_addr));
socklen_t local_addr_len = sizeof(local_addr);
getsockname(connfd,(struct sockaddr*)&local_addr,&local_addr_len);
printf("client local %s:%d\n",inet_ntoa(local_addr.sin_addr),ntohs(local_addr.sin_port));
struct sockaddr_in peer_addr;
bzero(&peer_addr,sizeof(peer_addr));
socklen_t peer_addr_len = sizeof(peer_addr);
getpeername(connfd,(struct sockaddr*)&peer_addr,&peer_addr_len);
printf("clinet peer %s:%d\n",inet_ntoa(peer_addr.sin_addr),ntohs(peer_addr.sin_port));
}
int main(int argc,char* argv[]){
if(3 != argc){
printf("usage:%s <ip> <#port> \n",argv[0]);
return 1;
}
int connfd = socket(AF_INET,SOCK_STREAM,0);
if(-1 == connfd){
perror("socket err");
return 1;
}
struct sockaddr_in remote_addr;
bzero(&remote_addr,sizeof(remote_addr));
remote_addr.sin_family = AF_INET;
remote_addr.sin_addr.s_addr = inet_addr(argv[1]);
remote_addr.sin_port = htons(atoi(argv[2]));
if(-1 == connect(connfd,(struct sockaddr*)&remote_addr,sizeof(remote_addr))){
perror("connect err");
return 1;
}
show_info(connfd);
printf("connfd:%d\n",connfd);
struct epoll_event in_evts[2];
in_evts[0].data.fd = connfd;
in_evts[0].events = EPOLLIN;
in_evts[1].data.fd = STDIN_FILENO;
in_evts[1].events = EPOLLIN;
int epoll_fd = epoll_create(2);
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,connfd,in_evts);
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,STDIN_FILENO,in_evts+1);
char buf[BUFSIZ];
for(;;){
struct epoll_event out_evts[2];
int fd_cnt = epoll_wait(epoll_fd,out_evts,2,-1);
int i;
for(i=0;i<fd_cnt;i++){
if(out_evts[i].data.fd == connfd && out_evts[i].events & EPOLLIN){
bzero(buf,BUFSIZ);
printf("recv msg\n");
ssize_t n;
if((n = read(connfd,buf,BUFSIZ)) == -1){
perror("read err");
return 1;
}else if(0 == n){
printf("server close\n");
break;
}
printf("client recv:%s\n",buf);
}
if(out_evts[i].data.fd == STDIN_FILENO && out_evts[i].events & EPOLLIN){
bzero(buf,BUFSIZ);
printf("send msg\n");
fgets(buf,BUFSIZ,stdin);
write(connfd,buf,strlen(buf)-1);
}
}
}
close(connfd);
}
3. 比较
3.1 select IO多路复用
- 缺点
- 只能监视
FD_SETSIZE
个连接
- 不能确切指定有数据的
socket
- 每次需要修改传入的
fd_set
- 线程不安全
3.2 poll IO多路复用
-
优点
- 不需要不修改传入的
pollfd
数组
- 可以监视任意个连接
-
缺点
3.3 epoll IO多路复用