简析RabbitMQ消息中间件

RabbitMQ官网

一、简介:

.  AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。

ConnectionFactory、Connection、Channel
  ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。 Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

二、python安装、启动RabbitMQ

.   关于python的队列,内置的有两种,一种是线程queue,另一种是进程queue,但是这两种queue都是只能在同一个进程下的线程间或者父进程与子进程之间进行队列通讯,并不能进行程序与程序之间的信息交换,这时候我们就需要一个中间件,来实现程序之间的通讯。
  RabbitMQ并不是python内置的模块,而是一个需要你额外安装模块pika,安装完毕后可通过python中内置的pika模块来调用MQ发送或接收队列请求。

  • pika是RabbitMQ团队编写的官方Python AMQP库。是由Erlang语言开发的,RabbitMQ依赖Erlang环境运行,所以需要先安装Erlang,再下载RabbitMQ,然后再下载pika模块
    brew install erlang
    brew install rabbitmq
    pip3 install pika

  • 启动Rabbitmq:
    cd /usr/local/Cellar/rabbitmq/3.7.9执行
    sbin/rabbitmq-server

  • RabbitMQ 启动插件:
    待RabbitMQ 的启动完毕之后,另起终端cd /usr/local/sbin/执行
    sudo ./rabbitmq-plugins enable rabbitmq_management(执行一次以后不用再次执行)

  • 登陆管理界面
    浏览器输入:http://localhost:15672/账号密码初始默认都为guest

三、python操作RabbitMQ

1、RabbitMQ的简单模式:

一个Producer向queue发送一个message,一个consumer从该queue接收message并打印
image.png

producer.py

import pika

# 第一步:链接rabbitmq 获取控制rabbitmq的channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
channel = connection.channel()

# 第二步:在rabbitmq中生成一个名为队列
channel.queue_declare(queue='hello')

# 向rabbitmq中名为hello的队列插入一个消息:'Hello World!'
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='5')
print(" [x] has Sent")
connection.close()

consumer.py

import pika

# 第一步:获取控制rabbitmq的channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 第二步:在rabbitmq中生成一个名hello为队列,无则创建,有则pass
channel.queue_declare(queue='hello')

# 回调处理函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 定义相关参数
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True  # 无应答模式
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
# 程序开启,处于等待状态,监听对应队列数据,有数据就调用回调函数执行
channel.start_consuming()

相关参数:
  (1)no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
应答模式下,回调函数处理完成后再通知队列将任务删除掉,只在consumer中修改代码即可

consumer.py

import pika

# 第一步:获取控制rabbitmq的channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 第二步:在rabbitmq中生成一个名hello为队列,无则创建,有则pass
channel.queue_declare(queue='hello')

# 回调处理函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    #应答模式下,回调函数处理完成后通知队列将任务删除掉
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 定义相关参数
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False  # 应答模式
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
# 程序开启,处于等待状态,监听对应队列数据,有数据就调用回调函数执行
channel.start_consuming()

|  (2) durable :消息不丢失
在rabbitmq中生成一个名为队列,加上参数:durable=True、
在basic_publish中参数:properties=pika.BasicProperties(delivery_mode=2) 就可以持久化了
持久化设置主要是在producer中修改参数,consumer中只要在生成队列时加上durable=True参数就行

producer.py

import pika
# 第一步:链接rabbitmq 获取控制rabbitmq的channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))
channel = connection.channel()

# 第二步:在rabbitmq中生成一个名为队列,加上durable=True就可以持久化了
channel.queue_declare(queue='hello',durable=True)

# 向rabbitmq中名为hello的队列插入一个消息:'Hello World!'
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='5',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      )
print(" [x] has Sent")
connection.close()

consumer.py

import pika

# 第一步:获取控制rabbitmq的channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 第二步:在rabbitmq中生成一个名hello为队列,无则创建,有则pass
channel.queue_declare(queue='hello',durable=True)

# 回调处理函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    #应答模式下,回调函数处理完成后通知队列将任务删除掉
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 定义相关参数
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False  # 应答模式
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
# 程序开启,处于等待状态,监听对应队列数据,有数据就调用回调函数执行
channel.start_consuming()

|  (3) 消息获取顺序(只在consumer中修改即可)
  默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数序列的任务,消费者2去队列中获取偶数序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

consumer.py

import pika

# 第一步:获取控制rabbitmq的channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 第二步:在rabbitmq中生成一个名hello为队列,无则创建,有则pass
channel.queue_declare(queue='hello',durable=True)

# 回调处理函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    #应答模式下,回调函数处理完成后通知队列将任务删除掉
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 修改消息获取的顺序,由默认模式顺序改为谁先来谁取
channel.basic_qos(prefetch_count=1) 

# 定义相关参数
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False  # 应答模式
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
# 程序开启,处于等待状态,监听对应队列数据,有数据就调用回调函数执行
channel.start_consuming()

2、 RabbitMQ的交换机模式:

交换机模式使用:
  producer : 创建链接后不用再创建队列了,直接创建一个交换机,指定交换机fanout模式
  consumer : 创建一个随机的属于自己的队列,然后把交换机和生成的队列进行绑定

交换机模式下共有分发订阅模式、关键字模式、模糊匹配模式三种,下面来一一了解

  • 分发订阅模式:
image.png

  发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

producer.py

import pika
#获取链接拿到channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
#创建交换机,设置模式
channel.exchange_declare(exchange='logs',
                         exchange_type="fanout")

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

consumer.py

import pika
#创建链接,获取channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
# 创建交换机,设定模式
channel.exchange_declare(exchange='logs',
                         exchange_type="fanout")

#重点来了:每个consumer创建一个随机的属于自己的队列,result是自己创建的队列的名字
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue#队列名字
print("queue_name",queue_name)
#把交换机和生成的队列进行绑定
channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

#回调函数
def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
  • 关键字模式:
      之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,

队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

image.png

关键字模式使用:
  producer : 创建链接后不用再创建队列了,直接创建一个交换机,指定交换机direct模式,basic_publish中加上routing_key关键字
  consumer : 创建一个随机的属于自己的队列,然后把交换机和生成的队列进行绑定,加上绑定关键字参数routing_key关键字

producer.py

import pika
#创建链接,得到channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
#创建交换机,设定模式
channel.exchange_declare(exchange='direct_logs',
                         exchange_type="direct")

message = "info: Hello World!"
channel.basic_publish(exchange='direct_logs',
                      routing_key='info',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

consumer.py

import pika
# 创建链接,得到channel对象
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

#创建交换机,指定direct模式
channel.exchange_declare(exchange='direct_logs',
                         exchange_type="direct")
#重点来了:每个consumer创建一个随机的属于自己的队列,result是自己创建的队列的名字
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

#把交换机和生成的队列进行绑定,加上绑定关键字参数routing_key
channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key="info")

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
  • 模糊匹配模式:(类似关键字模式)
      在模糊匹配类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

# 表示可以匹配 0 个 或 多个 单词
* 表示只能匹配 一个 单词


image.png

3、 基于RabbitMQ的RPC模式

  • (RPC) Remote Procedure Call Protocol 远程过程调用协议
      在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器。但是在做开发时候往往要用到其它团队的方法,因为已经有了实现。但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效。RPC协议定义了规划,其它的公司都给出了不同的实现。比如微软的wcf,以及现在火热的WebApi。

  • 在RabbitMQ中RPC的实现也是很简单高效的,现在我们的客户端、服务端都是消息发布者与消息接收者。

首先客户端通过RPC向服务端发出请求

我这里有一堆东西需要你给我处理一下,correlation_id:这是我的请求标识,erply_to:你处理完过后把结果返回到这个队列中。

服务端拿到了请求,开始处理并返回

correlation_id:这是你的请求标识 ,原封不动的给你。 这时候客户端用自己的correlation_id与服务端返回的id进行对比。是我的,就接收。
image.png
  • Callback queue 回调队列
      一个客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
  • Correlation id 关联标识
      一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

服务器端

import pika

# 建立连接,服务器地址为localhost,可指定ip地址
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))

# 建立会话
channel = connection.channel()

# 声明RPC请求队列
channel.queue_declare(queue='rpc_queue')

# 数据处理方法
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

# 对RPC请求队列中的请求进行处理
def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)

    # 调用数据处理方法
    response = fib(n)

    # 将处理结果(响应)发送到回调队列
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 负载均衡,同一时刻发送给该服务器的请求不超过一个
channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

客户端

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        ”“”
        客户端启动时,创建回调队列,会开启会话用于发送RPC请求以及接受响应   
        “”“
       
        # 建立连接,指定服务器的ip地址
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
                
        # 建立一个会话,每个channel代表一个会话任务
        self.channel = self.connection.channel()
        
        # 声明回调队列,再次声明的原因是,服务器和客户端可能先后开启,该声明是幂等的,多次声明,但只生效一次
        result = self.channel.queue_declare(exclusive=True)
        # 将次队列指定为当前客户端的回调队列
        self.callback_queue = result.method.queue
        
        # 客户端订阅回调队列,当回调队列中有响应时,调用`on_response`方法对响应进行处理; 
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    # 对回调队列中的响应进行处理的函数
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    # 发出RPC请求
    def call(self, n):
    
        # 初始化 response
        self.response = None
        
        #生成correlation_id 
        self.corr_id = str(uuid.uuid4())
        
        # 发送RPC请求内容到RPC请求队列`rpc_queue`,同时发送的还有`reply_to`和`correlation_id`
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
                                   
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

# 建立客户端
fibonacci_rpc = FibonacciRpcClient()

# 发送RPC请求
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

参考文章
https://www.cnblogs.com/yuanchenqi/articles/8507109.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,559评论 18 139
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,806评论 8 167
  • rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性...
    点融黑帮阅读 2,977评论 3 41
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,337评论 2 34
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,695评论 13 425