【RabbitMQ-9】自定义配置线程池(线程池资源不足-MQ初始化队列&&MQ动态扩容影响)

1. 源码注意点

源码一:启动消费者

@Override 
protected void doStart() {
    checkListenerContainerAware();
    super.doStart();
    synchronized(this.consumersMonitor) {
        if (this.consumers != null) {
            throw new IllegalStateException("A stopped container should not have consumers");
        }
        int newConsumers = initializeConsumers();
        if (this.consumers == null) {
            logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)");
            return;
        }
        if (newConsumers <= 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Consumers are already running");
            }
            return;
        }
        Set < AsyncMessageProcessingConsumer > processors = new HashSet < AsyncMessageProcessingConsumer > ();
        //每一个消费者,创建ConcurrentConsumers个线程
        for (BlockingQueueConsumer consumer: this.consumers) {
            AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            //使用配置的线程池去开启线程
            //(便会执行run方法,run方法中启动成功会使得processor的CountDownLatch-1)
            getTaskExecutor().execute(processor);
            if (getApplicationEventPublisher() != null) {
                getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
            }
        }
        //判断所有的线程是否执行run()方法启动消费者成功?没有成功的话,阻塞,直至所有消费者成功。
        waitForConsumersToStart(processors);
    }
}

若核心线程数满了,但是依旧有消费者等待启动,那么会在waitForConsumersToStart阻塞。

源码二:串行阻塞

private void waitForConsumersToStart(Set < AsyncMessageProcessingConsumer > processors) {
    for (AsyncMessageProcessingConsumer processor: processors) {
        FatalListenerStartupException startupException = null;
        try {
            startupException = processor.getStartupException();
        } catch(TimeoutException e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        } catch(InterruptedException e) {
            Thread.currentThread().interrupt();
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        }
        if (startupException != null) {
            throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
        }
    }
}

源码三:使用CountDownLatch阻塞

private FatalListenerStartupException getStartupException() throws TimeoutException,
InterruptedException {
    if (!this.start.await(SimpleMessageListenerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
        logger.error("Consumer failed to start in " + SimpleMessageListenerContainer.this.consumerStartTimeout + " milliseconds; does the task executor have enough threads to support the container " + "concurrency?");
    }
    return this.startupException;
}

启动的线程是串行的阻塞。

例如:线程池只存在1个线程,但某个队列消费者需要10个线程。

  1. 创建消费者线程;
  2. 使用配置的线程池启动消费者;
  3. 发布创建消费者的消息;
  4. 串行阻塞判断所有消费者是否创建完毕(默认60s);
  5. 理论是等待9*60s的时间,唯一的消费者才会开始执行;

注意点:

  1. 队列抢占线程池线程顺序是按队列初始化顺序决定的,即先初始化的队列先占用线程池资源。若线程不足,MQ打印Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?信息。

  2. 配置的线程池资源被消费者占用后,是不会被释放的,while循环会一直监听MQ消息。

配置MQ的线程池不应该配置阻塞队列,因为getTaskExecutor().execute(processor);使用线程池启动线程,若核心线程满了之后,会使用阻塞队列。而使用阻塞队列,会导致消费者不能被启动。

2. 实现方式

配置线程池模式:

@Slf4j
@Configuration
public class RabbitConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

        @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        /* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(2);
        factory.setPrefetchCount(1);
        factory.setDefaultRequeueRejected(true);
        //使用自定义线程池来启动消费者。
        factory.setTaskExecutor(taskExecutor());
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }


    @Bean("correctTaskExecutor")
    @Primary
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(100);
        // 设置最大线程数
        executor.setMaxPoolSize(100);
        // 设置队列容量
        executor.setQueueCapacity(0);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(300);
        // 设置默认线程名称
        executor.setThreadNamePrefix("thread-xx-");
        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

}

3. MQ自动扩容的影响

上面说到,mq在启动时创建消费者时由于线程池资源不足,会导致阻塞(影响该queue的消费消息)。

那么若是代码中配置了factory.setMaxConcurrentConsumers(2);,扩容时发现线程池资源不足,有什么影响呢?

3.1 源码分析

  1. 消费者线程循环的消费消息

源码位置org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run

@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
    if (!isActive()) {
        return;
    }
    ...
    try {
        initialize();
        //每个消费者线程循环的去获取消息
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            mainLoop();
        }
    } ...
}
  1. 循环体的操作

注意receiveAndExecute()方法的返回值是checkAdjust()方法的请求参数,那么理解MQ动态扩容,就必须先明白receiveAndExecute()的逻辑以及返回值的含义。

private void mainLoop() throws Exception { // NOSONAR Exception
    try {
        //该方法是获取消息,并执行业务操作(并发送ACK或NACK到MQ)。返回值true表示已经消费消息;false表示未获取到消息
        boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
        //判断是否配置了maxConcurrentConsumers,是否进行动态扩容
        if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
            checkAdjust(receivedOk);
        }
        ...
    }
}

2.1 receiveAndExecute—业务逻辑

该方法会执行业务逻辑,并发送ACK或NACK到MQ中。完成一个消息的消费。
但是即使发送ACK后,依旧在mainLoop()循环中,需要完成后续逻辑才能消费下一个消息。(注:不是向MQ发送ACK或NACK后立即去消费后续消息!!!)

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
    PlatformTransactionManager transactionManager = getTransactionManager();
    if (transactionManager != null) {...事务操作,不关注
    }
    //接受消息并进行处理
    return doReceiveAndExecute(consumer);
}

若是执行nextMessage()没有获取到消息,那么执行break操作,最终会导致上面的receiveAndExecute()方法返回false。而receiveAndExecute()的值可以决定是否动态扩容

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
    Channel channel = consumer.getChannel();
    //默认txSize=1
    for (int i = 0; i < this.txSize; i++) {

        //在内存队列中获取消息
        Message message = consumer.nextMessage(this.receiveTimeout);
        //未获取到消息,开始下次循环
        if (message == null) {
            break;
        }
        try {
            //执行业务逻辑  
            executeListener(channel, message);
        } 
       ...catch操作,不关注
    }
    //没有获取到消息,这个方法会返回false
    return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
    //此处直接返回false
    if (this.deliveryTags.isEmpty()) {
        return false;
    }
    ...
    return true;
}

在内存中获取消息

由于配置了setPrefetchCount参数,所以内存会去MQ中预取配置的消息数,放到本地的BlockingQueue中。
配置详见:
【RabbitMQ-2】RabbitMQ的并发参数(concurrency和prefetch)

未获取到消息

public Message nextMessage(long timeout) throws InterruptedException,
ShutdownSignalException {
    if (logger.isTraceEnabled()) {
        logger.trace("Retrieving delivery for " + this);
    }
    checkShutdown();
    if (this.missingQueues.size() > 0) {
        checkMissingQueues();
    }
    //poll的API描述:检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。
    Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
    //cancelled默认false不会执行改逻辑
    if (message == null && this.cancelled.get()) {
        throw new ConsumerCancelledException();
    }
    //未获取到消息返回null。
    return message;
}

2.2 checkAdjust()—动态扩容业务

由上面源码可以若是没有获取到消息,receivedOk返回false(注:若是获取到消息,但是NACK,receivedOk返回值依旧是true)。

如何保证是连续获取或者连续空转的?
答案:因为mainloop()一直循环,每次均在本地queue获取消息(最长阻塞1s)。若连续9次均未获取到消息,第10次获取到消息,那么会重置consecutiveIdles=0

private void checkAdjust(boolean receivedOk) {
    //成功获取到消息
    if (receivedOk) {
        if (isActive(this.consumer)) {
            //连续空转标识设置为0
            this.consecutiveIdles = 0;
            //consecutiveActiveTrigger默认为10
            if (this.consecutiveMessages++>SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
               //开启一个消费者线程
                considerAddingAConsumer();
                //练习消费的标识设置为0
                this.consecutiveMessages = 0;
            }
        }
    } else {
        this.consecutiveMessages = 0;
        if (this.consecutiveIdles++>SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
            considerStoppingAConsumer(this.consumer);
            this.consecutiveIdles = 0;
        }
    }
}

开启一个消费者线程:

private void considerAddingAConsumer() {
    //加锁
    synchronized(this.consumersMonitor) {
      //若是当前consumers数量小于配置maxConcurrentConsumers
        if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
            long now = System.currentTimeMillis();
            //开启消费者有间隔时间
            if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
                //增加消费者。
                this.addAndStartConsumers(1);
                this.lastConsumerStarted = now;
            }
        }
    }
}

开启消费者的操作

protected void addAndStartConsumers(int delta) {
    synchronized(this.consumersMonitor) {
        if (this.consumers != null) {
            //每一次循环均是创建一个消费者
            for (int i = 0; i < delta; i++) {
                 //判断是否创建消费者
                if (this.maxConcurrentConsumers != null && this.consumers.size() >= this.maxConcurrentConsumers) {
                    break;
                }
                //创建消费者
                BlockingQueueConsumer consumer = createBlockingQueueConsumer();
                //(核心)属性的consumers+1
                this.consumers.add(consumer);
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                if (logger.isDebugEnabled()) {
                    logger.debug("Starting a new consumer: " + consumer);
                }  
                //使用内部的线程池执行
                getTaskExecutor().execute(processor);
                //发布创建消费者事件
                if (this.getApplicationEventPublisher() != null) {
                    this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                }
                try {
                    //线程执行完run方法后,线程中的CountDownLatch-1。
                    //若线程池没有资源,那么会在此处阻塞(默认60s)
                    //阻塞完毕,startupException返回null。即成功创建
                    FatalListenerStartupException startupException = processor.getStartupException();
                    //若是线程池资源不足,只是返回null,不会执行下面分支。
                    if (startupException != null) {
                        this.consumers.remove(consumer);
                        throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                    }
                } catch(InterruptedException ie) {
                    Thread.currentThread().interrupt();
                } catch(Exception e) {
                    consumer.stop();
                    logger.error("Error starting new consumer", e);
                    this.cancellationLock.release(consumer);
                    this.consumers.remove(consumer);
                }
            }
        }
    }
}
  1. 上述代码中,创建消费者线程是同步的流程,即某个消费者线程加锁去创建。某创建时线程池没有资源,会阻塞消费者线程。
  2. 若线程池没有资源,阻塞完毕后,只是打印异常日志,并抛出异常,此时内存中消费者个数为n+1个,但是只有n个线程可以消费消息
  3. 当连续10次空转时consecutiveIdles =10,且消费者线程n+1,会回收临时扩展的消费者线程。
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
    synchronized(this.consumersMonitor) {
        if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
            long now = System.currentTimeMillis();
            if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
                //回收消费者的核心方式
                consumer.basicCancel(true);
                //本地消费者集合移除消费者
                this.consumers.remove(consumer);
                if (logger.isDebugEnabled()) {
                    logger.debug("Idle consumer terminating: " + consumer);
                }
                this.lastConsumerStopped = now;
            }
        }
    }
}

上面说到,内存中的消费者数量n+1,但是有效的消费者n个。当回收消费者时会回收有效的消费者使得内存消费者数量n个,有效消费者数量n-1个。

若是线程池资源不足,且配置了消费者动态扩展参数后,最终会导致有效的消费者数量为0,导致消息的大量积压!!!

注:RabbitMQ使用默认的new SimpleAsyncTaskExecutor()开启消费者线程,即每当使用线程是,均是new出来的。

总结:不推荐使用自定义配置的线程池,若使用,每次增加队列时均需要注意配置好线程数。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容