1. 结构
消息服务器包含以下角色:
- vhost:虚拟机,不同虚拟机内可以有各自独立的配置,如权限
- exchange:用于消息分发
- queue:缓存消息
- route:队列与exchange的绑定关系,exchange收到消息后根据消息的route-key将消息转发到相应的队列。
- channel: 在client与消息服务器之间的tcp连接中,开辟多个channel,每个线程使用其中一个channel,避免连接数过多。
2.工作流程
- producer产生消息,指定消息的exchange和routekey,将消息发到指定exchange
- exchange收到消息,匹配routekey的队列,将消息发送到队列。
- consumer从监听的队列获取消息并消费。
3. exchange
direct:发送到路由键完全匹配的队列
fanout:广播到当前exchange绑定的所有队列
topic:可使用通配符部分匹配route-key,将消息发送至匹配的队列。
4. 可靠性保证
1.broker确认:生产者到broker的可靠性保证。
2.broker持久化:防止broker宕机丢失数据。
3.消费者确认
消费者接收每一条消息后都必须进行确认,只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需 要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长 的时间来处理消息。保证数据的最终一致性。
消费者确认机制有两个须注意的问题:
(1) 如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会 认为消息没有被分发,然后重新分发给下一个订阅的消费者。
(2) 如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为 该消费者繁忙,将不会给该消费者分发更多的消息。
确认和持久化不能完全保证可靠,broker中已确认的消息,在节点宕机数据无法恢复时,消息发生丢失。
解决方案:
- 方案1. 队列需要镜像,保证高可用
- 方案2. 消息补偿。发送、接收消息时,需要记录消息状态,检查是否被消费。
5.消息重复消费
第4节主要保证消息至少被消费一次,但由于重发机制,消息可能会重复消费。
解决方案:幂等性保证,消息中携带唯一业务id,消费时检查id是否已消费。
场景:
- 插入数据库:数据库建立unique key约束
- 非幂等的update操作:如扣款50等更新,先检查消息id是否已被消费。
6. 延迟队列
- ttl
可以对queue和message分别设置ttl,过期的消息称为dead letter。 - 死信队列
可以为死信指定交换机和route-key,重新将死信进行路由,发送到队列处理。 - 成为deal letter条件
- 有过期message
- 队列长度达到最大值
- 消息被拒绝且不再重新入队
7.工作模式
simple:一个队列一个消费者
worker:一个队列多个消费者,争抢消息
发布订阅:通过队列与exchange的绑定关系完成消息分发。
8.顺序消费
需求:生产者产生一组消息,组内消息需要被顺序消费。
方案:生产者到队列、队列到消费者都需要顺序发送,并在前一条消息处理完成之后返回信号,获取下一条消息。
producer -> queue:生产的消息需放入本地队列缓存,发送一条消息时加锁;broker收到消息并发送到队列后,返回确认;生产者收到确认后释放锁发送下一条消息。
queue -> consumer: 同样须加锁,收到消费者的确认后,释放锁发送下一条消息。
此时,一个队列只能有一个消费者。如果需要多个消费者,则需要建立多个队列。生产者根据消息分组id求哈希值,将同一分组的消息发送到相同队列。
9.集群
普通集群:某节点挂了,则节点上消息不可用,需重启节点。
镜像模式:同一队列在多个节点上建立数据备份