Spring Cloud Stream 进阶配置——高吞吐量(二)——弹性消费者数量

ps:
1、本文示例使用的消息中间件为 Rabbitmq
2、示例代码是以测试用例的形式给出。
3、使用@ActiveProfiles( active_profile(s) ) 让指定配置生效。

背景

前文 Spring Cloud Stream 进阶配置——高吞吐量(一) 说到,最简单的提高消费端吞吐量的方式为:增加消费者数量,但具体增加到多少是很难确定的,因为各种各样的因素都会导致消费者数量冗余或不足,比如在用户访问高峰期,消费者数量肯定需要比较多,而在平时或凌晨,访问量降到低谷,消费者数量肯定就冗余了,甚至,一个队列只在某个定时任务才会用到,那么在平时多消费者基本就只有占用系统资源的作用了。

最大消费者数量

好在,Spring Cloud Stream 为我们提供了一个配置,用于控制 最大消费者数量,当然还有另一个隐藏的使用功能——动态创建/销毁消费者。配置为:spring.cloud.stream.rabbit.bindings.<channelName>.consumer. maxConcurrency,比如:spring.cloud.stream.rabbit.bindings.input.consumer. maxConcurrency。另外,值得注意的是,要区别于配置:spring.cloud.stream.bindings.<channelName>.consumer.concurrency,可以看到 最大消费者数量 配置的前缀多了 rabbit,因为该配置是只针对 Rabbitmq 的,其他的中间件如 Kafka 则没有。

ps: 不同的消息中间件,因为不同的架构,配置大多都不一样,所以针对那些中间件特有的属性配置,前缀需要多加一个 binder 类型前缀,即 rabbitkafka

示例

以下代码可在 源码 查看。

配置

spring:
  application:
    name: scas-data-collection
  profiles:
    active:
      default

  cloud:
    stream:
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

      bindings:
        packetUplinkOutput:
          destination: packetUplinkTopic
          content-type: application/json
          binder: rabbit

        packetUplinkInput:
          destination: packetUplinkTopic
          content-type: application/json
          group: ${spring.application.name}
          binder: rabbit
          consumer:
            concurrency: 3 # 初始/最少/空闲时 消费者数量。默认1

      rabbit:
        bindings:
          packetUplinkInput:
            consumer:
              maxConcurrency: 10 # 默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量, 即该配置的值.

代码

消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
    /**
     * 设备 eui
     */
    private String devEui;

    /**
     * 数据
     */
    private String data;

    // 省略其他字段
}
测试用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxConcurrency")
@EnableBinding({ScasMaxConcurrencyTest.MessageSink.class, ScasMaxConcurrencyTest.MessageSource.class})
public class ScasMaxConcurrencyTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    @Test
    public void test() throws InterruptedException {

        // 启动时先睡眠20s, 方便在控制面板查看 初始消费者数量
        Thread.sleep(20000);

        for (int i = 0; i < 50000; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(10000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
        log.info("发布上行数据包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) throws InterruptedException {
            Thread.sleep(20);
            log.info("消费上行数据包消息. model: [{}].", model);
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}

运行测试用例

运行测试用例后,访问 Rabbitmq可视化页面 可以看到类似下图的页面:

初始消费者数量

过一段时间后,消费者数量 则动态增加到6,如下图:


消费者数量增加到6

在过段时间会增加到30,即增加到消费者数量的最大值,如下图:


消费者数量增加到最大值

当堆积的消息被消费完,消费者处于空闲状态,于是会逐渐销毁空闲消费者,直到剩余消费者数量等于初始值(3):


开始销毁空闲消费者

销毁空闲消费者直到等于初始值

再次发布消息

因为程序是以测试用例的形式启动的,所以要想不破坏消费者数量,那么需要单独再增加一个测试用例,该测试用例单纯只有消息发布功能。如下:

配置
spring:
  application:
    name: scas-data-collection
  profiles:
    active:
      default

  cloud:
    stream:
      binders:
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

      bindings:
        packetUplinkOutput:
          destination: packetUplinkTopic
          content-type: application/json
          binder: rabbit
测试用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableBinding({ScasOnlyProducerTest.MessageSource.class})
@ActiveProfiles("onlyProducer")
public class ScasOnlyProducerTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    /**
     * 使用该测试用例, 之前降下来的消费者数量, 最后又会升上去
     * @throws InterruptedException
     */
    @Test
    public void test1() throws InterruptedException {

        while (true) {
            int msgCount = 10;
            for (int i = 0; i < msgCount; i++) {
                String devEui = getDevEuis();
                packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
            }
            Thread.sleep(1000);
        }
    }

    /**
     * 使用该测试用例, 消费者数量基本保持不变
     * @throws InterruptedException
     */
    @Test
    public void test2() throws InterruptedException {

        while (true) {
            int msgCount = 3;
            for (int i = 0; i < msgCount; i++) {
                String devEui = getDevEuis();
                packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
            }
            Thread.sleep(3000);
        }
    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
            log.info("发布上行数据包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}

注意,启动下文的测试用例时,之前的测试用例不能关闭,因为要验证动态增加/销毁消费者。

启动测试用例一
消费者数量再次逐渐增加

使用该测试用例, 之前降下来的消费者数量, 最后又会升上去。

启动测试用例二

先关闭上一个测试用例


消费者数量基本维持不变

通过上面2个测试用例的结果,可以推测:在某个时刻,当 消息发布速率 > 消息消费速率 时,如果当前 消费者数量 < 最大消费者数量,那么 Spring Cloud Stream 会帮我们创建新的消费者,直到 消费者数量 = 最大消费者数量。

结论

配置 spring.cloud.stream.rabbit.bindings.<channelName>.consumer. maxConcurrency,可以让应用拥有动态增加/销毁消费者的能力,达到动态控制消费者数量的目的。再配合 spring.cloud.stream.bindings.<channelName>.consumer.concurrency,可以将消费者数量控制在一定范围内。

另外,需要注意的是,maxConcurrency 并不是越大越好,因为该值越大,若一直有大量消息发布,那么 Spring Cloud Stream 则会一直增加消费者,知道消费者数量等于 maxConcurrency,而一个消费者对应了一个线程,线程太多的话,会加载系统的负担,从而可能影响到其他业务。

但是如果不增加消费者数量的话,而消息发布的吞吐量又居高不下,该怎么办呢?实际上,增加消费者数量,其最终目的就是提升消费端的消费力,那我们可以从别的方式入手,比如增加应用实例数量,其实这也是另一种增加消费者数量的方式,只是在其他机器而已;而如果暂时没有增加机器的预算,那么可以考虑另一种方案:Spring Cloud Stream 进阶配置——高吞吐量(三)

推荐阅读

Spring Cloud Stream 进阶配置——高吞吐量(一)——多消费者
Spring Cloud Stream 进阶配置——高吞吐量(三)——批量预取消息(prefetch)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容