celery学习笔记

Celery

标签(空格分隔): celery


Celery是一个分布式任务队列工具,是一个异步的任务队列基于分布式消息传递。参考官网

1. 基础概念

  • Broker,简单说就是"消息队列",Celery的基本工作是管理分配任务到不同的服务器,至于服务器之间的通信则是交给了如“RabbitMQ” 第三方服务。
  • Task,任务,在Celery中,每个Python function就是一个task,只要在function前面修饰”@task()“,Celery就知道这是一个task,需要异步调用task的时候,task.delay()就可以了,当然有更复杂的调用函数,task.apply_async(),这里面可以指定啥时候调用什么的,这里的调用就是将task加入到queue中,而queue是保存在指定的Broker中的。
  • Worker,Celery将你要异步处理的task加入到一个queue中,然后空闲的Worker就会将queue中的task给取走交给服务器。

2. 任务调用

任务(task)调用有三个API:

#给任务发送消息
apply_async(args[,kwargs[, ...]])

#给任务发送消息的简单版,但是不支持execution options(apply_async有三个部分的参数,第一部分就是task里面的python function的参数,比如add(x,y)的x,y,第二个参数叫作keyword arguments,就是设定一些环境变量,第三个参数就是execution options,也就是这个task本身的执行选项,时间啊之类)
delay(*args, **kwargs)

#类似直接调用的意思,即不是让worker来执行任务,而是当前的进程来执行。
calling(__call__)

关于execution options有这些主要的参数:

Link(callbacks/errbacks)

就是一个任务接着一个,回调任务作为一个partial argument在父任务完成的时候被调用。

add.apply_async((2, 2), link=add.s(16))

即(2 + 2) + 16 = 20

ETA and countdown

eta必须是一个datatime对象。你可以设定一个eta,可以让你的任务在这个eta开始。
countdown是一个整型,它是eta的简版,以秒为单位。
这两个都保证任务会在这个时间后执行,但是这个时间无法非常确定,因为各方面的原因(网络延时,任务队列太繁忙)。

Expiration

expires参数定义了一个可选的到期时间,可以以秒为单位,也可以以datetime。

序列化

数据在客户端于worker之间的传递需要序列化。
Celery内建的支持 pickle,json,yaml和msgpack方式序列化,当然也可以自定义序列化方式(要在Kombu中注册)。
如果要指定某种内建的方式:

>>> add.apply_async((10, 10), serializer='json')

压缩

Celery支持gzip和bzip2,当然你也可以自定义压缩的方式(要在kombu中注册)。
指定某种方式的话:

>>> add.apply_async((2, 2), compression='zlib')

3. Canvas:Designing Workflows

有时候,你需要把一个调用函数的某些信息传递到一个进程或者做为参数传递到另外一个函数的时候,Celery用一种叫subtasks完成这种任务。如:

>>> add.subtask((2,2), countdown=10)
tasks.add(2,2)

简写方式:

>>> add.s(2,2)
tasks.add(2,2)

subtasks的实例也支持调用API,也就是delay和apply_async方法。
但对于subtasks也有一些不同,看例子:

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4

add任务接受两个参数,这个subtasks指定了两个参数,它就完成了一个complete signature。

但是这样:

# incomplete partial: add(?, 2)
>>> s2 = add.s(2)

这种incomplete signatures称之为partials。
这个时候,s2需要另外一个参数才能完成运算,而再次调用的时候,就可以完成了,像这样:

# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10

The Primitives(原语?)

The primitives are subtasks themselves, so that they can be combined in any number of ways to compose complex workflows.

Groups

一个group调用了一系列parallel的任务,看例子:

>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i, i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Partial group:

>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

add.s(i)这个就是partial,只有一个参数,所以在g(10)中传递10这个参数之后,每个add.s(i,10),就make a complete signature了。

Chains

任务是可以被链接的,也就是一个任务完成后,就把结果返回给另外一个任务,像这样:

>>> from celery import chain
>>> from proj.tasks import add, mul
# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

还有partial chain:

# (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

还有这样的写法也是可以的:

>>> (add.s(4, 4) | mul.s(8))().get()
64

Chords

一个chord就是group带有一个回调:

>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
90

一个group通过chain另一个任务,就会自动变成一个chord:

>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
90

由于这些primitives都是subtask类型,所以可以任意组合成你想要的样子,比如:

>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

Signatures

signature()用一种能传递给一个函数的方式,包含了一个任务调用的 arguments(参数,即任务本身的参数,像add(x,y)中的参数), keyword arguments(关键字参数,就是debug=false,true这类参数), and execution options(执行选项,比如运行时间countdown,到期时间expirt)。

signatures通常也被叫作"subtasks"。可以这样用:

>>> from celery import signature
>>> signature('tasks.add', args=(2, 2), countdown=10)
tasks.add(2, 2)

或者你直接使用task的subtask方法:

>>> add.subtask((2, 2), countdown=10)
tasks.add(2, 2)

简写版就是这样:

>>> add.s(2, 2)
tasks.add(2, 2)

Immutabel signatures

意思就是partial的任务可以在回调的时候,把参数值再传进来,但有时候并不想得到某个函数的值,这个时候就可以把这个函数的immutable设成true:

>>> add.subtask((2, 2), immutable=True)

也可以简写:

>>> add.si(2, 2)

看例子:

>>> res = (add.si(2, 2) | add.si(4, 4) | add.s(8, 8))()
>>> res.get()
16
>>> res.parent.get()
8
>>> res.parent.parent.get()
4
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容