RabbitMQ -- part2 [ Work Queues]

上节介绍了在命名队列中发送和获取消息,本节介绍创建一个工作队列,然后分发任务到多个消费者(consumer)

work queues
  • 编辑生产者 new_task.py

#!/usr/bin/env python3
# coding=utf-8

import pika
import sys

# 获取终端输入的信息,上传到队列
message = ' '.join(sys.argv[1:]) or 'Hello World!'

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body=message)

print("[x] Sent '%s'" % message)

connection.close()
  • 编辑消费者 worker.py

#!/usr/bin/env python3
# coding=utf-8

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print('[x] Received %r' % body)
    # 通过统计body中点符号的数量来调用time.sleep(),用来模拟真实环境中处理任务的耗时
    time.sleep(body.count(b'.'))    
    print('[x] Done')

channel.basic_consume(callback, queue='hello', no_ack=True)

print('[*] Waiting for messgaes. To exit press CTRL+C')

channel.start_consuming()
  • 轮询调度规则 (Round-robin)

执行以上代码,同时启动一个生产者(new_task.py)和两个消费者(worker.py),可以发现RabbitMQ通过轮询调度的方式将消息分发给消费者。

# 生产者
python3 new_task.py This is a test.
python3 new_task.py This is a test..
python3 new_task.py This is a test...
python3 new_task.py This is a test....
python3 new_task.py This is a test.....
# 消费者1
> python3 worker.py
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'This is a test.'
[x] Done
[x] Received b'This is a test...'
[x] Done
[x] Received b'This is a test.....'
[x] Done
# 消费者2
> python3 worker.py
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'This is a test..'
[x] Done
[x] Received b'This is a test....'
[x] Done

优点:使用 Task Queue 可以轻易的实现平行计算。当工作队列积压时,只需要添加更多的consumer来解决。

  • 消息确认

当消息被consumer取走之后,RabbitMQ会将此消息标记为deletion。这种情况下,如果consumer挂掉,消息没有处理成功,就等于丢失了。但这是不可取的,为确保消息不丢失,RabbitMQ支持consumer在处理完消息之后返回确认,告诉RabbitMQ,此消息已被处理,并且可以删除。
当一个consumer挂掉(它的channel关闭、连接断开或者TCP链接断开),没有发送ack,RabbitMQ会认为此消息没有被处理成功并且会再次放入队列。如果此时有其他consumer在线,则会分发此消息到其他consumer。这样就会避免消息丢失。

Manual message acknowledgments 默认是打开的。之前的代码中使用了 no_ack=True 来关闭ACK。移除此配置,当完成任务之后将会发送ack确认。

#!/usr/bin/env python3
# coding=utf-8

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print('[x] Received %r' % body)
    time.sleep(body.count(b'.'))
    print('[x] Done')
    ch.basic_ack(delivery_tag = method.delivery_tag)    # 任务处理完之后,发送确认

channel.basic_consume(callback, queue='hello')      # 去除 no_ack

print('[*] Waiting for messgaes. To exit press CTRL+C')

channel.start_consuming()

一个常犯的错误是忘记了basic_ack,非常严重,因为无法释放掉未确认的消息,所以消息将被重复投递到客户端,会消耗越来越多的内存。可以通过rabbitmqctl工具调试,显示出为确认的消息:rabbitmqctl list_queues name messages_ready messages_unacknowledged

  • 消息持久化

之前的程序,任务在服务停止之后会丢失。如果RabbitMQ quit或crash,队列和消息将被丢失。要想确保消息不丢失,需要同时标记队列和消息为持久化

  1. 首先确保队列不会丢失:

channel.queue_declare(queue='task_queue', durable=True) # 添加 durable 参数

注意: 之前已经申明了"hello"队列,不能使用不同的参数进行再次申明,否则会报错。可以考虑换一个队列名,例如:task_queue

queue_declare 需要在producer和consumer两端一起修改

  1. 然后标记消息为持久化,添加 delivery_mode 参数,值为2
#!/usr/bin/env python3
# coding=utf-8

import pika
import sys

message = ' '.join(sys.argv[1:]) or 'Hello World!'

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()

 # 在producer和cosumer两端一起添加 durable=True,确保队列不丢失
channel.queue_declare(queue='task_queue', durable=True)    

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode = 2,    # 使消息持久化
                      ))

print("[x] Sent '%s'" % message)

connection.close()

注: 这种办法不能完全确保消息不丢失,在RabbitMQ接受消息到保存到磁盘的这个时间窗口中,如果发生意外,消息可能还在缓存中并没有将消息保存到disk中。这种方法不算很健壮,但是可以满足绝大多数简单的任务队列。如果需要完全的不丢失,可以使用 [ publisher confirms ]

  • 均匀的分发

在consumer中添加: channel.basic_qos(prefetch_count=1) ,告诉RabbitMQ不将多个消息分发给同一个consumer。如果处理较繁杂,则队列会堵塞,可以通过添加多个consumer或者使用 [ message TTL ]


参考文档:http://www.rabbitmq.com/tutorials/tutorial-two-python.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,344评论 2 34
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,335评论 0 1
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,599评论 18 139
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,985评论 3 41
  • 应用场景 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种: 1.串行的方式 2.并行的...
    lijun_m阅读 1,808评论 0 3