sanic

sanic

app.run:

def run(
    self,
    host=None,
    port=None,
    debug=False,
    ssl=None,
    sock=None,
    workers=1,
    protocol=None,
    backlog=100,
    stop_event=None,
    register_sys_signals=True,
    access_log=True,
    **kwargs
):
    try:
        self.is_running = True
        if workers == 1:
            if (
                auto_reload
                and os.environ.get("SANIC_SERVER_RUNNING") != "true"
            ):
                reloader_helpers.watchdog(2)
            else:
                serve(**server_settings)
        else:
            serve_multiple(server_settings, workers)
    except BaseException:
        error_logger.exception(
            "Experienced exception while trying to serve"
        )
        raise
    finally:
        self.is_running = False
    logger.info("Server Stopped")

sanic的启动函数除了配置相关的处理,主要是调用了server.serve_multiple函数来启动多个进程来接收服务,serve_multiple的实现如下:

server.serve_mutiple

def serve_multiple(server_settings, workers):
   if server_settings.get("sock") is None:
        sock = socket()
        sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        sock.bind((server_settings["host"], server_settings["port"]))
        sock.set_inheritable(True)
        server_settings["sock"] = sock
        server_settings["host"] = None
        server_settings["port"] = None

    def sig_handler(signal, frame):
        logger.info("Received signal %s. Shutting down.", Signals(signal).name)
        for process in processes:
            os.kill(process.pid, SIGTERM)

    signal_func(SIGINT, lambda s, f: sig_handler(s, f))
    signal_func(SIGTERM, lambda s, f: sig_handler(s, f))

    processes = []

    for _ in range(workers):
        process = Process(target=serve, kwargs=server_settings)
        process.daemon = True
        process.start()
        processes.append(process)

    for process in processes:
        process.join()

    # the above processes will block this until they're stopped
    for process in processes:
        process.terminate()
    server_settings.get("sock").close()

serve_mutltiple主要是创建socket并绑定端口,然后根据参数中worker数量启动对应数量的进程,每个进程执行serve,serve的代码如下:
server.serve

def serve():
    if not run_async:
        # create new event_loop after fork
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    connections = connections if connections is not None else set()
    server = partial(
        protocol,
        loop=loop,
        connections=connections,
        signal=signal,
        request_handler=request_handler,
        error_handler=error_handler,
        request_timeout=request_timeout,
        response_timeout=response_timeout,
        keep_alive_timeout=keep_alive_timeout,
        request_max_size=request_max_size,
        request_class=request_class,
        access_log=access_log,
        keep_alive=keep_alive,
        is_request_stream=is_request_stream,
        router=router,
        websocket_max_size=websocket_max_size,
        websocket_max_queue=websocket_max_queue,
        websocket_read_limit=websocket_read_limit,
        websocket_write_limit=websocket_write_limit,
        state=state,
        debug=debug,
    )

    server_coroutine = loop.create_server(
        server,
        host,
        port,
        ssl=ssl,
        reuse_port=reuse_port,
        sock=sock,
        backlog=backlog,
    )

    try:
        http_server = loop.run_until_complete(server_coroutine)
    except BaseException:
        logger.exception("Unable to start server")
        return

    trigger_events(after_start, loop)

    pid = os.getpid()
    try:
        logger.info("Starting worker [%s]", pid)
        loop.run_forever()
    finally:
        logger.info("Stopping worker [%s]", pid)

        # Run the on_stop function if provided
        trigger_events(before_stop, loop)

        # Wait for event loop to finish and all connections to drain
        http_server.close()

server.serve主要通过asyncio库loop.create_server实现,这时一个针对四层的实现,具体http处理的实现需要用户实现,这个方法需要传入一个处理连接的类,这个类需要实现几个方法:connection_made,connection_lost, pause_writing, resume_writing, data_receved,sanic的http处理定义在server.HttpProtocol中,connection_made将传入的transport对象保存到对象的成员中,方便后续处理,同时启动request_timeout任务,即定时判断请求是否超时了。

   def connection_made(self, transport):
        self.connections.add(self)
        self._request_timeout_handler = self.loop.call_later(
            self.request_timeout, self.request_timeout_callback
        )
        self.transport = transport
        self._last_request_time = current_time

sanic的data_received是借助于httptools库的HttpRequestParser来处理的,

def data_received(self, data):
    ...
    if self.parser is None:
        assert self.request is None
        self.headers = []
        self.parser = HttpRequestParser(self)
    ...        
    try:
        self.parser.feed_data(data)
    except HttpParserError:
        message = "Bad Request"
        if self._debug:
            message += "\n" + traceback.format_exc()
        self.write_error(InvalidUsage(message))

可以通过提供on_headers,on_body,on_message_complete等方法来干预不同阶段的请求处理, sanic中除了on_message_complete阶段,其他阶段用户均不能干涉,主要的做的是构建request对象,将必要的信息赋给这个对象。on_message_complete阶段创建一个任务调用request_handler来处理请求

def on_message_complete(self):
    # Entire request (headers and whole body) is received.
    # We can cancel and remove the request timeout handler now.
    if self._request_timeout_handler:
        self._request_timeout_handler.cancel()
        self._request_timeout_handler = None
    if self.is_request_stream and self._is_stream_handler:
        self._request_stream_task = self.loop.create_task(
            self.request.stream.put(None)
        )
        return
    self.request.body_finish()
    self.execute_request_handler()

def execute_request_handler(self):
    self._response_timeout_handler = self.loop.call_later(
        self.response_timeout, self.response_timeout_callback
    )
    self._last_request_time = current_time
    self._request_handler_task = self.loop.create_task(
        self.request_handler(
            self.request, self.write_response, self.stream_response
        )
    )

request_handler即app中定义的handle_request

async def handle_request(self, request, write_callback, stream_callback):
    ...
    response = None
    cancelled = False
    ...
    handler, args, kwargs, uri = self.router.get(request)
    try:
    ...
        response = handler(request, *args, **kwargs)
        if isawaitable(response):
            response = await response
    ...
    if isinstance(response, StreamingHTTPResponse):
        await stream_callback(response)
    else:
        write_callback(response)

这个方法主要先获取处理对应路由的方法及用户定义的方法,根据是否awaitable来获取response即用户定义路由处理方法的返回,返回对象是一个response.HTTPResponse/StreamingHTTPResponse对象,然后调用write_callback来将结果返回给client,write_callback是server.HttpProtocol的write_response方法,实现如下:

def write_response(self, response):
    ...
    try:
        keep_alive = self.keep_alive
        self.transport.write(
            response.output(
                self.request.version, keep_alive, self.keep_alive_timeout
            )
        )
    ...
    finally:
        if not keep_alive:
            self.transport.close()
            self.transport = None
        else:
            self._keep_alive_timeout_handler = self.loop.call_later(
                self.keep_alive_timeout, self.keep_alive_timeout_callback
            )
            self._last_response_time = current_time
            self.cleanup()

其主要通过transport.write写repsonse.output按照http协议格式输出的bytes response,response.output的定义如下

   def output(self, version="1.1", keep_alive=False, keep_alive_timeout=None):

        timeout_header = b""
        if keep_alive and keep_alive_timeout is not None:
            timeout_header = b"Keep-Alive: %d\r\n" % keep_alive_timeout

        body = b""
        if has_message_body(self.status):
            body = self.body
            self.headers["Content-Length"] = self.headers.get(
                "Content-Length", len(self.body)
            )

        self.headers["Content-Type"] = self.headers.get(
            "Content-Type", self.content_type
        )

        if self.status in (304, 412):
            self.headers = remove_entity_headers(self.headers)

        headers = self._parse_headers()

        if self.status is 200:
            status = b"OK"
        else:
            status = STATUS_CODES.get(self.status, b"UNKNOWN RESPONSE")

        return (
            b"HTTP/%b %d %b\r\n" b"Connection: %b\r\n" b"%b" b"%b\r\n" b"%b"
        ) % (
            version.encode(),
            self.status,
            status,
            b"keep-alive" if keep_alive else b"close",
            timeout_header,
            headers,
            body,
        )

不论是否有body返回header之后都会有一个空行,_parse_headers的返回是以\r\n结尾的,再接%b\r\n构成了一个空行

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342