1. 源码注意点
源码一:启动消费者
@Override
protected void doStart() {
checkListenerContainerAware();
super.doStart();
synchronized(this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set < AsyncMessageProcessingConsumer > processors = new HashSet < AsyncMessageProcessingConsumer > ();
//每一个消费者,创建ConcurrentConsumers个线程
for (BlockingQueueConsumer consumer: this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
//使用配置的线程池去开启线程
//(便会执行run方法,run方法中启动成功会使得processor的CountDownLatch-1)
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
//判断所有的线程是否执行run()方法启动消费者成功?没有成功的话,阻塞,直至所有消费者成功。
waitForConsumersToStart(processors);
}
}
若核心线程数满了,但是依旧有消费者等待启动,那么会在waitForConsumersToStart
阻塞。
源码二:串行阻塞
private void waitForConsumersToStart(Set < AsyncMessageProcessingConsumer > processors) {
for (AsyncMessageProcessingConsumer processor: processors) {
FatalListenerStartupException startupException = null;
try {
startupException = processor.getStartupException();
} catch(TimeoutException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
源码三:使用CountDownLatch阻塞
private FatalListenerStartupException getStartupException() throws TimeoutException,
InterruptedException {
if (!this.start.await(SimpleMessageListenerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
logger.error("Consumer failed to start in " + SimpleMessageListenerContainer.this.consumerStartTimeout + " milliseconds; does the task executor have enough threads to support the container " + "concurrency?");
}
return this.startupException;
}
启动的线程是串行的阻塞。
例如:线程池只存在1个线程,但某个队列消费者需要10个线程。
- 创建消费者线程;
- 使用配置的线程池启动消费者;
- 发布创建消费者的消息;
- 串行阻塞判断所有消费者是否创建完毕(默认60s);
- 理论是等待9*60s的时间,唯一的消费者才会开始执行;
注意点:
队列抢占线程池线程顺序是按队列初始化顺序决定的,即先初始化的队列先占用线程池资源。若线程不足,MQ打印
Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?
信息。配置的线程池资源被消费者占用后,是不会被释放的,while循环会一直监听MQ消息。
配置MQ的线程池不应该配置阻塞队列,因为
getTaskExecutor().execute(processor);
使用线程池启动线程,若核心线程满了之后,会使用阻塞队列。而使用阻塞队列,会导致消费者不能被启动。
2. 实现方式
配置线程池模式:
@Slf4j
@Configuration
public class RabbitConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
/* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(2);
factory.setPrefetchCount(1);
factory.setDefaultRequeueRejected(true);
//使用自定义线程池来启动消费者。
factory.setTaskExecutor(taskExecutor());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
@Bean("correctTaskExecutor")
@Primary
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(100);
// 设置最大线程数
executor.setMaxPoolSize(100);
// 设置队列容量
executor.setQueueCapacity(0);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(300);
// 设置默认线程名称
executor.setThreadNamePrefix("thread-xx-");
// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
3. MQ自动扩容的影响
上面说到,mq在启动时创建消费者时由于线程池资源不足,会导致阻塞(影响该queue的消费消息)。
那么若是代码中配置了factory.setMaxConcurrentConsumers(2);
,扩容时发现线程池资源不足,有什么影响呢?
3.1 源码分析
- 消费者线程循环的消费消息
源码位置org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
if (!isActive()) {
return;
}
...
try {
initialize();
//每个消费者线程循环的去获取消息
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
} ...
}
- 循环体的操作
注意receiveAndExecute()
方法的返回值是checkAdjust()
方法的请求参数,那么理解MQ动态扩容,就必须先明白receiveAndExecute()
的逻辑以及返回值的含义。
private void mainLoop() throws Exception { // NOSONAR Exception
try {
//该方法是获取消息,并执行业务操作(并发送ACK或NACK到MQ)。返回值true表示已经消费消息;false表示未获取到消息
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
//判断是否配置了maxConcurrentConsumers,是否进行动态扩容
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
...
}
}
2.1 receiveAndExecute—业务逻辑
该方法会执行业务逻辑,并发送ACK或NACK到MQ中。完成一个消息的消费。
但是即使发送ACK后,依旧在mainLoop()
循环中,需要完成后续逻辑才能消费下一个消息。(注:不是向MQ发送ACK或NACK后立即去消费后续消息!!!)
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
PlatformTransactionManager transactionManager = getTransactionManager();
if (transactionManager != null) {...事务操作,不关注
}
//接受消息并进行处理
return doReceiveAndExecute(consumer);
}
若是执行nextMessage()
没有获取到消息,那么执行break操作,最终会导致上面的receiveAndExecute()
方法返回false。而receiveAndExecute()
的值可以决定是否动态扩容
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
Channel channel = consumer.getChannel();
//默认txSize=1
for (int i = 0; i < this.txSize; i++) {
//在内存队列中获取消息
Message message = consumer.nextMessage(this.receiveTimeout);
//未获取到消息,开始下次循环
if (message == null) {
break;
}
try {
//执行业务逻辑
executeListener(channel, message);
}
...catch操作,不关注
}
//没有获取到消息,这个方法会返回false
return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
//此处直接返回false
if (this.deliveryTags.isEmpty()) {
return false;
}
...
return true;
}
在内存中获取消息
由于配置了setPrefetchCount
参数,所以内存会去MQ中预取配置的消息数,放到本地的BlockingQueue
中。
配置详见:
【RabbitMQ-2】RabbitMQ的并发参数(concurrency和prefetch)
未获取到消息
public Message nextMessage(long timeout) throws InterruptedException,
ShutdownSignalException {
if (logger.isTraceEnabled()) {
logger.trace("Retrieving delivery for " + this);
}
checkShutdown();
if (this.missingQueues.size() > 0) {
checkMissingQueues();
}
//poll的API描述:检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
//cancelled默认false不会执行改逻辑
if (message == null && this.cancelled.get()) {
throw new ConsumerCancelledException();
}
//未获取到消息返回null。
return message;
}
2.2 checkAdjust()—动态扩容业务
由上面源码可以若是没有获取到消息,receivedOk
返回false(注:若是获取到消息,但是NACK,receivedOk
返回值依旧是true)。
如何保证是连续获取或者连续空转的?
答案:因为mainloop()
一直循环,每次均在本地queue获取消息(最长阻塞1s)。若连续9次均未获取到消息,第10次获取到消息,那么会重置consecutiveIdles=0
。
private void checkAdjust(boolean receivedOk) {
//成功获取到消息
if (receivedOk) {
if (isActive(this.consumer)) {
//连续空转标识设置为0
this.consecutiveIdles = 0;
//consecutiveActiveTrigger默认为10
if (this.consecutiveMessages++>SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
//开启一个消费者线程
considerAddingAConsumer();
//练习消费的标识设置为0
this.consecutiveMessages = 0;
}
}
} else {
this.consecutiveMessages = 0;
if (this.consecutiveIdles++>SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
this.consecutiveIdles = 0;
}
}
}
开启一个消费者线程:
private void considerAddingAConsumer() {
//加锁
synchronized(this.consumersMonitor) {
//若是当前consumers数量小于配置maxConcurrentConsumers
if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
long now = System.currentTimeMillis();
//开启消费者有间隔时间
if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
//增加消费者。
this.addAndStartConsumers(1);
this.lastConsumerStarted = now;
}
}
}
}
开启消费者的操作
protected void addAndStartConsumers(int delta) {
synchronized(this.consumersMonitor) {
if (this.consumers != null) {
//每一次循环均是创建一个消费者
for (int i = 0; i < delta; i++) {
//判断是否创建消费者
if (this.maxConcurrentConsumers != null && this.consumers.size() >= this.maxConcurrentConsumers) {
break;
}
//创建消费者
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
//(核心)属性的consumers+1
this.consumers.add(consumer);
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Starting a new consumer: " + consumer);
}
//使用内部的线程池执行
getTaskExecutor().execute(processor);
//发布创建消费者事件
if (this.getApplicationEventPublisher() != null) {
this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
try {
//线程执行完run方法后,线程中的CountDownLatch-1。
//若线程池没有资源,那么会在此处阻塞(默认60s)
//阻塞完毕,startupException返回null。即成功创建
FatalListenerStartupException startupException = processor.getStartupException();
//若是线程池资源不足,只是返回null,不会执行下面分支。
if (startupException != null) {
this.consumers.remove(consumer);
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
} catch(InterruptedException ie) {
Thread.currentThread().interrupt();
} catch(Exception e) {
consumer.stop();
logger.error("Error starting new consumer", e);
this.cancellationLock.release(consumer);
this.consumers.remove(consumer);
}
}
}
}
}
- 上述代码中,创建消费者线程是同步的流程,即某个消费者线程加锁去创建。某创建时线程池没有资源,会阻塞消费者线程。
- 若线程池没有资源,阻塞完毕后,只是打印异常日志,并抛出异常,此时内存中消费者个数为n+1个,但是只有n个线程可以消费消息
- 当连续10次空转时
consecutiveIdles =10
,且消费者线程n+1,会回收临时扩展的消费者线程。
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
synchronized(this.consumersMonitor) {
if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
long now = System.currentTimeMillis();
if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
//回收消费者的核心方式
consumer.basicCancel(true);
//本地消费者集合移除消费者
this.consumers.remove(consumer);
if (logger.isDebugEnabled()) {
logger.debug("Idle consumer terminating: " + consumer);
}
this.lastConsumerStopped = now;
}
}
}
}
上面说到,内存中的消费者数量n+1,但是有效的消费者n个。当回收消费者时会回收有效的消费者使得内存消费者数量n个,有效消费者数量n-1个。
若是线程池资源不足,且配置了消费者动态扩展参数后,最终会导致有效的消费者数量为0,导致消息的大量积压!!!
注:RabbitMQ使用默认的new SimpleAsyncTaskExecutor()
开启消费者线程,即每当使用线程是,均是new出来的。
总结:不推荐使用自定义配置的线程池,若使用,每次增加队列时均需要注意配置好线程数。