Django接入Celery实现任务管理,并且启用flower进行任务的可视化查看。另外任务的结果除了保存在Redis中之外,我们也可以选在保存在MySQL等持久化数据库,方便后续做统计分析,结果查询等等操作
全栈运维 DailyJobOps - Django通过celery实现任务注册和管理
关于配置项的存放
按照官网提示的demo实现接入过程可能会遇到参数告警或者不生效的问题,原因在于
Celery will still be able to read old configuration files until Celery 6.0. Afterwards, support for the old configuration files will be removed. We provide the celery upgrade command that should handle plenty of cases (including Django).
也就是配置的时候两种方式配置项写法不一样
如果是在Django的settings配置文件中,写入采用CELERY_
开头参数项都是大写的方式,比如
BROKER_URL = 'redis://127.0.0.1:6379'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TASK_TIME_OUT = 30 * 60
CELERY_TIMEZONE = 'Asia/Shanghai'
但是如果是在放到一个独立的配置文件中,比如这里demo演示的config.py
broker_url = 'redis://127.0.0.1:6379'
result_backend = 'redis://127.0.0.1:6379'
accept_content = ['application/json']
result_serializer = 'json'
task_serializer = 'json'
task_time_out = 30 * 60
timezone = 'Asia/Shanghai'
# imports = (
# 'celery_app.task_add',
# )
注意这里也可以手动指定导入那些task,如果项目中的task太多的时候我们都是按照一定的规范创建之后,采用app.autodiscover_tasks()实现
这里先上基础版的demo
目录结构如下, 这里忽略了Django默认创建的一些基本文件
demo_djcelery
├── cmdb
│ ├── __init__.py
| ... ...
│ ├── import_tasks.py
| ... ...
├── demo_djcelery
│ ├── __init__.py
| ... ...
│ ├── celery.py
│ ├── config.py
| ... ...
└── manage.py
这里需要特殊说明的是,注意 cmdb 应用下的 import_tasks.py
,这样命名是为了区分 手动导入
和 自动发现
。
demo_djcelery/celery.py
# -*- coding: utf-8 -*-
from celery import Celery
from demo_djcelery import config
app = Celery('DemoCelery')
# 这种通过模块的方式导入是需要添加引号
# app.config_from_object('demo_djcelery.config')
# 这种import的方式config是不需要添加引号的
app.config_from_object(config)
# app.autodiscover_tasks()
- 这里的
app.autodiscover_tasks()
是注销的, 在如下config.py中启用了imports
- 注意导入config对象两种方式的区别
config.py
broker_url = 'redis://127.0.0.1:6379/1'
result_backend = 'redis://127.0.0.1:6379/1'
accept_content = ['application/json']
result_serializer = 'json'
task_serializer = 'json'
task_time_out = 30 * 60
timezone = 'Asia/Shanghai'
imports = (
'cmdb.import_tasks',
)
核心: 需要在 demo_djcelery 项目下的 init.py 中导入 celery app,目的是Django项目启动的时候能加载celery
from .celery import app as celery_app
__all__ = ('celery_app')
这个时候运行 celery -A demo_djcelery worker -l INFO
从结果中我们看到 task import_tasks
注册到了 celery
思考题
在现有代码逻辑上,注销 config.py 中的imports配置,开启 celery.py中的 app.autodiscover_tasks() ,能否实现任务的自动注册呢?
休息5分钟我们继续... ...
_ _ _
摸鱼回来,我们继续...
上面的思考题,有没有尝试实践以下呢?
答案是无法注册
任务 import_tasks。因为采用autodiscover的方式需要对任务创建有一定的要求
自动注册任务
在每个应用目录下创建一个tasks.py文件,文件的任务会会被自动注册到celery
这个时候我们在 cmdb 应用下新增 tasks.py 文件,内容如下
from celery import shared_task
@shared_task
def auto_task():
print("auto register task from cmdb tasks.py")
然后注销 config.py 中的imports配置项,开启 celery.py 中的 app.autodiscover_tasks() 然后启动 celery worker看看有没有自动注册上任务 cmdb.tasks.auto_task
命令行输入如下命令,然后回车
celery -A demo_djcelery worker -l INFO
What happened??? 为啥也没有注册上呢?
大家先想想为什么呢?
不告诉你答案....
不告诉你答案....
不告诉你答案....
嗯,是否记得这节开头说的话
在每个应用目录下创建一个tasks.py文件,文件的任务会会被自动注册到celery
所以:
1、需要在Django的 INSTALLED_APPS
中注册上 cmdb
2、其实还有个关键的一步,就是在celery.py中配置os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo_djcelery.settings')
目的是为了让celery能获取Django的settings配置文件,找到找到都安装了那些应用,然后丛这些安装的应用下去查找tasks.py文件,进行任务的自动注册
配置任务结果写入MySQL数据库
结果写入我们知道是用result_backend
配置项,之前我们配置的Redis,然后在Redis中可以看到任务执行的结果,类似如下:
7) "celery-task-meta-49b5034e-fa78-4bcf-b1ea-c9f990b2eb38"
8) "celery-task-meta-a58fa0ff-2051-47b2-b662-b191fdf2281e"
127.0.0.1:6379> get celery-task-meta-49b5034e-fa78-4bcf-b1ea-c9f990b2eb38
"{\"status\": \"SUCCESS\", \"result\": \"User Created\", \"traceback\": null, \"children\": [], \"date_done\": \"2022-01-30T06:28:26.109653\", \"task_id\": \"49b5034e-fa78-4bcf-b1ea-c9f990b2eb38\"}"
配置写入MySQL或者其他数据库,可以参考官网说明 https://docs.celeryproject.org/en/master/userguide/configuration.html#conf-database-result-backend
1、先安装相关第三方模块
pip install django-celery-results
2、然后配置到Django的 INSTALLED_APPS中,注意这里实际是下划线
INSTALLED_APPS = [
... ...,
'cmdb',
'django_celery_results',
]
3、 创建表
python manage.py migrate django_celery_results
4、然后在config.py中修改配置
result_backend = 'db+mysql://democelery:Democelery*2099@192.168.3.108:3306/demo_celery'
主要密码中不能有 @
和 #
特殊符号,否则会报错
5、然后重启 celery worker,在进行测试就可以看到写入到数据了
mysql> select * from celery_taskmeta ;
+----+--------------------------------------+---------+-----------------------------+---------------------+-----------+------+------+--------+--------+---------+-------+
| id | task_id | status | result | date_done | traceback | name | args | kwargs | worker | retries | queue |
+----+--------------------------------------+---------+-----------------------------+---------------------+-----------+------+------+--------+--------+---------+-------+
| 1 | ea3f73d9-b3f4-41ef-a5eb-0beac8e599c2 | SUCCESS | �� �
User Created�. | 2022-01-30 07:37:58 | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
+----+--------------------------------------+---------+-----------------------------+---------------------+-----------+------+------+--------+--------+---------+-------+
6 rows in set (0.05 sec)
这里需要注意:
虽然数据库migrate的时候生成了 表 django_celery_results_taskresult
,但是最终却又自动生成了表celery_taskmeta
,写入到该表种了。
刚开始自己在 taskresult 查了半天没有数据还以为哪里出错了...
celery flower
flower的具体介绍看官网就行 https://flower-docs-cn.readthedocs.io/zh/latest/ 这里不多说
目前flower已经集成在celery中,启动很简单
celery -A demo_djcelery flower
然后通过本地的5555端口就可以访问