第一步操作当然是安装:pip install celery
新建一个celery文件夹(包),包含任务,配置,主函数(启动)文件
1. 首先编写配置文件,如:config.py
broker_url = "redis://ip:6379/4" # 将任务队列用redis存储
2. 编写主函数文件,如:main.py
from celery import Celery
import os
# 如果找不到项目中的celery文件就需要配置环境变量
if not os.getenv('DJANGO_SETTINGS_MODULE'):
os.environ['DJANGO_SETTINGS_MODULE'] = 'dj31_env.settings.dev'
# 创建实例,这里可以不传参数也不会出现问题
celery_app = Celery('sms_code')
# 加载配置,将配置文件中的配置信息加载
celery_app.config_from_object('celery.config')
# 注册任务,可以提交多个任务,所以用列表类型
# 将任务函数注册
celery_app.autodiscover_tasks(['celery.sms'])
3. 新建sms文件夹(包)编写任务文件,task.py
# 将主函数实例化的对象导入
from celery_tasks.main import celery_app
# 将发送短信的接口导入
from dj31_env.utils.yuntongxun.sms import CCP
# 导入日志模块做记录
import logging
logger = logging.getLogger('django')
# 用装饰器的方法提交任务
# bing=True,固定写法
# name=任务函数名
# retry_backoff=间隔发送时间
@celery_app.task(bind=True, name='send_sms_code', retry_backoff=3)
# 实现短信发送的功能
def send_sms_code(self, mobile, sms_code):
global send_res
try:
send_res = CCP().send_template_sms(mobile, [sms_code, 5], 1)
except Exception as e:
logger.error(e)
# 有异常触发3次
# raise self.retry(exc=e, max_retries=3)
raise self.retry(exc=Exception('异常信息:%s' % send_res))
if send_res !=0:
raise self.retry(exc=Exception('发送短信失败:%s' % send_res), max_retries=3)
return send_res
4. 实现生产者功能
# 在另外需要实现发送短信功能的文件中导入任务函数
from celery.sms import send_sms_code
# 调用delay方法开启任务,参数为send_sms_code需要传递的参数
send_sms_code.delay(参数:mobile, sms_code)
5. 启动消费者服务
celery -A celery.main worker -l info
注意事项:如果出现报错信息请参考如下链接解决
https://www.jianshu.com/p/66e271dbdf2d