最近在实验室中做一个项目(心酸啊,本想安安静静的清清闲闲的毕业的,没想到。。。),要把实验室发过的论文中的一些算法集成到一个 Web 服务器上,用户可以上传数据,还可以在 Web 上看到计算结果的可视化图表。整个服务器的后台是使用 Django 框架来搭建的,这些算法需要处理一定量的数据,使用了 numpy,pandas,scipy 等数值计算库,每一组数据的处理有时候需要跑好几个小时。为了合理的调度这些算法,我们这里使用了 Celery。
1. Django 处理 Request 的基本流程
上面的这一张是网络上的 Django 处理 request 的流程示意图。大致意思就是:
浏览器发起 http 请求 ----> http handling(request 解析) ----> url 匹配(正则匹配找到对应的 View) ----> 在View中进行逻辑的处理与数据计算(包括调用 Model 类进行数据库的增删改查)----> 将数据推送到 template,返回对应的 template/response。
对于一些简单的操作,可以放在 View 中处理。在View处理任务时用户处于等待状态,直到页面返回结果。但是对于一些复杂的操作,则在 View 中应该先返回 response,再在后台处理任务。用户无需等待。当任务处理完成时,我们可以再通过 Ajax 之类的方式告知用户。
Celery 就是基于 Python 开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。
2. Celery
上图是 Celery 的基本架构,它采用典型的生产生--消费者模式,主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果)。实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。
3. 安装 Celery
安装过程就是直接按照官网上的文档安装即可。我这里用的均是目前的最新稳定版。
- macOS Sierra 10.12.3
- Django 1.10
- Celery 4.0.2
在早前版本的 Celery 中,有一个专门供 Django 使用的 Celery 版本:django-celery。但是在现在 Celery 已经统一为一个版本,所以直接安装原生的 Celery 即可:
pip install celery
Celery 推荐使用 RabbitMQ,Redis,Amazon SQS,Zookeeper,这几个作为 broker,但是只有前两个支持在生产环境使用。下面的表格对比了几种 broker。
Name | Status | Monitoring | Remote Control |
---|---|---|---|
RabbitMQ | Stable | Yes | Yes |
Redis | Stable | Yes | Yes |
Amazon SQS | Stable | No | No |
Zookeeper | Experimental | No | No |
我是使用 Redis 作为 broker 的。除了安装 redis 之外,还应该安装 redis 的 python 支持库。
安装 Redis:
brew install redis
安装 redis 的 python 支持库:
pip install redis
输入 redis-server
来开启 redis。当你看见下面的图案时,就说明成功开启了 redis。redis 默认监听 6379 端口。开启之后可以用 ctrl+c 来退出。
4. 把 Celery 配置到 Django 上
假设你有一个项目 proj:
- proj/
- proj/__init__.py
- proj/settings.py
- proj/urls.py
- manage.py
Celery 建议在 proj/proj/celery.py
上定义一个 Celery 的实例。
文件 proj/proj/celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
然后再在proj/proj/__init__.py
做一些配置。
文件 proj/proj/__init__.py:
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
完成上面的步骤之后,可以在命令行输入:
celery worker -A proj -l info
正常情况下,应该会出现类似于下图的输出。
ok,接下来,为了让 celery 中执行的任务的结果返回我们的 Django,我们还应该安装 django-celery-results
。
pip install django-celery-results
再在 proj/proj/settings.py:
中做如下的设置:
文件proj/proj/settings.py:
# Celery 设置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_TIMEZONE = 'Asia/Shanghai'
INSTALLED_APPS = [
...
...
'django_celery_results'
]
再 migrate 一下:
migrate django_celery_results
5. 加入一个耗时任务
在你的 app 的目录下,新建一个 tasks.py
文件。在里面加入一个耗时的任务:
from __future__ import absolute_import, unicode_literals
from celery import shared_task
# 模拟一个耗时操作
@shared_task
def longtime_test():
...
# 在这里进行一些耗时操作
...
在 views.py
中,写成这样:
def test_view(request):
# do something
longtime_test.delay()
return render(request, 'template.html', {'data': data})
这样之后,就会先返回 html 模版,再在后台计算数据了。