深入浅出 tornado源码(2): 采用 epoll 代理构建高并发网络模型

我们在前篇文章中实现了一个简单的 client-server 模型,最后我们也抛出了一个疑问,这样的 server 非常低效,一次只能处理一个连接。当某个client 连接到 server 后,server 等待 client 发送数据给它,但是如果此时 client 并没有准备好数据,server 只能等待,反之亦然。这种等待的模式我们称之为“阻塞”,很显然阻塞的模式非常低效,但有没有非阻塞的模式呢?

1 阻塞和非阻塞

对于阻塞和非阻塞,网上有一个很形象的比喻,就是说好比你在等快递,阻塞模式就是快递如果不到,你就不能做其他事情。非阻塞模式就是在这段时间里面,你可以做其他事情,比如上网、打游戏、睡觉等,很显然非阻塞的模式会效率更高。
 非阻塞的模式也分两种,第一种就是忙轮询,因为你不知道快递什么时候来,所以你每5分钟就跟快递打一次电话进行询问,另外一种就是我们这篇文章讲的 epoll 模型,在等待快递到达的时间内,你尽可以做其他任何事情,包括睡觉,当快递到达时,你就会被告知。

那么阻塞在操作系统中到底是如何进行的呢?假设有一个管道,进程A为管道的写入方,B为管道的读出方。

管道示意图

 假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。
 但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。
 假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”
 也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。

这四个情形涵盖了四个I/O事件,缓冲区满,缓冲区空,缓冲区非空,缓冲区非满,这四个I/O事件是进行阻塞同步的根本。那么在我们的 client-server 模型是怎样发生阻塞的呢?
 socket 之间的通信就像这个管道,两端的 socket 会进入读取和写入。但请注意的是,写入仅仅表示数据被复制到了内核中的 TCP 发送缓冲区,至于什么时候发送到网络,什么时候被对方主机接收,什么时候被对方进程读取,系统调用层面不会给与任何通知。由于缓冲区的大小是有限的,当该socket的写入缓冲区满时会发生阻塞。所以,如果接收端进程从socket读数据的速度跟不上发送端进程向socket写数据的速度,会导致发送端write调用阻塞。
 读取阻塞则相对来说非常容易理解,就是该 socket 的读取缓冲区中没有数据时发生阻塞,通常是因为发送端的数据没有到达。如果想对缓冲区有一个感性的了解,可以在 Linux 下执行如下命令,查看本机 socket 的发送和读取缓冲区大小。如下图所示:

查看socket缓冲区

既然 socket 在读写的过程中会存在阻塞,那么如何进行非阻塞的socket 读写呢?很简单,我们可以记录所有这些流,通过写一个 for 循环把所有socket流从头到尾问一遍。但这样的做法显然不好,因为某些 socket 没有数据,则只会浪费 CPU 的时间。那怎么解决这个问题呢?答案就是引进一个 代理,通过代理来观察许多流的I/O 事件,在空闲的时候把当前线程阻塞掉,当一个或多个流有 I/O 事件时,就从阻塞态醒来,这个代理就是 select, poll 和 epoll 模型。

2 select, poll, epoll 代理

select 和 poll

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

while true {
    select(streams[])
    for i in streams[] {
        if i has data
        read until unavailable
    }
}

select的优点是支持目前几乎所有的平台,缺点主要有如下2个:
 1)单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
 2)select 所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
 poll则在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

epoll

epoll是Linux 2.6 开始出现的为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
 在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

伪代码如下:

while true {
    active_stream[] = epoll_wait(epollfd)
    for i in active_stream[] {
        read or write till
    }
}

3 采用 select 和 epoll 代理重构网络并发模型

通过上面的分析,我们知道采用代理可以一次处理多个连接,那么到底是如何实现的呢,我们以上节课的代码为基础,分别使用 select 和 epoll 代理进行重构,并比较它们之间的区别。先给出代码如下:

  1. select 代理实现的 server代码:server_select.py
#coding:utf-8
import socket
from time import ctime
import select
import Queue

HOST = ''
PORT = 21567
BUFSIZE = 1024
ADDR = ('127.0.0.1', PORT)

# 服务器端创建 socket
serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverSock.bind(ADDR)
serverSock.listen(5)

inputs = [serverSock]

outputs = []

timeout = 20

message_queues = {}

while inputs:
    print "doing select ..."

    readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)

    for s in readable:

        if s is serverSock:
            server2client_Sock, addr = serverSock.accept()
            print " Connection from "
            server2client_Sock.setblocking(0)
            inputs.append(server2client_Sock)

            message_queues[server2client_Sock] = Queue.Queue()

        else:
            server2client_Sock = s

            data = server2client_Sock.recv(BUFSIZE)

            # 如果数据接收完,则退出 recv, 进入到下一个连接
            if data:
                # server2client_Sock.send('[%s] %s' % (ctime(), data))
                print "Received data from ", server2client_Sock.getpeername()
                data = '[%s] %s' % (ctime(), data)

                message_queues[server2client_Sock].put(data)


                # 将建立连接的 socket 放入到可以写的 socket 列表中
                if server2client_Sock not in outputs:
                    outputs.append(server2client_Sock)
            else:
                if server2client_Sock in outputs:
                    outputs.remove(server2client_Sock)

                inputs.remove(server2client_Sock)

                server2client_Sock.close()

                del message_queues[server2client_Sock]

    if s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except Queue.Empty:
            print " " , s.getpeername() , 'queue empty'
            outputs.remove(s)
        else:
            print " sending " , next_msg , " to ", s.getpeername()
            s.send(next_msg)

serverSock.close()
  1. epoll代理实现的 server代码:server_epoll.py
#coding:utf-8
import socket
from time import ctime
import select
import Queue

HOST = ''
PORT = 21567
BUFSIZE = 1024
ADDR = ('127.0.0.1', PORT)

# 服务器端创建 socket
serverSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serverSock.bind(ADDR)
serverSock.listen(5)

timeout = 1000 # millisecond

message_queues = {}

# key state of socket io
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)


poller = select.poll()
poller.register(serverSock, READ_ONLY)

fd_to_sockets = {serverSock.fileno(): serverSock, }

while True:
    print "Waiting for next event ..."
    events = poller.poll(timeout)

    for fd, flag in events:
        s = fd_to_sockets[fd]
        if flag & (select.POLLIN | select.POLLPRI):
            if s is serverSock:
                server2client_Sock, addr = serverSock.accept()
                print "Connetion from ", addr
                server2client_Sock.setblocking(0)

                fd_to_sockets[server2client_Sock.fileno()] = server2client_Sock
                poller.register(server2client_Sock, READ_ONLY)

                message_queues[server2client_Sock] = Queue.Queue()

            else:
                server2client_Sock = s

                data = server2client_Sock.recv(BUFSIZE)

                if data:
                    print "Received data from ", server2client_Sock.getpeername()
                    data = '[%s] %s' % (ctime(), data)

                    message_queues[server2client_Sock].put(data)


                    # 将建立连接的 socket 放入到可以写的 socket 列表中
                    poller.modify(server2client_Sock, READ_WRITE)
                else:
                    poller.unregister(server2client_Sock)
                    server2client_Sock.close()
                    del message_queues[server2client_Sock]
        else:
            try:
                next_msg = message_queues[s].get_nowait()
            except Queue.Empty:
                print " " , s.getpeername() , 'queue empty'
                poller.modify(s, READ_ONLY)
            else:
                print " sending " , next_msg , " to ", s.getpeername()
                s.send(next_msg)

serverSock.close()

对于 select 代理最核心的调用就是

readable, writable, exceptional = select.select(inputs, outputs, inputs, timeout)

该调用将可读可写的socket存储到 readable 和 writable 列表中,从而我们可以直接调用这些 socket的 recv 和 send 时不会发生阻塞。注意除了 serverSock 只读以外,其他 socket 都会存在同时存在于 inputs 和 outputs 列表中。

对于 epoll 代理最核心的就是

events = poller.poll(timeout)

该调用不需要输入观察的 socket,它是之前通过 register 来指定的。和 select 模式中的代码一样,这些 socket 都是可读可写的,通过如下代码实现:

poller.modify(server2client_Sock, READ_WRITE)

4 问题分析

至此使用 epoll 代理重构我们的 server已经做完了,大家可以同时运行多个 client.py 进行交互,我们可以观察到该 server 具备了同时处理多个客户端连接的能力,而且所有的 socket 都是非阻塞的。
  然而,对于这样一个简单的服务(将客户端发送的数据加上时间戳返回)我们的代码结构看起来已经非常复杂,所以如果要处理我们实际业务中的逻辑处理简直是无法想象。
 另外,对于超高的并发请求,仅仅采用 epoll 模型是不够的,我们还必须使用多进程多线程等方式来充分利用系统资源。
 关于后面这个问题会是本系列文章重点讨论的部分,也是tornado 源码中的核心部分,我们会在稍微后面文章中去讨论。接下来的几篇文章中我们会尝试先梳理代码结构的问题,真正进入到 tornado 源码的部分,看看 tornado 的如何进行架构的。与其他torando源码文章不同的是,我们不会直接按模块挨个挨个分析代码,这种方式既晦涩难懂又非常低效,而是尝试从零开始搭建它,也就是说我们会仿造 tornado 的框架结构,从最简单开始,一步一步去实现 tornado,从而可以从内部逐步理清 tornado 这个优秀高并发框架的脉络。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,378评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,356评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,702评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,259评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,263评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,036评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,349评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,979评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,469评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,938评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,059评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,703评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,257评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,262评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,501评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,792评论 2 345

推荐阅读更多精彩内容

  • 本文摘抄自linux基础编程 IO概念 Linux的内核将所有外部设备都可以看做一个文件来操作。那么我们对与外部设...
    VD2012阅读 1,017评论 0 2
  • 转自: http://www.jianshu.com/p/486b0965c296 http://www.jia...
    demop阅读 3,874评论 1 21
  • 上一篇《聊聊同步、异步、阻塞与非阻塞》[https://www.jianshu.com/p/aed6067eeac...
    七寸知架构阅读 139,828评论 57 445
  • 本文摘抄自linux基础编程 IO概念 Linux的内核将所有外部设备都可以看做一个文件来操作。那么我们对与外部设...
    lintong阅读 1,564评论 0 4
  • CSS继承:子元素继承父元素的属性样式。 一、无继承性的属性 1、display:规定元素应该生成的框的类型 2、...
    辉夜乀阅读 271评论 0 0