应用
需要一个celery实例,即应用。这个应用是使用所有东西的进入点,例如创建任务、管理工作进程,必须可被其他模块引入。
tasks.py
# coding: utf8
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
# 传入的tasks参数即当前的模块名称,broker即为消息队列的地址
# ampb(RabbitMQ) redis
# 下面创建任务
@app.task
def add(x, y):
return x + y
执行程序,启动工作进程:
celery -A tasks worker --loglevel=info
调用
from tasks import add
add.delay(4, 4)
现在task是被之前启动的工作进程来执行,返回值是一个AsyncResult,可以用来判断任务的状态、等待该任务执行完毕或是获得它的返回值。默认是不返回的,需要配置result backend,也可以在工作进程的命令行输出窗口中看到。
保存结果
可以使用很多backend 例如Django的ORM、SQLAlchemy,Redis,RabbitMQ。
app = Celery('tasks', backend='rpc://', broker='pyamqp://')
# 这边使用的backend是RabbitMQ的rpc远程调用
result = add.delay(4, 4) # 现在就可以获得返回的result了
result.ready() # 判断任务是否执行完成
result.get(timeout=1) # 等待任务执行(一般不用)
# 如果任务出错了这边也会直接获得异常 或:
result.get(propagate=False) # 不抛出
result.traceback # 再获得异常信息
配置
app.conf.task_serializer = 'json'
# 设置task的序列化方式
# 一次设置很多选项
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
)
使用配置模块:
app.config_from_object('celeryconfig')
celeryconfig.py
broke_url = 'pyamqp://'
task_serializer = 'json'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
enable_utc = True
task_routes = {
'tasks.add': 'low-priority', # 把任务路由到某个队列
}
task_annotations = {
'tasks.add': {'rate_limit': '10/m'} # 限制该任务的发送速度
}
如果想测试配置文件是否有语法问题,和普通的py文件一样,使用:
python -m celeryconfig