【spring cloud 配置中心 + rabbit mq】网络断连恢复引起的配置无法动态刷新

最近帮别人看一个问题,其项目使用了 rabbit mq,一是业务代码使用;二是配合 spring cloud config & spring cloud bus做配置动态刷新。
在测试环境偶尔会瞬时的网络中断,在几秒内即恢复,但之后项目日志内会一直报 rabbit mq 的重连错误:一开始的报错是 :

#method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'springCloudBus.anonymous.xxxxx' in vhost '/', class-id=50, method-id=20)

过了一段时候后,报错变成 :

#method<channel.close>(reply-code=405, reply-text=NOT_FOUND.....

经过测试发现,业务队列收发没有收到影响,结合报错内容内的 springCloudBus.anonymous.xxxxxx ,明显是和 spring cloud bus 相关的队列,项目里用到的地方只有配置中心的配置动态刷新,一测果然配置无法动态刷新了。

原因分析:

第一个报错里,rabbit mq 返回了405 RESOURCE_LOCKED,搜一下就知道是因为 rabbit 排他性 队列的特性,通过查阅 spring cloud bus 源码确认了这一点(RabbitExchangeQueueProvisioner.provisionConsumerDestination):

@Override
    public ConsumerDestination provisionConsumerDestination(String name, String group,
            ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
        boolean anonymous = !StringUtils.hasText(group);
        String  baseQueueName = anonymous ? groupedName(name, ANONYMOUS_GROUP_NAME_GENERATOR.generateName())
                    : properties.getExtension().isQueueNameGroupOnly() ? group : groupedName(name, group);
        if (this.logger.isInfoEnabled()) {
            this.logger.info("declaring queue for inbound: " + baseQueueName + ", bound to: " + name);
        }
        String prefix = properties.getExtension().getPrefix();
        final String exchangeName = applyPrefix(prefix, name);
        Exchange exchange = buildExchange(properties.getExtension(), exchangeName);
        if (properties.getExtension().isDeclareExchange()) {
            declareExchange(exchangeName, exchange);
        }
        String queueName = applyPrefix(prefix, baseQueueName);
        boolean partitioned = !anonymous && properties.isPartitioned();
        boolean durable = !anonymous && properties.getExtension().isDurableSubscription();
        Queue queue;
        if (anonymous) {
            queue = new Queue(queueName, false, true, true, queueArgs(queueName, properties.getExtension(), false));
        }
        else {
            if (partitioned) {
                String partitionSuffix = "-" + properties.getInstanceIndex();
                queueName += partitionSuffix;
            }
            if (durable) {
                queue = new Queue(queueName, true, false, false,
                        queueArgs(queueName, properties.getExtension(), false));
            }
            else {
                queue = new Queue(queueName, false, false, true,
                        queueArgs(queueName, properties.getExtension(), false));
            }
        }
        declareQueue(queueName, queue);
        Binding binding = null;
        if (properties.getExtension().isBindQueue()) {
            binding = declareConsumerBindings(name, properties, exchange, partitioned, queue);
        }
        if (durable) {
            autoBindDLQ(applyPrefix(properties.getExtension().getPrefix(), baseQueueName), queueName,
                    properties.getExtension());
        }
        return new RabbitConsumerDestination(queue, binding);
    }

可以看到 group 参数为空的时候,就会自动创建匿名的排他队列。

那么为什么第二个报错变成了 rabbit 返回 404 NOT FOUND?

首先,第一段报错和第二段报错的间隔一般很稳定,三次报 405 后就会变成 404,跟异常栈对应的源码,可以发现这段间隔对应的配置:

private int declarationRetries = 3;

这个字段是 spring 封装的 rabbit mq 包的 BlockingQueueConsumer 类,这个封装的消费者类,会在与指定队列绑定消费连接时,试图重声明队列,重试间隔默认 5000ms,在它试图重新声明这个匿名的排他队列时,会被无情的返回 405 拒绝连接,即使这个排他队列是它之前创建...
在三次重试过后差不多的时间点,这个队列会自动删除,没错,这个匿名队列不但是排他性 的,而且是 自动删除 的。在 rabbit 之后的消费者重连尝试中,就会返回 404 找不到指定队列的报错。

注意:需要区分 rabbit 消费者重连 重试和 队列重声明 重试机制。

  • 消费者重连 若不进行手动配置,在 RabbitAdmin 中就可以看到其实也是代码中写死的——5次尝试,但不管怎样肯定要比队列的重试周期长。

解决方案:

废了这么多话,各位很容易就能想出一个解决方案:把 队列重声明 的次数配置多一些就好了嘛,等旧的匿名排他队列自动删掉了,就可以正常的重声明出新的匿名排他队列了。那让我们看看怎么配、配完效果是什么?

尝试解决:加大 队列重声明 的次数

配置
在 github 的 spring cloud bus 的仓库并没有找到相关配置,但在 spring cloud stream binder rabbit仓库找到了:


该配置项是:spring.cloud.stream.rabbit.bingings.<channelName>.consumer.queue-declaration-retries,那么问题来了,这个 <channelName> 填什么???
只好从配置项注入代码附近入手,通过打断点在运行时的拿参数出来,可以知道 spring cloud bus 通过 spring cloud stream binder rabbit 创建队列的 channelName 属性值为 springCloudBusInput,那么带入配置一下,设一个很大数如999999即可。
结果
运行测试,依然在几次 405 报错后返回 404......
继续从源码分析入手,队列重声明调用的是 Channel.queueDeclarePassive 方法,搜了下发现用这个方法声明队列时,如果队列不存在就会报404....WTF....

最终解决:配置指定名称队列

那么现在还能怎样解决这个问题?第一次声明匿名队列的源码中有一段(RabbitExchangeQueueProvisioner.provisionConsumerDestination):

boolean anonymous = !StringUtils.hasText(group);
.......
if (anonymous) {
     queue = new Queue(queueName, false, true, true, queueArgs(queueName, properties.getExtension(), false));
}else {
     if (partitioned) {
         String partitionSuffix = "-" + properties.getInstanceIndex();
         queueName += partitionSuffix;
     }
     if (durable) {
         queue = new Queue(queueName, true, false, false, queueArgs(queueName, properties.getExtension(), false));
     }else {
         queue = new Queue(queueName, false, false, queueArgs(queueName, properties.getExtension(), false));
      }
}

可以看出非匿名队列是不会设置 排他性自动删除 的,而group 这个参数不为空时,就会用 group 为队列名进行声明。这个参数同样是可以配置的,同样在之前的github仓库中有介绍:

简言之:如果该值设为true,则使用 group 作为队列的名称。

为指定 channel 配置 group 没有找到可用配置,但可以通过设置一个全局默认 group 做到同样效果:

spring.cloud.stream.default.group=springCloudBus-${spring.application.name}-${spring.cloud.client.ip-address}-${server.port}

完整配置

spring:
  cloud:
    stream:
      default:
        group: ${spring.application.name}-${spring.cloud.client.ip-address}-${server.port}
      rabbit:
        bindings:
          springCloudBusInput:
            consumer:
              # 队列声明重试次数
              queue-declaration-retries: 2000
              # 重试间隔(ms)
              recovery-interval: 5000
              # 为true时,使用‘group’作为配置刷新队列的名称
              queue-name-group-only: true

经测试,网络断连恢复后,程序即时地恢复了消费连接,没有报错。

小小的感悟

其实整个问题解决下来,最后只用了一段配置就搞定了。官方文档里几十个配置项,一个个读下去每个都像是解决的方式,甚至有一些官方文档没介绍的配置,其实都直接可以写在配置文件里,启动时会自动注入。
总而言之,现在的开发工作很少离得开功能完善的开源库了,阅读源码是发现、解决问题的不二手段,甚至通过源码,你可以发现一些巧妙的间接的解决方式。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,482评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,377评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,762评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,273评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,289评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,046评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,351评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,988评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,476评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,948评论 2 324
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,064评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,712评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,261评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,264评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,486评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,511评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,802评论 2 345