组件
Producer: 生产者生产消息
Consumer:消费者消费消息
Exchange:交换器,RabbitMQ 中的消息不会直接投递到 Queue 中,中间需要经过 Exchange,由 Exchange 把消息分配到对应的 Queue 中。如果 Producer 生成的消息路由不到对应的队列,可能会返回给 Producer 或者直接丢弃掉。
RoutingKey:路由键,生产者将消息发送给交换器时,一般会指定 RoutingKey,用来指定路由规则。RoutingKey 要与 BindingKey 联合使用才会最终生效。
BindingKey:绑定键
RabbitMQ 需要通过 Binding(绑定),将 Exchange 和 Queue 关联起来,在绑定时一般会指定 BindingKey。在 Producer 发送消息时,BindingKey 就是RoutingKey。
一个 Exchange 和多个 Queue 绑定时,允许指定相同的 BindingKey。
BindingKey 在某些情况下是不生效的,例如 fanout 类型的交换器会无视 BindingKey,将消息路由到绑定到这个交换器上的所有队列中。
Queue:消息队列
消息队列用来保存消息,直到发送给消费者,它是消息的容器,也是消息的终点。一条消息可以投递到一个或者多个消息队列,等待消费者将其消费掉。
多个消费者订阅同一个队列时,队列中的消息会被均摊给多个消费者,而不是每个消费者都能收到消息,这样避免消息重复被消费。
Broker:消息中间件服务节点,即 RabbitMQ 服务节点。
Exchange Types(交换器类型)
RabbitMQ 的交换机类型。
fanout:广播模式,它会把所有发送到 Exchange 的消息路由到所有与该 Exchange 绑定的 Queue,fanout 是速度最快的交换机。fanout 通常用来广播消息。
direct:直接路由,它会将消息路由到那些 BindingKey 与 RoutingKey 完全匹配的队列中。
topic:主题路由,BindingKey 和 RoutingKey 约定它们都是有点号"."分割的字符串。例如“test.rabbitmq.client”,BindingKey 和 RoutingKey 中可以存在两种特殊字符""和"#",用于模糊匹配。其中""匹配一个单词,即相邻两个点之间的内容,"#"批次零个或多个单词,例如"test.#"可以匹配以"test."开始的任意路由。
headers:消息头路由,它不依赖路由键进行路由,而是根据发送消息内容中的 headers 属性进行匹配,通过对比消息的 headers 属性与 Exchange 绑定 Queue 时指定的属性是否一致来进行路由。
headers 类型的交换器性能会很差,而且也不实用。
RabbitMQ 的一些机制
TTL:time to live,生存时间,是 RabbitMQ 支持的消息过期时间。
在发送消息时设置,通过配置消息体的 properties 指定消息的过期时间。
在创建 Exchange 时设置,从消息进入队列开始计算,超过超时时间,消息会自动清楚。
DLX(死信队列):dead-letter-exchange,当一个消息成为死信后,会被重新 publish 到另一个 exchange上,这个 exchange 就是死信队列。
死信队列是一个正常的 exchange,它能在任何队列上被指定。当这个队列中有死信,RabbitMQ 会自动将消息重新发布到设置的 exchange 上去,进而被路由到另一个队列。
消息变成死信的几种情况:
消息被拒绝(basic.reject/basic.nack)并且requeue=false
消息TTL过期
队列达到最大长度
死信队列使用:
在声明的 queue 中指定死信队列的 exchange
声明死信 exchange、queue 并绑定
消费端ACK、NACK、Reject:
ack 是手动确认消息被消费
nack 是拒绝确认消息被消费,一般可以通过 requeue 参数把消息重新返回到 Broker,一般来说,实际应用会设置为 false,不返回队列。
reject 是拒绝确认消息被消费,与 nack类似。
生产者Confirm机制:生产者投递消息后,如果 Broker 收到消息,会给生产者一个成功或者失败的应答,生产者根据应答来确认消息是否投递成功,以确定后续操作,重发或者记录日志。生产者Confirm机制,这是在投递方保证消息可靠性的核心。
return消息机制:当一些消息不可路由的时候,例如 exchange 不存在,或者路由键不存在,导致消息不可达,可以使用 return listener。通过chennel.addReturnListener(ReturnListener rl)传入已经重写过handleReturn方法的ReturnListener,处理不可达消息。
消费端消息的获取方式 pull/push:可以通过while循环 consumer.nextDelivery() 模拟进行pull消息,但是死循环会消耗CPU资源。也可以通过自定义 Consumer 等待 RabbitMQ 推送消息过来(现在默认使用 push 方式)。
持久化:queue、exchange、message 都支持持久化
持久化的 queue、exchange 在 RabbitMQ 的 Broker 重启后,queue 和 exchange 依然存在。
消息的持久化取决于消息本身的持久化模式,如果消息以持久化模式发布,会对性能造成一定的影响。
消费端限流:如果 RabbitMQ 瞬间将大量数据推给消费端,消费端无法处理,会导致消费端服务器被压垮。RabbitMQ 提供了 qos 功能,即在非自动确认消息的前提下,如果一定数量的消息没有被确认前,不进行消费新的消息。
顺序消息:顺序发送必须保证消息投递到相同的队列,且只能有一个消费者,每次只消费一条信息,手工ACK后,再处理下一条消息。
重复消费:保证消息不被重复消费的关键是保证消息的幂等性,
解决方案:
把消息作为唯一主键,落库到数据库,如果重复会发生主键冲突。
将消息set到redis中,set操作本身就是幂等。
用第三方介质做消费记录,例如给消息分配全局id,将消息 id,message 以 K-V 形式存入,消费前,先查询有没有消费记录。
RibbitMQ集群
普通集群:多个节点之间只共享元数据,即队列结构,消息实体只存在其中一个节点中。例如 rabbit01、rabbit02,当消息进入 rabbit01 后,consumer 连接到 rabbit02 进行消费,会临时在两个节点之间进行消息传输,所以 consumer 尽量连接每一个节点,从中取消息。同一个逻辑队列,要在多个节点建立物理 queue,否则物理连接谁,出口总在 rabbit01,会产生瓶颈。
当rabbit01节点故障后,rabbit02节点无法取到rabbit01节点中还未消费的消息实体。如果做了消息持久化,那么得等rabbit01节点恢复,然后才可被消费。如果没有持久化的话,就会产生消息丢失的现象。
镜像集群:在普通集群的基础上,把需要的队列做成镜像队列,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取,也就是说多少节点消息就会备份多少份。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用。
作者:南橘ryc 链接:https://juejin.cn/post/6844904121917505550 来源:稀土掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
保证消息的投递成功
确认应答机制(生产者Confirm机制)、消费端ACK、NACK、Reject
消息自动补偿机制,(return消息机制、死信队列)
持久化机制,exchange、queue、message 都可以持久化到磁盘
消息落库机制,将消息写入数据库持久化,同时对消息进行确认,保证消息消费成功
延迟投递机制,借助死信队列和TTL实现延迟投递,在TTL后,数据被投递进死信队列,然后再处理
保证消息的消费成功
1.消息消费确认应答模式,当消息消费成功后手动或者自动ACK,或者NACK,保证消息被成功消费
参考文章1:Rabbitmq 原理解析与使用