sanic
app.run:
def run(
self,
host=None,
port=None,
debug=False,
ssl=None,
sock=None,
workers=1,
protocol=None,
backlog=100,
stop_event=None,
register_sys_signals=True,
access_log=True,
**kwargs
):
try:
self.is_running = True
if workers == 1:
if (
auto_reload
and os.environ.get("SANIC_SERVER_RUNNING") != "true"
):
reloader_helpers.watchdog(2)
else:
serve(**server_settings)
else:
serve_multiple(server_settings, workers)
except BaseException:
error_logger.exception(
"Experienced exception while trying to serve"
)
raise
finally:
self.is_running = False
logger.info("Server Stopped")
sanic的启动函数除了配置相关的处理,主要是调用了server.serve_multiple函数来启动多个进程来接收服务,serve_multiple的实现如下:
server.serve_mutiple
def serve_multiple(server_settings, workers):
if server_settings.get("sock") is None:
sock = socket()
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind((server_settings["host"], server_settings["port"]))
sock.set_inheritable(True)
server_settings["sock"] = sock
server_settings["host"] = None
server_settings["port"] = None
def sig_handler(signal, frame):
logger.info("Received signal %s. Shutting down.", Signals(signal).name)
for process in processes:
os.kill(process.pid, SIGTERM)
signal_func(SIGINT, lambda s, f: sig_handler(s, f))
signal_func(SIGTERM, lambda s, f: sig_handler(s, f))
processes = []
for _ in range(workers):
process = Process(target=serve, kwargs=server_settings)
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
# the above processes will block this until they're stopped
for process in processes:
process.terminate()
server_settings.get("sock").close()
serve_mutltiple主要是创建socket并绑定端口,然后根据参数中worker数量启动对应数量的进程,每个进程执行serve,serve的代码如下:
server.serve
def serve():
if not run_async:
# create new event_loop after fork
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
connections = connections if connections is not None else set()
server = partial(
protocol,
loop=loop,
connections=connections,
signal=signal,
request_handler=request_handler,
error_handler=error_handler,
request_timeout=request_timeout,
response_timeout=response_timeout,
keep_alive_timeout=keep_alive_timeout,
request_max_size=request_max_size,
request_class=request_class,
access_log=access_log,
keep_alive=keep_alive,
is_request_stream=is_request_stream,
router=router,
websocket_max_size=websocket_max_size,
websocket_max_queue=websocket_max_queue,
websocket_read_limit=websocket_read_limit,
websocket_write_limit=websocket_write_limit,
state=state,
debug=debug,
)
server_coroutine = loop.create_server(
server,
host,
port,
ssl=ssl,
reuse_port=reuse_port,
sock=sock,
backlog=backlog,
)
try:
http_server = loop.run_until_complete(server_coroutine)
except BaseException:
logger.exception("Unable to start server")
return
trigger_events(after_start, loop)
pid = os.getpid()
try:
logger.info("Starting worker [%s]", pid)
loop.run_forever()
finally:
logger.info("Stopping worker [%s]", pid)
# Run the on_stop function if provided
trigger_events(before_stop, loop)
# Wait for event loop to finish and all connections to drain
http_server.close()
server.serve主要通过asyncio库loop.create_server实现,这时一个针对四层的实现,具体http处理的实现需要用户实现,这个方法需要传入一个处理连接的类,这个类需要实现几个方法:connection_made,connection_lost, pause_writing, resume_writing, data_receved,sanic的http处理定义在server.HttpProtocol中,connection_made将传入的transport对象保存到对象的成员中,方便后续处理,同时启动request_timeout任务,即定时判断请求是否超时了。
def connection_made(self, transport):
self.connections.add(self)
self._request_timeout_handler = self.loop.call_later(
self.request_timeout, self.request_timeout_callback
)
self.transport = transport
self._last_request_time = current_time
sanic的data_received是借助于httptools库的HttpRequestParser来处理的,
def data_received(self, data):
...
if self.parser is None:
assert self.request is None
self.headers = []
self.parser = HttpRequestParser(self)
...
try:
self.parser.feed_data(data)
except HttpParserError:
message = "Bad Request"
if self._debug:
message += "\n" + traceback.format_exc()
self.write_error(InvalidUsage(message))
可以通过提供on_headers,on_body,on_message_complete等方法来干预不同阶段的请求处理, sanic中除了on_message_complete阶段,其他阶段用户均不能干涉,主要的做的是构建request对象,将必要的信息赋给这个对象。on_message_complete阶段创建一个任务调用request_handler来处理请求
def on_message_complete(self):
# Entire request (headers and whole body) is received.
# We can cancel and remove the request timeout handler now.
if self._request_timeout_handler:
self._request_timeout_handler.cancel()
self._request_timeout_handler = None
if self.is_request_stream and self._is_stream_handler:
self._request_stream_task = self.loop.create_task(
self.request.stream.put(None)
)
return
self.request.body_finish()
self.execute_request_handler()
def execute_request_handler(self):
self._response_timeout_handler = self.loop.call_later(
self.response_timeout, self.response_timeout_callback
)
self._last_request_time = current_time
self._request_handler_task = self.loop.create_task(
self.request_handler(
self.request, self.write_response, self.stream_response
)
)
request_handler即app中定义的handle_request
async def handle_request(self, request, write_callback, stream_callback):
...
response = None
cancelled = False
...
handler, args, kwargs, uri = self.router.get(request)
try:
...
response = handler(request, *args, **kwargs)
if isawaitable(response):
response = await response
...
if isinstance(response, StreamingHTTPResponse):
await stream_callback(response)
else:
write_callback(response)
这个方法主要先获取处理对应路由的方法及用户定义的方法,根据是否awaitable来获取response即用户定义路由处理方法的返回,返回对象是一个response.HTTPResponse/StreamingHTTPResponse对象,然后调用write_callback来将结果返回给client,write_callback是server.HttpProtocol的write_response方法,实现如下:
def write_response(self, response):
...
try:
keep_alive = self.keep_alive
self.transport.write(
response.output(
self.request.version, keep_alive, self.keep_alive_timeout
)
)
...
finally:
if not keep_alive:
self.transport.close()
self.transport = None
else:
self._keep_alive_timeout_handler = self.loop.call_later(
self.keep_alive_timeout, self.keep_alive_timeout_callback
)
self._last_response_time = current_time
self.cleanup()
其主要通过transport.write写repsonse.output按照http协议格式输出的bytes response,response.output的定义如下
def output(self, version="1.1", keep_alive=False, keep_alive_timeout=None):
timeout_header = b""
if keep_alive and keep_alive_timeout is not None:
timeout_header = b"Keep-Alive: %d\r\n" % keep_alive_timeout
body = b""
if has_message_body(self.status):
body = self.body
self.headers["Content-Length"] = self.headers.get(
"Content-Length", len(self.body)
)
self.headers["Content-Type"] = self.headers.get(
"Content-Type", self.content_type
)
if self.status in (304, 412):
self.headers = remove_entity_headers(self.headers)
headers = self._parse_headers()
if self.status is 200:
status = b"OK"
else:
status = STATUS_CODES.get(self.status, b"UNKNOWN RESPONSE")
return (
b"HTTP/%b %d %b\r\n" b"Connection: %b\r\n" b"%b" b"%b\r\n" b"%b"
) % (
version.encode(),
self.status,
status,
b"keep-alive" if keep_alive else b"close",
timeout_header,
headers,
body,
)
不论是否有body返回header之后都会有一个空行,_parse_headers的返回是以\r\n结尾的,再接%b\r\n构成了一个空行