概述
RabbitMQ是目前非常热门的一款消息中间件,不管是互联网行业还是传统行业都在大量地使用 。 RabbitMQ 凭借其高可靠、易扩展、高可用及丰富的功能特性受到越来越多企业的青睐
-
RabbitMQ的起源
RabbitMQ是采用Erlang语言实现AMQP (Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息
在此之前,有一些消息中间件的商业实现,比如微软的MSMQ(MicroSoftMessageQueue)、IBM的WebSphere 等。 由于高昂的价格,一般只应用于大型组织机构,它们需要可靠性、解藕及实时消息通信的功能。由于商业壁垒,商业MQ供应商想要解决应用互通的问题,而不是去创建标准来实现不同的MQ产品间的互通,或者允许应用程序更改MQ平台
为了打破这个壁垒,同时为了能够让消息在各个消息队列平台间互融互通, JMS CJava Message Service) 应运而生 。 JMS试图通过提供公共JavaAPI的方式,隐藏单独MQ产品供应商提供的实际接口,从而跨越了壁垒,以及解决了互通问题。从技术上讲,Java应用程序只需针对JMS API编程,选择合适的MQ驱动即可,JMS会打理好其他部分 。 ActiveMQ就是JMS的一种实现 。 不过尝试使用单独标准化接口来胶合众多不同的接口,最终会暴露出问题,使得应用程序变得更加脆弱。 所以急需一种新的消息通信标准化方案
在2006年6月,由Cisco、 Redhat、iMatix等联合制定了AMQP的公开标准,由此AMQP登上了历史的舞台 。 它是应用层协议的一个开放标准,以解决众多消息中间件的需求和拓扑结构问题 。 它为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制
RabbitMQ 最初版本实现了AMQP的一个关键特性:使用协议本身就可以对队列和交换器 (Exchange) 这样的资源进行配置 。 对于商业MQ供应商来说,资源配置需要通过管理终端的特定工具才能完成 。RabbitMQ的资源配置能力使其成为构建分布式应用的最完美的通信总线,特别有助于充分利用基于云的资源和进行快速开发 -
RabbitMQ特性
1.可靠性
RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等,在后续的文章中我们将深入讲述如何保证RabbitMQ消息的可靠性
2.灵活的路由
在消息进入队列之前,通过交换器来路由消息
对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器
3.扩展性
多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点
4.高可用
队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用
5.多种协议
RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息中间件协议
6.多语言客户端
RabbitMQ几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript等
7.管理界面
RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
8.插件机制
RabbitMQ 提供了许多插件 , 以实现从多方面进行扩展,当然也可以编写自己的插件。
RabbitMQ的基本模型
所有MQ产品从模型抽象上来说都是一样的过程:生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者,消费者(consumer)来消费某个队列中的消息
上面只是一个宏观上抽象的描述,而具体到RabbitMQ则有更详细的内部结构:
想要理解RabbitMQ,我们需要认识一些基本概念:
-
1.消息(Message)
消息由标签(label)和消息体(payload)组成。
标签由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等
消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串,当然也可以进一步对这个消息体进行序列化操作 -
2.生产者(Publisher/Producer)
创建消息的一方称为生产者,生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者 (Consumer) -
3.交换器(Exchange)
用来接收生产者发送的消息并负责将这些消息路由给服务器中的队列,如果路由不到,则返回给生产者,或直接丢弃(视具体配置而定) -
4.绑定(Binding)
RabbitMQ中通过绑定将交换器Exchange与队列Queue关联起来,在绑定的时候一般会指定一个绑定键 (BindingKey),这样 RabbitMQ就知道如何正确地将消息路由到队列了,如下图:
生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和RoutingKey相匹配时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候, 这些绑定允许使用相同的BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型 ,比如fanout类型的交换器就会无视 BindingKey,而是将消息路由到所有绑定到该交换器的队列中。
需要注意的是:在某些情形下 , RoutingKey 与 BindingKey 可以看作同 一个东西 。例如如下代码片段
/**
* FileName: MessageProducer
* Author: TP
* Date: 2019-06-07 18:56
* Description:消息生产者
*/
public class MessageProducer {
//交换器名称
private static final String EXCHANGE_NAME = "exchange_demo";
//路由键
private static final String ROUTING_KEY = "routingKey_demo";
//队列名称
private static final String QUEUE_NAME = "queue_demo";
//IP地址
private static final String IP_ADDRESS = "127.0.0.1";
//端口(RabbitMQ服务端口默认为5672)
private static final int PORT = 5672;
//用户名
private static final String USER_NAME = "admin";
//密码
private static final String PASS_WORD = "admin";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USER_NAME);
connectionFactory.setPassword(PASS_WORD);
//获取连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//创建一个类型为direct、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//创建一个持久化、非排他、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//将交换机与队列通过路由键绑定(绑定key和路由key使用同一个)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
//发送一条持久化的消息:HELLO Rabbit MQ
String message = "HELLO Rabbit MQ";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
//关闭资源
channel.close();
connection.close();
}
}
以上代码声明了一个direct类型的交换器,然后将交换器和队列绑定起来。channel.queueBind方法中本该使用BindingKey,却和channel.basicPublish方法同样使用了RoutingKey,这样做的潜台词是:这里的RoutingKey和BindingKey是同一个东西。在direct交换器类型下, RoutingKey和BindingKey需要完全匹配才能使用,所以上面代码中采用了此种写法会显得方便许多。
所以在很多人的代码中可能习惯性地将BindingKey写成RoutingKey,尤其是在使用 direct类型的交换器的时候。
但是在topic交换器类型下,RoutingKey和BindingKey之间需要做模糊匹配,两者并不是相同的。
-
5.路由键(RoutingKey)
生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个 RoutingKey需要与交换器类型和绑定键 (BindingKey) 联合使用才能最终生效。
在交换器类型和绑定键 (BindingKey) 固定的情况下,生产者可以在发送消息给交换器时, 通过指定 RoutingKey来决定消息流向哪里。 -
6.队列(Queue)
消息队列用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 -
7.连接(Connection)
网络连接,比如一个TCP连接。 -
8.信道(Channel)
多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 -
9.消费者(Consumer)
消费消息的一方称为消费者,表示一个从消息队列中取得消息的客户端应用程序。 -
10.虚拟主机(Virtual Host)
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ 默认的vhost是 / -
11.服务节点(Broker)
对于RabbitMQ来说,一个RabbitMQBroker可以简单地看作一个RabbitMQ服务节点, 或者RabbitMQ服务实例。 大多数情况下也可以将一个RabbitMQ Broker看作一台RabbitMQ服务器 。
整个消息队列的运转过程大致如下:
交换器(Exchange)类型
RabbitMQ 常用的交换器类型有 fanout、 direct、 topic、 headers 这四种 。 AMQP 协议里还提
到另外两种类型: System 和自定义,这里不予描述。对于这四种类型下面一一阐述。
-
fanout
这种类型会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,很像子网广播,每台子网内的主机都获得了一份复制的消息,fanout 类型转发消息是最快的。
-
direct
direct类型的交换器路由规则也很简单,它会把消息路由到那些 BindingKey和 RoutingKey 完全匹配的队列中。
以下图为例:交换器的类型为direct,如果我们发送一条消息,并在发送消息的时候设置路由键为" warning",则消息会路由到Queuel和Queue2
如果在发送消息的时候设置路由键为"info" 或者 "debug",消息只会路由到Queue2。 如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
-
topic
前面讲到direct类型的交换器路由规则是完全匹配BindingKey和RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务的需求。 topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但这里的匹配规则有些不同,它约定:
1.RoutingKey为一个点号"."分隔的字符串(被点号"."分隔开的每一段独立的字符串称为一个单词
如"com.rabbitmq.client","java.util.concurrent"、"com.hidden.client"
2. BindingKey和RoutingKey一样也是点号"."分隔的字符串
3. BindingKey中可以存在两种特殊字符串""和"#"用于做模糊匹配
其中""用于匹配一个单词,“#”用于匹配多个单词(可以是0个)
例如:
结果:
路由键为"com.rabbitmq.client" 的消息会同时路由到Queuel和Queue2
路由键为"com.hidden.client" 的消息只会路由到Queue2中
路由键为"com.hidden.demo" 的消息只会路由到Queue2中
路由键为 "java.rabbitmq.demo" 的消息只会路由到Queuel中
路由键为" java.util.concurrent" 的消息将会被丢弃或者返回给生产者(需要设置mandatory参数) ,因为它没有匹配任何路由键 headers
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中
的headers属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的headers(也是一个键值对的形式) ,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
RabbitMQ的运转流程
生产者端
- 生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道 (Channel)
- 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等
- 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息
- 相应的交换器根据接收到的路由键查找相匹配的队列
- 如果找到,则将从 生产者发送过来的消息存入相应的队列中
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道
- 关闭连接
消费者端
- 消费者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
- 消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
- 等待 RabbitMQ Broker回应并投递相应队列中的消息
- 消费者确认 (ack) 接收到的消息
- RabbitMQ从队列中删除相应己经被确认的消息
- 关闭信道
- 关闭连接
在接下来的文章中我们将讲解RabbitMQ的安装与演示Java客户端的使用