如前文 Celery 源码学习(一)架构分析 所言,celery 能保证高吞吐量和高性能,主要依托两个方面:1. 多进程,2.事件驱动。
在 celery 中,多进程主要指的是一个主进程负责调度,然后多个从进程负责消费具体的任务。也就是前文中我说的调度器(Main process)和工作进程(Worker processes)。我们今天主要就是看来一下 celery 这部分的源码。
我先说明下版本,celery 是 4.2.0,broker 和 result backend 都是用的 redis。
当我们在命令行执行一个常见的启动命令:
# 在包含任务文件的目录下
➜ celery -A tasks.example worker -c 2 --l info
然后我们用 linux 的 ps 命令查看此时的相关进程
# 在包含任务文件的目录下
➜ ps -ef | grep -E "celery|PID"
UID PID PPID C STIME TTY TIME CMD
501 1344 331 0 6:27PM ?? 0:00.98 celery -A tasks.example worker -c 2 --l info
501 1348 1344 0 6:27PM ?? 0:00.01 celery -A tasks.example worker -c 2 --l info
501 1349 1344 0 6:27PM ?? 0:00.01 celery -A tasks.example worker -c 2 --l info
可以清楚的看到 celery 为我们启动了 3 个进程,PID 分别为 1344,1348,1349。
我们可以做一个明显的推断,PID 为 1344 的进程,应当是主进程,也就是调度器。
PID 为 1348,1349 的 PPID 为 1344,说明这两个进程就是主进程派生(fork)出来的从进程。
常见的手段多进程不是 celery 原创的,对于任何一个大型项目,基于主从的多进程模式都是十分常见的,这是一套十分成熟的工业化做法。
为什么这么做?
其实就是如前文所说,为了充分发挥多核计算的优势,并在一定程度上提升程序的并发能力,缓解 IO 的压力。
怎么做?
业内的常见方案叫做 prefork,也就是预生成。预生成指的是,主进程在执行具体的业务逻辑之前,先提前 fork 出来一堆子进程,并把他们存起来集中管理,形成一个进程池。平常的时候这些子进程都是 休眠(asleep) 状态,只有当主进程派发任务的时候,会唤醒(awake)其中的一个子进程,并通过进程间通讯的手段,向子进程传输相应的任务数据。
我们先假设一下,如果不使用预生成,会有什么问题?
每当一个任务到来,主进程都会去临时产生一个子进程,复制一份上下文数据,然后传输任务给这个子进程。当子进程执行后,主进程再去销毁掉这个子进程的所有上下文数据。频繁的对内存数据进行操作,上下文切换,会导致系统的性能很差。所以,人们基本都会使用预生成的方式。
celery 源码结构
.
./bootsteps.py # 流程控制相关的数据结构
./signals.py # 基于各组件之间观察者模式的数据结构
./app # 各种基础组件,粒度较细
./platforms.py
./bin # celery 命令行的启动命令需要用到的模块引导文件
./security
./local.py
./backends # 存放任务结果的相关数据结构
./__init__.py
./five.py
./utils
./contrib
./result.py
./concurrency # 并发模式的相关数据结构
./_state.py
./task # 任务的数据结构
./exceptions.py
./fixups
./worker # 消费者相关的数据结构
./events # 集群内监听事件的相关数据结构
./states.py
./apps # 按功能拆分出来的三个基础模块的数据结构,分别为 worker,multi,beat
./loaders
./__main__.py # 程序主入口
./beat.py
./canvas.py
./schedules.py
具体实现因为我们的主角是 celery,我们会更侧重这一套流程应该如何用 python 去实现。因为 celery 代码很多,有各种各样的功能组件混杂其中,所以我只会挑取我认为有必要讲的源码实现。对于整个流程,读者有兴趣的话可以自行去研究。
当我们在命令行敲下 :
celery -A tasks.example worker -c 2 --l info
首先,会通过一系列的命令行解析的方法,提取出我们上面那个命令需要运行的模块 (即Worker 模块,具体流程因为过于复杂,就不展开讲了),解析到一个 Worker 的数据结构,并创建对应的实例,其中我们主要关注 start 方法。
# celery/apps/worker.py
# ...省略
class WorkController(object):
"""Unmanaged worker instance."""
# ...省略
class Blueprint(bootsteps.Blueprint):
"""Worker bootstep blueprint."""
# 这是默认的 worker DAG 流程,会根据传入的命令行参数不同有不同的执行流程
name = 'Worker'
default_steps = {
'celery.worker.components:Hub',
'celery.worker.components:Pool',
'celery.worker.components:Beat',
'celery.worker.components:Timer',
'celery.worker.components:StateDB',
'celery.worker.components:Consumer',
'celery.worker.autoscale:WorkerComponent',
}
# ... 省略
class Worker(WorkController):
"""Worker as a program."""
# ... 省略
def start(self):
try:
self.blueprint.start(self) # 重点关注!
except WorkerTerminate:
self.terminate()
except Exception as exc:
logger.critical('Unrecoverable error: %r', exc, exc_info=True)
self.stop(exitcode=EX_FAILURE)
except SystemExit as exc:
self.stop(exitcode=exc.code)
except KeyboardInterrupt:
self.stop(exitcode=EX_FAILURE)
# ... 省略
Worker 的 start 方法中,其实就是执行了一个 self.blueprint 的 start 方法,这里面的 blueprint,是 celery 自己实现的一个 有向无环图(DAG)的数据结构,说起来复杂,其实功能简单描述下就是:根据命令行传入的不同参数,初始化不同的组件(step),并执行这些组件的初始化方法。其实就是一个对流程控制的面向对象的封装。
我们的这个启动命令产生的 DAG,会按顺序加载三个组件,Hub,Pool,Consumer(这些组件的数据结构可以在 celery/worker/components.py 找到)。Consumer 和 Hub 是我之后会详细讲的,我们这次主要讲一下 Pool 这个组件。这个组件基本囊括了 celery 多进程 prefork 的实现。
self.blueprint.start(self) 中,这个 blueprint 的数据结构定义如下,其中我们重点关注 start 方法:
# celery/bootsteps.py
# ... 省略
class Blueprint(object):
"""Blueprint containing bootsteps that can be applied to objects.
Arguments:
steps Sequence[Union[str, Step]]: List of steps.
name (str): Set explicit name for this blueprint.
on_start (Callable): Optional callback applied after blueprint start.
on_close (Callable): Optional callback applied before blueprint close.
on_stopped (Callable): Optional callback applied after
blueprint stopped.
"""
GraphFormatter = StepFormatter
name = None
state = None
started = 0
default_steps = set()
state_to_name = {
0: 'initializing',
RUN: 'running',
CLOSE: 'closing',
TERMINATE: 'terminating',
}
def __init__(self, steps=None, name=None,
on_start=None, on_close=None, on_stopped=None):
self.name = name or self.name or qualname(type(self))
self.types = set(steps or []) | set(self.default_steps)
self.on_start = on_start
self.on_close = on_close
self.on_stopped = on_stopped
self.shutdown_complete = Event()
self.steps = {}
def start(self, parent): # 重点关注!
self.state = RUN
if self.on_start:
self.on_start()
for i, step in enumerate(s for s in parent.steps if s is not None):
self._debug('Starting %s', step.alias)
self.started = i + 1
step.start(parent)
logger.debug('^-- substep ok')
# ...省略
start 方法中的 parent.steps,其实就是 Hub,Pool,Consumer 这三个组件的实例组成的列表。我们可以看到,其实就是依次调用这三个组件实例的 start 方法(。。。celery 的作者特别喜欢把方法名叫做 start)我们直接去 components.py 文件中查看 class Pool(bootsteps.StartStopStep) 组件的源码,会发现这个 start 方法还是藏的很隐蔽的。
因为不是很直观,且过程非常曲折,我这里就不详细描述具体过程了,直接说结论:这个 start 方法最终会调用 celery/concurrency/prefork.py中的TaskPool 类下的 on_start 方法。我们可以看下这个 on_start 方法:
# celery/concurrency/prefork.py
# ...省略
class TaskPool(BasePool):
"""Multiprocessing Pool implementation."""
Pool = AsynPool
BlockingPool = BlockingPool
uses_semaphore = True
write_stats = None
def on_start(self):
forking_enable(self.forking_enable)
Pool = (self.BlockingPool if self.options.get('threads', True)
else self.Pool)
# 重点关注下面这个!
P = self._pool = Pool(processes=self.limit,
initializer=process_initializer,
on_process_exit=process_destructor,
enable_timeouts=True,
synack=False,
**self.options)
# Create proxy methods
self.on_apply = P.apply_async
self.maintain_pool = P.maintain_pool
self.terminate_job = P.terminate_job
self.grow = P.grow
self.shrink = P.shrink
self.flush = getattr(P, 'flush', None) # FIXME add to billiard
# ...省略
到了这里我们就清楚多了,主要是执行了 Pool 的实例化。其实这个实例化就是 prefork 的具体实现。这个 Pool 其实就是 AsyncPool,源码在下面:
# celery/concurrency/asynpool.py
# ...省略
class AsynPool(_pool.Pool):
"""AsyncIO Pool (no threads)."""
ResultHandler = ResultHandler
Worker = Worker
def WorkerProcess(self, worker):
worker = super(AsynPool, self).WorkerProcess(worker)
worker.dead = False
return worker
def __init__(self, processes=None, synack=False,
sched_strategy=None, *args, **kwargs):
self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
sched_strategy)
processes = self.cpu_count() if processes is None else processes
self.synack = synack
# create queue-pairs for all our processes in advance.
# 重点!创建多个读写的管道
self._queues = {
self.create_process_queues(): None for _ in range(processes)
}
# 省略
super(AsynPool, self).__init__(processes, *args, **kwargs) # 重点
for proc in self._pool: # 重点
# create initial mappings, these will be updated
# as processes are recycled, or found lost elsewhere.
self._fileno_to_outq[proc.outqR_fd] = proc
self._fileno_to_synq[proc.synqW_fd] = proc
# 省略
# ... 省略
看到这里,可能有的小伙伴就懵了,说好的 fork 呢?说好的 进程间通讯呢?
别急,其实 fork 和进程间通讯都藏在上面那一坨代码里了processes = self.cpu_count() if processes is None else processes 这个 processes 的值,就是需要 fork 的子进程数量,默认是 cpu 核数,如果在命令行制定了 -c 参数,则是 -c 参数的值,在本例子中,为 2。
self.create_process_queues(): None for _ in range(processes) 其实就是创建出来了一堆读和写的管道,具体逻辑在 billiard/connection.py 文件中,因为逻辑较复杂,所以本文就省略了。
根据流向的不同和主进程与子进程的不同,之后会分别关闭对应的的一端的管道,比如父进程把写关闭,子进程就把读关闭。并会用抽象的数据结构进行封装以便于管理。这个数据结构的实例用来为主进程和即将 fork 的子进程提供双向的数据传输。
同样的,会根据子进程的数量创建出多个管道实例来。其中有个比较奇怪的一点就是,我在父进程关闭了一端的管道,fork 了之后,结果在子进程还是可以用这一端。
这个也许是 fork 的子进程不继承父进程的管道关闭状态?其中最重要的方法是 super(AsynPool, self).init(processes, *args, **kwargs) 中执行的 self._create_worker_process(i),这里面就是 fork 的关键所在。相关源码如下:
# 这个类在 celery 的依赖库 billiard 中的 pool.py 文件中
# billiard/pool.py
class Pool(object):
'''
Class which supports an async version of applying functions to arguments.
'''
# 省略
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None, timeout=None, soft_timeout=None,
lost_worker_timeout=None,
max_restarts=None, max_restart_freq=1,
on_process_up=None,
on_process_down=None,
on_timeout_set=None,
on_timeout_cancel=None,
threads=True,
semaphore=None,
putlocks=False,
allow_restart=False,
synack=False,
on_process_exit=None,
context=None,
max_memory_per_child=None,
enable_timeouts=False,
**kwargs):
# 省略
# 重点关注!
for i in range(self._processes): #cityblack !important
self._create_worker_process(i)
def _create_worker_process(self, i):
sentinel = self._ctx.Event() if self.allow_restart else None
inq, outq, synq = self.get_process_queues()
w = self.WorkerProcess(self.Worker(
inq, outq, synq, self._initializer, self._initargs,
self._maxtasksperchild, sentinel, self._on_process_exit,
# Need to handle all signals if using the ipc semaphore,
# to make sure the semaphore is released.
sigprotection=self.threads,
wrap_exception=self._wrap_exception,
max_memory_per_child=self._max_memory_per_child,
))
self._pool.append(w)
self._process_register_queues(w, (inq, outq, synq))
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.index = i
w.start() # 重点关注!
self._poolctrl[w.pid] = sentinel
if self.on_process_up:
self.on_process_up(w)
return w
inq, outq, synq = self.get_process_queues() 拿到的是一个读和写的管道的抽象对象。这个管道是之前预先创建好的(就是上面 self.create_process_queues() 创建的)。
主要是给即将 fork 的子进程用的,子进程会监听这管道数据结构抽象实例中的读事件,还可以从写管道写数据。
w,也就是 self.WorkerProcess 的实例,其实是对 fork 出来的子进程的一个抽象封装。用来方便快捷的管理子进程,抽象成一个进程池,这个 w 会记录 fork 出来的子进程的一些 meta 信息,比如 pid,管道的读写的 fd 等等,并注册在主进程中,主进程可以利用它进行任务分发。w.start() 中包含具体的 fork 过程,相关源码在:
#billiard/process.py
# 省略
class BaseProcess(object):
# 省略
def start(self):
'''
Start child process
'''
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
_cleanup()
self._popen = self._Popen(self) # 重点关注!
self._sentinel = self._popen.sentinel
_children.add(self)
# 省略
我们看到其中主要是 self._popen = self._Popen(self) 比较重要,我们看下 Popen 的源码:\
# billiard/popen_fork.py
# 省略
class Popen(object):
method = 'fork'
sentinel = None
def __init__(self, process_obj):
sys.stdout.flush()
sys.stderr.flush()
self.returncode = None
self._launch(process_obj)
# 省略
def _launch(self, process_obj):
code = 1
parent_r, child_w = os.pipe()
self.pid = os.fork()
if self.pid == 0:
try:
os.close(parent_r)
if 'random' in sys.modules:
import random
random.seed()
code = process_obj._bootstrap()
finally:
os._exit(code)
else:
os.close(child_w)
self.sentinel = parent_r
看到这里我们应该明白了。在执行 launch 方法的时候,会使用 os.fork() 派生出一个子进程,并且使用 ps.pipe() 创建出一对读写的管道,之后通过比较 self.pid 是否为 0,在主进程和子进程中执行不同的逻辑。子进程关闭 读 管道,之后执行 process_obj._bootstrap() 方法。
然后就是 process_obj._bootstrap(),这个方法就是子进程执行的最后一个方法。当子进程执行完这个方法后,这个子进程已经进入了可用状态,随时等待着从主进程的管道接受任务。具体的流程比较复杂,我直接展示 process_obj._bootstrap() 的最后一步的源码,他会执行 workloop 方法,进入一个无限的循环:
# billiard/pool.py
# 省略
#
# Code run by worker processes
#
class Worker(object):
# 省略
def workloop(self, debug=debug, now=monotonic, pid=None):
pid = pid or os.getpid()
put = self.outq.put
inqW_fd = self.inqW_fd
synqW_fd = self.synqW_fd
maxtasks = self.maxtasks
max_memory_per_child = self.max_memory_per_child or 0
prepare_result = self.prepare_result
wait_for_job = self.wait_for_job
_wait_for_syn = self.wait_for_syn
def wait_for_syn(jid):
i = 0
while 1:
if i > 60:
error('!!!WAIT FOR ACK TIMEOUT: job:%r fd:%r!!!',
jid, self.synq._reader.fileno(), exc_info=1)
req = _wait_for_syn()
if req:
type_, args = req
if type_ == NACK:
return False
assert type_ == ACK
return True
i += 1
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
req = wait_for_job()
if req:
type_, args_ = req
assert type_ == TASK
job, i, fun, args, kwargs = args_
put((ACK, (job, i, now(), pid, synqW_fd)))
if _wait_for_syn:
confirm = wait_for_syn(job)
if not confirm:
continue # received NACK
try:
result = (True, prepare_result(fun(*args, **kwargs)))
except Exception:
result = (False, ExceptionInfo())
try:
put((READY, (job, i, result, inqW_fd)))
except Exception as exc:
_, _, tb = sys.exc_info()
try:
wrapped = MaybeEncodingError(exc, result[1])
einfo = ExceptionInfo((
MaybeEncodingError, wrapped, tb,
))
put((READY, (job, i, (False, einfo), inqW_fd)))
finally:
del(tb)
completed += 1
if max_memory_per_child > 0:
used_kb = mem_rss()
if used_kb <= 0:
error('worker unable to determine memory usage')
if used_kb > 0 and used_kb > max_memory_per_child:
error(MAXMEM_USED_FMT.format(
used_kb, max_memory_per_child))
return EX_RECYCLE
debug('worker exiting after %d tasks', completed)
if maxtasks:
return EX_RECYCLE if completed == maxtasks else EX_FAILURE
return EX_OK
# 省略
这个 workloop 其实很明显,就是监听读管道的数据(主进程从这个管道的另一端写),然后执行对应的回调,期间会调用 put 方法,往写管道同步状态(主进程可以从管道的另一端读这个数据).