Publish\Subscribe(消息发布\订阅)
广播策略:每个人都能收到;或是过滤某些人可以接收
一个生产者,对应对个消费者!
exchange type 过滤类型
fanout = 广播
direct = 组播
topic = 规则播
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了。
exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
headers: 通过headers 来决定把消息发给哪些queue
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
广播模式(一个生产者,多个消费者)
1、路由指定为空!所有消息都发给exchange处理转到队列,转到哪个队列就需要exchange指定,所以在建立连接的时候要指定名字。
注意:exchange只负责转发不负责存放消息!如果没有队列绑定消息就会扔掉!
2、自动生成队列名,然后使用完之后再删掉
队列参数exclusive=True唯一的,rabbit 随机生成一个名字。
3、生产者和消费者端都要声明队列,以排除生成者未启动,消费者获取报错的问题
4、生产者发送一条消息,说有的消费者都能接收到!高效,效率的完成发送!
应用场景:新浪微博-订阅模式,只有当前登录的用户才可以收到实时发送的消息
生产者代码
import pika
import sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 队列连接通道
channel.exchange_declare(exchange='logs', exchange_type='fanout') # 声明队列 exchange名字和类型
message = ' '.join(sys.argv[1:]) or "info: Hello World!" # 获取外界输入的信息,否则就是hello world
channel.basic_publish(exchange='logs', # 指定exchange的名字
routing_key='', # 注意,不需要指定队列名!
body=message) # 信息
print(" [x] Sent %r" % message)
connection.close()
消费者代码
import pika
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 队列连接通道
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_obj = channel.queue_declare('',exclusive=True)
queue_name = queue_obj.method.queue # 获取队列名
print('queue name', queue_name, queue_obj) # 打印会列名
channel.queue_bind(queue=queue_name, exchange='logs') # 绑定队列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)
channel.start_consuming()
direct 组播模式:有选择的接收消息(exchange type=direct)
1、有选择的接收消息(exchange type=direct),RabbitMQ还支持根据关键字发送,相当于是添加了一个过滤地带!
即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
2、发什么类型的,什么类型的接收,在接收端运行的时候加参数,指定接收的类型。
3、routing_key = 'xxx' 与广播相比不再为空,队列由执行时手动输入获取,然后路由指定发送到哪个队列。
4、按照类型:生产者发送指定类型的消息;消费者循环绑定队列,如果不存在不接收
例子:就像广播电台,要想接收生产者发送的数据,必须是绑定且在线!如果断开一段时间再接收该电台消息,之前的讯息就不会再收到!
应用场景:日志分类处理逻辑 【注:可以同时存在多个消费者】
生产者代码
# rabbitmq_send.py
import pika
import sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 队列连接通道
channel.exchange_declare(exchange='direct_log', exchange_type='direct') # 声明消息队列及类型
log_level = sys.argv[1] if len(sys.argv) > 1 else 'info' # 日志等级
message = ' '.join(sys.argv[1:]) or "info: Hello World!" # 接收手动输入的消息内容
channel.basic_publish(exchange='direct_log',
routing_key=log_level,
body=message)
print(" [x] Sent %r" % message)
connection.close()
消费者代码
# rabbitmq_receive.py
import pika, sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 队列连接通道
# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_obj = channel.queue_declare('', exclusive=True)
queue_name = queue_obj.method.queue
print('queue name', queue_name, queue_obj)
log_levels = sys.argv[1:] # 日志等级 info warning error danger
# 判断存不存在,不存在退出
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
# 循环绑定队列
for level in log_levels:
channel.queue_bind(exchange='direct_log',
queue=queue_name,
routing_key=level) # 绑定队列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)
channel.start_consuming()
需要用命令行传递参数来启动消费者,以便于接收规定的日志级别的队列消息,如:
python rabbitmq_receive.py info
python rabbitmq_receive.py warning error
生产者也可以用命令行启动以指定发送的级别,如:
python rabbitmq_send.py info
python rabbitmq_send.py error
topic规则播
话题类型,可以根据正则进行更精确的匹配,按照规则过滤。exchange_type = topic,仅改下类型即可!
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
# 表示可以匹配 0 个 或 多个 单词
* 表示只能匹配 一个 单词
接收所有运行的日志:
python receive_logs_topic.py "#"
接收"mysql"的所有日志:
python receive_logs_topic.py "mysql.*"
或者如果只想接收"critical"日志:
python receive_logs_topic.py "*.critical"
可以创建多个绑定:
python receive_logs_topic.py "mysql.*" "*.critical"
并发出带有routing_key的日志类型"mysql.critical":
python emit_log_topic.py "mysql.critical" "A critical kernel error"
#测试执行如下:
#客户端一:
- python3 receive1.py *.django
#客户端二:
- python3 receive1.py mysql.error
#客户端三:
- python3 receive1.py mysql.*
#服务端:
- python3 receive1.py #匹配相应的客户端
生产者代码
import pika
import sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #队列连接通道
channel.exchange_declare(exchange='topic_log',exchange_type='topic')
#log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info'
message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"
channel.basic_publish(exchange='topic_log',
routing_key=log_level,
body=message)
print(" [x] Sent %r" % message)
connection.close()
消费者代码
import pika,sys
credentials = pika.PlainCredentials('alex', 'alex123')
parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #队列连接通道
#不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_obj = channel.queue_declare('', exclusive=True)
queue_name = queue_obj.method.queue
log_levels = sys.argv[1:] # info warning errr
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for level in log_levels:
channel.queue_bind(exchange='topic_log',
queue=queue_name,
routing_key=level) #绑定队列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(on_message_callback=callback,queue=queue_name, auto_ack=True)
channel.start_consuming()