rabbitMq高级

保证消息的可靠性

发送者重连

有的时候由于网络波动,可能会出现发送者连接MQ失败的情况。通过配置我们可以开启连接失败后的重连机制:

spring:
  rabbitmg:
     connection-timeout: 1s #设置MQ的连接超时时间
     template:
        retry:
            enabled:true #开启超时重试机制
            initial-interval:1000ms#失败后的初始等待时间
            multiplier:1 #失败后下次的等待时长倍数,下次等待时长=         initial-interval * multiplier
            max-attempts:3#最大重试次数

注意:

当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMOP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

发送者确认

SpringAMQP提供了Publisher Confirm和Publisher Return两种确认机制。开启确机制认后,当发送者发送消息给MO后,MO会返回确认结果给发送者。返回的结果有以下几种情况:

  • 消息投递到了MQ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

SpringAMQP实现发送者确认

1.在publisher这个微服务的application.yml中添加配置:

spring:
  rabbitmg:
    publisher-confirm-type:correlated#开publisher confirm机制,并设  置confirm类型
    publisher-returns:true#开启publisher return机制

配置说明:

  • 这里publisher-confirm-type有三种模式可选:
  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执消息
  • correlated:MO异步回调方式返回回执消息

SpringAMQP实现发送者确认

2.每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

@slf4j
@AllArgsConstructor
@Configuration
public class MgConfig {
  private final RabbitTemplate rabbitTemplate;
  @Postconstruct
  public vaid init(){
    rabbfiTemplate,setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
      @Override
      public void returnedMessage(ReturnedMessage returned{
          log.error("触发return callback,");
          log.debug("exchange:{}",returned.getExchange());
          log.debug("routingKey:{}",returned.getRoutingKey());
          log.debug("message:{}",returned.getMessage());
          log.debug("replyCode:{}",returned.getReplyCode());
          log.debug("replyText:{},returned.getReplyText());
      });
  }
}

SpringAMQP实现发送者确认

3发送消息,指定消息ID、消息ConfirmCallback

@Test
void testPublisherConfirm()throws InterruptedException {
    //1.创建CorrelationData
    CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    //2.给Future活加confirmcallback
    cd.getFuture().addcallback(new ListenableFutureCallback<CorrelationData.Confirm>(){
        @Overridepublic void onFailure(Throwable ex){
            //2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("handle message ack fail",ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result){
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){// result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{//result.getReason(),String类型,返回nack时的导常描述    
                log.error("发送消息失败,收到 nack,reason :{}",result.getReason());
            });
            //3.发送消息
            rabbitTemplate.convertAndSend("hmall.direct", "red1", "hello", cd);
}

MQ的可靠性

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。这样会导致两个问题:

  • 一旦MO宕机,内存中的消息会丢失
  • 内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MO阻塞


数据持久化

RabbitMO实现数据持久化包括3个方面

交换机默认就是持久化

队列默认也是持久化

消息,第一个是非持久化,第二个是持久化

代码默认SpringAmqp默认是持久化,如果想非持久化的话,需要自定义构建消息。

//1.自定义构建消息
Message message =MessageBuilder
        .withBody("hello,springAMqp".getBytes(Standardcharsets.UTF_8))
        .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
        .build();
// 2.发送消息
for(inti=0;i<1000000;i++){
        rabbitTemplate.convertAndSend( routingKey: "simple.queue", message);
}

推荐持久化:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

Lazy Queue

从RabbitMQ的3.6.0版本开始,就增加了LazyQueue的概念,也就是惰性队列
惰性队列的特征如下:

  • 接收到消息后直接存入磁盘,不再存储到内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(可以提前缓存部分消息到内存,最多2048条)
    在3.12版本后,所有队列都是LazyQueue模式,无法更改

Lazy Queue

要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可:


界面开启Lazy模式

代码开启Lazy模式

RabbitMQ如何保证消息的可靠性

  • 首先通过配置可以让交换机、队列、以及发送的消息都持久化。这样队列中的消息会持久化到磁盘,MO重启消息依然存在,
  • RabbitMQ在3.6版本引入了LazyQueue,并且在3.12版本后会称为队列的默认模式。LazyQueue会将所有消息都持久化。
  • 开启持久化和生产者确认时,RabbitM0只有在消息持久化完成后才会给生产者返回ACK回执

消费者确认机制

消费者确认机制(Consumer Acknowledgement)是为了确认消费者是否成功处理消息。当消费者处理消息结束后应该向RabbitMO发送一个回执,告知RabbitMQ自己消息处理状态:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMO需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息


消费者确认机制

SpringAMOP已经实现了消息确认功能。并允许我们通过配置文件选择ACK处理方式,有三种方式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送ackreject,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMOP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack
    当业务出现异常时,根据异常判断返回不同结果:
  • 如果是业务异常,会自动返回nack
  • 如果是消息处理或校验异常,自动返回reject

失败重试机制

SpringAMOP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限的requeue到mg。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:
    rabbitmg :
        listener:
            simple:
                prefetch: 1
                retry:
                    enabled:true #开启消费者失败重试
                    initial-interval:1000ms#初始的失败等待时长为1秒
                    multiplier:1 #下次失败的等待时长倍数,下次等待时长      =multiplier *last-interval
                    max-attempts:3#最大重试次数
                    stateless:true #true无状态:false有状态。如果业务中包含事务,这里改为false

失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机


失败消息处理策略

将失败处理策略改为RepublishMessageRecoverer:

  • 首先,定义接收失败消息的交换机、队列及其绑定关系,此处略:
  • 然后,定义RepublishMessageRecoverer:
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
      return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"):
}

业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x)=f(f(x))。在程序开发中,则是指同一个业务,执A行一次或多次对业务状态的影响是一致的。


唯一消息id

方案一,是给每个消息都设置一个唯一id,利用id区分是否是重复消息

  • ①每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  • ②消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  • ③如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
@#Bean
public MessageConverter messageConverter(){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jimc = new Jackson2JsonMessageConverter();
        //2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jjmc.setCreateMessageIds(true);
        return jjmc;
}

接收消息的时候

@RabbitListener(queues ="simple.queue")
public void listensimpleQueue(Message message){
        log.info("监听到simple.queue的消息:ID:【{}】",message.getMessageProperties().getMessageId());
        log.info("监听到simple.queue的消息:【{}】",new string(message.getBody()));
),

业务判断

方案二,是结合业务逻辑,基于业务本身做判断。以我们的余额支付业务为例:


应该改为:

如何保证支付服务与交易服务之间的订单状态一致性?

  • 首先,支付服务会正在用户支付成功以后利用MO消息通知交易服务完成订单状态同步
  • 其次,为了保证MO消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性。同时也开启了MO的持久化,避免因服务宕机导致消息丢失。
  • 最后,我们还在交易服务更新订单状态时做了业务幂等判断,避免因消息重复消费导致订单状态异常。

如果交易服务消息处理失败,有没有什么兜底方案?

延迟消息

延迟消息:发送者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息
延迟任务:设置在一定时间之后才执行的任务

用延迟消息作为兜底,实现一致性

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):

  • 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
  • 要投递的队列消息堆积满了,最早的消息可能成为死信
    如果队列通过dead-letter-exchange属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中。这个交换机称为死信交换机(Dead LetterExchange,简称DLX)。
    利用死信交换机机制实现延迟消息

绑定死信交换机代码:

@Bean
public Queue normalQueue(){
        return QueueBuilder
                    .durable( name: "normal.queue")
                    .deadLetterExchange( dlx: "dlx.direct") //死信交换机
                    .build();
}
@Test
void testSendDelayMessage(){
      rabbitTemplate,convertAndsend( exchange: "normal,direct", routingKey: "hi", message: "hello", message ->{
           message.getMessageProperties().setExpiration("10000");//设置过期时间ms
           return message;
      });
}

延迟消息插件

这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。


4.2.DelayExchange插件

基于死信队列虽然可以实现延迟消息,但是太麻烦了。因此RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。
官方文档说明:
https://blog.rabbitmg.com/posts/2015/04/scheduling-messages-with-rabbitmg
Scheduling Messages with RabbitMO|RabbitMO -Blog
Menu features Get Started Support Community Docs Blog * Understanding memory use with RabbitM0 3.4 New Credit flow Setings onRabbitMQ 3.5.5 » Scheduling Messages with RabbitMQ Tweet Follow @RabbitMQ

4.2.1.下载

插件下载地址:
https://github.com/rabbitmg/rabbitmg-delayed-message-exchange
由于我们安装的MQ是 3.8版本,因此这里下载 3.8.17 版本:

4.2.2.安装

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

Shell
l docker volume inspect mg-plugins

结果如下:

(
    {
      "CreatedAt":"2024-06-19T09:22:59+08:00",
      "Driver": "local",
      "Labels":null,
      "Mountpoint":"/var/lib/docker/volumes/mg-plugins/_data",
      "Name": "mg-plugins",
      "Options":null,
      "Scope":"local"
    }
)

插件目录被挂载到了 /var/lib/docker/volumes/mg-plugins/_data 这个目录,我们上传插件到该目录
接下来执行命令,安装插件:

Shell
| docker exec -it mg rabbitmg-plugins enable rabbitmg_delayed_message_exchange

延迟消息插件

这个插件可以将普通交换机改造为支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。

@RabbitListener(bindings =QueueBinding(
        value =Oueue(name ="delay.queue",durable ="true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key ="delay"
))
public void listenDelayMessage(String msg){
      log.info("接收到delay.queue的延迟消息:{}",msg);
}
@Bean
public DirectExchange delayExchange(){
      return ExchangeBuilder
               .directExchange("delay.direct")
               .delayed()// 设置delay的属性为true
               .durable(true)//持久化
               .build();
}

延迟消息插件

发送消息时需要通过消息头x-delay来设置过期时间:

@Test
void testPublisherDelayMessage(){
        // 1.创建消息
        String message ="hello,delayed message";
        //2.发送消息,利用消息后置处理器添加消息头
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message)throws AmgpException{
                // 添加延迟消息属性
                message.getMessageProperties().setDelay(5000);
                return message;
            });
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • 楔子 本篇是消息队列RabbitMQ的第四弹。 RabbitMQ我已经写了三篇了,基础的收发消息和基础的概念我都已...
    前度天下阅读 346评论 0 0
  • RabbitMQ 高级特性 消息可靠性 我们可以从以下几方面来保证消息的可靠性: 客户端代码中的异常捕获,包括生产...
    左师兄zuosx阅读 580评论 0 0
  • 消息如何保障100%的投递成功? 什么是生产端的可靠性投递? 1、保障消息的成功发出 2、保障MQ节点的成功接收 ...
    小波同学阅读 733评论 0 9
  • 0. 前言 本文内容分为如下三部分RabbitMQ高级特性 消息可靠性投递Consumer ACK消费端限流TTL...
    abboo阅读 1,511评论 1 8
  • 消息可靠性投递 使用RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景,RabbitMQ为...
    爷爷的心里只有奶奶阅读 199评论 0 0