RabbitMQ消息订阅发布

Publish\Subscribe(消息发布\订阅)

广播策略:每个人都能收到;或是过滤某些人可以接收
一个生产者,对应对个消费者!

exchange type 过滤类型
fanout = 广播
direct = 组播
topic = 规则播

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了。
exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
headers: 通过headers 来决定把消息发给哪些queue
表达式符号说明:#代表一个或多个字符,*代表任何字符

例:#.a会匹配a.a,aa.a,aaa.a等
    *.a会匹配a.a,b.a,c.a等

注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

广播模式(一个生产者,多个消费者)

1、路由指定为空!所有消息都发给exchange处理转到队列,转到哪个队列就需要exchange指定,所以在建立连接的时候要指定名字。
注意:exchange只负责转发不负责存放消息!如果没有队列绑定消息就会扔掉!
2、自动生成队列名,然后使用完之后再删掉
队列参数exclusive=True唯一的,rabbit 随机生成一个名字。
3、生产者和消费者端都要声明队列,以排除生成者未启动,消费者获取报错的问题
4、生产者发送一条消息,说有的消费者都能接收到!高效,效率的完成发送!

应用场景:新浪微博-订阅模式,只有当前登录的用户才可以收到实时发送的消息

生产者代码

import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 队列连接通道

channel.exchange_declare(exchange='logs', exchange_type='fanout')  # 声明队列 exchange名字和类型

message = ' '.join(sys.argv[1:]) or "info: Hello World!"  # 获取外界输入的信息,否则就是hello world

channel.basic_publish(exchange='logs',  # 指定exchange的名字
                      routing_key='',  # 注意,不需要指定队列名!
                      body=message)  # 信息
print(" [x] Sent %r" % message)
connection.close()

消费者代码

import pika

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 队列连接通道
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_obj = channel.queue_declare('',exclusive=True)

queue_name = queue_obj.method.queue  # 获取队列名
print('queue name', queue_name, queue_obj)  # 打印会列名

channel.queue_bind(queue=queue_name, exchange='logs')  # 绑定队列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)

channel.start_consuming()

direct 组播模式:有选择的接收消息(exchange type=direct)

1、有选择的接收消息(exchange type=direct),RabbitMQ还支持根据关键字发送,相当于是添加了一个过滤地带!
即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
2、发什么类型的,什么类型的接收,在接收端运行的时候加参数,指定接收的类型。
3、routing_key = 'xxx' 与广播相比不再为空,队列由执行时手动输入获取,然后路由指定发送到哪个队列。
4、按照类型:生产者发送指定类型的消息;消费者循环绑定队列,如果不存在不接收
例子:就像广播电台,要想接收生产者发送的数据,必须是绑定且在线!如果断开一段时间再接收该电台消息,之前的讯息就不会再收到!

应用场景:日志分类处理逻辑 【注:可以同时存在多个消费者】

生产者代码

# rabbitmq_send.py
import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 队列连接通道

channel.exchange_declare(exchange='direct_log', exchange_type='direct')  # 声明消息队列及类型

log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'  # 日志等级

message = ' '.join(sys.argv[1:]) or "info: Hello World!"  # 接收手动输入的消息内容

channel.basic_publish(exchange='direct_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消费者代码

# rabbitmq_receive.py
import pika, sys

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 队列连接通道

# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_obj = channel.queue_declare('', exclusive=True)

queue_name = queue_obj.method.queue
print('queue name', queue_name, queue_obj)

log_levels = sys.argv[1:]  # 日志等级 info warning error danger

# 判断存不存在,不存在退出
if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

# 循环绑定队列
for level in log_levels:
    channel.queue_bind(exchange='direct_log',
                       queue=queue_name,
                       routing_key=level)  # 绑定队列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(on_message_callback=callback, queue=queue_name, auto_ack=True)
channel.start_consuming()

需要用命令行传递参数来启动消费者,以便于接收规定的日志级别的队列消息,如:

python rabbitmq_receive.py info
python rabbitmq_receive.py warning error

生产者也可以用命令行启动以指定发送的级别,如:

python rabbitmq_send.py info
python rabbitmq_send.py error

topic规则播

话题类型,可以根据正则进行更精确的匹配,按照规则过滤。exchange_type = topic,仅改下类型即可!
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

# 表示可以匹配 0 个 或 多个 单词
* 表示只能匹配 一个 单词

接收所有运行的日志:
python receive_logs_topic.py "#"
接收"mysql"的所有日志:
python receive_logs_topic.py "mysql.*"
或者如果只想接收"critical"日志:
python receive_logs_topic.py "*.critical"
可以创建多个绑定:
python receive_logs_topic.py "mysql.*" "*.critical"
并发出带有routing_key的日志类型"mysql.critical":
python emit_log_topic.py "mysql.critical" "A critical kernel error"

#测试执行如下:
#客户端一:
    - python3 receive1.py *.django
#客户端二:
    - python3 receive1.py mysql.error
#客户端三:
    - python3 receive1.py mysql.*
     
#服务端:
    - python3 receive1.py  #匹配相应的客户端

生产者代码

import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

channel.exchange_declare(exchange='topic_log',exchange_type='topic')

#log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level =  sys.argv[1] if len(sys.argv) > 1 else 'all.info'

message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"

channel.basic_publish(exchange='topic_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

消费者代码

import pika,sys
credentials = pika.PlainCredentials('alex', 'alex123')

parameters = pika.ConnectionParameters(host='localhost',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #队列连接通道

#不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_obj = channel.queue_declare('', exclusive=True)
queue_name = queue_obj.method.queue

log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for level in log_levels:
    channel.queue_bind(exchange='topic_log',
                       queue=queue_name,
                       routing_key=level) #绑定队列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(on_message_callback=callback,queue=queue_name, auto_ack=True)
channel.start_consuming()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342