多任务服务器的几种实现方式

前两篇文章介绍了 Python 中 UDP 套接字和 TCP 套接字,并在此基础上实现了简单的客户端和服务端,本文接着介绍服务端多任务处理的几种实现方式,算是对这方面知识的一个总结,本文将介绍以下几种实现方式:

  • 多进程服务端
  • 多线程服务端
  • 单进程/线程非阻塞服务端
  • select 实现 IO 多路复用型服务端
  • epoll 事件订阅型服务端
  • gevent 协程型服务端

客户端编写

在开始编写服务端之前,首先准备一份通用的客户端代码,后面创建的服务端都使用这份代码测试:

from socket import *

def main():
    cSocket = socket(AF_INET, SOCK_STREAM)
    cSocket.connect(("192.168.2.142",3001))
    while True:
        msg = input("Enter Message:")
        if msg == "q!":
            break
        else:
            cSocket.send(msg.encode("utf-8"))

    cSocket.close()

if __name__ == '__main__':
    main()

多进程型服务端

多进程型服务端的特点是每收到一个 Socket 连接,就为其创建一个独立的进程进行服务:

from socket import *
from multiprocessing import Process

# Server 类负责接收客户端请求,并未每个客户端创建套接字
class Server():
    @classmethod
    def __prepareSocket(cls):
        cls.sSocket = socket(AF_INET, SOCK_STREAM)
        cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        cls.sSocket.bind(("",3001))
        cls.sSocket.listen(5)

    @classmethod
    def startServer(cls):
        cls.__prepareSocket()
        while True:
            # 监听客户端请求
            clientSocket,clientAddr = cls.sSocket.accept()
            print("%s 已连入,正在接受消息..."%clientAddr[1])
            # 创建 SocketHander 实例,并将新创建的套接字传入
            cp = SocketHander(clientSocket,clientAddr)
            cp.start()

# SocketHander 类用来为每一个客户端提供独立的服务
class SocketHander(Process):
    def __init__(self,clientSocket,clientAddr):
        Process.__init__(self)
        self.clientSocket = clientSocket
        self.clientAddr = clientAddr

    def run(self):
        # 监听异常信息
        try:
            while True:
                recvMsg = self.clientSocket.recv(1024)
                print("%s:%s"%(self.clientAddr[0],recvMsg.decode("utf-8")))
                self.clientSocket.send("ding~".encode("utf-8"))
        except:
            print("%s 已断开连接~"%self.clientAddr[0])
        finally:
            self.clientSocket.close()

if __name__ == '__main__':
    Server.startServer()

多线程型服务器

只需对上面的代码做一丁点儿修改,就可以实现一个多线程服务器:

from socket import *
from threading import Thread

# Server 类负责接收客户端请求,并未每个客户端创建套接字
class Server():
    @classmethod
    def __prepareSocket(cls):
        cls.sSocket = socket(AF_INET, SOCK_STREAM)
        cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        cls.sSocket.bind(("",3001))
        cls.sSocket.listen(5)

    @classmethod
    def startServer(cls):
        cls.__prepareSocket()
        while True:
            # 监听客户端请求
            clientSocket,clientAddr = cls.sSocket.accept()
            print("%s 已连入,正在接受消息..."%clientAddr[1])
            # 创建 SocketHander 实例,并将新创建的套接字传入
            cp = SocketHander(clientSocket,clientAddr)
            cp.start()

# SocketHander 类用来为每一个客户端提供独立的服务
class SocketHander(Thread):
    def __init__(self,clientSocket,clientAddr):
        Thread.__init__(self)
        self.clientSocket = clientSocket
        self.clientAddr = clientAddr

    def run(self):
        # 监听异常信息
        try:
            while True:
                recvMsg = self.clientSocket.recv(1024)
                print("%s:%s"%(self.clientAddr[0],recvMsg.decode("utf-8")))
                self.clientSocket.send("ding~".encode("utf-8"))
        except:
            print("%s 已断开连接~"%self.clientAddr[0])
        finally:
            self.clientSocket.close()

if __name__ == '__main__':
    Server.startServer()

单进程/线程非阻塞服务端

前面的代码中为什么要使用多进程或者多线程呢?这是因为 Socket 对象的 accept 方法和 recv 方法是阻塞的,如果将二者放在一个进程或线程中,势必会造成阻塞。于是我们把 accept 方法放在一个进程/线程中执行,将 recv 方法放在另一个进程/线程中执行,就避免了阻塞。
如果 accpetrecv 方法是不阻塞的,不就可以解决这个问题了吗?是的,我们可以在创建套接字后,调用其的 setblocking 方法,传入一个参数 False,这时这两个方法就不阻塞了。
另外,客户端连接成功后不一定立马向服务端发送消息,因此我们并不能确定合适调用 Socket 对象的 recv 方法,为了解决这个问题,我们可以在客户端连接成功后,将创建好的客户端 Socket 装入一个列表中,然后每隔一段时间遍历此列表即可。
下面是实现代码:

from socket import *
from time import sleep

class Server():
    # 存放客户端 Socket 对象
    clientSockets = []

    @classmethod
    def __prepareSocket(cls):
        cls.sSocket = socket(AF_INET, SOCK_STREAM)
        cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        # 将服务端 socket 设置为非阻塞
        cls.sSocket.setblocking(False)
        cls.sSocket.bind(("",3001))
        cls.sSocket.listen(5)
    @classmethod
    def startServer(cls):
        cls.__prepareSocket()
        # 轮询 检查有没有新的连接
        while True:
            # 监听客户端请求
            try:
                clientSocket,clientAddr = cls.sSocket.accept()
            except:
                pass
            else:
                print("%s 已连入,正在接受消息..."%clientAddr[1])
                # 如果不发生异常,说明有了新的连接,将新建的 Socket 对象设置为非阻塞
                clientSocket.setblocking(False)
                # 将此对象添加到列表中,等待轮询
                cls.clientSockets.append((clientSocket,clientAddr))
            # 每个 0.3 秒遍历一次列表,判断是否能接受消息
            sleep(0.3)
            cls.__handleSocket()

    @classmethod
    def __handleSocket(cls):
        for clientSocket,clientAddr in cls.clientSockets:
            try:
                recvMsg = clientSocket.recv(1024)
            except:
                pass
            else:
                if not len(recvMsg):
                    print("%s 已断开连接..."%clientAddr[1])
                    # 关闭客户端 Socket
                    clientSocket.close()
                    # 断开连接后,从列表中移除该 Socket 对象
                    cls.clientSockets.remove((clientSocket,clientAddr))
                else:
                    clientSocket.send("ding~".encode("utf-8"))
                    print("%s:%s"%(clientAddr[1],recvMsg.decode("utf-8")))

if __name__ == '__main__':
    Server.startServer()

需要注意的是,将 Socket 对象设置为非阻塞后,在使用其的 acceptrecv 方法时需要加上异常处理,这是因为使用非阻塞 Socket 后,轮询时如果没有新的连接或者没有客户端发送消息,将会引发异常,需要我们进行异常捕获。

select 实现 IO 多路复用型服务端

上面我们采用非阻塞 Socket 和轮询列表的方式实现了一个单进程/线程的非阻塞服务器,其实,操作系统底层也提供了一个 select 模块,用来帮我们检测哪些套接字发生了变化,可以对其进行操作,由于是操作系统底层帮我们完成的,效率上比上面的手动循环更高。
select 的用法很简单,我们只需调用 select 模块中的 select 方法,该方法接收三个列表作为参数,并对这三个列表进行监听。这三个列表依次为可读的套接字列表、可写的套接字列表、异常套接字列表。
select 方法是阻塞的,在其接收的三个列表中有状态变化时,它会返回三个列表,列表中的元素是发生了状态变化的元素,返回的三个列表一次对应于发生变化的刻度套接字列表、发生变化的可写套接字列表和发生变化的异常套接字列表。
基本用法:

readble,writeable,exceptional = select(readbleList, writeableList, exceptionalList)

下面看一个例子:

from socket import *
from select import select

class Server():
    # 存放客户端 Socket 对象
    readableSocketsList = []

    @classmethod
    def __prepareSocket(cls):
        cls.sSocket = socket(AF_INET, SOCK_STREAM)
        cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        cls.sSocket.bind(("",3001))
        cls.sSocket.listen(5)
        cls.readableSocketsList.append(cls.sSocket)
    @classmethod
    def startServer(cls):
        cls.__prepareSocket()
        # 轮询 检查有没有新的连接
        while True:
            readableSockets,writeableSockets,exceptionalSocket = select(cls.readableSocketsList,[],[])
            cls.__handleSocket(readableSockets)

    @classmethod
    def __handleSocket(cls,readableSockets):
        for sock in readableSockets:
            # 如果是 sSocket 发生了变化,说明有新的客户端连接
            if sock == cls.sSocket:
                clientSocket,clientAddr = cls.sSocket.accept()
                print("%s 已连入..."%clientAddr[1])
                # 将新建的客户端 Sockets 对象存入 readableSocketsList 中
                cls.readableSocketsList.append(clientSocket)
            # 如果不是 sSocket 发生了变化,这说明有客户端向服务端发送了消息
            else:
                try:
                    recvMsg = sock.recv(1024)
                    # 判断客户端是否断开了了解
                    if not len(recvMsg):
                        sock.close()
                        cls.readableSocketsList.remove(sock)
                    else:
                        # 向客户端回执消息
                        sock.send("ding~".encode("utf-8"))
                        print("%s:%s"%(sock.getpeername()[1],recvMsg.decode("utf-8")))
                except:
                    pass

if __name__ == '__main__':
    Server.startServer()

上面我们给 select 函数传入一个 readableSocketsList 列表,当此列表中有 Socket 对象发生了状态变化时,我们会立马得到一个发生了变化的 Socket 对象的列表,可以对此列表中的 Socket 对象进行操作。

epoll 事件订阅型服务端

上面我们使用 select 模块完成了一个IO多路复用,对套接字对象变化的检测是操作系统内部检测的,但本质上仍然是对套接字列表进行遍历操作,效率并不高,并且使用 select 实现并发时还有并发量限制,一般 32 位机器是 1024 个,64 位机器是 2048 个。
select 的作用是什么呢?无非就是通过对套接字列表的遍历,找出发生状态变化的套接字,那么我们也可以换一种方式:不主动遍历套接字,而是在哪个套接字发生变化的时候再通知系统,系统拿到变化的套接字后再通知我们的 Python 程序,效率自然更高。
epoll 的使用仍然依赖于 select,在使用 epoll 前需要创建一个 epoll 对象:

epoll = select.epoll()

然后向 epoll 中注册事件:

epoll.register( Socket 对象的文件描述符, 操作方式 )

获取 Socket 对象的文件描述符:

fno = socketObj.fileno()

epoll 对文件描述符的操作是由三个常量来表示的:

  • select.EPOLLIN (可读)
  • select.EPOLLOUT (可写)
  • select.EPOLLET (ET模式)

epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。LT模式是默认模式,LT模式与ET模式的区别如下:

  • LT模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll时,会再次响应应用程序并通知此事件。
  • ET模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll时,不会再次响应应用程序并通知此事件。
from socket import *
import select

class Server():
    # 存放已连接的客户端 Socket 对象
    clientSocktsList = {}
    # 存放客户端的地址信息
    clietnAddrList = {}

    @classmethod
    def __prepareSocket(cls):
        cls.sSocket = socket(AF_INET, SOCK_STREAM)
        cls.sSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        cls.sSocket.bind(("",3001))
        cls.sSocket.listen(5)
        # 创建 epoll 对象
        cls.epoll = select.epoll()
        # 将服务端 Socket 添加到 epoll 事件监听
        cls.epoll.register(cls.sSocket.fileno(),select.EPOLLIN | select.EPOLLET)

    @classmethod
    def startServer(cls):
        cls.__prepareSocket()
        # 进行轮询操作,处理事件响应
        while True:
            # poll 方法是阻塞方法
            # 当有套接字发生状态变化时,会通过时间通知操作系统,操作系统将这些发生变化的套接字对象返还给程序
            socketList = cls.epoll.poll()
            cls.__handleSocket(socketList)


    @classmethod
    def __handleSocket(cls,socketList):
        for fno,event in socketList:
            # 如果文件描述符等于服务端 Socket 对象的文件描述符,说明有新的客户端连接
            if fno == cls.sSocket.fileno():
                clientSocket,clientAddr = cls.sSocket.accept()
                print("%s 已连接..."%clientAddr[1])
                # 将客户端 Socket 对象和客户端地址信息添加到 clientSocktsList 和 clietnAddrList
                # 以该套接字的文件描述符作为key
                socketFno = clientSocket.fileno()
                cls.clientSocktsList[socketFno] = clientSocket
                cls.clietnAddrList[socketFno] = clientAddr
                # 将新建的客户端 Socket 对象注册到 epoll 事件监听
                cls.epoll.register(clientSocket.fileno(),select.EPOLLIN | select.EPOLLET)
            
            # 如果 Socket 对象的文件描述符和服务端 Socket 对象不一致,说明客户端向服务端发送了消息
            # 判断 event 类型,作出相应的处理
            elif event == select.EPOLLIN:
                clientSocket = cls.clientSocktsList[fno]
                addr = cls.clietnAddrList[fno][1]
                try:
                    recvMsg = clientSocket.recv(1024)
                    if not len(recvMsg):
                        print("%s 已断开连接"%addr)
                        clientSocket.close()
                        del cls.clientSocktsList[fno]
                    else:
                        print("%s:%s"%(addr,recvMsg.decode("utf-8")))
                except:
                    pass

if __name__ == '__main__':
    Server.startServer()

可见,使用 epoll 和使用 select 的处理方式都是一致:获取到发生了变化的套接字列表,然后进行相应的处理。区别只在于操作系统内部对于 epollselect 的处理方式的不同。

gevent 协程型服务器

我们也可以 gevent 这个协程库来实现一个多任务处理的服务器,首先需要安装 gevent

pip install gevent

需要注意,使用 gevent 实现服务器时,需要使用 gevent 库提供的 socket,而不是系统自带的 socket
代码如下:

from gevent import socket,monkey,spawn
# 使用 gevent 在执行代码之前,需要首先调用 monkey 下的 patch_all 方法
monkey.patch_all()

class Server():
    @classmethod
    def __prepareSocket(cls):
        # 使用 genvent 提供的 socket
        cls.sSocket = socket.socket()
        # 将服务端 socket 设置为非阻塞
        cls.sSocket.bind(("",3001))
        cls.sSocket.listen(5)

    @classmethod
    def startServer(cls):
        cls.__prepareSocket()
        while True:
            clientSocket,clientAddr = cls.sSocket.accept()
            print("%s 已连接..."%clientAddr[1])
            # 处理连接,需要使用 gevent 的 spawn 方法调用
            spawn(cls.__handleSocket,clientSocket,clientAddr)

    @classmethod
    def __handleSocket(cls,clientSocket,clientAddr):
        while True:
            try:
                recvMsg = clientSocket.recv(1024)
                if not len(recvMsg):
                    print("%s 已断开连接..."%clientAddr[1])
                    clientSocket.close()
                    break
                else:
                    print("%s:%s"%(clientAddr[1],recvMsg.decode("utf-8")))
            except:
                pass

if __name__ == '__main__':
    Server.startServer()

使用 gevent 协程实现多任务服务器有以下几个注意点:

  • 需要使用 gevent 模块中提供的 socket
  • 需要使用 gevent 模块中的 spawn 函数去调用目标函数
  • 在代码执行之前需要调用 monkey 下的 patch_all 方法

总结

本文介绍了 Python 中实现多任务服务器的几种常用方式,包括多进程/多线程实现、非阻塞 Socket 实现、select 实现、epoll 实现和 gevent 实现。算是一个记录,以后忘记了可以随时回头查阅。

完。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容