Celery分布式任务队列

一、Celery介绍和使用:

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

  • 你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
  • 你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,后面会讲

celery图示:

image.png

Celery有以下优点:

  • 简单:一旦熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制

Celery基本工作流程图:

image.png

user:用户程序,用于告知celery去执行一个任务。
broker: 存放任务(依赖RabbitMQ或Redis,进行存储)
worker:执行任务

二、基于redis实现的Celery

  • 本机安装redis
    brew install redis
  • 安装Celery:
    pip3 install celery
  • 后台启动redis服务
    redis-server
  • 创建一个celery对象和任务
    创建一个任务文件就叫tasks.py
from celery import Celery

app = Celery('tasks',
             broker='redis://localhost',
             backend='redis://localhost')

@app.task
def my_task(x, y):
    print("running...", x, y)
    return x + y
  • 创建user.py用户程序
from tasks import my_task

# 立即告知celery去执行my_task任务,并传入两个参数
result = my_task.delay(4, 4)
print(result.id)
  • 启动Celery 创建Worker来开始监听并执行任务(要在项目目录里执行)
    celery -A tasks worker --loglevel=info
    在windows是不支持这个命令的,要安装 pip3 install eventle,然后执行:
    celery -A tasks worker --loglevel=info -P eventlet
屏幕快照 2018-11-30 上午1.08.54.png
  • 执行 user.py ,创建一个任务并获取任务ID:
    python3 user.py
屏幕快照 2018-11-30 上午1.22.00.png
屏幕快照 2018-11-30 上午1.19.29.png
  • 查看任务执行情况(注意:代码中的id为执行任务时返回的id值)
from celery.result import AsyncResult
from tasks import app

async = AsyncResult(id="3dd9081b-e62f-4085-84f4-36bba3255df6", app=app)

if async.successful():
   result = async.get()
   print(result)
   # result.forget() # 将结果删除
elif async.failed():
   print('执行失败')
elif async.status == 'PENDING':
   print('任务等待中被执行')
elif async.status == 'RETRY':
   print('任务异常后正在重试')
elif async.status == 'STARTED':
   print('任务已经开始被执行')

是不是还是很懵逼,感觉不知道怎么用在实际的业务中。请往下看,通过Flask实现一个抢购商品的模拟示例,你就领略了celery的强大之处!!!

步骤一:首先创建一个celery_task.py文件用来创建celery对象和task任务:
import time
import random
from celery import Celery

# 创建celery对象
app = Celery('tasks', broker='redis://127.0.0.1:6379', 
                    backend='redis://127.0.0.1:6379')

# 创建任务
@app.task
def create_order(gid):
    time.sleep(10)
    v = random.randint(1,4)
    if v == 2:
        return '抢购成功'
    else:
        return '抢购失败'
步骤二:创建一个manage.py文件用来创建一个flask程序和相应的逻辑:
from flask import Flask, render_template, request
from celery.result import AsyncResult # 异步获取结果
from celery_task import create_order # 导入任务
from celery_task import app as celery_app # 导入celery对象

#实例化flask对象
app = Flask(__name__)

GOODS_LIST = [
    {'id': 1, 'title': '小米手机'},
    {'id': 2, 'title': '小米手环'},
    {'id': 3, 'title': '小米电视'},
]

@app.route('/goods')
def goods():
    return render_template('goods.html', gds=GOODS_LIST)

@app.route('/buy')
def buy():
    gid = request.args.get('gid') # 获取前端用户要购买的商品id
    result = create_order.delay(gid) #执行抢购任务
    return render_template('tips.html', task_id=result.id)

@app.route('/check')
def check():
    task_id = request.args.get('task') #获得参数
    async = AsyncResult(id=task_id, app=celery_app)#查看任务结果
    if async.successful():
        result = async.get()# 获取任务结果
        return result
    else:
        return '还在排队等待中'

if __name__ == '__main__':
    app.run()
步骤三:启动Celery 创建Worker来开始监听并执行任务(要在项目目录里执行)

celery -A celery_task worker --loglevel=info

步骤四:启动flask程序,访问 http://127.0.0.1:5000/goods
屏幕快照 2018-12-01 下午8.46.31.png
步骤五:点击购买,实际上就是用户创建了一个任务,并返回一个任务ID,任务被丢进了redis队列中,等待执行任务的worker就会自动执行队列中的任务,请看redis中的数据:
屏幕快照 2018-12-01 下午8.53.20.png
屏幕快照 2018-12-01 下午8.55.35.png
步骤六:点击点我,实际上是通过任务ID来查看任务执行的情况

屏幕快照 2018-12-01 下午8.58.02.png

从这张截图的URL参数可以看出任务ID和redis中的任务ID是一样的,末尾都是22c8

这样就使用celery简单实现了商品抢购的业务示例,有没有清楚一些!!!

三、Celery的定时任务

celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫celery beat

Celery 中启动定时任务有两种方式,(1)在程序中指定(2)在配置文件中指定

1、在程序中指定

写一个脚本periodic_task.py文件:

from celery import Celery
from celery.schedules import crontab

app = Celery("tasks",
             broker='redis://localhost',  # 消息代理
             backend='redis://localhost',  # 结果存储
            )

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

add_periodic_task 会添加一条定时任务

任务添加好了,需要让celery单独启动一个进程来定时发起这些任务, 注意, 这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划, 每发现有任务需要执行了,就发起一个任务调用消息,交给celery worker去执行

  • 启动任务调度器 celery beat(cd 到文件目录)
    celery -A periodic_task beat
屏幕快照 2018-12-01 下午10.23.14.png

此时还差一步,就是还需要启动一个worker,负责执行celery beat发起的任务

  • 启动celery worker来执行任务(cd 到文件目录)
    celery -A periodic_task worker
屏幕快照 2018-12-01 下午10.25.35.png

此时观察worker的输出,是不是每隔一小会,就会执行一次定时任务!

2、在配置文件中指定

  • 首先看一下文件书写的方式:
屏幕快照 2018-12-02 上午1.28.44.png

celery_task/_init_.py

# 拒绝隐式引入,如果celery.py和celery模块名字一样,避免冲突,需要加上这条语句
# 该代码中,名字是不一样的,最好也要不一样
from __future__ import absolute_import
from celery import Celery

app = Celery('tasks',
             broker='redis://localhost',  # 消息代理
             backend='redis://localhost',  # 结果存储
             )

app.config_from_object('celery_task.config')

config.py

from __future__ import absolute_import
from celery.schedules import crontab
from datetime import timedelta

# 使用redis存储任务队列
broker_url = 'redis://127.0.0.1:6379/7'
# 使用redis存储结果
result_backend = 'redis://127.0.0.1:6379/8'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
# 时区设置
timezone = 'Asia/Shanghai'
# celery默认开启自己的日志
# False表示不关闭
worker_hijack_root_logger = False
# 存储结果过期时间,过期后自动删除
# 单位为秒
result_expires = 60 * 60 * 24

# 导入任务所在文件
imports = [
    'celery_task.tasks.task',
]

# 需要执行任务的配置
beat_schedule = {
    'test1': {
        # 具体需要执行的函数
        # 该函数必须要使用@app.task装饰
        'task': 'celery_task.tasks.task.remove',
        # 定时时间
        # 每分钟执行一次,不能为小数
        'schedule': crontab(minute='*/1'),
        # 或者这么写,每小时执行一次
        # "schedule": crontab(minute=0, hour="*/1")
        # 执行的函数需要的参数
        'args': ("hello",)
    },
}

task.py

from __future__ import absolute_import, unicode_literals
from celery_task import app

@app.task
def remove(path):

    print(path)
    # to do anything
    return True
  • 启动任务调度器 celery beat(cd 到文件目录)
    celery -A celery_task beat
屏幕快照 2018-12-02 上午1.33.14.png
  • 启动celery worker来执行任务(cd 到文件目录)
    celery -A celery_task worker -l info
屏幕快照 2018-12-02 上午1.34.10.png

Django中使用celery 定时任务

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容