在程序运行过程中,要执行一个很久的任务,但是我们又不想主程序被阻塞,常见的方法是多线程。可是当并发量过大时,多线程也会扛不住,必须要用线程池来限制并发个数,而且多线程对共享资源的使用也是很麻烦的事情。还有就是前面几篇介绍过的协程,但是协程毕竟还是在同一线程内执行的,如果一个任务本身就要执行很长时间,而不是因为等待IO被挂起,那其他协程照样无法得到运行。本文要介绍一个强大的分布式任务队列Celery,它可以让任务的执行同主程序完全脱离,甚至不在同一台主机内。它通过队列来调度任务,不用担心并发量高时系统负载过大。它可以用来处理复杂系统性能问题,却又相当灵活易用。
架构组成:
参考引用:http://www.bjhee.com/celery.html
一个完整的Celery分布式队列架构应该包含一下几个模块:
- 消息中间人 Broker
消息中间人,就是任务调度队列,通常以独立服务形式出现。它是一个生产者消费者模式,即主程序将任务放入队列中,而后台职程则会从队列中取出任务并执行。任务可以按顺序调度,也可以按计划时间调度。Celery组件本身并不提供队列服务,你需要集成第三方消息中间件。Celery推荐的有RabbitMQ和Redis,另外也支持MongoDB、SQLAlchemy、Memcached等,但不推荐。 - 任务执行单元 Worker,也叫职程
即执行任务的程序,可以有多个并发。它实时监控消息队列,获取队列中调度的任务,并执行它。 - 执行结果存储 Backend
由于任务的执行同主程序分开,如果主程序想获取任务执行的结果,就必须通过中间件存储。同消息中间人一样,存储也可以使用RabbitMQ、Redis、MongoDB、SQLAlchemy、Memcached等,建议使用带持久化功能的存储中间件。(另外,并非所有的任务执行都需要保存结果,这个模块可以不配置。)
安装CELERY
pip install celery
pip install django=celery=3.x.x
备注:经过搜索发现是因为winsows是不支持celery4的。参照的回答在这https://github.com/celery/celery/issues/3551
Framework Integration | |
---|---|
Django | django-celery |
Pyramid | pyramid_celery |
Pylons | celery-pylons |
Flask | not needed |
web2py | web2py-celery |
Tornado | tornado-celery |
为了支持redis
pip install 'celery[redis]'
help:
celery help
celery worker --help
- 然后,我们编写任务代码TASKS.PY
from celery import Celery
app = Celery('tasks',
broker='amqp://guest@localhost//',
backend='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
启动后台职程
职程会监听消息中间人队列并等待任务调度,启动命令为:
$ celery worker -A tasks --loglevel=info --concurrency=5
解释:
- 参数”-A”指定了Celery实例的位置,更建议你指定Celery对象名称,如”-A tasks.app”。
- 参数”loglevel”指定了日志等级,也可以不加,默认为warning。
- 参数”concurrency”指定最大并发数,默认为CPU核数。
输入指令:
- 任务发送到消息中间人队列
>>> from tasks import add
>>> add.delay(2, 5)
<AsyncResult: 4c079d93-fd5f-47f0-8b93-c77a0112eb4e>
这个”delay()”方法会将任务发送到消息中间人队列,并由之前启动的后台职程来执行。所以这时Python控制台上只会返回”AsyncResult”信息。如果你看下之前职程的启动窗口,你会看到多了条日志”Task tasks.add[4c079d93-fd5f-47f0-8b93-c77a0112eb4e] succeeded in 0.0211374238133s: 7″。说明”add”任务已经被调度并执行成功,并且返回7。
- 配置了后台结果存储(backend),我们可以通过如下方法获取任务执行后的返回值:
>>> result=add.delay(2, 5)
>>> result.ready()
True
>>> result.get(timeout=1)
7
- 关于并发
任务的并发默认采用多进程方式,Celery也支持gevent或者eventlet协程并发。方法是在启动职程时使用”-P”参数:
celery worker -A tasks --loglevel=info -P gevent -c 100
通过”-P gevent”我们就将并发改为了gevent方式了;”-c 100″同之前介绍的”concurrency”参数,指定了并发个数。
- 关于后台
配置了Redis存储,那让我们去Redis里看看Celery任务执行的结果是怎么存储的吧。通过”keys celery*”,可以查到所有属于celery的键值.
一条记录详细内容是:
#JSon序列化存在Redis:
"{\"status\": \"SUCCESS\", \"traceback\": null, \"result\": 7, \"task_id\": \"4c079d93-fd5f-47f0-8b93-c77a0112eb4e\", \"children\": []}"
关于配置 : 三种方式可供选择
- 单个参数配置
app.conf.CELERY_BROKER_URL = 'amqp://guest@localhost//'
app.conf.CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
- 多个参数配置
app.conf.update(
CELERY_BROKER_URL = 'amqp://guest@localhost//',
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
)
- 从配置文件中获取
先将配置项放入配置文件中,如”celeryconfig.py”
BROKER_URL='amqp://guest@localhost//'
CELERY_RESULT_BACKEND='redis://localhost:6379/0'
然后导入到celery对象中:
app.config_from_object('celeryconfig')
资料参考:
https://blog.csdn.net/yeyingcai/article/details/78647553(很容易懂)
https://www.jianshu.com/p/b7f843f21c46
https://www.jianshu.com/p/f1f2cd1cd491(实践例子)
https://blog.csdn.net/weixin_43688726/article/details/89242366
from future import absolute_import : 在 3.0 以前的旧版本中启用相对导入等特性所必须的 future 语句。