简介
RabbitMQ 是一个由 Erlang 语言(支持高并发)开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。灵活的路由(Flexible Routing)
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。消息集群(Clustering)
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。多语言客户端(Many Clients)
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。跟踪机制(Tracing)
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
基本概念
- Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 - Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。 - Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 - Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 - Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 - Connection
网络连接,比如一个TCP连接。 - Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 - Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 - Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 - Broker
表示消息队列服务器实体。
相比其他MQ,rabbit比较特殊的一点是消息并不直接投递到队列,而是通过交换机去路由队列
六种工作模式
-
简单模式:点对点
-
工作队列模式:多个消费者
-
发布订阅模式:一个交换机广播所有队列
-
路由模式:指定路由键
-
主题模式(路由模式的一种):通配符路由键
-
RPC模式:同步
消息确认机制
生产者确认
生产者发送消息到 RabbitMQ Server 后,RabbitMQ Server 需要对生产者进行消息 Confirm 确认
- 事务机制:类似数据库的事务,性能较低,一般不采用
- Confirm机制:一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作;确认可以是普通同步、批量同步或者异步,一般会选择异步接收回调
消费者确认
消费者消费消息后需要对 RabbitMQ Server 进行消息 ACK 确认
- 自动确认:这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息
- 手动确认:ACK 或者NACK,如果绑定死信队列NACK且不重新入队则会消息可以进入死信队列,也可以选择NACK重新入队
可靠性
每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据
生产者丢数据
从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
transaction机制就是说,发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而缺点就是吞吐量下降了。生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。消息队列丢数据
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
如何持久化:
1、将queue的持久化标识durable设置为true,则代表是一个持久的队列
2、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据消费者丢数据
消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。
至于解决方案,采用手动确认消息即可。
幂等性
自己根据唯一的业务主键或者消息ID利用数据库唯一索引,消费成功后就插入一条记录,或者业务本身就支持幂等,比如DELETE操作等等
顺序性
将需要保持先后顺序的消息放到同一个消息队列中,然后只用一个消费者去消费该队列
推拉模式
在rabbitmq中有两种消息处理的模式
- 推模式:消息中间件主动将消息推送给消费者——实时性、吞吐量高
- 拉模式:消费者主动从消息中间件拉取消息
推模式
1:推模式接收消息是最有效的一种消息处理方式。channel.basicConsume(queneName,consumer)方法将信道(channel)设置成投递模式,直到取消队列的订阅为止;在投递模式期间,当消息到达RabbitMQ时,RabbitMQ会自动地、不断地投递消息给匹配的消费者,而不需要消费端手动来拉取,当然投递消息的个数还是会受到channel.basicQos的限制。
2:推模式将消息提前推送给消费者,消费者必须设置一个缓冲区缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率很高。缺点就是缓冲区可能会溢出。
3:由于推模式是信息到达RabbitMQ后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。
拉模式
1:如果只想从队列中获取单条消息而不是持续订阅,则可以使用channel.basicGet方法来进行消费消息。
2:拉模式在消费者需要时才去消息中间件拉取消息,这段网络开销会明显增加消息延迟,降低系统吞吐量。
3:由于拉模式需要消费者手动去RabbitMQ中拉取消息,所以实时性较差;消费者难以获取实时消息,具体什么时候能拿到新消息完全取决于消费者什么时候去拉取消息。
死信队列
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间。
- 消息队列的消息数量已经超过最大队列长度。
- 那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
配置:
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
实现延迟队列
利用消息TTL + 死信队列
延迟队列插件
安装一个插件即可:https://www.rabbitmq.com/community-plugins.html ,下载rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。进入RabbitMQ的安装目录下的sbin目录,执行下面命令让该插件生效,然后重启RabbitMQ。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
延迟交换机
@Bean
public CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
部署模式
普通模式
消息实体仅保存在其中一个节点上,如果通过其他节点进行消费,消息由所在节点转发至该节点
镜像模式
消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取,也就是说多少节点消息就会备份多少份。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用
由于镜像队列之间消息自动同步,且内部有选举master机制,即使master节点宕机也不会影响整个集群的使用,达到去中心化的目的,从而有效的防止消息丢失及服务不可用等问题