前两篇文章介绍了 Python 中 UDP 套接字和 TCP 套接字,并在此基础上实现了简单的客户端和服务端,本文接着介绍服务端多任务处理的几种实现方式,算是对这方面知识的一个总结,本文将介绍以下几种实现方式:
- 多进程服务端
- 多线程服务端
- 单进程/线程非阻塞服务端
- select 实现 IO 多路复用型服务端
- epoll 事件订阅型服务端
- gevent 协程型服务端
客户端编写
在开始编写服务端之前,首先准备一份通用的客户端代码,后面创建的服务端都使用这份代码测试:
from socket import *
def main():
cSocket = socket(AF_INET, SOCK_STREAM)
cSocket.connect(("192.168.2.142",3001))
while True:
msg = input("Enter Message:")
if msg == "q!":
break
else:
cSocket.send(msg.encode("utf-8"))
cSocket.close()
if __name__ == '__main__':
main()
多进程型服务端
多进程型服务端的特点是每收到一个 Socket 连接,就为其创建一个独立的进程进行服务:
from socket import *
from multiprocessing import Process
# Server 类负责接收客户端请求,并未每个客户端创建套接字
class Server():
@classmethod
def __prepareSocket(cls):
cls.sSocket = socket(AF_INET, SOCK_STREAM)
cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
cls.sSocket.bind(("",3001))
cls.sSocket.listen(5)
@classmethod
def startServer(cls):
cls.__prepareSocket()
while True:
# 监听客户端请求
clientSocket,clientAddr = cls.sSocket.accept()
print("%s 已连入,正在接受消息..."%clientAddr[1])
# 创建 SocketHander 实例,并将新创建的套接字传入
cp = SocketHander(clientSocket,clientAddr)
cp.start()
# SocketHander 类用来为每一个客户端提供独立的服务
class SocketHander(Process):
def __init__(self,clientSocket,clientAddr):
Process.__init__(self)
self.clientSocket = clientSocket
self.clientAddr = clientAddr
def run(self):
# 监听异常信息
try:
while True:
recvMsg = self.clientSocket.recv(1024)
print("%s:%s"%(self.clientAddr[0],recvMsg.decode("utf-8")))
self.clientSocket.send("ding~".encode("utf-8"))
except:
print("%s 已断开连接~"%self.clientAddr[0])
finally:
self.clientSocket.close()
if __name__ == '__main__':
Server.startServer()
多线程型服务器
只需对上面的代码做一丁点儿修改,就可以实现一个多线程服务器:
from socket import *
from threading import Thread
# Server 类负责接收客户端请求,并未每个客户端创建套接字
class Server():
@classmethod
def __prepareSocket(cls):
cls.sSocket = socket(AF_INET, SOCK_STREAM)
cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
cls.sSocket.bind(("",3001))
cls.sSocket.listen(5)
@classmethod
def startServer(cls):
cls.__prepareSocket()
while True:
# 监听客户端请求
clientSocket,clientAddr = cls.sSocket.accept()
print("%s 已连入,正在接受消息..."%clientAddr[1])
# 创建 SocketHander 实例,并将新创建的套接字传入
cp = SocketHander(clientSocket,clientAddr)
cp.start()
# SocketHander 类用来为每一个客户端提供独立的服务
class SocketHander(Thread):
def __init__(self,clientSocket,clientAddr):
Thread.__init__(self)
self.clientSocket = clientSocket
self.clientAddr = clientAddr
def run(self):
# 监听异常信息
try:
while True:
recvMsg = self.clientSocket.recv(1024)
print("%s:%s"%(self.clientAddr[0],recvMsg.decode("utf-8")))
self.clientSocket.send("ding~".encode("utf-8"))
except:
print("%s 已断开连接~"%self.clientAddr[0])
finally:
self.clientSocket.close()
if __name__ == '__main__':
Server.startServer()
单进程/线程非阻塞服务端
前面的代码中为什么要使用多进程或者多线程呢?这是因为 Socket 对象的 accept
方法和 recv
方法是阻塞的,如果将二者放在一个进程或线程中,势必会造成阻塞。于是我们把 accept
方法放在一个进程/线程中执行,将 recv
方法放在另一个进程/线程中执行,就避免了阻塞。
如果 accpet
和 recv
方法是不阻塞的,不就可以解决这个问题了吗?是的,我们可以在创建套接字后,调用其的 setblocking
方法,传入一个参数 False
,这时这两个方法就不阻塞了。
另外,客户端连接成功后不一定立马向服务端发送消息,因此我们并不能确定合适调用 Socket 对象的 recv
方法,为了解决这个问题,我们可以在客户端连接成功后,将创建好的客户端 Socket 装入一个列表中,然后每隔一段时间遍历此列表即可。
下面是实现代码:
from socket import *
from time import sleep
class Server():
# 存放客户端 Socket 对象
clientSockets = []
@classmethod
def __prepareSocket(cls):
cls.sSocket = socket(AF_INET, SOCK_STREAM)
cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
# 将服务端 socket 设置为非阻塞
cls.sSocket.setblocking(False)
cls.sSocket.bind(("",3001))
cls.sSocket.listen(5)
@classmethod
def startServer(cls):
cls.__prepareSocket()
# 轮询 检查有没有新的连接
while True:
# 监听客户端请求
try:
clientSocket,clientAddr = cls.sSocket.accept()
except:
pass
else:
print("%s 已连入,正在接受消息..."%clientAddr[1])
# 如果不发生异常,说明有了新的连接,将新建的 Socket 对象设置为非阻塞
clientSocket.setblocking(False)
# 将此对象添加到列表中,等待轮询
cls.clientSockets.append((clientSocket,clientAddr))
# 每个 0.3 秒遍历一次列表,判断是否能接受消息
sleep(0.3)
cls.__handleSocket()
@classmethod
def __handleSocket(cls):
for clientSocket,clientAddr in cls.clientSockets:
try:
recvMsg = clientSocket.recv(1024)
except:
pass
else:
if not len(recvMsg):
print("%s 已断开连接..."%clientAddr[1])
# 关闭客户端 Socket
clientSocket.close()
# 断开连接后,从列表中移除该 Socket 对象
cls.clientSockets.remove((clientSocket,clientAddr))
else:
clientSocket.send("ding~".encode("utf-8"))
print("%s:%s"%(clientAddr[1],recvMsg.decode("utf-8")))
if __name__ == '__main__':
Server.startServer()
需要注意的是,将 Socket 对象设置为非阻塞后,在使用其的 accept
和 recv
方法时需要加上异常处理,这是因为使用非阻塞 Socket 后,轮询时如果没有新的连接或者没有客户端发送消息,将会引发异常,需要我们进行异常捕获。
select 实现 IO 多路复用型服务端
上面我们采用非阻塞 Socket 和轮询列表的方式实现了一个单进程/线程的非阻塞服务器,其实,操作系统底层也提供了一个 select
模块,用来帮我们检测哪些套接字发生了变化,可以对其进行操作,由于是操作系统底层帮我们完成的,效率上比上面的手动循环更高。
select
的用法很简单,我们只需调用 select
模块中的 select
方法,该方法接收三个列表作为参数,并对这三个列表进行监听。这三个列表依次为可读的套接字列表、可写的套接字列表、异常套接字列表。
select
方法是阻塞的,在其接收的三个列表中有状态变化时,它会返回三个列表,列表中的元素是发生了状态变化的元素,返回的三个列表一次对应于发生变化的刻度套接字列表、发生变化的可写套接字列表和发生变化的异常套接字列表。
基本用法:
readble,writeable,exceptional = select(readbleList, writeableList, exceptionalList)
下面看一个例子:
from socket import *
from select import select
class Server():
# 存放客户端 Socket 对象
readableSocketsList = []
@classmethod
def __prepareSocket(cls):
cls.sSocket = socket(AF_INET, SOCK_STREAM)
cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
cls.sSocket.bind(("",3001))
cls.sSocket.listen(5)
cls.readableSocketsList.append(cls.sSocket)
@classmethod
def startServer(cls):
cls.__prepareSocket()
# 轮询 检查有没有新的连接
while True:
readableSockets,writeableSockets,exceptionalSocket = select(cls.readableSocketsList,[],[])
cls.__handleSocket(readableSockets)
@classmethod
def __handleSocket(cls,readableSockets):
for sock in readableSockets:
# 如果是 sSocket 发生了变化,说明有新的客户端连接
if sock == cls.sSocket:
clientSocket,clientAddr = cls.sSocket.accept()
print("%s 已连入..."%clientAddr[1])
# 将新建的客户端 Sockets 对象存入 readableSocketsList 中
cls.readableSocketsList.append(clientSocket)
# 如果不是 sSocket 发生了变化,这说明有客户端向服务端发送了消息
else:
try:
recvMsg = sock.recv(1024)
# 判断客户端是否断开了了解
if not len(recvMsg):
sock.close()
cls.readableSocketsList.remove(sock)
else:
# 向客户端回执消息
sock.send("ding~".encode("utf-8"))
print("%s:%s"%(sock.getpeername()[1],recvMsg.decode("utf-8")))
except:
pass
if __name__ == '__main__':
Server.startServer()
上面我们给 select
函数传入一个 readableSocketsList
列表,当此列表中有 Socket 对象发生了状态变化时,我们会立马得到一个发生了变化的 Socket 对象的列表,可以对此列表中的 Socket 对象进行操作。
epoll 事件订阅型服务端
上面我们使用 select
模块完成了一个IO多路复用,对套接字对象变化的检测是操作系统内部检测的,但本质上仍然是对套接字列表进行遍历操作,效率并不高,并且使用 select
实现并发时还有并发量限制,一般 32 位机器是 1024 个,64 位机器是 2048 个。
select
的作用是什么呢?无非就是通过对套接字列表的遍历,找出发生状态变化的套接字,那么我们也可以换一种方式:不主动遍历套接字,而是在哪个套接字发生变化的时候再通知系统,系统拿到变化的套接字后再通知我们的 Python 程序,效率自然更高。
epoll
的使用仍然依赖于 select
,在使用 epoll
前需要创建一个 epoll
对象:
epoll = select.epoll()
然后向 epoll
中注册事件:
epoll.register( Socket 对象的文件描述符, 操作方式 )
获取 Socket 对象的文件描述符:
fno = socketObj.fileno()
epoll
对文件描述符的操作是由三个常量来表示的:
- select.EPOLLIN (可读)
- select.EPOLLOUT (可写)
- select.EPOLLET (ET模式)
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:
- LT模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll时,会再次响应应用程序并通知此事件。
- ET模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll时,不会再次响应应用程序并通知此事件。
from socket import *
import select
class Server():
# 存放已连接的客户端 Socket 对象
clientSocktsList = {}
# 存放客户端的地址信息
clietnAddrList = {}
@classmethod
def __prepareSocket(cls):
cls.sSocket = socket(AF_INET, SOCK_STREAM)
cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
cls.sSocket.bind(("",3001))
cls.sSocket.listen(5)
# 创建 epoll 对象
cls.epoll = select.epoll()
# 将服务端 Socket 添加到 epoll 事件监听
cls.epoll.register(cls.sSocket.fileno(),select.EPOLLIN | select.EPOLLET)
@classmethod
def startServer(cls):
cls.__prepareSocket()
# 进行轮询操作,处理事件响应
while True:
# poll 方法是阻塞方法
# 当有套接字发生状态变化时,会通过时间通知操作系统,操作系统将这些发生变化的套接字对象返还给程序
socketList = cls.epoll.poll()
cls.__handleSocket(socketList)
@classmethod
def __handleSocket(cls,socketList):
for fno,event in socketList:
# 如果文件描述符等于服务端 Socket 对象的文件描述符,说明有新的客户端连接
if fno == cls.sSocket.fileno():
clientSocket,clientAddr = cls.sSocket.accept()
print("%s 已连接..."%clientAddr[1])
# 将客户端 Socket 对象和客户端地址信息添加到 clientSocktsList 和 clietnAddrList
# 以该套接字的文件描述符作为key
socketFno = clientSocket.fileno()
cls.clientSocktsList[socketFno] = clientSocket
cls.clietnAddrList[socketFno] = clientAddr
# 将新建的客户端 Socket 对象注册到 epoll 事件监听
cls.epoll.register(clientSocket.fileno(),select.EPOLLIN | select.EPOLLET)
# 如果 Socket 对象的文件描述符和服务端 Socket 对象不一致,说明客户端向服务端发送了消息
# 判断 event 类型,作出相应的处理
elif event == select.EPOLLIN:
clientSocket = cls.clientSocktsList[fno]
addr = cls.clietnAddrList[fno][1]
try:
recvMsg = clientSocket.recv(1024)
if not len(recvMsg):
print("%s 已断开连接"%addr)
clientSocket.close()
del cls.clientSocktsList[fno]
else:
print("%s:%s"%(addr,recvMsg.decode("utf-8")))
except:
pass
if __name__ == '__main__':
Server.startServer()
可见,使用 epoll
和使用 select
的处理方式都是一致:获取到发生了变化的套接字列表,然后进行相应的处理。区别只在于操作系统内部对于 epoll
和 select
的处理方式的不同。
gevent 协程型服务器
我们也可以 gevent
这个协程库来实现一个多任务处理的服务器,首先需要安装 gevent
:
pip install gevent
需要注意,使用 gevent
实现服务器时,需要使用 gevent
库提供的 socket
,而不是系统自带的 socket
。
代码如下:
from gevent import socket,monkey,spawn
# 使用 gevent 在执行代码之前,需要首先调用 monkey 下的 patch_all 方法
monkey.patch_all()
class Server():
@classmethod
def __prepareSocket(cls):
# 使用 genvent 提供的 socket
cls.sSocket = socket.socket()
# 将服务端 socket 设置为非阻塞
cls.sSocket.bind(("",3001))
cls.sSocket.listen(5)
@classmethod
def startServer(cls):
cls.__prepareSocket()
while True:
clientSocket,clientAddr = cls.sSocket.accept()
print("%s 已连接..."%clientAddr[1])
# 处理连接,需要使用 gevent 的 spawn 方法调用
spawn(cls.__handleSocket,clientSocket,clientAddr)
@classmethod
def __handleSocket(cls,clientSocket,clientAddr):
while True:
try:
recvMsg = clientSocket.recv(1024)
if not len(recvMsg):
print("%s 已断开连接..."%clientAddr[1])
clientSocket.close()
break
else:
print("%s:%s"%(clientAddr[1],recvMsg.decode("utf-8")))
except:
pass
if __name__ == '__main__':
Server.startServer()
使用 gevent
协程实现多任务服务器有以下几个注意点:
- 需要使用
gevent
模块中提供的socket
- 需要使用
gevent
模块中的spawn
函数去调用目标函数 - 在代码执行之前需要调用
monkey
下的patch_all
方法
总结
本文介绍了 Python 中实现多任务服务器的几种常用方式,包括多进程/多线程实现、非阻塞 Socket 实现、select 实现、epoll 实现和 gevent 实现。算是一个记录,以后忘记了可以随时回头查阅。
完。