近期出现MQ消费者消费时业务中有下载大文件抛出OOM后就没有消费者了,回想好像很多年前就遇到这样的事迹rabbitmq 消费线程无故中断连接 - 简书 (jianshu.com),但是没有研究MQ 消费者源码,这不回弦镖又回来了!
,所以就有疑问:
1、是消费线程掉线了吗?如果消费线程掉线不是还可以重新new 线程消费队列消息吗?为什么会没有消费者了;
2、是消费者掉线了吗?如果配置多个消费者是不是就可以避免无法消费队列消息问题?
springboot 自动装配版本
org.springframework.boot:spring-boot-autoconfigure:2.0.2.release
springboot rabbitmq 版本
org.springframework.boot.amqp:spring-rabbit:2.0.3.release
1、首先看rabbitmq 自动装配启动类
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class) ##
public class RabbitAutoConfiguration {
#这个类里面会@Bean 很多工厂类比如 连接工厂,以及配置类,rabbittemplate类,这些暂时不是这篇文章所涉及,我们主要关注的是@Import(RabbitAnnotationDrivenConfiguration.class)这个类
}
3、咱们看下@Import(RabbitAnnotationDrivenConfiguration.class)这个类做了什么,从这开启了消费者的生命
@Configuration
@ConditionalOnClass(EnableRabbit.class) ## 关注这个注解
class RabbitAnnotationDrivenConfiguration {
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
#@Import(RabbitBootstrapConfiguration.class) 通过注解引入这个类 ,从这里调到spring-rabbit 包
}
2、咱们看下@Import(RabbitBootstrapConfiguration.class) 这个类做了什么
@Configuration
public class RabbitBootstrapConfiguration {
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
#通过这个后置处理器进来
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}
4、RabbitListenerAnnotationBeanPostProcessor 当spring容器初始化bean时执行后置处理器
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
## 拿到所有的@RabbitListener 注解,并遍历处理
for (RabbitListener rabbitListener : lm.annotations) {
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
#拿到对应的反射对象
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
endpoint.setBeanFactory(this.beanFactory);
endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
}
#继续处理监听器
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName) {
# 设置原始对象
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
# 设置消费的队列
endpoint.setQueueNames(resolveQueues(rabbitListener));
#设置 消费者个数
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
String group = rabbitListener.group();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String autoStartup = rabbitListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
}
endpoint.setExclusive(rabbitListener.exclusive());
String priority = resolve(rabbitListener.priority());
if (StringUtils.hasText(priority)) {
try {
endpoint.setPriority(Integer.valueOf(priority));
}
catch (NumberFormatException ex) {
throw new BeanInitializationException("Invalid priority value for " +
rabbitListener + " (must be an integer)", ex);
}
}
String rabbitAdmin = resolve(rabbitListener.admin());
if (StringUtils.hasText(rabbitAdmin)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
try {
endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
rabbitAdmin + "' was found in the application context", ex);
}
}
RabbitListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
containerFactoryBeanName + "' was found in the application context", ex);
}
}
# 监听注册器开始注册
this.registrar.registerEndpoint(endpoint, factory);
}
/**
* Register a new {@link RabbitListenerEndpoint} alongside the
* {@link RabbitListenerContainerFactory} to use to create the underlying container.
* <p>The {@code factory} may be {@code null} if the default factory has to be
* used for that endpoint.
* @param endpoint the {@link RabbitListenerEndpoint} instance to register.
* @param factory the {@link RabbitListenerContainerFactory} to use.
*/
public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
#监听器开始注册
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
5、 ##开始注册监听器容器
/**
* Create a message listener container for the given {@link RabbitListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
* <p>The {@code startImmediately} flag determines if the container should be
* started immediately.
* @param endpoint the endpoint to add.
* @param factory the {@link RabbitListenerContainerFactory} to use.
* @param startImmediately start the container immediately if necessary
* @see #getListenerContainers()
* @see #getListenerContainer(String)
*/
@SuppressWarnings("unchecked")
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
##在这里创建一个Rabbitmq 的监听器容器
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
##开始监听
startIfNecessary(container);
}
}
}
6、在这里创建一个Rabbitmq 的监听器容器createListenerContainer
/**
* Create and start a new {@link MessageListenerContainer} using the specified factory.
* @param endpoint the endpoint to create a {@link MessageListenerContainer}.
* @param factory the {@link RabbitListenerContainerFactory} to use.
* @return the {@link MessageListenerContainer}.
*/
protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
RabbitListenerContainerFactory<?> factory) {
## 创建监听容器
MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
if (listenerContainer instanceof InitializingBean) {
try {
((InitializingBean) listenerContainer).afterPropertiesSet();
}
catch (Exception ex) {
throw new BeanInitializationException("Failed to initialize message listener container", ex);
}
}
int containerPhase = listenerContainer.getPhase();
if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
this.phase + " vs " + containerPhase);
}
this.phase = listenerContainer.getPhase();
}
return listenerContainer;
}
@Override
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
C instance = createContainerInstance();
##设置连接工厂
if (this.connectionFactory != null) {
instance.setConnectionFactory(this.connectionFactory);
}
if (this.errorHandler != null) {
instance.setErrorHandler(this.errorHandler);
}
if (this.messageConverter != null) {
instance.setMessageConverter(this.messageConverter);
}
##设置ack 模式
if (this.acknowledgeMode != null) {
instance.setAcknowledgeMode(this.acknowledgeMode);
}
if (this.channelTransacted != null) {
instance.setChannelTransacted(this.channelTransacted);
}
if (this.applicationContext != null) {
instance.setApplicationContext(this.applicationContext);
}
if (this.taskExecutor != null) {
instance.setTaskExecutor(this.taskExecutor);
}
if (this.transactionManager != null) {
instance.setTransactionManager(this.transactionManager);
}
## 设置一次网络请求拿多少个消息包
if (this.prefetchCount != null) {
instance.setPrefetchCount(this.prefetchCount);
}
if (this.defaultRequeueRejected != null) {
instance.setDefaultRequeueRejected(this.defaultRequeueRejected);
}
if (this.adviceChain != null) {
instance.setAdviceChain(this.adviceChain);
}
if (this.recoveryBackOff != null) {
instance.setRecoveryBackOff(this.recoveryBackOff);
}
if (this.mismatchedQueuesFatal != null) {
instance.setMismatchedQueuesFatal(this.mismatchedQueuesFatal);
}
if (this.missingQueuesFatal != null) {
instance.setMissingQueuesFatal(this.missingQueuesFatal);
}
if (this.consumerTagStrategy != null) {
instance.setConsumerTagStrategy(this.consumerTagStrategy);
}
if (this.idleEventInterval != null) {
instance.setIdleEventInterval(this.idleEventInterval);
}
if (this.failedDeclarationRetryInterval != null) {
instance.setFailedDeclarationRetryInterval(this.failedDeclarationRetryInterval);
}
if (this.applicationEventPublisher != null) {
instance.setApplicationEventPublisher(this.applicationEventPublisher);
}
if (endpoint.getAutoStartup() != null) {
instance.setAutoStartup(endpoint.getAutoStartup());
}
else if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
if (this.phase != null) {
instance.setPhase(this.phase);
}
if (this.afterReceivePostProcessors != null) {
instance.setAfterReceivePostProcessors(this.afterReceivePostProcessors);
}
instance.setListenerId(endpoint.getId());
endpoint.setupListenerContainer(instance);
if (this.beforeSendReplyPostProcessors != null
&& instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {
((AbstractAdaptableMessageListener) instance.getMessageListener())
.setBeforeSendReplyPostProcessors(this.beforeSendReplyPostProcessors);
}
##实例化容器
initializeContainer(instance, endpoint);
return instance;
}
@Override
protected void initializeContainer(SimpleMessageListenerContainer instance, RabbitListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
if (this.txSize != null) {
instance.setTxSize(this.txSize);
}
String concurrency = endpoint.getConcurrency();
if (concurrency != null) {
instance.setConcurrency(concurrency);
}
#设置消费者个数
else if (this.concurrentConsumers != null) {
instance.setConcurrentConsumers(this.concurrentConsumers);
}
if ((concurrency == null || !(concurrency.contains("-"))) && this.maxConcurrentConsumers != null) {
instance.setMaxConcurrentConsumers(this.maxConcurrentConsumers);
}
if (this.startConsumerMinInterval != null) {
instance.setStartConsumerMinInterval(this.startConsumerMinInterval);
}
if (this.stopConsumerMinInterval != null) {
instance.setStopConsumerMinInterval(this.stopConsumerMinInterval);
}
if (this.consecutiveActiveTrigger != null) {
instance.setConsecutiveActiveTrigger(this.consecutiveActiveTrigger);
}
if (this.consecutiveIdleTrigger != null) {
instance.setConsecutiveIdleTrigger(this.consecutiveIdleTrigger);
}
if (this.receiveTimeout != null) {
instance.setReceiveTimeout(this.receiveTimeout);
}
if (this.deBatchingEnabled != null) {
instance.setDeBatchingEnabled(this.deBatchingEnabled);
}
}
###做个标记
public void setConcurrentConsumers(final int concurrentConsumers) {
Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
Assert.isTrue(!isExclusive() || concurrentConsumers == 1,
"When the consumer is exclusive, the concurrency must be 1");
if (this.maxConcurrentConsumers != null) {
Assert.isTrue(concurrentConsumers <= this.maxConcurrentConsumers,
"'concurrentConsumers' cannot be more than 'maxConcurrentConsumers'");
}
synchronized (this.consumersMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Changing consumers from " + this.concurrentConsumers + " to " + concurrentConsumers);
}
int delta = this.concurrentConsumers - concurrentConsumers;
this.concurrentConsumers = concurrentConsumers;
if (isActive() && this.consumers != null) {
if (delta > 0) {
Iterator<BlockingQueueConsumer> consumerIterator = this.consumers.iterator();
while (consumerIterator.hasNext() && delta > 0) {
BlockingQueueConsumer consumer = consumerIterator.next();
consumer.basicCancel(true);
consumerIterator.remove();
delta--;
}
}
else {
addAndStartConsumers(-delta);
}
}
}
}
8、上面创建监听器容器完成后由下方方法开始监听
@SuppressWarnings("unchecked")
public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
#开始监听
startIfNecessary(container);
}
}
}
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
9、MessageListenerContainer
/**
* Start this container.
* @see #doStart
*/
@Override
public void start() {
if (isRunning()) {
return;
}
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
this.initialized = true;
}
}
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Starting Rabbit listener container.");
}
configureAdminIfNeeded();
checkMismatchedQueues();
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
}
@Override
protected void doStart() throws Exception {
if (getMessageListener() instanceof ListenerContainerAware) {
Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length,
"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
boolean found = false;
for (String queueName : queueNames) {
if (expectedQueueNames.contains(queueName)) {
found = true;
}
else {
found = false;
break;
}
}
Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
}
}
super.doStart();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
#初始化消费者
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
# 为每个消费者 创建一个线程,交给线程池
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
}
10、 看消费线程run 的逻辑
@Override
public void run() {
if (!isActive()) {
return;
}
boolean aborted = false;
int consecutiveIdles = 0;
int consecutiveMessages = 0;
this.consumer.setLocallyTransacted(isChannelLocallyTransacted());
String routingLookupKey = getRoutingLookupKey();
if (routingLookupKey != null) {
SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey);
}
if (this.consumer.getQueueCount() < 1) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer stopping; no queues for " + this.consumer);
}
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
this.start.countDown();
return;
}
try {
try {
redeclareElementsIfNecessary();
this.consumer.start();
this.start.countDown();
}
catch (QueuesNotAvailableException e) {
if (isMissingQueuesFatal()) {
throw e;
}
else {
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw e;
}
}
catch (FatalListenerStartupException ex) {
if (isPossibleAuthenticationFailureFatal()) {
throw ex;
}
else {
Throwable possibleAuthException = ex.getCause().getCause();
if (possibleAuthException == null ||
!(possibleAuthException instanceof PossibleAuthenticationFailureException)) {
throw ex;
}
else {
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw possibleAuthException;
}
}
}
catch (Throwable t) { //NOSONAR
this.start.countDown();
handleStartupFailure(this.consumer.getBackOffExecution());
throw t;
}
if (getTransactionManager() != null) {
/*
* Register the consumer's channel so it will be used by the transaction manager
* if it's an instance of RabbitTransactionManager.
*/
ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), getConnectionFactory());
}
## 直接看这里,进入循环
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
## 开始拿消息处理消息
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
if (receivedOk) {
if (isActive(this.consumer)) {
consecutiveIdles = 0;
if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
considerAddingAConsumer();
consecutiveMessages = 0;
}
}
}
else {
consecutiveMessages = 0;
if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
considerStoppingAConsumer(this.consumer);
consecutiveIdles = 0;
}
}
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
if (receivedOk) {
updateLastReceive();
}
else {
long now = System.currentTimeMillis();
long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
long lastReceive = getLastReceive();
if (now > lastReceive + idleEventInterval
&& now > lastAlertAt + idleEventInterval
&& SimpleMessageListenerContainer.this.lastNoMessageAlert
.compareAndSet(lastAlertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
catch (ListenerExecutionFailedException ex) {
// Continue to process, otherwise re-throw
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
/*
* These will normally be wrapped by an LEFE if thrown by the
* listener, but we will also honor it if thrown by an
* error handler.
*/
}
}
}
catch (InterruptedException e) {
logger.debug("Consumer thread interrupted, processing stopped.");
Thread.currentThread().interrupt();
aborted = true;
publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
}
catch (QueuesNotAvailableException ex) {
logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);
if (isMissingQueuesFatal()) {
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}
catch (FatalListenerStartupException ex) {
logger.error("Consumer received fatal exception on startup", ex);
this.startupException = ex;
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex);
}
catch (FatalListenerExecutionException ex) {
logger.error("Consumer received fatal exception during processing", ex);
// Fatal, but no point re-throwing, so just abort.
aborted = true;
publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
}
catch (PossibleAuthenticationFailureException ex) {
logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() +
" exception during processing", ex);
if (isPossibleAuthenticationFailureFatal()) {
this.startupException =
new FatalListenerStartupException("Authentication failure",
new AmqpAuthenticationException(ex));
// Fatal, but no point re-throwing, so just abort.
aborted = true;
}
publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
}
catch (ShutdownSignalException e) {
if (RabbitUtils.isNormalShutdown(e)) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
}
}
else {
logConsumerException(e);
}
}
catch (AmqpIOException e) {
if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
getExclusiveConsumerExceptionLogger().log(logger,
"Exclusive consumer failure", e.getCause().getCause());
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
}
else {
logConsumerException(e);
}
}
catch (Error e) { //NOSONAR
// ok to catch Error - we're aborting so will stop
logger.error("Consumer thread error, thread abort.", e);
publishConsumerFailedEvent("Consumer threw an Error", true, e);
#当发送error 时 标记为true,目的是为了下面终止当前线程
aborted = true;
}
catch (Throwable t) { //NOSONAR
// by now, it must be an exception
if (isActive()) {
logConsumerException(t);
}
}
finally {
if (getTransactionManager() != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}
// In all cases count down to allow container to progress beyond startup
this.start.countDown();
if (!isActive(this.consumer) || aborted) {
# 如果当前贤臣更不是活跃 或者标记为true
logger.debug("Cancelling " + this.consumer);
try {
this.consumer.stop(); // 将当前线程体停止
SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(
new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
}
}
catch (AmqpException e) {
logger.info("Could not cancel message consumer", e);
}
if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort
.compareAndSet(null, Thread.currentThread())) {
logger.error("Stopping container from aborted consumer");
stop();
SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null);
ListenerContainerConsumerFailedEvent event = null;
do {
try {
event = SimpleMessageListenerContainer.this.abortEvents.poll(5, TimeUnit.SECONDS);
if (event != null) {
SimpleMessageListenerContainer.this.publishConsumerFailedEvent(
event.getReason(), event.isFatal(), event.getThrowable());
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
while (event != null);
}
}
else {
# 如果是非标记的 异常,可以重启当前线程 如果当前只有一个消费线程 并且无法重启则改队列一直无法被消费;
logger.info("Restarting " + this.consumer);
restart(this.consumer);
}
if (routingLookupKey != null) {
SimpleResourceHolder.unbind(getRoutingConnectionFactory());
}
}
private void logConsumerException(Throwable t) {
if (logger.isDebugEnabled()
|| !(t instanceof AmqpConnectException || t instanceof ConsumerCancelledException)) {
logger.debug(
"Consumer raised exception, processing can restart if the connection factory supports it",
t);
}
else {
if (t instanceof ConsumerCancelledException && this.consumer.isNormalCancel()) {
if (logger.isDebugEnabled()) {
logger.debug(
"Consumer raised exception, processing can restart if the connection factory supports it. "
+ "Exception summary: " + t);
}
}
else if (logger.isWarnEnabled()) {
logger.warn(
"Consumer raised exception, processing can restart if the connection factory supports it. "
+ "Exception summary: " + t);
}
}
publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, t);
}
}
11、开始获取mq 消息并处理
private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
if (getTransactionManager() != null) {
try {
if (this.transactionTemplate == null) {
this.transactionTemplate =
new TransactionTemplate(getTransactionManager(), getTransactionAttribute());
}
return this.transactionTemplate
.execute(status -> {
RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
new RabbitResourceHolder(consumer.getChannel(), false),
getConnectionFactory(), true);
// unbound in ResourceHolderSynchronization.beforeCompletion()
try {
return doReceiveAndExecute(consumer);
}
catch (RuntimeException e1) {
prepareHolderForRollback(resourceHolder, e1);
throw e1;
}
catch (Throwable e2) { //NOSONAR
// ok to catch Throwable here because we re-throw it below
throw new WrappedTransactionException(e2);
}
});
}
catch (WrappedTransactionException e) {
throw e.getCause();
}
}
# 上面是判断是否是事务消息 假定是非事务直接看这个方法
return doReceiveAndExecute(consumer);
}
private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR
Channel channel = consumer.getChannel();
for (int i = 0; i < this.txSize; i++) {
logger.trace("Waiting for message from consumer.");
## 获取到消息
Message message = consumer.nextMessage(this.receiveTimeout);
if (message == null) {
break;
}
try {
# 处理消息
executeListener(channel, message);
}
catch (ImmediateAcknowledgeAmqpException e) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery '"
+ e.getMessage() + "': "
+ message.getMessageProperties().getDeliveryTag());
}
break;
}
catch (Throwable ex) { //NOSONAR
if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("User requested ack for failed delivery: "
+ message.getMessageProperties().getDeliveryTag());
}
break;
}
if (getTransactionManager() != null) {
if (getTransactionAttribute().rollbackOn(ex)) {
RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
.getResource(getConnectionFactory());
if (resourceHolder != null) {
consumer.clearDeliveryTags();
}
else {
/*
* If we don't actually have a transaction, we have to roll back
* manually. See prepareHolderForRollback().
*/
consumer.rollbackOnExceptionIfNecessary(ex);
}
throw ex; // encompassing transaction will handle the rollback.
}
else {
if (this.logger.isDebugEnabled()) {
this.logger.debug("No rollback for " + ex);
}
break;
}
}
else {
consumer.rollbackOnExceptionIfNecessary(ex);
throw ex;
}
}
}
return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
12 、处理消息
/**
* Execute the specified listener, committing or rolling back the transaction afterwards (if necessary).
*
* @param channel the Rabbit Channel to operate on
* @param messageIn the received Rabbit Message
* @throws Exception Any Exception.
*
* @see #invokeListener
* @see #handleListenerException
*/
protected void executeListener(Channel channel, Message messageIn) throws Exception {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because the listener container has been stopped: " + messageIn);
}
throw new MessageRejectedWhileStoppingException();
}
try {
Message message = messageIn;
if (this.afterReceivePostProcessors != null) {
for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
message = processor.postProcessMessage(message);
if (message == null) {
throw new ImmediateAcknowledgeAmqpException(
"Message Post Processor returned 'null', discarding message");
}
}
}
Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
#是否是批处理?上面咱们说到一次拿几个消息 在这里会吧消息包安装mq 自己的结构拆解
if (MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(batchFormat) && this.deBatchingEnabled) {
ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
while (byteBuffer.hasRemaining()) {
int length = byteBuffer.getInt();
if (length < 0 || length > byteBuffer.remaining()) {
throw new ListenerExecutionFailedException("Bad batched message received",
new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
message);
}
byte[] body = new byte[length];
byteBuffer.get(body);
messageProperties.setContentLength(length);
// Caveat - shared MessageProperties.
Message fragment = new Message(body, messageProperties);
# 循环调用监听方法
invokeListener(channel, fragment);
}
}
else {
# 单个消息包调用监听方法
invokeListener(channel, message);
}
}
catch (Exception ex) {
if (messageIn.getMessageProperties().isFinalRetryForMessageWithNoId()) {
if (this.statefulRetryFatalWithNullMessageId) {
throw new FatalListenerExecutionException(
"Illegal null id in message. Failed to manage retry for message: " + messageIn);
}
else {
throw new ListenerExecutionFailedException("Cannot retry message more than once without an ID",
new AmqpRejectAndDontRequeueException("Not retryable; rejecting and not requeuing", ex),
messageIn);
}
}
handleListenerException(ex);
throw ex;
}
}
protected void invokeListener(Channel channel, Message message) throws Exception {
this.proxy.invokeListener(channel, message);
}
13、真正执行监听器
/**
* Invoke the specified listener: either as standard MessageListener or (preferably) as SessionAwareMessageListener.
* @param channel the Rabbit Channel to operate on
* @param message the received Rabbit Message
* @throws Exception if thrown by Rabbit API methods
* @see #setMessageListener
*/
protected void actualInvokeListener(Channel channel, Message message) throws Exception {
Object listener = getMessageListener();
if (listener instanceof ChannelAwareMessageListener) {
doInvokeListener((ChannelAwareMessageListener) listener, channel, message);
}
else if (listener instanceof MessageListener) {
boolean bindChannel = isExposeListenerChannel() && isChannelLocallyTransacted();
if (bindChannel) {
RabbitResourceHolder resourceHolder = new RabbitResourceHolder(channel, false);
resourceHolder.setSynchronizedWithTransaction(true);
TransactionSynchronizationManager.bindResource(this.getConnectionFactory(),
resourceHolder);
}
try {
doInvokeListener((MessageListener) listener, message);
}
finally {
if (bindChannel) {
// unbind if we bound
TransactionSynchronizationManager.unbindResource(this.getConnectionFactory());
}
}
}
else if (listener != null) {
throw new FatalListenerExecutionException("Only MessageListener and SessionAwareMessageListener supported: "
+ listener);
}
else {
throw new FatalListenerExecutionException("No message listener specified - see property 'messageListener'");
}
}
protected void doInvokeListener(MessageListener listener, Message message) throws Exception {
try {
listener.onMessage(message);
}
catch (Exception e) {
throw wrapToListenerExecutionFailedExceptionIfNeeded(e, message);
}
}
## AbstractAdaptableMessageListener 类的
@Override
public void onMessage(Message message) {
try {
onMessage(message, null);
}
catch (Exception e) {
throw new ListenerExecutionFailedException("Listener threw exception", e, message);
}
}
15、MessageListenerAdapter
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// Check whether the delegate is a MessageListener impl itself.
// In that case, the adapter will simply act as a pass-through.
Object delegate = getDelegate();
if (delegate != this) {
if (delegate instanceof ChannelAwareMessageListener) {
if (channel != null) {
((ChannelAwareMessageListener) delegate).onMessage(message, channel);
return;
}
else if (!(delegate instanceof MessageListener)) {
throw new AmqpIllegalStateException("MessageListenerAdapter cannot handle a "
+ "ChannelAwareMessageListener delegate if it hasn't been invoked with a Channel itself");
}
}
if (delegate instanceof MessageListener) {
((MessageListener) delegate).onMessage(message);
return;
}
}
// Regular case: find a handler method reflectively.
Object convertedMessage = extractMessage(message);
#找到@RabbitListtener 注解作用的方法名
String methodName = getListenerMethodName(message, convertedMessage);
if (methodName == null) {
throw new AmqpIllegalStateException("No default listener method specified: "
+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
+ "override the 'getListenerMethodName' method.");
}
// Invoke the handler method with appropriate arguments.
#构建方法入参
Object[] listenerArguments = buildListenerArguments(convertedMessage);
#反射方法
Object result = invokeListenerMethod(methodName, listenerArguments, message);
if (result != null) {
handleResult(result, message, channel);
}
else {
logger.trace("No result object given - no result to handle");
}
}
/**
* Invoke the specified listener method.
* @param methodName the name of the listener method
* @param arguments the message arguments to be passed in
* @param originalMessage the original message
* @return the result returned from the listener method
* @throws Exception if thrown by Rabbit API methods
* @see #getListenerMethodName
* @see #buildListenerArguments
*/
protected Object invokeListenerMethod(String methodName, Object[] arguments, Message originalMessage)
throws Exception {
try {
MethodInvoker methodInvoker = new MethodInvoker();
methodInvoker.setTargetObject(getDelegate());
methodInvoker.setTargetMethod(methodName);
methodInvoker.setArguments(arguments);
methodInvoker.prepare();
return methodInvoker.invoke();
}
catch (InvocationTargetException ex) {
Throwable targetEx = ex.getTargetException();
if (targetEx instanceof IOException) {
throw new AmqpIOException((IOException) targetEx);
}
else {
throw new ListenerExecutionFailedException("Listener method '" + methodName + "' threw exception",
targetEx, originalMessage);
}
}
catch (Exception ex) {
ArrayList<String> arrayClass = new ArrayList<String>();
if (arguments != null) {
for (Object argument : arguments) {
arrayClass.add(argument.getClass().toString());
}
}
throw new ListenerExecutionFailedException("Failed to invoke target method '" + methodName
+ "' with argument type = [" + StringUtils.collectionToCommaDelimitedString(arrayClass)
+ "], value = [" + ObjectUtils.nullSafeToString(arguments) + "]", ex, originalMessage);
}
}