gunicorn源码阅读

入口

首先程序的入口为gunicorn/app/wsgiapp这个模块。

def run():
    """\
    The ``gunicorn`` command line runner for launching Gunicorn with
    generic WSGI applications.
    """
    from gunicorn.app.wsgiapp import WSGIApplication
    WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]").run()


if __name__ == '__main__':
    run()

WSGIApplication这个类继承于Application,然后继承于BaseApplication.而且这三个类只有BaseApplication是有构造函数的。

def __init__(self, usage=None, prog=None):
    self.usage = usage
    self.cfg = None
    self.callable = None
    self.prog = prog
    self.logger = None
    self.do_load_config()

这里面useage, prog就是两个字符串,忽略,其他的下面分析。赋值完后进入do_load_config方法。这个方法做了两件事,第一件是将一个Config对象赋值给self.cfg参数。这个对象可以从命令行中解析参数,将一些配置绑定。第二件是调用一个在Application中才实现的方法load_config。这个方法通过各种途径将参数绑定到cfg对象中,其中包括调用一次WSGIApplicagion的init方法,同样也是绑定相关参数。

但这里有个比较神奇的技巧,关于cfg的,一开始没看懂,看到后来发现cfg中包含了很多可以使用的方法,却不知道是什么时候偷偷绑定上来的。现在来仔细看一下,之前说过了,cfg就是一个Config对象。

KNOWN_SETTINGS = []

def make_settings(ignore=None):
    settings = {}
    ignore = ignore or ()
    for s in KNOWN_SETTINGS:
        setting = s()
        if setting.name in ignore:
            continue
        settings[setting.name] = setting.copy()
    return settings

class Config(object):

    def __init__(self, usage=None, prog=None):
        self.settings = make_settings()
        ...

目前来看,KNOWN_SETTINGS是一个空列表,所以self.settings也应该是一个空字典。但其实不然。

class SettingMeta(type):
    def __new__(cls, name, bases, attrs):
        super_new = super(SettingMeta, cls).__new__
        parents = [b for b in bases if isinstance(b, SettingMeta)]
        if not parents:
            return super_new(cls, name, bases, attrs)

        attrs["order"] = len(KNOWN_SETTINGS)
        attrs["validator"] = staticmethod(attrs["validator"])

        new_class = super_new(cls, name, bases, attrs)
        new_class.fmt_desc(attrs.get("desc", ""))
        KNOWN_SETTINGS.append(new_class)
        return new_class

class Setting(object):
    pass

Setting = SettingMeta('Setting', (Setting,), {})

class Workers(Setting):
    name = 'xxx'
    ...
    validator = xxx
    pass

config.py这个模块中还有很多个类似Workers一样的类,结构都是差不多的,首先都是继承Setting类,而Setting类是一个由SettingMeta创造出来的类,大家应该都知道创造类是new这个方法来完成的,这里也不例外,在new方法中,通过type这个元类来生成一个新的类,并通过attrs["validator"] = staticmethod(attrs["validator"])来给类绑定一个方法。同时将新的Setting类加入KNOWN_SETTINGS中,这样后续定义的类似Workers的类都会被加入列表中,从而绑定到cfg这个对象上。

简单的说,在调用run方法之前,初始化了一些参数,主要是给cfg这个对象绑定了很多熟悉和方法。

run方法

run方法最终调用的是Arbiter对象的run方法,创建Arbiter对象时传入Application对象作为参数。根据类注释,可以很清楚的了解这个类的主要作用。

class Arbiter(object):
    """
    Arbiter maintain the workers processes alive. It launches or
    kills them if needed. It also manages application reloading
    via SIGHUP/USR2.
    """
    ...
        def run(self):
        "Main master loop."
        self.start()
        ...
        try:
            self.manage_workers()

            while True:
                self.maybe_promote_master()

                ...
        ...
        except Exception:
            ...
            sys.exit(-1)

Arbiter的run方法中,先调用start()来创建socket监听,然后通过manage_workers()来控制worker的数量,现在来看下manage_workers的代码。

    def manage_workers(self):
        """\
        Maintain the number of workers by spawning or killing
        as required.
        """
        if len(self.WORKERS.keys()) < self.num_workers:
            self.spawn_workers()
        workers = self.WORKERS.items()
        workers = sorted(workers, key=lambda w: w[1].age)
        while len(workers) > self.num_workers:
            (pid, _) = workers.pop(0)
            self.kill_worker(pid, signal.SIGTERM)
        ...

    def spawn_workers(self):
        """\
        Spawn new workers as needed.

        This is where a worker process leaves the main loop
        of the master process.
        """

        for _ in range(self.num_workers - len(self.WORKERS.keys())):
            self.spawn_worker()
            time.sleep(0.1 * random.random())

    def spawn_worker(self):
        self.worker_age += 1
        worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log)
        self.cfg.pre_fork(self, worker)
        pid = os.fork()
        if pid != 0:
            worker.pid = pid
            self.WORKERS[pid] = worker
            return pid

        # Do not inherit the temporary files of other workers
        for sibling in self.WORKERS.values():
            sibling.tmp.close()

        # Process Child
        worker.pid = os.getpid()
        try:
            util._setproctitle("worker [%s]" % self.proc_name)
            self.log.info("Booting worker with pid: %s", worker.pid)
            self.cfg.post_fork(self, worker)
            worker.init_process()
            sys.exit(0)
        except SystemExit:
            raise
        ...

如果worker少于cfg.num_workers,调用spawn_workers方法增加worker数量,增加的方法就是os.fork()。
如果数量大于cfg.num_workers,根据worker.age的属性排序后kill一个worker。

我们主要看下增加worker的过程,增加worker是通过调用os.fork来实现的,调用os.fork的进程称为主进程,生成的进程称为子进程,对于这两个进程,os.fork的返回值是不一样的,子进程的返回值是0,父进程返回的是子进程的进程id。所以如果是主进程则记录子进程id后返回到run里的无限循环。如果是子进程,则成为一个worker进程,执行worker.init_process()。正常情况不会执行sys.exit(0)语句。

我们现在回到刚才os.fork的主进程,他执行完os.fork后就返回到run里的无限循环.

        try:
            self.manage_workers()

            while True:
                self.maybe_promote_master()

                sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
                if sig is None:
                    self.sleep()
                    self.murder_workers()
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue

                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                if not handler:
                    self.log.error("Unhandled signal: %s", signame)
                    continue
                self.log.info("Handling signal: %s", signame)
                handler()
                self.wakeup()
        except StopIteration:
            self.halt()
        except KeyboardInterrupt:
            self.halt()
        except HaltServer as inst:
            self.halt(reason=inst.reason, exit_status=inst.exit_status)
        except SystemExit:
            raise
        except Exception:
            self.log.info("Unhandled exception in main loop",
                          exc_info=True)
            self.stop(False)
            if self.pidfile is not None:
                self.pidfile.unlink()
            sys.exit(-1)

主进程在执行maybe_promote_master方法,将自己标识为master进程,然后根据信号量来进行一些控制进程的操作。如果信号量为空,则通过sleep方法进入睡眠状态,sleep的代码是这样的:

    def sleep(self):
        """\
        Sleep until PIPE is readable or we timeout.
        A readable PIPE means a signal occurred.
        """
        try:
            ready = select.select([self.PIPE[0]], [], [], 1.0)
            if not ready[0]:
                return
            while os.read(self.PIPE[0], 1):
                pass
        except (select.error, OSError) as e:
            # TODO: select.error is a subclass of OSError since Python 3.3.
            error_number = getattr(e, 'errno', e.args[0])
            if error_number not in [errno.EAGAIN, errno.EINTR]:
                raise
        except KeyboardInterrupt:
            sys.exit()

循环的监听管道,如果有信号量就退出循环,关于select这一块我也不是很清楚。退出循环后回到上一段的循环中,首先保持worker的数量为配置信息里的值,然后读取信号量的名字,根据不同的名字调用不同的hander方法。之后不断的重复,master进程大概就是这样。

Worker进程

通过上面的分析,可以看出来worker进程才是真正用来处理请求的进程,入口是worker.init_process().这个worker的来历大概是这样的,worker -> self.work_class(*args) -> self.cfg.worker_class() -> util.load_class()。util.load_class接受一个字符串参数,是配置中的worker_class变量,默认为SyncWorker。但是也能变成gevent, threadworker等更高效的worker.我们先看下默认的SyncWorker的逻辑是怎么样的。

所有的worker模块都在gunicorn/workers包中。SyncWorker继承自base.Worker.SyncWorker的init_process()方法来自于父类。

    def init_process(self):
        """\
        If you override this method in a subclass, the last statement
        in the function should be to call this method with
        super(MyWorkerClass, self).init_process() so that the ``run()``
        loop is initiated.
        """

        # set environment' variables
        if self.cfg.env:
            for k, v in self.cfg.env.items():
                os.environ[k] = v

        util.set_owner_process(self.cfg.uid, self.cfg.gid,
                               initgroups=self.cfg.initgroups)

        # Reseed the random number generator
        util.seed()

        # For waking ourselves up
        self.PIPE = os.pipe()
        for p in self.PIPE:
            util.set_non_blocking(p)
            util.close_on_exec(p)

        # Prevent fd inheritance
        for s in self.sockets:
            util.close_on_exec(s)
        util.close_on_exec(self.tmp.fileno())

        self.wait_fds = self.sockets + [self.PIPE[0]]

        self.log.close_on_exec()

        self.init_signals()

        # start the reloader
        if self.cfg.reload:
            def changed(fname):
                self.log.info("Worker reloading: %s modified", fname)
                self.alive = False
                self.cfg.worker_int(self)
                time.sleep(0.1)
                sys.exit(0)

            reloader_cls = reloader_engines[self.cfg.reload_engine]
            self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
                                         callback=changed)
            self.reloader.start()

        self.load_wsgi()
        self.cfg.post_worker_init(self)

        # Enter main run loop
        self.booted = True
        self.run()
  1. init_signals()注册信号量
  2. load_wsgi(): self.wsgi = self.app.wsgi(),一般就是python框架里起的app,比如Flask里的app = Flask(__name__).
  3. run(). 现在我们到syncworker的run方法看一看。
    def run(self):
        timeout = self.timeout or 0.5

        for s in self.sockets:
            s.setblocking(0)

        if len(self.sockets) > 1:
            self.run_for_multiple(timeout)
        else:
            self.run_for_one(timeout)
    
    def run_for_one(self, timeout):
        listener = self.sockets[0]
        while self.alive:
            self.notify()
            try:
                self.accept(listener)
                continue
            except EnvironmentError as e:
                if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
                        errno.EWOULDBLOCK):
                    raise

            if not self.is_parent_alive():
                return

            try:
                self.wait(timeout)
            except StopWaiting:
                return

    def run_for_multiple(self, timeout):
        while self.alive:
            self.notify()
            try:
                ready = self.wait(timeout)
            except StopWaiting:
                return

            if ready is not None:
                for listener in ready:
                    if listener == self.PIPE[0]:
                        continue

                    try:
                        self.accept(listener)
                    except EnvironmentError as e:
                        if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
                                errno.EWOULDBLOCK):
                            raise

            if not self.is_parent_alive():
                return

我把一些注释删了,run方法之后进入的两个方法同样也都是无限循环,不断的接收socket。accept方法很简洁,就是在建立连接的socket上获取client端的地址等信息,并设置socket为阻塞的,也就是同一时间只能处理一个请求。然后调用handle方法处理请求,handle方法如下:

    def handle(self, listener, client, addr):
        req = None
        try:
            if self.cfg.is_ssl:
                client = ssl.wrap_socket(client, server_side=True,
                    **self.cfg.ssl_options)

            parser = http.RequestParser(self.cfg, client)
            req = six.next(parser)
            self.handle_request(listener, req, client, addr)
        except http.errors.NoMoreData as e:
            self.log.debug("Ignored premature client disconnection. %s", e)
        except StopIteration as e:
            self.log.debug("Closing connection. %s", e)
        except ssl.SSLError as e:
            if e.args[0] == ssl.SSL_ERROR_EOF:
                self.log.debug("ssl connection closed")
                client.close()
            else:
                self.log.debug("Error processing SSL request.")
                self.handle_error(req, client, addr, e)
        except EnvironmentError as e:
            if e.errno not in (errno.EPIPE, errno.ECONNRESET):
                self.log.exception("Socket error processing request.")
            else:
                if e.errno == errno.ECONNRESET:
                    self.log.debug("Ignoring connection reset")
                else:
                    self.log.debug("Ignoring EPIPE")
        except Exception as e:
            self.handle_error(req, client, addr, e)
        finally:
            util.close(client)

    def handle_request(self, listener, req, client, addr):
        environ = {}
        resp = None
        try:
            self.cfg.pre_request(self, req)
            request_start = datetime.now()
            resp, environ = wsgi.create(req, client, addr,
                    listener.getsockname(), self.cfg)
            # Force the connection closed until someone shows
            # a buffering proxy that supports Keep-Alive to
            # the backend.
            resp.force_close()
            self.nr += 1
            if self.nr >= self.max_requests:
                self.log.info("Autorestarting worker after current request.")
                self.alive = False
            respiter = self.wsgi(environ, resp.start_response)
            try:
                if isinstance(respiter, environ['wsgi.file_wrapper']):
                    resp.write_file(respiter)
                else:
                    for item in respiter:
                        resp.write(item)
                resp.close()
                request_time = datetime.now() - request_start
                self.log.access(resp, req, environ, request_time)
            finally:
                if hasattr(respiter, "close"):
                    respiter.close()
        except EnvironmentError:
            # pass to next try-except level
            six.reraise(*sys.exc_info())
        except Exception:
            if resp and resp.headers_sent:
                # If the requests have already been sent, we should close the
                # connection to indicate the error.
                self.log.exception("Error handling request")
                try:
                    client.shutdown(socket.SHUT_RDWR)
                    client.close()
                except EnvironmentError:
                    pass
                raise StopIteration()
            raise
        finally:
            try:
                self.cfg.post_request(self, req, environ, resp)
            except Exception:
                self.log.exception("Exception in post_request hook")

了解过wsgi协议的应该知道,服务器是如何跟框架交互的。简单的说就是服务器会调用一个方法并传入两个参数,第一个参数为environ,这个参数包含了所有请求有关的信息,比如headers, body等等。第二个参数是一个回调函数,后台服务处理完业务后调用这个函数将response传给服务器,服务器再传给客户端。但是这里还有很多细节,水平有限,看不大明白,但是整体的流程应该还是很清楚。所以这里先parser了http请求的相关信息,保存在environ中,然后生成回调函数resp.strt_response,然后调用wsgi(environ, resp.start_response)。这里的wsgi就是框架中的app.

GeventWorker

我最近接触到的是配合gevent起一个服务,所以我也分析一下geventworker的逻辑。首先geventworker继承自asyncworker,asyncworker继承自base.worker。上面提到了,默认的worker是一个阻塞的模型,同一时间只能处理一个请求,所以效率比较低,生产环境一般不会使用。

AsyncWorker

AsyncWorker的构造函数先是调用了父类的构造函数,然后又添加了一个额外的参数worker_connections,这个参数也是在cfg中设置的,且只在eventletgevent两种模式下起作用,作用是限制最大的同时的客户端连接数。

前面的SyncWorker的init_process是继承自worker。但是GeventWorker重写了这个方法。用过gevent的应该知道,gevent底层实现的方法叫做猴子补丁-monkey_patch。修改了大多数的底层库,将一些阻塞的底层实现,重新换成非阻塞的。所以GeventWorker先是打上补丁,然后调用worker的init_process方法,最终进入GeventWorker的run方法开始执行处理请求任务。run方法代码如下:

    def run(self):
        ...
        for s in self.sockets:
            s.setblocking(1)
            pool = Pool(self.worker_connections)
            if self.server_class is not None:
                environ = base_environ(self.cfg)
                environ.update({
                    "wsgi.multithread": True,
                    "SERVER_SOFTWARE": VERSION,
                })
                server = self.server_class(
                    s, application=self.wsgi, spawn=pool, log=self.log,
                    handler_class=self.wsgi_handler, environ=environ,
                    **ssl_args)
            else:
                hfun = partial(self.handle, s)
                server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)

            server.start()
            servers.append(server)

        while self.alive:
            self.notify()
            gevent.sleep(1.0)

        try:
            # Stop accepting requests
            for server in servers:
                if hasattr(server, 'close'):  # gevent 1.0
                    server.close()
                if hasattr(server, 'kill'):  # gevent < 1.0
                    server.kill()

            # Handle current requests until graceful_timeout
            ts = time.time()
            while time.time() - ts <= self.cfg.graceful_timeout:
                accepting = 0
                for server in servers:
                    if server.pool.free_count() != server.pool.size:
                        accepting += 1

                # if no server is accepting a connection, we can exit
                if not accepting:
                    return

                self.notify()
                gevent.sleep(1.0)

            # Force kill all active the handlers
            self.log.warning("Worker graceful timeout (pid:%s)" % self.pid)
            for server in servers:
                server.stop(timeout=1)
        except:
            pass
  1. 创建tcpServer。并用pool限制了最大连接数。这个server的实现在gevent中,没看懂。
  2. hfun这个方法,是一个绑定了参数的handle,是asyncWorker的handle。过程跟前面的同步的差不多。但是遇到阻塞是gevent会帮助切换,所以提高了并发量。
  3. 创建完server进入无限循环,notify网上查了一下说是给Arbiter发信号的,这里我不大懂。

总结

gunicorn代码比较多,且有很多底层的东西。很多地方不懂,都跳过了,分析可能也有很多错误,看到可以指出。

相比于之前看过的flask、request、tornado等等。gunicorn显然难很多,也没有那么清晰,有很多方法,参数来的不明不白;而且跟gevent牵扯很大,gevent的代码更加难懂。

但应该还是有点收获吧,虽然暂时没察觉到~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容

  • 作者:詹聪聪 序言: 本人工作中需要用到flask-socketio,在学习英文文档时发现,flask-socke...
    Python中文社区阅读 12,571评论 6 39
  • 必备的理论基础 1.操作系统作用: 隐藏丑陋复杂的硬件接口,提供良好的抽象接口。 管理调度进程,并将多个进程对硬件...
    drfung阅读 3,511评论 0 5
  • 谈论WEB编程的时候常说天天在写CGI,那么CGI是什么呢?可能很多时候并不会去深究这些基础概念,再比如除了CGI...
    __七把刀__阅读 2,176评论 2 11
  • 四九天的酷寒在人们的抱怨声中收敛了些许,渐渐地有些回温,依然有激烈的北风敲打着夜的窗。整理旧物,在日记本中...
    雨村绿澜阅读 529评论 0 5
  • 人到中年,格局已定。既然成不了南飞雁那就安心做一只寻常雀。不攀比不抱怨,你若安好便是晴天。学最好的别人,做最好的自...
    佳的后花园阅读 1,872评论 0 7