RabbitMQ消费者OOM

近期出现MQ消费者消费时业务中有下载大文件抛出OOM后就没有消费者了,回想好像很多年前就遇到这样的事迹rabbitmq 消费线程无故中断连接 - 简书 (jianshu.com),但是没有研究MQ 消费者源码,这不回弦镖又回来了!

03403AC8.png

,所以就有疑问:
1、是消费线程掉线了吗?如果消费线程掉线不是还可以重新new 线程消费队列消息吗?为什么会没有消费者了;
2、是消费者掉线了吗?如果配置多个消费者是不是就可以避免无法消费队列消息问题?

springboot 自动装配版本

org.springframework.boot:spring-boot-autoconfigure:2.0.2.release

springboot rabbitmq 版本

org.springframework.boot.amqp:spring-rabbit:2.0.3.release

1、首先看rabbitmq 自动装配启动类

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)   ##
public class RabbitAutoConfiguration {
        #这个类里面会@Bean  很多工厂类比如 连接工厂,以及配置类,rabbittemplate类,这些暂时不是这篇文章所涉及,我们主要关注的是@Import(RabbitAnnotationDrivenConfiguration.class)这个类
}

3、咱们看下@Import(RabbitAnnotationDrivenConfiguration.class)这个类做了什么,从这开启了消费者的生命

@Configuration
@ConditionalOnClass(EnableRabbit.class)  ## 关注这个注解
class RabbitAnnotationDrivenConfiguration {
}


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {

#@Import(RabbitBootstrapConfiguration.class) 通过注解引入这个类 ,从这里调到spring-rabbit 包
}

2、咱们看下@Import(RabbitBootstrapConfiguration.class) 这个类做了什么

@Configuration
public class RabbitBootstrapConfiguration {

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
        return new RabbitListenerAnnotationBeanPostProcessor();   
        #通过这个后置处理器进来
    }

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

}

4、RabbitListenerAnnotationBeanPostProcessor 当spring容器初始化bean时执行后置处理器


    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
        for (ListenerMethod lm : metadata.listenerMethods) {
                        ## 拿到所有的@RabbitListener 注解,并遍历处理
            for (RabbitListener rabbitListener : lm.annotations) {
                processAmqpListener(rabbitListener, lm.method, bean, beanName);
            }
        }
        if (metadata.handlerMethods.length > 0) {
            processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
        }
        return bean;
    }

    protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
                #拿到对应的反射对象
        Method methodToUse = checkProxy(method, bean);
        MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
        endpoint.setMethod(methodToUse);
        endpoint.setBeanFactory(this.beanFactory);
        endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
        String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");
        if (StringUtils.hasText(errorHandlerBeanName)) {
            endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
        }
                #继续处理监听器
        processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
    }


protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
            Object adminTarget, String beanName) {
                # 设置原始对象
        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setId(getEndpointId(rabbitListener));
              # 设置消费的队列
        endpoint.setQueueNames(resolveQueues(rabbitListener));
              #设置 消费者个数
        endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
        String group = rabbitListener.group();
        if (StringUtils.hasText(group)) {
            Object resolvedGroup = resolveExpression(group);
            if (resolvedGroup instanceof String) {
                endpoint.setGroup((String) resolvedGroup);
            }
        }
        String autoStartup = rabbitListener.autoStartup();
        if (StringUtils.hasText(autoStartup)) {
            endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
        }

        endpoint.setExclusive(rabbitListener.exclusive());
        String priority = resolve(rabbitListener.priority());
        if (StringUtils.hasText(priority)) {
            try {
                endpoint.setPriority(Integer.valueOf(priority));
            }
            catch (NumberFormatException ex) {
                throw new BeanInitializationException("Invalid priority value for " +
                        rabbitListener + " (must be an integer)", ex);
            }
        }

        String rabbitAdmin = resolve(rabbitListener.admin());
        if (StringUtils.hasText(rabbitAdmin)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
            try {
                endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                        adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
                        rabbitAdmin + "' was found in the application context", ex);
            }
        }


        RabbitListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                        adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
                        containerFactoryBeanName + "' was found in the application context", ex);
            }
        }
                # 监听注册器开始注册
        this.registrar.registerEndpoint(endpoint, factory);
    }




    /**
     * Register a new {@link RabbitListenerEndpoint} alongside the
     * {@link RabbitListenerContainerFactory} to use to create the underlying container.
     * <p>The {@code factory} may be {@code null} if the default factory has to be
     * used for that endpoint.
     * @param endpoint the {@link RabbitListenerEndpoint} instance to register.
     * @param factory the {@link RabbitListenerContainerFactory} to use.
     */
    public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
        Assert.notNull(endpoint, "Endpoint must be set");
        Assert.hasText(endpoint.getId(), "Endpoint id must be set");
        // Factory may be null, we defer the resolution right before actually creating the container
        AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
        synchronized (this.endpointDescriptors) {
            if (this.startImmediately) { // Register and start immediately
                                #监听器开始注册
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                        resolveContainerFactory(descriptor), true);
            }
            else {
                this.endpointDescriptors.add(descriptor);
            }
        }
    }

5、 ##开始注册监听器容器



    /**
     * Create a message listener container for the given {@link RabbitListenerEndpoint}.
     * <p>This create the necessary infrastructure to honor that endpoint
     * with regards to its configuration.
     * <p>The {@code startImmediately} flag determines if the container should be
     * started immediately.
     * @param endpoint the endpoint to add.
     * @param factory the {@link RabbitListenerContainerFactory} to use.
     * @param startImmediately start the container immediately if necessary
     * @see #getListenerContainers()
     * @see #getListenerContainer(String)
     */
    @SuppressWarnings("unchecked")
    public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
                                          boolean startImmediately) {
        Assert.notNull(endpoint, "Endpoint must not be null");
        Assert.notNull(factory, "Factory must not be null");

        String id = endpoint.getId();
        Assert.hasText(id, "Endpoint id must not be empty");
        synchronized (this.listenerContainers) {
            Assert.state(!this.listenerContainers.containsKey(id),
                    "Another endpoint is already registered with id '" + id + "'");
                        ##在这里创建一个Rabbitmq  的监听器容器
            MessageListenerContainer container = createListenerContainer(endpoint, factory);
            this.listenerContainers.put(id, container);
            if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
                List<MessageListenerContainer> containerGroup;
                if (this.applicationContext.containsBean(endpoint.getGroup())) {
                    containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
                }
                else {
                    containerGroup = new ArrayList<MessageListenerContainer>();
                    this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
                }
                containerGroup.add(container);
            }
            if (startImmediately) {
                                ##开始监听
                startIfNecessary(container);
            }
        }
    }

6、在这里创建一个Rabbitmq 的监听器容器createListenerContainer


    /**
     * Create and start a new {@link MessageListenerContainer} using the specified factory.
     * @param endpoint the endpoint to create a {@link MessageListenerContainer}.
     * @param factory the {@link RabbitListenerContainerFactory} to use.
     * @return the {@link MessageListenerContainer}.
     */
    protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
            RabbitListenerContainerFactory<?> factory) {
                ## 创建监听容器
        MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

        if (listenerContainer instanceof InitializingBean) {
            try {
                ((InitializingBean) listenerContainer).afterPropertiesSet();
            }
            catch (Exception ex) {
                throw new BeanInitializationException("Failed to initialize message listener container", ex);
            }
        }

        int containerPhase = listenerContainer.getPhase();
        if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value
            if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
                throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
                        this.phase + " vs " + containerPhase);
            }
            this.phase = listenerContainer.getPhase();
        }

        return listenerContainer;
    }



    @Override
    public C createListenerContainer(RabbitListenerEndpoint endpoint) {
        C instance = createContainerInstance();
              ##设置连接工厂
        if (this.connectionFactory != null) {
            instance.setConnectionFactory(this.connectionFactory);
        }
        if (this.errorHandler != null) {
            instance.setErrorHandler(this.errorHandler);
        }
        if (this.messageConverter != null) {
            instance.setMessageConverter(this.messageConverter);
        }
##设置ack 模式
        if (this.acknowledgeMode != null) {
            instance.setAcknowledgeMode(this.acknowledgeMode);
        }

        if (this.channelTransacted != null) {
            instance.setChannelTransacted(this.channelTransacted);
        }
        if (this.applicationContext != null) {
            instance.setApplicationContext(this.applicationContext);
        }
        if (this.taskExecutor != null) {
            instance.setTaskExecutor(this.taskExecutor);
        }
        if (this.transactionManager != null) {
            instance.setTransactionManager(this.transactionManager);
        }
## 设置一次网络请求拿多少个消息包
        if (this.prefetchCount != null) {
            instance.setPrefetchCount(this.prefetchCount);
        }
        if (this.defaultRequeueRejected != null) {
            instance.setDefaultRequeueRejected(this.defaultRequeueRejected);
        }
        if (this.adviceChain != null) {
            instance.setAdviceChain(this.adviceChain);
        }
        if (this.recoveryBackOff != null) {
            instance.setRecoveryBackOff(this.recoveryBackOff);
        }
        if (this.mismatchedQueuesFatal != null) {
            instance.setMismatchedQueuesFatal(this.mismatchedQueuesFatal);
        }
        if (this.missingQueuesFatal != null) {
            instance.setMissingQueuesFatal(this.missingQueuesFatal);
        }
        if (this.consumerTagStrategy != null) {
            instance.setConsumerTagStrategy(this.consumerTagStrategy);
        }
        if (this.idleEventInterval != null) {
            instance.setIdleEventInterval(this.idleEventInterval);
        }
        if (this.failedDeclarationRetryInterval != null) {
            instance.setFailedDeclarationRetryInterval(this.failedDeclarationRetryInterval);
        }
        if (this.applicationEventPublisher != null) {
            instance.setApplicationEventPublisher(this.applicationEventPublisher);
        }
        if (endpoint.getAutoStartup() != null) {
            instance.setAutoStartup(endpoint.getAutoStartup());
        }
        else if (this.autoStartup != null) {
            instance.setAutoStartup(this.autoStartup);
        }
        if (this.phase != null) {
            instance.setPhase(this.phase);
        }
        if (this.afterReceivePostProcessors != null) {
            instance.setAfterReceivePostProcessors(this.afterReceivePostProcessors);
        }
        instance.setListenerId(endpoint.getId());

        endpoint.setupListenerContainer(instance);
        if (this.beforeSendReplyPostProcessors != null
                && instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
            ((AbstractAdaptableMessageListener) instance.getMessageListener())
                    .setBeforeSendReplyPostProcessors(this.beforeSendReplyPostProcessors);
        }
                ##实例化容器
        initializeContainer(instance, endpoint);

        return instance;
    }

    @Override
    protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
        super.initializeContainer(instance, endpoint);

        if (this.txSize != null) {
            instance.setTxSize(this.txSize);
        }
        String concurrency = endpoint.getConcurrency();
        if (concurrency != null) {
            instance.setConcurrency(concurrency);
        }
              #设置消费者个数
        else if (this.concurrentConsumers != null) {
                      
            instance.setConcurrentConsumers(this.concurrentConsumers);
        }
        if ((concurrency == null || !(concurrency.contains("-"))) && this.maxConcurrentConsumers != null) {
            instance.setMaxConcurrentConsumers(this.maxConcurrentConsumers);
        }
        if (this.startConsumerMinInterval != null) {
            instance.setStartConsumerMinInterval(this.startConsumerMinInterval);
        }
        if (this.stopConsumerMinInterval != null) {
            instance.setStopConsumerMinInterval(this.stopConsumerMinInterval);
        }
        if (this.consecutiveActiveTrigger != null) {
            instance.setConsecutiveActiveTrigger(this.consecutiveActiveTrigger);
        }
        if (this.consecutiveIdleTrigger != null) {
            instance.setConsecutiveIdleTrigger(this.consecutiveIdleTrigger);
        }
        if (this.receiveTimeout != null) {
            instance.setReceiveTimeout(this.receiveTimeout);
        }
        if (this.deBatchingEnabled != null) {
            instance.setDeBatchingEnabled(this.deBatchingEnabled);
        }
    }


###做个标记
    public void setConcurrentConsumers(final int concurrentConsumers) {
        Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
        Assert.isTrue(!isExclusive() || concurrentConsumers == 1,
                "When the consumer is exclusive, the concurrency must be 1");
        if (this.maxConcurrentConsumers != null) {
            Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
                    "'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
        }
        synchronized (this.consumersMonitor) {
            if (logger.isDebugEnabled()) {
                logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
            }
            int delta = this.concurrentConsumers - concurrentConsumers;
            this.concurrentConsumers = concurrentConsumers;
            if (isActive() && this.consumers != null) {
                if (delta > 0) {
                    Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
                    while (consumerIterator.hasNext() && delta > 0) {
                        BlockingQueueConsumer consumer = consumerIterator.next();
                        consumer.basicCancel(true);
                        consumerIterator.remove();
                        delta--;
                    }
                }
                else {
                    addAndStartConsumers(-delta);
                }
            }
        }
    }

8、上面创建监听器容器完成后由下方方法开始监听

    @SuppressWarnings("unchecked")
    public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
                                          boolean startImmediately) {
        Assert.notNull(endpoint, "Endpoint must not be null");
        Assert.notNull(factory, "Factory must not be null");

        String id = endpoint.getId();
        Assert.hasText(id, "Endpoint id must not be empty");
        synchronized (this.listenerContainers) {
            Assert.state(!this.listenerContainers.containsKey(id),
                    "Another endpoint is already registered with id '" + id + "'");
            MessageListenerContainer container = createListenerContainer(endpoint, factory);
            this.listenerContainers.put(id, container);
            if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
                List<MessageListenerContainer> containerGroup;
                if (this.applicationContext.containsBean(endpoint.getGroup())) {
                    containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
                }
                else {
                    containerGroup = new ArrayList<MessageListenerContainer>();
                    this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
                }
                containerGroup.add(container);
            }
            if (startImmediately) {
                        #开始监听
                startIfNecessary(container);
            }
        }
    }


    private void startIfNecessary(MessageListenerContainer listenerContainer) {
        if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
            listenerContainer.start();
        }
    }

9、MessageListenerContainer

/**
     * Start this container.
     * @see #doStart
     */
    @Override
    public void start() {
        if (isRunning()) {
            return;
        }
        if (!this.initialized) {
            synchronized (this.lifecycleMonitor) {
                if (!this.initialized) {
                    afterPropertiesSet();
                    this.initialized = true;
                }
            }
        }
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting Rabbit listener container.");
            }
            configureAdminIfNeeded();
            checkMismatchedQueues();
            doStart();
        }
        catch (Exception ex) {
            throw convertRabbitAccessException(ex);
        }
    }

@Override
    protected void doStart() throws Exception {
        if (getMessageListener() instanceof ListenerContainerAware) {
            Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
            if (expectedQueueNames != null) {
                String[] queueNames = getQueueNames();
                Assert.state(expectedQueueNames.size() == queueNames.length,
                        "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                                + Arrays.asList(queueNames));
                boolean found = false;
                for (String queueName : queueNames) {
                    if (expectedQueueNames.contains(queueName)) {
                        found = true;
                    }
                    else {
                        found = false;
                        break;
                    }
                }
                Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                        + Arrays.asList(queueNames));
            }
        }
        super.doStart();
        synchronized (this.consumersMonitor) {
            if (this.consumers != null) {
                throw new IllegalStateException("A stopped container should not have consumers");
            }
                        #初始化消费者
            int newConsumers = initializeConsumers();
            if (this.consumers == null) {
                logger.info("Consumers were initialized and then cleared " +
                        "(presumably the container was stopped concurrently)");
                return;
            }
            if (newConsumers <= 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Consumers are already running");
                }
                return;
            }
            Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
            for (BlockingQueueConsumer consumer : this.consumers) {
                                # 为每个消费者 创建一个线程,交给线程池
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                processors.add(processor);
                getTaskExecutor().execute(processor);
                if (getApplicationEventPublisher() != null) {
                    getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                }
            }
            for (AsyncMessageProcessingConsumer processor : processors) {
                FatalListenerStartupException startupException = processor.getStartupException();
                if (startupException != null) {
                    throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                }
            }
        }
    }

10、 看消费线程run 的逻辑

        @Override
        public void run() {
            if (!isActive()) {
                return;
            }

            boolean aborted = false;

            int consecutiveIdles = 0;

            int consecutiveMessages = 0;

            this.consumer.setLocallyTransacted(isChannelLocallyTransacted());

            String routingLookupKey = getRoutingLookupKey();
            if (routingLookupKey != null) {
                SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey);
            }

            if (this.consumer.getQueueCount() < 1) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Consumer stopping; no queues for " + this.consumer);
                }
                SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
                if (getApplicationEventPublisher() != null) {
                    getApplicationEventPublisher().publishEvent(
                            new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
                }
                this.start.countDown();
                return;
            }

            try {

                try {
                    redeclareElementsIfNecessary();
                    this.consumer.start();
                    this.start.countDown();
                }
                catch (QueuesNotAvailableException e) {
                    if (isMissingQueuesFatal()) {
                        throw e;
                    }
                    else {
                        this.start.countDown();
                        handleStartupFailure(this.consumer.getBackOffExecution());
                        throw e;
                    }
                }
                catch (FatalListenerStartupException ex) {
                    if (isPossibleAuthenticationFailureFatal()) {
                        throw ex;
                    }
                    else {
                        Throwable possibleAuthException = ex.getCause().getCause();
                        if (possibleAuthException == null ||
                                !(possibleAuthException instanceof PossibleAuthenticationFailureException)) {
                            throw ex;
                        }
                        else {
                            this.start.countDown();
                            handleStartupFailure(this.consumer.getBackOffExecution());
                            throw possibleAuthException;
                        }
                    }
                }
                catch (Throwable t) { //NOSONAR
                    this.start.countDown();
                    handleStartupFailure(this.consumer.getBackOffExecution());
                    throw t;
                }

                if (getTransactionManager() != null) {
                    /*
                     * Register the consumer's channel so it will be used by the transaction manager
                     * if it's an instance of RabbitTransactionManager.
                     */
                    ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());
                }
                                ## 直接看这里,进入循环
                while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
                    try {
                                                 ## 开始拿消息处理消息
                        boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
                        if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
                            if (receivedOk) {
                                if (isActive(this.consumer)) {
                                    consecutiveIdles = 0;
                                    if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                                        considerAddingAConsumer();
                                        consecutiveMessages = 0;
                                    }
                                }
                            }
                            else {
                                consecutiveMessages = 0;
                                if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
                                    considerStoppingAConsumer(this.consumer);
                                    consecutiveIdles = 0;
                                }
                            }
                        }
                        long idleEventInterval = getIdleEventInterval();
                        if (idleEventInterval > 0) {
                            if (receivedOk) {
                                updateLastReceive();
                            }
                            else {
                                long now = System.currentTimeMillis();
                                long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
                                long lastReceive = getLastReceive();
                                if (now > lastReceive + idleEventInterval
                                        && now > lastAlertAt + idleEventInterval
                                        && SimpleMessageListenerContainer.this.lastNoMessageAlert
                                        .compareAndSet(lastAlertAt, now)) {
                                    publishIdleContainerEvent(now - lastReceive);
                                }
                            }
                        }
                    }
                    catch (ListenerExecutionFailedException ex) {
                        // Continue to process, otherwise re-throw
                        if (ex.getCause() instanceof NoSuchMethodException) {
                            throw new FatalListenerExecutionException("Invalid listener", ex);
                        }
                    }
                    catch (AmqpRejectAndDontRequeueException rejectEx) {
                        /*
                         *  These will normally be wrapped by an LEFE if thrown by the
                         *  listener, but we will also honor it if thrown by an
                         *  error handler.
                         */
                    }
                }

            }
            catch (InterruptedException e) {
                logger.debug("Consumer thread interrupted, processing stopped.");
                Thread.currentThread().interrupt();
                aborted = true;
                publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
            }
            catch (QueuesNotAvailableException ex) {
                logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);
                if (isMissingQueuesFatal()) {
                    this.startupException = ex;
                    // Fatal, but no point re-throwing, so just abort.
                    aborted = true;
                }
                publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
            }
            catch (FatalListenerStartupException ex) {
                logger.error("Consumer received fatal exception on startup", ex);
                this.startupException = ex;
                // Fatal, but no point re-throwing, so just abort.
                aborted = true;
                publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex);
            }
            catch (FatalListenerExecutionException ex) {
                logger.error("Consumer received fatal exception during processing", ex);
                // Fatal, but no point re-throwing, so just abort.
                aborted = true;
                publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
            }
            catch (PossibleAuthenticationFailureException ex) {
                logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() +
                        " exception during processing", ex);
                if (isPossibleAuthenticationFailureFatal()) {
                    this.startupException =
                            new FatalListenerStartupException("Authentication failure",
                                    new AmqpAuthenticationException(ex));
                    // Fatal, but no point re-throwing, so just abort.
                    aborted = true;
                }
                publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
            }
            catch (ShutdownSignalException e) {
                if (RabbitUtils.isNormalShutdown(e)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
                    }
                }
                else {
                    logConsumerException(e);
                }
            }
            catch (AmqpIOException e) {
                if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
                        && e.getCause().getCause().getMessage().contains("in exclusive use")) {
                    getExclusiveConsumerExceptionLogger().log(logger,
                            "Exclusive consumer failure", e.getCause().getCause());
                    publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
                }
                else {
                    logConsumerException(e);
                }
            }
            catch (Error e) { //NOSONAR
                // ok to catch Error - we're aborting so will stop
                logger.error("Consumer thread error, thread abort.", e);
                publishConsumerFailedEvent("Consumer threw an Error", true, e);
                                #当发送error 时 标记为true,目的是为了下面终止当前线程
                aborted = true;
            }
            catch (Throwable t) { //NOSONAR
                // by now, it must be an exception
                if (isActive()) {
                    logConsumerException(t);
                }
            }
            finally {
                if (getTransactionManager() != null) {
                    ConsumerChannelRegistry.unRegisterConsumerChannel();
                }
            }

            // In all cases count down to allow container to progress beyond startup
            this.start.countDown();

            if (!isActive(this.consumer) || aborted) {
                                # 如果当前贤臣更不是活跃 或者标记为true
                logger.debug("Cancelling " + this.consumer);
                try {
                    this.consumer.stop(); // 将当前线程体停止
                    SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
                    if (getApplicationEventPublisher() != null) {
                        getApplicationEventPublisher().publishEvent(
                                new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
                    }
                }
                catch (AmqpException e) {
                    logger.info("Could not cancel message consumer", e);
                }
                if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort
                        .compareAndSet(null, Thread.currentThread())) {
                    logger.error("Stopping container from aborted consumer");
                    stop();
                    SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null);
                    ListenerContainerConsumerFailedEvent event = null;
                    do {
                        try {
                            event = SimpleMessageListenerContainer.this.abortEvents.poll(5, TimeUnit.SECONDS);
                            if (event != null) {
                                SimpleMessageListenerContainer.this.publishConsumerFailedEvent(
                                        event.getReason(), event.isFatal(), event.getThrowable());
                            }
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    while (event != null);
                }
            }
            else {
                                # 如果是非标记的 异常,可以重启当前线程  如果当前只有一个消费线程 并且无法重启则改队列一直无法被消费;
                logger.info("Restarting " + this.consumer);
                restart(this.consumer);
            }

            if (routingLookupKey != null) {
                SimpleResourceHolder.unbind(getRoutingConnectionFactory());
            }
        }

        private void logConsumerException(Throwable t) {
            if (logger.isDebugEnabled()
                    || !(t instanceof AmqpConnectException || t instanceof ConsumerCancelledException)) {
                logger.debug(
                        "Consumer raised exception, processing can restart if the connection factory supports it",
                        t);
            }
            else {
                if (t instanceof ConsumerCancelledException && this.consumer.isNormalCancel()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                                "Consumer raised exception, processing can restart if the connection factory supports it. "
                                        + "Exception summary: " + t);
                    }
                }
                else if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Consumer raised exception, processing can restart if the connection factory supports it. "
                                    + "Exception summary: " + t);
                }
            }
            publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t);
        }

    }


11、开始获取mq 消息并处理

    private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {

        if (getTransactionManager() != null) {
            try {
                if (this.transactionTemplate == null) {
                    this.transactionTemplate =
                            new TransactionTemplate(getTransactionManager(), getTransactionAttribute());
                }
                return this.transactionTemplate
                        .execute(status -> {
                            RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
                                    new RabbitResourceHolder(consumer.getChannel(), false),
                                    getConnectionFactory(), true);
                            // unbound in ResourceHolderSynchronization.beforeCompletion()
                            try {
                                return doReceiveAndExecute(consumer);
                            }
                            catch (RuntimeException e1) {
                                prepareHolderForRollback(resourceHolder, e1);
                                throw e1;
                            }
                            catch (Throwable e2) { //NOSONAR
                                // ok to catch Throwable here because we re-throw it below
                                throw new WrappedTransactionException(e2);
                            }
                        });
            }
            catch (WrappedTransactionException e) {
                throw e.getCause();
            }
        }
              # 上面是判断是否是事务消息  假定是非事务直接看这个方法
        return doReceiveAndExecute(consumer);

    }
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR

        Channel channel = consumer.getChannel();

        for (int i = 0; i < this.txSize; i++) {

            logger.trace("Waiting for message from consumer.");
                        ## 获取到消息
            Message message = consumer.nextMessage(this.receiveTimeout);
            if (message == null) {
                break;
            }
            try {
                                # 处理消息
                executeListener(channel, message);
            }
            catch (ImmediateAcknowledgeAmqpException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("User requested ack for failed delivery '"
                            + e.getMessage() + "': "
                            + message.getMessageProperties().getDeliveryTag());
                }
                break;
            }
            catch (Throwable ex) { //NOSONAR
                if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("User requested ack for failed delivery: "
                                + message.getMessageProperties().getDeliveryTag());
                    }
                    break;
                }
                if (getTransactionManager() != null) {
                    if (getTransactionAttribute().rollbackOn(ex)) {
                        RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
                                .getResource(getConnectionFactory());
                        if (resourceHolder != null) {
                            consumer.clearDeliveryTags();
                        }
                        else {
                            /*
                             * If we don't actually have a transaction, we have to roll back
                             * manually. See prepareHolderForRollback().
                             */
                            consumer.rollbackOnExceptionIfNecessary(ex);
                        }
                        throw ex; // encompassing transaction will handle the rollback.
                    }
                    else {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("No rollback for " + ex);
                        }
                        break;
                    }
                }
                else {
                    consumer.rollbackOnExceptionIfNecessary(ex);
                    throw ex;
                }
            }
        }

        return consumer.commitIfNecessary(isChannelLocallyTransacted());

    }

12 、处理消息

    /**
     * Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
     *
     * @param channel the Rabbit Channel to operate on
     * @param messageIn the received Rabbit Message
     * @throws Exception Any Exception.
     *
     * @see #invokeListener
     * @see #handleListenerException
     */
    protected void executeListener(Channel channel, Message messageIn) throws Exception {
        if (!isRunning()) {
            if (logger.isWarnEnabled()) {
                logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
            }
            throw new MessageRejectedWhileStoppingException();
        }
        try {
            Message message = messageIn;
            if (this.afterReceivePostProcessors != null) {
                for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
                    message = processor.postProcessMessage(message);
                    if (message == null) {
                        throw new ImmediateAcknowledgeAmqpException(
                                "Message Post Processor returned 'null', discarding message");
                    }
                }
            }
            Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
                        #是否是批处理?上面咱们说到一次拿几个消息  在这里会吧消息包安装mq 自己的结构拆解 
            if (MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(batchFormat) && this.deBatchingEnabled) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
                MessageProperties messageProperties = message.getMessageProperties();
                messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
                while (byteBuffer.hasRemaining()) {
                    int length = byteBuffer.getInt();
                    if (length < 0 || length > byteBuffer.remaining()) {
                        throw new ListenerExecutionFailedException("Bad batched message received",
                                new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
                                message);
                    }
                    byte[] body = new byte[length];
                    byteBuffer.get(body);
                    messageProperties.setContentLength(length);
                    // Caveat - shared MessageProperties.
                    Message fragment = new Message(body, messageProperties);
                                        # 循环调用监听方法
                    invokeListener(channel, fragment);
                }
            }
            else {
                              # 单个消息包调用监听方法
                invokeListener(channel, message);
            }
        }
        catch (Exception ex) {
            if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
                if (this.statefulRetryFatalWithNullMessageId) {
                    throw new FatalListenerExecutionException(
                            "Illegal null id in message. Failed to manage retry for message: " + messageIn);
                }
                else {
                    throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",
                            new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),
                            messageIn);
                }
            }
            handleListenerException(ex);
            throw ex;
        }
    }

    protected void invokeListener(Channel channel, Message message) throws Exception {
        this.proxy.invokeListener(channel, message);
    }

容器实例化bean1.png

13、真正执行监听器


    /**
     * Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
     * @param channel the Rabbit Channel to operate on
     * @param message the received Rabbit Message
     * @throws Exception if thrown by Rabbit API methods
     * @see #setMessageListener
     */
    protected void actualInvokeListener(Channel channel, Message message) throws Exception {
        Object listener = getMessageListener();
        if (listener instanceof ChannelAwareMessageListener) {
            doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
        }
        else if (listener instanceof MessageListener) {
            boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
            if (bindChannel) {
                RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
                resourceHolder.setSynchronizedWithTransaction(true);
                TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
                        resourceHolder);
            }
            try {
                doInvokeListener((MessageListener) listener, message);
            }
            finally {
                if (bindChannel) {
                    // unbind if we bound
                    TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
                }
            }
        }
        else if (listener != null) {
            throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: "
                    + listener);
        }
        else {
            throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
        }
    }

    protected void doInvokeListener(MessageListener listener, Message message) throws Exception {
        try {
            listener.onMessage(message);
        }
        catch (Exception e) {
            throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
        }
    }
        ## AbstractAdaptableMessageListener 类的
    @Override
    public void onMessage(Message message) {
        try {
            onMessage(message, null);
        }
        catch (Exception e) {
            throw new ListenerExecutionFailedException("Listener threw exception", e, message);
        }
    }

15、MessageListenerAdapter

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // Check whether the delegate is a MessageListener impl itself.
        // In that case, the adapter will simply act as a pass-through.
        Object delegate = getDelegate();
        if (delegate != this) {
            if (delegate instanceof ChannelAwareMessageListener) {
                if (channel != null) {
                    ((ChannelAwareMessageListener) delegate).onMessage(message, channel);
                    return;
                }
                else if (!(delegate instanceof MessageListener)) {
                    throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a "
                            + "ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
                }
            }
            if (delegate instanceof MessageListener) {
                ((MessageListener) delegate).onMessage(message);
                return;
            }
        }

        // Regular case: find a handler method reflectively.
        Object convertedMessage = extractMessage(message);
                #找到@RabbitListtener 注解作用的方法名
        String methodName = getListenerMethodName(message, convertedMessage);
        if (methodName == null) {
            throw new AmqpIllegalStateException("No default listener method specified: "
                    + "Either specify a non-null value for the 'defaultListenerMethod' property or "
                    + "override the 'getListenerMethodName' method.");
        }

        // Invoke the handler method with appropriate arguments.
                #构建方法入参
        Object[] listenerArguments = buildListenerArguments(convertedMessage);
                 #反射方法
        Object result = invokeListenerMethod(methodName, listenerArguments, message);
        if (result != null) {
            handleResult(result, message, channel);
        }
        else {
            logger.trace("No result object given - no result to handle");
        }
    }



    /**
     * Invoke the specified listener method.
     * @param methodName the name of the listener method
     * @param arguments the message arguments to be passed in
     * @param originalMessage the original message
     * @return the result returned from the listener method
     * @throws Exception if thrown by Rabbit API methods
     * @see #getListenerMethodName
     * @see #buildListenerArguments
     */
    protected Object invokeListenerMethod(String methodName, Object[] arguments, Message originalMessage)
            throws Exception {
        try {
            MethodInvoker methodInvoker = new MethodInvoker();
            methodInvoker.setTargetObject(getDelegate());
            methodInvoker.setTargetMethod(methodName);
            methodInvoker.setArguments(arguments);
            methodInvoker.prepare();
            return methodInvoker.invoke();
        }
        catch (InvocationTargetException ex) {
            Throwable targetEx = ex.getTargetException();
            if (targetEx instanceof IOException) {
                throw new AmqpIOException((IOException) targetEx);
            }
            else {
                throw new ListenerExecutionFailedException("Listener method '" + methodName + "' threw exception",
                        targetEx, originalMessage);
            }
        }
        catch (Exception ex) {
            ArrayList<String> arrayClass = new ArrayList<String>();
            if (arguments != null) {
                for (Object argument : arguments) {
                    arrayClass.add(argument.getClass().toString());
                }
            }
            throw new ListenerExecutionFailedException("Failed to invoke target method '" + methodName
                    + "' with argument type = [" + StringUtils.collectionToCommaDelimitedString(arrayClass)
                    + "], value = [" + ObjectUtils.nullSafeToString(arguments) + "]", ex, originalMessage);
        }
    }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容

  • 目录 交换器、队列、绑定的声明 关于消息序列化 同一个队列多消费类型 注解将消息和消息头注入消费者方法 关于消费者...
    哦00阅读 1,464评论 0 0
  • 目前项目采用spring-boot 2.1.6版本,并集成了RabbitMQ的相关功能,至于MQ的相关选型,由...
    花花大脸猫阅读 3,014评论 1 4
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,601评论 18 139
  • RabbitMq 消息中间件介绍&为什么要使用消息中间件&什么时候使用消息中间件 我们用java来举例子, 打个比...
    葛立恒数阅读 2,167评论 1 4
  • 为了实现服务的异步操作,比较常用的就是消息队列了,之前一直是依葫芦画瓢的调包侠客,这次还是稍微的了解了一下在Spr...
    TesTo阅读 402评论 0 0