Django+Celery+xadmin实现异步任务和定时任务

Django+Celery+xadmin实现异步任务和定时任务

关注公众号“轻松学编程”了解更多。

一、celery介绍

1、简介

【官网】http://www.celeryproject.org/

Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务( async task )和定时任务( crontab )。

  • 异步任务:比如发送邮件、短信,或者文件上传, 图像处理等等一些比较耗时的操作 ;

  • 定时任务:需要在特定时间执行的任务。

架构组成如图:

image

2、Celery 主要包含以下几个模块

2.1 任务模块 Task

异步任务通常在业务逻辑中被触发并发往任务队列;

定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

2.2 消息中间件 Broker

Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。 Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

2.3 任务执行单元 Worker

Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

2.4 任务结果存储 Backend

Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用RabbitMQ, Redis 和 MongoDB 等。 MQ全称为Message Queue。

消息队列(MQ)是不同的应用程序相互通信的一种方法。

MQ是消费者-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。

二、异步任务

假设已经有了一个Django项目(我的项目名是MySites),下面演示如何使用Celery实现异步任务。

1、安装celery

pip install celery
pip install django-celery
pip install redis==2.10.6
pip install django_celery_beat

注意:如果出现以下错误,一般是celery版本不对,重新安装较高版本即可。

Running django in virtualenv - ImportError: No module name django.core.management

2、配置settings.py

我使用Redis作为消息队列。

在settings.py中增加:

#django-celery
import djcelery
​
djcelery.setup_loader()
#使用本地redis服务器中的0号数据库,redis密码为123456
BROKER_URL = 'redis://:123456@127.0.0.1:6379/0'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_RESULT_BACKEND = 'redis://:123456@127.0.0.1:6379/1'
CELERY_ENABLE_UTC = False
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_RESULT_EXPIRES = 10
CELERYD_LOG_FILE = BASE_DIR + "/logs/celery/celery.log"
CELERYBEAT_LOG_FILE = BASE_DIR + "/logs/celery/beat.log"
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
​```

其中,当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery_task。

BROKER_URL和CELERY_RESULT_BACKEND分别指代你的Broker的代理地址以及Backend(result store)数据存储地址。

在Django中如果没有设置backend,会使用其默认的后台数据库用来存储数据。

注册celery应用:

```python
INSTALLED_APPS = [
 #其它应用
 ...
 #celery应用
 'djcelery',
]

3、创建celery.py文件

在项目中Mysites下创建celery.py文件(与settings.py同级)

image

内容为:

from celery import Celery
from django.conf import settings
import os
​
# 为celery设置环境变量, 改为你项目的settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MySites.settings')
​
# 创建应用
app = Celery('mysites')
​
# 配置应用
app.conf.update(
 # 本地Redis服务器
 BROKER_URL=settings.BROKER_URL,
)
​
app.autodiscover_tasks(settings.INSTALLED_APPS)

4、创建tasks.py文件

在子应用app下创建tasks.py:

image

内容为:

from MySites.celery import app
​
@app.task
def start_running(info):
 print(info)
 print('--->>开始执行任务<<---')
 print('比如发送短信或邮件')
 print('>---任务结束---<')

5、修改views.py

在views.py中增加需要执行的异步任务,比如:

from SitesApp.tasks import start_running
​
#celery测试
class CeleryTask(View):
 def get(self, request):
 print('>=====开始发送请求=====<')
 start_running.delay('发送短信')
 # start_running.apply_async(('发送短信',), countdown=10)  # 10秒后再执行异步任务
 return HttpResponse('<h2> 请求已发送 </h2>')

其实关键代码就一条start_running.delay(参数),当执行这条代码时,系统会把tasks.py中的start_running函数推迟执行,即放入消息队列中。

系统相当于跳过start_running.delay('发送短信')执行后面的语句,这就是异步任务。

6、修改项目下(不是子应用下)的urls.py

from SitesApp import views
​
urlpatterns = [
 #其它url
 ...
 #celery测试url
 url('^celery/',views.CeleryTask.as_view()),
]

7、启动服务

在Terminal中输入:

python manage.py runserver

在另一个Terminal中:

celery -A MySites worker --loglevel=DEBUG
image
image

8、查看异步任务情况

Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现,如下图所示:

image

下实现的方式如下:

  1. 安装flower:

pip install flower


2.  启动flower(默认会启动一个webserver,端口为5555):

    在另一个Terminal中:

    ```python
python manage.py celery flower

3.进入http://localhost:5555即可查看。

9、一些错误解决方法

错误1:

Celery ValueError: not enough values to unpack (expected 3, got 0)

看别人描述大概就是说win10上运行celery4.x就会出现这个问题,解决办法如下,原理未知:

先安装一个``eventlet`

pip install eventlet

启动worker时加上参数-P eventlet

celery -A MySites worker --loglevel=DEBUG -P eventlet

三、定时任务

1、配置settings.py

注册django_celery_beat应用:

INSTALLED_APPS = [
 #其它应用
 ...
 #django_celery_beat应用
 'django_celery_beat',
]

2、配置项目目录下的__init__()

from __future__ import absolute_import
from MySites.celery import app as celery_app

import pymysql
pymysql.install_as_MySQLdb()

image

3、修改celery.py文件

修改项目目录(和settings.py)同级的celery.py:

from __future__ import absolute_import

import os
from celery import Celery, platforms
from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MySites.settings')

# MySites主应用名
app = Celery('mysites')
platforms.C_FORCE_ROOT = True

# 配置应用
app.conf.update(
    # 本地Redis服务器
    BROKER_URL=settings.BROKER_URL,

)

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

4、修改adminx.py文件

我使用xadmin作为后台管理。增加以下代码:

#adminx.py
from __future__ import absolute_import, unicode_literals
from djcelery.models import (
  TaskState, WorkerState,
  PeriodicTask, IntervalSchedule, CrontabSchedule,
)

#celery

xadmin.site.register(IntervalSchedule) # 存储循环任务设置的时间
xadmin.site.register(CrontabSchedule) # 存储定时任务设置的时间
xadmin.site.register(PeriodicTask) # 存储任务
xadmin.site.register(TaskState) # 存储任务执行状态
xadmin.site.register(WorkerState) # 存储执行任务的worker

5、修改tasks.py文件

from __future__ import absolute_import
from MySites.celery import app
from celery import task, shared_task

@app.task
def start_running(info):
    print(info)
    print('--->>开始执行任务<<---')
    print('比如发送短信或邮件')
    print('>---任务结束---<')

@task
def pushMsg(uid,msg):
    print('推送消息',uid,msg)
    return True

@shared_task
def add(x,y):
    print('加法:',x + y)
    return x + y

@shared_task
def mul(x, y):
    print('乘法',x*y)
    return x * y

6、数据库迁移和创建管理员

python manage.py makemigrations
python manage.py migrate
python manage.py createsuperuser

8、启动服务器

python manage.py runserver

在浏览器打开http://127.0.0.1:10501/xadmin/

Crontabs:定时任务执行时间;

Intervals:简单的间隔执行时间,比如每10秒执行一次;

Periodic tasks:配置定时任务;

Tasks:任务监控;

Workers:运行的worker。

image

配置执行时间:

image

添加定时任务:

image

9、终端启动celery命令

# 启动
#linux下
celery -A MySites worker -B  # MySites为celery和setting所在文件夹名
#Windows下先启动celery-beat
celery -A MySites beat -l debug --max-interval=10
#然后再启动worker
celery -A MySites worker -l debug -P eventlet
#注意:在正式环境下把debug改为info

# 查看注册的task
celery -A MySites inspect registered

# flower监控celery
celery flower
ip:5555

注意:出现以下错误是windows不支持celery4.0以上版本,降低为3.1版本即可

SystemError: <class 'OSError'> returned a result with an error set 
pip uninstall celery
pip install celery==3.1.22

补充:celery4.0以上版本不再支持 Microsoft Windows ,不再支持使用Django ORM作为代理 。

10、服务器使用Supervisor后台运行Celery

pip install supervisor

我们可以使用echo_supervisord_conf命令得到supervisor配置模板,打开终端执行如下Linux shell命令:

echo_supervisord_conf > supervisord.conf

该命令输出文件到当前目录下(当然,你也可以指定绝对路径到具体位置),文件名为supervisord.conf 修改supervisord.conf文件,在文件最后加入:

[program:celery.worker] 
;指定运行目录 
directory=/home/你的项目名称
;运行目录下执行命令
command=celery -A 你的项目名称worker --loglevel info --logfile celery_worker.log

;启动设置 
numprocs=1          ;进程数
autostart=true      ;当supervisor启动时,程序将会自动启动 
autorestart=true    ;自动重启

;停止信号,默认TERM 
;中断:INT (类似于Ctrl+C)(kill -INT pid),退出后会将写文件或日志(推荐) 
;终止:TERM (kill -TERM pid) 
;挂起:HUP (kill -HUP pid),注意与Ctrl+Z/kill -stop pid不同 
;从容停止:QUIT (kill -QUIT pid) 
stopsignal=INT
;输出日志 
stdout_logfile=celery_worker.log 
stdout_logfile_maxbytes=10MB  ;默认最大50M 
stdout_logfile_backups=10     ;日志文件备份数,默认为10 

;错误日志 
redirect_stderr=false         ;为true表示禁止监听错误 
stderr_logfile=celery_worker_err.log 
stderr_logfile_maxbytes=10MB 
stderr_logfile_backups=10

启动supervisor输入如下命令,使用具体的配置文件执行: 先运行虚拟环境

supervisord -c supervisord.conf

关闭supervisor输入如下命令:

supervisorctl -c supervisord.conf shutdown

重启supervisor输入如下命令:

supervisorctl -c supervisord.conf reload

后记

【后记】为了让大家能够轻松学编程,我创建了一个公众号【轻松学编程】,里面有让你快速学会编程的文章,当然也有一些干货提高你的编程水平,也有一些编程项目适合做一些课程设计等课题。

也可加我微信【1257309054】,拉你进群,大家一起交流学习。 如果文章对您有帮助,请我喝杯咖啡吧!

公众号

image
image

关注我,我们一起成长~~

【参考文章】: http://yshblog.com/blog/165 https://www.jb51.net/article/133507.htm

https://blog.csdn.net/Shyllin/article/details/80940643

https://www.cnblogs.com/znicy/p/5626040.html

https://blog.csdn.net/qq_30242609/article/details/79047660

https://www.jb51.net/article/145573.htm

https://www.cnblogs.com/dengshihuang/p/8258621.html

https://blog.csdn.net/yeyingcai/article/details/78647553

http://www.manongjc.com/article/5685.htmlhttps://www.cnblogs.com/huangxiaoxue/p/7266253.html

https://blog.csdn.net/xiemanr/article/details/70331120

http://www.linuxdiyf.com/viewarticle.php?id=562707

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容