上节介绍了在命名队列中发送和获取消息,本节介绍创建一个工作队列,然后分发任务到多个消费者(consumer)
-
编辑生产者
new_task.py
#!/usr/bin/env python3
# coding=utf-8
import pika
import sys
# 获取终端输入的信息,上传到队列
message = ' '.join(sys.argv[1:]) or 'Hello World!'
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body=message)
print("[x] Sent '%s'" % message)
connection.close()
-
编辑消费者
worker.py
#!/usr/bin/env python3
# coding=utf-8
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
# 通过统计body中点符号的数量来调用time.sleep(),用来模拟真实环境中处理任务的耗时
time.sleep(body.count(b'.'))
print('[x] Done')
channel.basic_consume(callback, queue='hello', no_ack=True)
print('[*] Waiting for messgaes. To exit press CTRL+C')
channel.start_consuming()
-
轮询调度规则 (Round-robin)
执行以上代码,同时启动一个生产者(new_task.py)和两个消费者(worker.py),可以发现RabbitMQ通过轮询调度的方式将消息分发给消费者。
# 生产者
python3 new_task.py This is a test.
python3 new_task.py This is a test..
python3 new_task.py This is a test...
python3 new_task.py This is a test....
python3 new_task.py This is a test.....
# 消费者1
> python3 worker.py
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'This is a test.'
[x] Done
[x] Received b'This is a test...'
[x] Done
[x] Received b'This is a test.....'
[x] Done
# 消费者2
> python3 worker.py
[*] Waiting for messgaes. To exit press CTRL+C
[x] Received b'This is a test..'
[x] Done
[x] Received b'This is a test....'
[x] Done
优点:使用 Task Queue 可以轻易的实现平行计算。当工作队列积压时,只需要添加更多的consumer来解决。
-
消息确认
当消息被consumer取走之后,RabbitMQ会将此消息标记为deletion。这种情况下,如果consumer挂掉,消息没有处理成功,就等于丢失了。但这是不可取的,为确保消息不丢失,RabbitMQ支持consumer在处理完消息之后返回确认,告诉RabbitMQ,此消息已被处理,并且可以删除。
当一个consumer挂掉(它的channel关闭、连接断开或者TCP链接断开),没有发送ack,RabbitMQ会认为此消息没有被处理成功并且会再次放入队列。如果此时有其他consumer在线,则会分发此消息到其他consumer。这样就会避免消息丢失。
Manual message acknowledgments
默认是打开的。之前的代码中使用了 no_ack=True
来关闭ACK。移除此配置,当完成任务之后将会发送ack确认。
#!/usr/bin/env python3
# coding=utf-8
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print('[x] Received %r' % body)
time.sleep(body.count(b'.'))
print('[x] Done')
ch.basic_ack(delivery_tag = method.delivery_tag) # 任务处理完之后,发送确认
channel.basic_consume(callback, queue='hello') # 去除 no_ack
print('[*] Waiting for messgaes. To exit press CTRL+C')
channel.start_consuming()
一个常犯的错误是忘记了basic_ack,非常严重,因为无法释放掉未确认的消息,所以消息将被重复投递到客户端,会消耗越来越多的内存。可以通过
rabbitmqctl
工具调试,显示出为确认的消息:rabbitmqctl list_queues name messages_ready messages_unacknowledged
-
消息持久化
之前的程序,任务在服务停止之后会丢失。如果RabbitMQ quit或crash,队列和消息将被丢失。要想确保消息不丢失,需要同时标记队列和消息为持久化
- 首先确保队列不会丢失:
channel.queue_declare(queue='task_queue', durable=True)
# 添加 durable 参数
注意: 之前已经申明了"hello"队列,不能使用不同的参数进行再次申明,否则会报错。可以考虑换一个队列名,例如:task_queue
queue_declare
需要在producer和consumer两端一起修改
- 然后标记消息为持久化,添加 delivery_mode 参数,值为2
#!/usr/bin/env python3
# coding=utf-8
import pika
import sys
message = ' '.join(sys.argv[1:]) or 'Hello World!'
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
# 在producer和cosumer两端一起添加 durable=True,确保队列不丢失
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # 使消息持久化
))
print("[x] Sent '%s'" % message)
connection.close()
注: 这种办法不能完全确保消息不丢失,在RabbitMQ接受消息到保存到磁盘的这个时间窗口中,如果发生意外,消息可能还在缓存中并没有将消息保存到disk中。这种方法不算很健壮,但是可以满足绝大多数简单的任务队列。如果需要完全的不丢失,可以使用 [ publisher confirms ]
-
均匀的分发
在consumer中添加: channel.basic_qos(prefetch_count=1)
,告诉RabbitMQ不将多个消息分发给同一个consumer。如果处理较繁杂,则队列会堵塞,可以通过添加多个consumer或者使用 [ message TTL ]
参考文档:http://www.rabbitmq.com/tutorials/tutorial-two-python.html