抽空看了下tornado的源码,会将几个关键部分以专栏文章的形式记录下来,算是对学习的一个汇总,也希望能对使用tornado的朋友有所启迪,文中有任何说的不到位的地方,欢迎私信或者评论指出。
看开源项目代码时,我会直接选择最原始的版本,tornadoweb/tornado,因为我认为最核心的功能往往在v1.0.0都具备,后续很少有对大功能的改进,而且随着各路开源人士代码的提交,项目的代码风格未必会完全的一致。
本文会介绍 ioloop.py, 看下文之前,需要你已经了解了Linux的IO模型,如果没有的话推荐看下《Unix的网络编程卷》。
那么开始吧!
# Choose a poll implementation. Use epoll if it is available, fall back to
# select() for non-Linux platforms
if hasattr(select, "epoll"):
# Python 2.6+ on Linux
_poll = select.epoll
elif hasattr(select, "kqueue"):
# Python 2.6+ on BSD or Mac
_poll = _KQueue
else:
try:
# Linux systems with our C module installed
import epoll
_poll = _EPoll
except:
# All other systems
import sys
if "linux" in sys.platform:
logging.warning("epoll module not found; using select()")
_poll = _Select
Linux中的epoll模型在不同的平台上有不同的叫法,Linux下叫epoll,mac或者bsd上叫kqueue,它们本质上都是IO复用的一种形式,Python 2.6+的select库中包含了对应的实现,但是2.6以下的版本没有对应的实现,tornado使用C语言模块的实现并简单包装了下,2.6以下版本就用tornado包装的epoll。上述代码就是干这个事情的,根据系统平台和Python的版本选择对应的epoll实现。
那么对应的epoll实现都包含了哪些功能呢?我们看下其中的_EPOLL,这是其中的一个实现,tornado对底层做了包装。
class _EPoll(object):
"""An epoll-based event loop using our C module for Python 2.5 systems"""
_EPOLL_CTL_ADD = 1
_EPOLL_CTL_DEL = 2
_EPOLL_CTL_MOD = 3
def __init__(self):
self._epoll_fd = epoll.epoll_create()
def fileno(self):
return self._epoll_fd
def register(self, fd, events):
epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_ADD, fd, events)
def modify(self, fd, events):
epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_MOD, fd, events)
def unregister(self, fd):
epoll.epoll_ctl(self._epoll_fd, self._EPOLL_CTL_DEL, fd, 0)
def poll(self, timeout):
return epoll.epoll_wait(self._epoll_fd, int(timeout * 1000))
我们可以看出,对外提供的就是epoll常规的几个功能,了解过epoll的一看便知。
接下来我们看下IOLOOP这个类的代码,按照从上到下的顺序:
# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
这里定义了epoll中的事件,我们比较关注的是_EPOLLIN和_EPOLLOUT,分别表示我们关心的fd(文件描述符)可写了或者可读了。
def __init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, 'fileno'):
self._set_close_exec(self._impl.fileno())
self._handlers = {}
self._events = {}
self._callbacks = set()
self._timeouts = []
self._running = False
self._stopped = False
self._blocking_log_threshold = None
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
if os.name != 'nt':
r, w = os.pipe()
self._set_nonblocking(r)
self._set_nonblocking(w)
self._set_close_exec(r)
self._set_close_exec(w)
self._waker_reader = os.fdopen(r, "r", 0)
self._waker_writer = os.fdopen(w, "w", 0)
else:
self._waker_reader = self._waker_writer = win32_support.Pipe()
r = self._waker_writer.reader_fd
self.add_handler(r, self._read_waker, self.READ)
这里解释两个地方。
_set_close_exec方法是干嘛的?
def _set_close_exec(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
Linux在fork子进程时使用写时复制的策略,假设原有进程对某个文件持有fd,fork后,子进程也会有相应的fd对应那个文件,但是子进程如果仅仅是被fork来exec的,那么在exec时会有新的上下文及变量,原来持有的那个文件描述符就不见了,设置了这个位,当子进程执行exec的时候,所持有的fd就会被关闭。
r, w = os.pipe()是在干嘛?
开启了一个管道,并设置读写均为nonblocking,当read或write被阻塞,将返回-1或者EAGAIN错误。
设置这个管道为了高效率的让IO Loop停止循环,只要在通道另一侧写点什么,就会阻塞poll()方法。
@classmethod
def instance(cls):
"""Returns a global IOLoop instance.
Most single-threaded applications have a single, global IOLoop.
Use this method instead of passing around IOLoop instances
throughout your code.
A common pattern for classes that depend on IOLoops is to use
a default argument to enable programs with multiple IOLoops
but not require the argument for simpler applications:
class MyClass(object):
def __init__(self, io_loop=None):
self.io_loop = io_loop or IOLoop.instance()
"""
if not hasattr(cls, "_instance"):
cls._instance = cls()
return cls._instance
instance方法就是实现单例的,不多介绍。
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] = handler
self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
"""Changes the events we listen for fd."""
self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
"""Stop listening for events on fd."""
self._handlers.pop(fd, None)
self._events.pop(fd, None)
try:
self._impl.unregister(fd)
except (OSError, IOError):
logging.debug("Error deleting fd from IOLoop", exc_info=True)
这几个方法,就是负责给指定的fd绑定对应的handler以及监听的事件的。
def set_blocking_log_threshold(self, s):
"""Logs a stack trace if the ioloop is blocked for more than s seconds.
Pass None to disable. Requires python 2.6 on a unixy platform.
"""
if not hasattr(signal, "setitimer"):
logging.error("set_blocking_log_threshold requires a signal module "
"with the setitimer method")
return
self._blocking_log_threshold = s
if s is not None:
signal.signal(signal.SIGALRM, self._handle_alarm)
def _handle_alarm(self, signal, frame):
logging.warning('IOLoop blocked for %f seconds in\n%s',
self._blocking_log_threshold,
''.join(traceback.format_stack(frame)))
使用signal模块来监控Ioloop的block时间,超过某个时间就会触发我们自己定义的handler。signal.SIGALRM和signal.ITIMER_REAL一般配合使用。
加下来就是最重要的start方法,start方法下还有几个小方法,将在这里一并介绍。
def start(self):
"""Starts the I/O loop.
The loop will run until one of the I/O handlers calls stop(), which
will make the loop stop after the current event iteration completes.
"""
if self._stopped:
self._stopped = False
return
self._running = True
while True:
# Never use an infinite timeout here - it can stall epoll
poll_timeout = 0.2
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
callbacks = list(self._callbacks)
for callback in callbacks:
# A callback can add or remove other callbacks
if callback in self._callbacks:
self._callbacks.remove(callback)
self._run_callback(callback)
if self._callbacks:
poll_timeout = 0.0
if self._timeouts:
now = time.time()
while self._timeouts and self._timeouts[0].deadline <= now:
timeout = self._timeouts.pop(0)
self._run_callback(timeout.callback)
if self._timeouts:
milliseconds = self._timeouts[0].deadline - now
poll_timeout = min(milliseconds, poll_timeout)
if not self._running:
break
if self._blocking_log_threshold is not None:
# clear alarm so it doesn't fire while poll is waiting for
# events.
signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception, e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')
if (getattr(e, 'errno') == errno.EINTR or
(isinstance(getattr(e, 'args'), tuple) and
len(e.args) == 2 and e.args[0] == errno.EINTR)):
logging.warning("Interrupted system call", exc_info=1)
continue
else:
raise
if self._blocking_log_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_log_threshold, 0)
# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
self._handlers[fd](fd, events)
except (KeyboardInterrupt, SystemExit):
raise
except (OSError, IOError), e:
if e[0] == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
except:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_log_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
_callbacks保存了一些函数,这些函数会在下一次IO loop事件循环前被调用,在任何时候任何线程中调用都是安全的,可用于将一些控件传输到ioloop的线程中。
_timeouts用户保存执行函数和deadline的对应关系,和_callbacks相比,它指定了函数执行时间,而_callback是在下一次Ioloop循环前立刻执行。
关于poll_timeout时间的设置问题
=0表示无论有没有就绪时间立刻返回
我们可以看到默认是0.2,当有_callback可以执行,我们把它设置为0,再看下过多长时间_timeout中有函数可以执行,取最小时间。
简单概括就是,如果_callback和_timeout都没有方法可以执行,就默认0.2,如果有方法可以执行,默认等待时间就是最快会执行方法到现在的时间间隔。
剩下 的部分就是用poll函数拿到就绪事件,然后用signal.ITIMER_REAL计时,开始处理,处理时候使用pop方法,而不是遍历,原因是fd和handler的映射关系可能在遍历过程修改,执行完成后,reset _stopped设置为false,关闭计时器。
关于ioloop的介绍就到这里,不正之处欢迎指出。