一、阻塞 I/O 、非阻塞 I/O
1.1 客户端代码
代码写入 tcp_client.py 文件,这是通用代码:
from socket import socket, AF_INET, SOCK_STREAM
ADDR = ('localhost', 1234)
tcp_client_sock = socket(AF_INET, SOCK_STREAM)
tcp_client_sock.connect(ADDR)
while True:
data = input('输入内容:')
if not data:
break
tcp_client_sock.send(data.encode())
data = tcp_client_sock.recv(1024)
if not data:
break
print(data.decode())
tcp_client_sock.close()
1.2 I/O 阻塞(同步阻塞)
注意,这块儿的阻塞指的是系统调用的阻塞。服务器套接字的 accept 和 recv 方法在执行时,首先要等待客户端发送数据(连接请求也是数据),数据在内核空间准备好之前的这段时间是一个阻塞,这个阻塞被称为系统调用阻塞,也就是用户代码需要用内核态的权限读写一些数据时的阻塞,这个阻塞可能持续很久,它是由客户端决定的。内核空间中的数据准备好之后,用户态代码需要将数据从内核空间复制到用户空间,这步需要 CPU 在内核态完成,这也是一个阻塞,相比前一个阻塞,这个耗时要小很多,所谓异步 I/O 就是针对这个阻塞
阻塞型服务端代码写入 tcp_server.py 文件:
import time
from socket import socket, AF_INET, SOCK_STREAM
# HOST 变量是空白的,这是对 bind 方法的标识,表示它可以使用任何可用的地址
HOST = ''
# 选择一个随机的未被占用的端口号
PORT = 21567
# 将缓冲区大小设置为 1KB
BUFSIZ = 1024
# 主机端口元组
ADDR = (HOST, PORT)
# 定义 TCP 服务器套接字
tcp_server_sock = socket(AF_INET, SOCK_STREAM)
# 将地址绑定到套接字上
tcp_server_sock.bind(ADDR)
# 开启服务器监听,在连接被转接或拒绝之前,传入连接请求的最大数是 5
tcp_server_sock.listen(5)
# 进入监听状态后,等待客户端连接
while True:
print('Waiting for connection...')
# 下一行为阻塞运行状态,等待客户端的连接,阻塞分两段,上文已述
# 服务器套接字相当于总机接线员,接到客户电话后转给分机客服
# 当成功接入一个客户端,accept 方法会返回一个临时服务端套接字和对方地址
tcp_extension_sock, addr = tcp_server_sock.accept()
# 如果此时另一个客户端向服务器发送连接请求,也是可以的
# 请求数由 listen 方法的参数决定
# 连接成功后保持等待,前一个已连接的客户端断开连接后,才会处理下一个
print('...connected from: {}'.format(addr))
while True:
# 临时服务端套接字的 recv 方法获得客户端传来的数据
# 此方法也是阻塞运行,每次接收固定量的数据,直到全部接收完毕
data = tcp_extension_sock.recv(BUFSIZ)
if not data:
break
# 临时服务端套接字的 send 方法向客户端发送数据
# data 为二进制对象,将其转换为 UTF-8 格式,再整体转换为二进制对象
tcp_extension_sock.send('{} {}'.format(
time.ctime(), data.decode()).encode())
# while 循环结束后,关闭临时服务器套接字
tcp_extension_sock.close()
tcp_server_sock.close()
1.3 I/O 非阻塞
设置非阻塞很简单,使用套接字的 setblocking 方法即可。非阻塞需要配合 while 无限循环,到这步为止,非阻塞模式相比阻塞模式没有任何益处,反而增加了 CPU 的占用。它是实现 I/O 复用的基础,所以至关重要。另外,非阻塞模型每次只能处理一个客户端连接,前一个连接关闭后才能处理下一个
注意这块儿说的 “非阻塞” 指的是内核空间中接收客户端的数据这步,数据从内核空间复制到用户空间这步还是阻塞的
将非阻塞服务端代码写入 tcp_server_noblock.py 文件中:
import time
from socket import socket, AF_INET, SOCK_STREAM
ADDR = ('', 1234)
tcp_server_sock = socket(AF_INET, SOCK_STREAM)
tcp_server_sock.setblocking(False) # 设置非阻塞,注意是系统调用非阻塞
tcp_server_sock.bind(ADDR)
tcp_server_sock.listen()
print('Waiting for connection...')
while True:
try:
try:
# 设置非阻塞后,套接字的 accept 方法进行系统调用后立即返回
# accept 方法会触发 BlockingIOError 异常
# 捕获这个异常,继续 while 循环,直到系统空间收到客户端连接
tcp_extension_sock, addr = tcp_server_sock.accept()
except BlockingIOError:
continue
except KeyboardInterrupt:
break
print('------ connected from {}'.format(addr))
while True:
try:
# 同上,设置非阻塞后,套接字的 recv 方法进行系统调用后立即返回
# recv 方法会触发 BlockingIOError 异常
# 捕获这个异常,继续 while 循环,直到系统空间收到客户端消息
data = tcp_extension_sock.recv(1024)
except BlockingIOError:
continue
if not data:
break
print('收到消息:{}'.format(data.decode()))
tcp_extension_sock.send('{} {}'.format(
time.ctime(), data.decode()).encode())
print('------ {} connection closed.'.format(addr))
tcp_extension_sock.close()
print('\nEnd')
tcp_server_sock.close()
二、I/O 多路复用
I/O 多路复用可以做到在一个线程中监视多个文件描述符,当其中一个或者多个描述符准备好之后,内核会通知用户进程,用户进程来处理数据。和多进程、多线程相比,I/O 多路复用的系统开销比较小,无需新建进程或线程,不必进行进程或线程的上下文切换。目前 Linux 系统中常见的几种 I/O 多路复用机制的系统调用有 select、poll、epoll 。epoll 相较于 select 和 poll 性能要好一些,它也是唯一一个不支持在 Windows 操作系统中使用的系统调用
文件描述符(File Descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。文件描述符在形式上是一个非负整数,实际上它是一个指向 “文件打开记录表” 的索引值,“文件打开记录表” 是由内核为每个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开,不过文件描述符这一概念往往只适用于类 Unix 操作系统。在一个进程里,每个套接字对应一个独一无二且固定不变的文件描述符,不过前一个套接字关闭后,后一个套接字可以用它的文件描述符
I/O 多路复用机制中,一个线程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),系统内核能够通知程序进行相应的读写操作。但 select、pselect、poll、epoll 本质上都是同步 I/O ,因为它们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步 I/O 则无需自己进行读写,系统内核会负责把数据从内核空间拷贝到用户空间,然后再通知用户进程过来处理数据
2.1 select
select 是最早出现的系统调用,它可以在大多数系统中使用。select.select 方法监视的文件描述符分三类:writefds 可读、readfds 可写、和 exceptfds 异常。该方法调用后处于阻塞状态,直到有描述符就绪(有套接字可读、可写、异常)或者超时(timeout 参数指定等待时间)。select.select 方法的返回值是元组,元组中有三个列表,列表里的元素是描述符已就绪的套接字
老话说:“select 的主要缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在 Linux 系统中为 1024 或 2048 ,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低” ,这么说虽然对,但是换成 “select 适用于连接数为 1K 的使用场景” 更为合理,监视文件描述符的数量限制是有意为之,因为套接字存储在类似 Python 列表的线性表里,对 socket 进行扫描时采用轮询的方法,管理的连接数量越多,扫描效率相较 epoll 的红黑树算法越弱。实际上 select 在 1K 连接量、高连接活跃度的场景中的表现要优于 poll 和 epoll ,所谓尺有所短寸有所长
将以下代码写入 tcp_server_select.py 文件:
import time
import select
from socket import socket, AF_INET, SOCK_STREAM
ADDR = ('', 1234)
tcp_server_sock = socket(AF_INET, SOCK_STREAM)
tcp_server_sock.setblocking(False)
tcp_server_sock.bind(ADDR)
tcp_server_sock.listen()
print(dir(tcp_server_sock))
rlist = [tcp_server_sock] # 这三个列表作为 select.select 的参数
wlist = []
xlist = []
print('Waiting for connection...')
# 这个 while 循环是核心
# 每当有客户端发起连接请求、关闭连接、发送消息,这块儿 while 都会循环一次
while True:
print('--------------------')
# 套接字的 fileno 方法的返回值就是其文件描述符,一个正整数
# 在一个进程中,每个套接字都有独一无二且固定不变的文件描述符
print('rlist:', ['套接字FD: {}'.format(s.fileno()) for s in rlist])
print('wlist:', ['套接字FD: {}'.format(s.fileno()) for s in wlist])
try:
# select.select 方法阻塞运行,直到三个参数列表里的套接字有情况出现
# 所谓的有情况,就是套接字文件描述符准备就绪,可读、可写、异常
# 在一个线程内,此方法可以监视上千个描述符,比多线程模式节省大量 CPU 开销
# 此方法的返回值是元组,元组里是三个列表,列表里是有情况的套接字
t = select.select(rlist, wlist, xlist)
readable, writable, exceptional = t
except KeyboardInterrupt:
break
print('readable:', ['套接字FD {}'.format(s.fileno()) for s in readable])
print('writable:', ['套接字FD {}'.format(s.fileno()) for s in writable])
# 处理有情况的可读套接字
for sock in readable:
# TCP 服务端的套接字有两种,一种是等待客户端连接的主套接字
# 另一种是负责处理连接后的一系列事务的临时套接字
# 如果是主套接字有情况,甭问,肯定是有客户端连接请求,它就是干这个的
# 这种情况,主套接字的 accept 方法会返回一个新建临时套接字
if sock is tcp_server_sock:
tcp_extension_sock, addr = sock.accept()
print('收到连接:{}'.format(addr))
# 把临时套接字设为非阻塞,注意是系统调用那块儿的非阻塞
tcp_extension_sock.setblocking(False)
# 把临时套接字加到作为 select.select 方法的参数的列表里
rlist.append(tcp_extension_sock)
# 如果有情况的不是主套接字,那就是临时套接字了
# 肯定是客户端发来消息了,接收消息
# 接收完毕,该套接字自动可写就绪,也就是要把它放到 wlist 列表里
# 下一个 while 循环就处理这个可写套接字
else:
data = sock.recv(1024)
if data:
print('收到数据:{}'.format(data.decode()))
if sock not in wlist:
wlist.append(sock)
else:
# 如果消息接收完毕,移除临时套接字并关闭
rlist.remove(sock)
sock.close()
# 处理有情况的可写套接字
for sock in writable:
print('Server send ...')
sock.send('{}'.format(time.ctime()).encode())
# 处理完,把套接字移除 wlist
wlist.remove(sock)
# 处理有情况的异常套接字
for sock in exceptional:
print('Exceptional sock', sock)
rlist.remove(sock)
if sock in wlist:
wlist.remove(sock)
sock.close()
print('====================')
print('\nEnd')
tcp_server_sock.close()
2.2 poll
poll 本质上和 select 没有区别,但是它是基于链表来存储的,没有最大连接数的限制。poll 会将大量的文件描述符和对应的事件描述符整体复制到内核地址空间,然后查询每个文件描述符的状态,如果文件描述符的读写准备就绪,就将其放入等待队列中。如果遍历完所有的文件描述符均未就绪,则挂起当前进程,直到有可读写套接字或者超时唤醒进程再次遍历文件描述符链表
链表是一种无序数据类型,相比线性表,前者的查询速度要快得多。链表由一系列结点组成,每个结点包括数据域和指针域两部分,链表比线性表占用空间更多,相当于是拿空间换时间,这一点类似于 Python 字典和列表
select 和 poll 都需要通过遍历文件描述符来获取已经就绪的 socket ,大多数情况下会有大量的客户端连接,但只有少数的是活跃的,即连接活跃度低,这样随着描述符的增多,性能也会下降
Python 的 select 模块也实现了 poll 的接口,将代码写入 tcp_server_poll.py 文件:
import select
import socket
import time
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setblocking(False)
server_sock.bind(('', 1234))
server_sock.listen()
connections = {} # 存放新建临时分支套接字,key 是文件描述符,value 是套接字
poll = select.poll() # 创建 poll 对象
# 将主套接字 server_sock 的文件描述符注册到读事件链表里
# 注册套接字描述符之后,poll 会监视套接字的相关活动
# poll.register 方法接收俩参数:套接字的文件描述符、事件位掩码
# 针对套接字的三个事件及其位掩码(正整数):
# POLLIN 可读 1 ,POLLOUT 可写 4 ,POLLHUP 关闭 16
poll.register(server_sock.fileno(), select.POLLIN)
print('Waiting for connection...')
while True:
time.sleep(1)
print('\n========循环开始========')
try:
events = poll.poll()
except KeyboardInterrupt:
break
# 获取发生事件的套接字的文件描述符 fd 和事件位掩码 flag
for fd, flag in events:
print('--------套接字FD: {} 位掩码: {}'.format(fd, flag))
# 如果主套接字有事件就绪,一定是可读就绪,它只负责接收客户端连接
if fd == server_sock.fileno():
# 接收客户端的连接请求,创建新的临时套接字用来发送和接收数据
extension_sock, addr = server_sock.accept()
# print('extension_sock.getpeername: {}'.format(
# extension_sock.getpeername()))
print('收到连接,客户端地址:', addr)
# 将新的临时套接字设置为非阻塞模式
extension_sock.setblocking(0)
# 注册新的临时套接字,监视其可读事件
poll.register(extension_sock.fileno(), select.POLLIN)
# 把新建临时分支套接字加入到存储连接的字典里
connections[extension_sock.fileno()] = extension_sock
# 如果临时套接字可读事件就绪,也就是说客户端发送数据过来
elif flag & select.POLLIN:
# 利用文件描述符从存储连接的字典里获取可读事件就绪的临时套接字
extension_sock= connections[fd]
# 接收数据这块儿需要注意,如果客户端突然关闭连接
# 对应的临时套接字会同时可读和关闭就绪,即事件位掩码为 17
# 此时套接字的 recv 方法在运行时会触发 ConnectionResetError 异常
# 捕获这个异常,使程序向下执行,顺利关闭套接字
try:
data = extension_sock.recv(1024)
except ConnectionResetError:
pass
if data:
print('套接字 {} 收到数据: {}'.format(fd, data.decode()))
# 套接字接收数据后,可写事件会立刻就绪
# poll 转而监视其可写事件,下个 while 循环会处理
poll.modify(fd, select.POLLOUT)
# 如果套接字可写就绪,也就是套接字收到客户端发来的数据后
if flag & select.POLLOUT:
print("套接字 {} 发送数据...".format(fd))
# 利用文件描述符,从存储连接的字典里获取可读就绪的临时套接字
extension_sock = connections[fd]
# 发送数据
extension_sock.send('收到信息,这是模拟信息'.encode())
# 套接字向客户端发送消息后,poll 转为监视其可读事件
poll.modify(fd, select.POLLIN)
# 如果客户端关闭,临时套接字的关闭事件会就绪
elif flag & select.POLLHUP:
print('套接字 {} 关闭'.format(fd))
# 注销文件描述符对应的套接字,不再监视之
poll.unregister(fd)
# 关闭套接字
connections[fd].close()
# 将套接字从存储连接的字典里移除
del connections[fd]
print('--------connections: {}'.format(list(connections.keys())))
print('========循环结束========')
# 注销主套接字
poll.unregister(server_sock.fileno())
# 关闭主套接字
server_sock.close()
print('\nEnd')
2.3 epool & kqueue
epoll 和 kqueue 是 poll 的改进版,分别适用于 Linux 系统和 macOS 系统,以下描述统称 epoll 。
相对于 select 和 poll 来说,epoll 更加灵活,没有最大并发连接的数量限制,能打开的 FD 的上限远大于 1024(1G 内存上能监听约十万个端口)。epoll 使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy 只需一次。在 select / poll 中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描;而 epoll 中,当事件就绪时内核会采用回调机制迅速激活这个文件描述符。相比遍历文件描述符,回调机制要高效得多
注意:在连接并发量不高且连接活跃度比较高的场景中,epoll 的表现并不会比 select / poll 优秀
Python 的 select 模块也实现了 epoll 和 kqueue 的接口,将代码写入 tcp_server_epoll.py 文件:
import select
import socket
import time
# Linux 平台使用 select.epoll 接口,将其封装为 Epoll 类
class Epoll():
def __init__(self,):
# 初始化实例时,创建 epoll 对象,等同于 poll 对象
self._epoll = select.epoll() # edge polling 边缘轮询对象
# 获取文件描述符,这个方法在该程序中没有用到
def fileno(self):
return self._epoll.fileno()
# 注册临时套接字,监听某种事件,参数为文件描述符和事件位掩码
def register(self, fd, flag):
self._epoll.register(fd, flag)
# 注销临时套接字,参数为文件描述符
def unregister(self, fd):
self._epoll.unregister(fd)
# 修改临时套接字的监听事件类型
def modify(self, fd, flag):
self._epoll.modify(fd, flag)
def poll(self, timeout=None):
# epoll 对象的 poll 方法的作用等同于 poll 的 poll 方法
# 监听已注册的套接字,该方法阻塞运行
# 直到有事件就绪,返回就绪事件的列表
events = self._epoll.poll(timeout)
# 这个字典用来保存处理后的就绪事件
events_ = {}
# 就绪事件为何要处理?因为 epoll 与 kqueue 所提供的事件位掩码不同
# 所以统一改成 poll 提供的事件位掩码
for fd, flag in events:
# 可读事件就绪
if flag & select.EPOLLIN:
# 将事件位掩码转换为 POLLIN
events_[fd] = events_.get(fd, 0) | select.POLLIN
# 可写事件就绪
if flag & select.EPOLLOUT:
# 将事件位掩码转换为 POLLOUT
events_[fd] = events_.get(fd, 0) | select.POLLOUT
# 关闭事件就绪
if flag & select.EPOLLHUP:
# 将事件位掩码转换为 POLLOUT
events_[fd] = events_.get(fd, 0) | select.POLLHUP
return events_.items()
# Mac OS 平台使用 select.kqueue 接口,将其封装为 Kqueue 类
# kqueue 的接口和 poll 的接口相差比较大
class Kqueue():
def __init__(self):
# 初始化实例时,创建 kqueue 对象,等同于 poll 对象
self._kqueue = select.kqueue() # kernel queue 内核队列对象
# 保存被监听的临时套接字的字典,这个在 poll 和 epoll 里没有
# 因为在撤销监视套接字事件时,需要用到事件位掩码
# 字段的 key 是文件描述符,value 就是事件位掩码
self._active = {}
# 获取文件描述符,这个方法在该程序中没有用到
def fileno(self):
return self._kqueue.fileno()
# 注册临时套接字,监听某种事件,参数为文件描述符和事件位掩码
def register(self, fd, flag):
if fd not in self._active:
# _control 方法监听套接字的某种事件,参数为 KQ_EV_ADD
self._control(fd, flag, select.KQ_EV_ADD)
self._active[fd] = flag
# 注销文件描述符,不再监听对应的套接字事件
def unregister(self, fd):
events = self._active.pop(fd)
self._control(fd, events, select.KQ_EV_DELETE)
# 修改文件描述符注册的事件
# kqueue 没有 modify 方法,通过先注销再注册事件实现
def modify(self, fd, flag):
self.unregister(fd)
self.register(fd, flag)
# 该自定义方法使用 kqueue.control 方法来添加和删除事件监听
# 参数为套接字的文件描述符、事件位掩码、行为常数
# 行为常数有很多,这里用到两个:
# KQ_EV_ADD 值为 1 ,添加监听;KQ_EV_DELETE 值为 2 ,撤销监听
def _control(self, fd, flag, flags):
kevents = []
# 如果事件位掩码匹配可读
if flag & select.POLLIN:
# select.kevent 方法的返回值是内核事件,也就是需要添加或删除的事件
# 把返回值添加到 kevents 列表,后面使用 kqueue 的 control 方法处理
# 该方法的参数:套接字的文件描述符、过滤常数、行为常数
# 过滤常数是负整数,添加可读事件监听使用 select.KQ_FILTER_READ
# 其作用跟 poll.register 中的 select.POLLIN 是一样的
kevents.append(select.kevent(
fd, filter = select.KQ_FILTER_READ, flags=flags))
# 如果事件位掩码匹配可写
if flag & select.POLLOUT:
kevents.append(select.kevent(
fd, filter = select.KQ_FILTER_WRITE, flags=flags))
for kevent in kevents:
# control 方法处理 kevent 列表里的所有事件
# 第一个参数为内核事件的可迭代对象
# 第二个参数是最大事件数,必须是 0 或正整数
# 第二个参数如果为负会抛出异常,固定值 0 表示无限制
# 第三个参数 timeout 没有写
self._kqueue.control([kevent], 0)
# 监听事件,获取发生事件的文件描述符
def poll(self, timeout=None):
# kqueue 对象的 control 方法等同于 poll 的 poll 方法
# 监听已注册的套接字,该方法阻塞运行
# 直到有事件就绪,返回就绪事件的列表 kevents ,这和上面的 epoll 一样
kevents = self._kqueue.control(None, 1000, timeout)
# 这个字典用来保存处理后的就绪事件
events_ = {}
# 同 epool 一样,处理就绪事件是为了统一事件位掩码
for kevent in kevents:
# kevent.ident 为事件标识,属性值为对应的文件描述符
fd = kevent.ident
# kevent.filter 为过滤标识,属性值为过滤常数
if kevent.filter == select.KQ_FILTER_READ:
events_[fd] = events_.get(fd, 0) | select.POLLIN
if kevent.filter == select.KQ_FILTER_WRITE:
events_[fd] = events_.get(fd, 0) | select.POLLOUT
return events_.items()
# 关闭 kqueue 对象,作用不详
def close(self):
self._kqueue.close()
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setblocking(False)
server_sock.bind(('localhost', 1234))
server_sock.listen()
print('server listening !!!')
connections = {}
# 根据不同平台使用对应的接口
if hasattr(select, 'epoll'):
print('Linux')
p = Epoll()
elif hasattr(select, 'kqueue'):
print('Mac OS')
p = Kqueue()
# server_sock注册读事件
p.register(server_sock.fileno(), select.POLLIN)
while True:
time.sleep(1)
print('\n========循环开始========')
try:
events = p.poll()
except KeyboardInterrupt:
break
# 获取发生事件的文件描述符和相应的事件
for fd, flag in events:
print('--------套接字FD: {} 位掩码: {}'.format(fd, flag))
# 如果主套接字有事件就绪,一定是可读就绪,它只负责接收客户端连接
if fd == server_sock.fileno():
# 接收客户端的连接请求,创建新的临时套接字用来发送和接收数据
extension_sock, addr = server_sock.accept()
print('收到连接,客户端地址:', addr)
# 将新的临时套接字设置为非阻塞模式
extension_sock.setblocking(0)
# 注册新的临时套接字,监听其可读事件
p.register(extension_sock.fileno(), select.POLLIN)
# 把新建临时分支套接字加入到存储连接的字典里
connections[extension_sock.fileno()] = extension_sock
# 如果临时套接字可读事件就绪,也就是客户端发来数据
elif flag & select.POLLIN:
# 利用文件描述符从 connections 里获取对应的临时套接字
extension_sock = connections[fd]
# 接收数据这块儿需要注意,如果客户端突然关闭连接
# 对应的临时套接字会可读就绪,即事件位掩码为 1
data = extension_sock.recv(1024)
if data:
print('套接字 {} 收到数据: {}'.format(fd, data.decode()))
# 套接字收到数据后,可写事件立即就绪
# 转而监听其可写事件,下个 while 循环会处理
p.modify(fd, select.POLLOUT)
else:
# 如果收到的数据是 None ,可能客户端已关闭,连接已断开
# 注销文件描述符对应的套接字,不再监听相关事件
p.unregister(fd)
# 关闭套接字
extension_sock.close()
# 将套接字从 connections 里移出
del connections[fd]
print('套接字 {} 已关闭'.format(fd))
# 如果套接字可写事件就绪
if flag & select.POLLOUT:
print("套接字 {} 发送数据...".format(fd))
extension_sock = connections[fd]
# 向客户端发送数据
extension_sock.send('收到信息,这是模拟信息'.encode())
# 套接字向客户端发送消息后,poll 转为监听其可读事件
p.modify(fd, select.POLLIN)
# 如果客户端关闭,临时套接字的关闭事件会就绪
# 注意,关闭事件无法使用 poll.modify 方法主动设置,只能被触发
if flag & select.POLLHUP:
print('套接字 {} 已关闭'.format(fd))
# 注销文件描述符对应的套接字,不再监听相关事件
p.unregister(fd)
# 关闭套接字
extension_sock.close()
# 将套接字从 connections 里移出
del connections[fd]
print('--------connections: {}'.format(list(connections.keys())))
print('========循环结束========')
# 撤销对主套接字的事件监听
p.unregister(server_sock.fileno())
# 关闭系统调用对象,作用不详
# p.close()
# 关闭主套接字服务端
server_sock.close()
print('\nEnd')
2.4 select、poll、epoll 比较
名称 | 支持一个进程所能打开的最大连接数 |
---|---|
select | 单个进程支持的最大连接数由 FD_SETSIZE 控制,默认值为 1024 / 2048 ,虽然可以修改,但默认值已经是性能最佳的数量了 |
poll | 最大连接数无限制,但和 select 一样,连接数过大会导致性能下降 |
epoll | 最大连接数有限制,但数量很大,1G 内存支持大约十万个连接 |
名称 | FD 剧增后的效率表现 |
---|---|
select | 每次调用都会对连接的 FD 进行遍历,类似于 Python 列表,随着连接数增多,耗时呈线性增加 |
poll | 同上 |
epoll | 采用回调机制,只有已就绪的事件才会主动调用回调函数。连接活跃度低的情况下,性能较佳;连接活跃度高时,大量回调会导致性能回下降 |
名称 | 消息传递方式 |
---|---|
select | 内核将事件就绪消息传递到用户空间,用户空间执行代码控制内核将内核空间的数据复制到用户空间 |
poll | 同上 |
epoll | 内核和用户共用一块内存空间 |