RocketMQ消息消费push源码分析(一)

例子

public class Consumer1 {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup1");
        consumer.setNamesrvAddr("192.168.137.3:9876;192.168.137.4:9876");
        consumer.setVipChannelEnabled(false);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("TestData1", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                Message message = list.get(0);
                try {
                    String body = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.out.println(body);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

看一下DefaultMQPushConsumerImpl.start()的具体实现

public synchronized void start() throws MQClientException {
        switch(this.serviceState) {
        case CREATE_JUST:
            this.log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", new Object[]{this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()});
            this.serviceState = ServiceState.START_FAILED;
            this.checkConfig();
            //将topic tags等订阅信息传递给rebalance
            this.copySubscription();
            if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPushConsumer.changeInstanceNameToPID();
            }
            //创建MQClientInstance实例,相同的IP和进程只会产生一个MQClientInstance实例
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
            //为rebalance设置负载均衡策略
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
            //拉取消息会用到PullAPIWrapper
            this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), this.isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
            if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
            } else {
                switch(this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                    break;
                case CLUSTERING:
                    this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                }

                this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
            }

            this.offsetStore.load();
            //若是顺序消费,则实例化ConsumeMessageOrderlyService
            //若是普通消费,则实例化ConsumeMessageConcurrentlyService
            if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                this.consumeOrderly = true;
                this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
            } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                this.consumeOrderly = false;
                this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
            }

            this.consumeMessageService.start();
            boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                this.consumeMessageService.shutdown();
                throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);
            } else {
                this.mQClientFactory.start();
                this.log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
            }
        default:
            this.updateTopicSubscribeInfoWhenSubscriptionChanged();
            this.mQClientFactory.checkClientInBroker();
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            this.mQClientFactory.rebalanceImmediately();
            return;
        case RUNNING:
        case SHUTDOWN_ALREADY:
        case START_FAILED:
            throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);
        }
    }

继续看MQClientInstance.start()

public void start() throws MQClientException {
        synchronized(this) {
            switch(this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                
                this.mQClientAPIImpl.start();
                //开启一系列定时任务(更新topic路由信息、将队列的消费偏移量更新到broker)
                this.startScheduledTask();
                this.pullMessageService.start();
                this.rebalanceService.start();
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                this.log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
            case RUNNING:
            case SHUTDOWN_ALREADY:
            default:
                return;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", (Throwable)null);
            }
        }
    }

pullMessageService会在获取PullRequest时一直阻塞在pullRequestQueue,直到rebalance将PullRequest放入此阻塞队列

public void run() {
        this.log.info(this.getServiceName() + " service started");

        while(!this.isStopped()) {
            try {
                //pullmessageservice会一直阻塞在此获取PullRequest
                PullRequest pullRequest = (PullRequest)this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException var2) {
                ;
            } catch (Exception var3) {
                this.log.error("Pull Message Service Run Method exception", var3);
            }
        }

        this.log.info(this.getServiceName() + " service end");
    }

RebalanceService会每隔20秒做一次负载均衡,并且将PullRequest放入pullMessageService中

public void run() {
        this.log.info(this.getServiceName() + " service started");

        while(!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        this.log.info(this.getServiceName() + " service end");
    }
private void rebalanceByTopic(String topic, boolean isOrder) {
        Set mqSet;
        switch(this.messageModel) {
        //可以看出广播模式是没有负载均衡策略的
        case BROADCASTING:
            mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
            }
            break;
        case CLUSTERING:
            mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
            //注意:是根据topic以及消费者组得到所有的消费者客户端
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);
            if (null == mqSet && !topic.startsWith("%RETRY%")) {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
            }

            if (null == cidAll) {
                log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);
            }

            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList();
                mqAll.addAll(mqSet);
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                List allocateResult = null;

                try {
                    //负载均衡,计算出本消费者应该去哪个MessageQueue拉取消息
                    allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                } catch (Throwable var10) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), var10);
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }
                
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
        }

    }

注意:List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);根据topic以及消费者组得到所有的消费者客户端,这可能就是同一个消费者组的消费逻辑要一致的原因,另外也就允许可以多个消费者组可以订阅同一个topic,两个消费者组之间是广播消费,消费者组内部是集群消费。

private boolean updateProcessQueueTableInRebalance(String topic, Set<MessageQueue> mqSet, boolean isOrder) {
        boolean changed = false;
        Iterator it = this.processQueueTable.entrySet().iterator();

        while(it.hasNext()) {
            Entry<MessageQueue, ProcessQueue> next = (Entry)it.next();
            MessageQueue mq = (MessageQueue)next.getKey();
            ProcessQueue pq = (ProcessQueue)next.getValue();
            if (mq.getTopic().equals(topic)) {
                if (!mqSet.contains(mq)) {
                    pq.setDropped(true);
                    if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                        it.remove();
                        changed = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", this.consumerGroup, mq);
                    }
                } else if (pq.isPullExpired()) {
                    switch(this.consumeType()) {
                    case CONSUME_ACTIVELY:
                    default:
                        break;
                    case CONSUME_PASSIVELY:
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", this.consumerGroup, mq);
                        }
                    }
                }
            }
        }

        List<PullRequest> pullRequestList = new ArrayList();
        Iterator var15 = mqSet.iterator();

        while(true) {
            while(true) {
                MessageQueue mq;
                do {
                    if (!var15.hasNext()) {
                        this.dispatchPullRequest(pullRequestList);
                        return changed;
                    }

                    mq = (MessageQueue)var15.next();
                } while(this.processQueueTable.containsKey(mq));

                if (isOrder && !this.lock(mq)) {
                    log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", this.consumerGroup, mq);
                } else {
                    this.removeDirtyOffset(mq);
                    ProcessQueue pq = new ProcessQueue();
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0L) {
                        ProcessQueue pre = (ProcessQueue)this.processQueueTable.putIfAbsent(mq, pq);
                        if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", this.consumerGroup, mq);
                        } else {
                            log.info("doRebalance, {}, add a new mq, {}", this.consumerGroup, mq);
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(this.consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                        }
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", this.consumerGroup, mq);
                    }
                }
            }
        }
    }

根据传入的mqSet为每一个MessageQueue创建一个ProcessQueue,并且构造PullRequest,将PullRequest传入PullMessageService阻塞队列

dispatchPullRequest(pullRequestList);在RebalancePullImpl没有任何实现
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
        Iterator var2 = pullRequestList.iterator();

        while(var2.hasNext()) {
            PullRequest pullRequest = (PullRequest)var2.next();
            this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
            log.info("doRebalance, {}, add a new pull request {}", this.consumerGroup, pullRequest);
        }

    }
public void executePullRequestImmediately(PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException var3) {
            this.log.error("executePullRequestImmediately pullRequestQueue.put", var3);
        }

    }

PullMessageService从阻塞状态改变成运行状态,继续执行拉取消息逻辑

public void pullMessage(final PullRequest pullRequest) {
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) {
            this.log.info("the pull request[{}] is dropped.", pullRequest.toString());
        } else {
            pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

            try {
                this.makeSureStateOK();
            } catch (MQClientException var20) {
                this.log.warn("pullMessage exception, consumer state not ok", var20);
                this.executePullRequestLater(pullRequest, 3000L);
                return;
            }

            if (this.isPause()) {
                this.log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
                this.executePullRequestLater(pullRequest, 1000L);
            } else {
                long cachedMessageCount = processQueue.getMsgCount().get();
                long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 1048576L;
                if (cachedMessageCount > (long)this.defaultMQPushConsumer.getPullThresholdForQueue()) {
                    this.executePullRequestLater(pullRequest, 50L);
                    if (this.queueFlowControlTimes++ % 1000L == 0L) {
                        this.log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", new Object[]{this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, this.queueFlowControlTimes});
                    }

                } else if (cachedMessageSizeInMiB > (long)this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
                    this.executePullRequestLater(pullRequest, 50L);
                    if (this.queueFlowControlTimes++ % 1000L == 0L) {
                        this.log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", new Object[]{this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, this.queueFlowControlTimes});
                    }

                } else {
                    if (!this.consumeOrderly) {
                        if (processQueue.getMaxSpan() > (long)this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                            this.executePullRequestLater(pullRequest, 50L);
                            if (this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
                                this.log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, this.queueMaxSpanFlowControlTimes});
                            }

                            return;
                        }
                    } else {
                        if (!processQueue.isLocked()) {
                            this.executePullRequestLater(pullRequest, 3000L);
                            this.log.info("pull message later because not locked in broker, {}", pullRequest);
                            return;
                        }

                        if (!pullRequest.isLockedFirst()) {
                            long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                            boolean brokerBusy = offset < pullRequest.getNextOffset();
                            this.log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", new Object[]{pullRequest, offset, brokerBusy});
                            if (brokerBusy) {
                                this.log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset);
                            }

                            pullRequest.setLockedFirst(true);
                            pullRequest.setNextOffset(offset);
                        }
                    }

                    final SubscriptionData subscriptionData = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
                    if (null == subscriptionData) {
                        this.executePullRequestLater(pullRequest, 3000L);
                        this.log.warn("find the consumer's subscription failed, {}", pullRequest);
                    } else {
                        final long beginTimestamp = System.currentTimeMillis();
                        PullCallback pullCallback = new PullCallback() {
                            public void onSuccess(PullResult pullResult) {
                                if (pullResult != null) {
                                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
                                    switch(pullResult.getPullStatus()) {
                                    case FOUND:
                                        long prevRequestOffset = pullRequest.getNextOffset();
                                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
                                        long firstMsgOffset = 9223372036854775807L;
                                        if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
                                            firstMsgOffset = ((MessageExt)pullResult.getMsgFoundList().get(0)).getQueueOffset();
                                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), (long)pullResult.getMsgFoundList().size());
                                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
                                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0L) {
                                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                            } else {
                                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                            }
                                        } else {
                                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                        }

                                        if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) {
                                            DefaultMQPushConsumerImpl.this.log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", new Object[]{pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset});
                                        }
                                        break;
                                    case NO_NEW_MSG:
                                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                        break;
                                    case NO_MATCHED_MSG:
                                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
                                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                        break;
                                    case OFFSET_ILLEGAL:
                                        DefaultMQPushConsumerImpl.this.log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString());
                                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                                        pullRequest.getProcessQueue().setDropped(true);
                                        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
                                            public void run() {
                                                try {
                                                    DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
                                                    DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
                                                    DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
                                                    DefaultMQPushConsumerImpl.this.log.warn("fix the pull request offset, {}", pullRequest);
                                                } catch (Throwable var2) {
                                                    DefaultMQPushConsumerImpl.this.log.error("executeTaskLater Exception", var2);
                                                }

                                            }
                                        }, 10000L);
                                    }
                                }

                            }

                            public void onException(Throwable e) {
                                if (!pullRequest.getMessageQueue().getTopic().startsWith("%RETRY%")) {
                                    DefaultMQPushConsumerImpl.this.log.warn("execute the pull request exception", e);
                                }

                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 3000L);
                            }
                        };
                        boolean commitOffsetEnable = false;
                        long commitOffsetValue = 0L;
                        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
                            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
                            if (commitOffsetValue > 0L) {
                                commitOffsetEnable = true;
                            }
                        }

                        String subExpression = null;
                        boolean classFilter = false;
                        SubscriptionData sd = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
                        if (sd != null) {
                            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                                subExpression = sd.getSubString();
                            }

                            classFilter = sd.isClassFilterMode();
                        }

                        int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, true, subExpression != null, classFilter);

                        try {
                            this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, 15000L, 30000L, CommunicationMode.ASYNC, pullCallback);
                        } catch (Exception var19) {
                            this.log.error("pullKernelImpl exception", var19);
                            this.executePullRequestLater(pullRequest, 3000L);
                        }

                    }
                }
            }
        }
    }

构造消息拉取的回调PullCallback,继续调用pullAPIWrapper.pullKernelImpl构造拉取消息的请求

public PullResult pullKernelImpl(MessageQueue mq, String subExpression, String expressionType, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < Version.V4_1_0_SNAPSHOT.ordinal()) {
                throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, (Throwable)null);
            } else {
                int sysFlagInner = sysFlag;
                if (findBrokerResult.isSlave()) {
                    sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlag);
                }

                PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
                requestHeader.setConsumerGroup(this.consumerGroup);
                requestHeader.setTopic(mq.getTopic());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setQueueOffset(offset);
                requestHeader.setMaxMsgNums(maxNums);
                requestHeader.setSysFlag(sysFlagInner);
                requestHeader.setCommitOffset(commitOffset);
                requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
                requestHeader.setSubscription(subExpression);
                requestHeader.setSubVersion(subVersion);
                requestHeader.setExpressionType(expressionType);
                String brokerAddr = findBrokerResult.getBrokerAddr();
                if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                    brokerAddr = this.computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
                }

                PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
                return pullResult;
            }
        } else {
            throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", (Throwable)null);
        }
    }

异步拉取消息

public PullResult pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(11, requestHeader);
        switch(communicationMode) {
        case ONEWAY:
            assert false;

            return null;
        case ASYNC:
            this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
            return null;
        case SYNC:
            return this.pullMessageSync(addr, request, timeoutMillis);
        default:
            assert false;

            return null;
        }
    }

回调PullCallback的onSuccess()方法,提交消费请求到消费线程池。

public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispatchToConsume) {
        int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeMessageConcurrentlyService.ConsumeRequest consumeRequest = new ConsumeMessageConcurrentlyService.ConsumeRequest(msgs, processQueue, messageQueue);

            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException var10) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            int total = 0;

            while(total < msgs.size()) {
                List<MessageExt> msgThis = new ArrayList(consumeBatchSize);

                for(int i = 0; i < consumeBatchSize && total < msgs.size(); ++total) {
                    msgThis.add(msgs.get(total));
                    ++i;
                }

                ConsumeMessageConcurrentlyService.ConsumeRequest consumeRequest = new ConsumeMessageConcurrentlyService.ConsumeRequest(msgThis, processQueue, messageQueue);

                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException var11) {
                    while(total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                        ++total;
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }

    }

执行defaultMQPushConsumer的MessageListenerConcurrently监听

public void run() {
            if (this.processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
            } else {
                MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
                ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(this.messageQueue);
                ConsumeConcurrentlyStatus status = null;
                ConsumeMessageContext consumeMessageContext = null;
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
                    consumeMessageContext.setProps(new HashMap());
                    consumeMessageContext.setMq(this.messageQueue);
                    consumeMessageContext.setMsgList(this.msgs);
                    consumeMessageContext.setSuccess(false);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                }

                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;

                try {
                    ConsumeMessageConcurrentlyService.this.resetRetryTopic(this.msgs);
                    if (this.msgs != null && !this.msgs.isEmpty()) {
                        Iterator var9 = this.msgs.iterator();

                        while(var9.hasNext()) {
                            MessageExt msg = (MessageExt)var9.next();
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }

                    status = listener.consumeMessage(Collections.unmodifiableList(this.msgs), context);
                } catch (Throwable var11) {
                    ConsumeMessageConcurrentlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", new Object[]{RemotingHelper.exceptionSimpleDesc(var11), ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue});
                    hasException = true;
                }

                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                if (null == status) {
                    if (hasException) {
                        returnType = ConsumeReturnType.EXCEPTION;
                    } else {
                        returnType = ConsumeReturnType.RETURNNULL;
                    }
                } else if (consumeRT >= ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumeTimeout() * 60L * 1000L) {
                    returnType = ConsumeReturnType.TIME_OUT;
                } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
                    returnType = ConsumeReturnType.FAILED;
                } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
                    returnType = ConsumeReturnType.SUCCESS;
                }

                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.getProps().put("ConsumeContextType", returnType.name());
                }

                if (null == status) {
                    ConsumeMessageConcurrentlyService.log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", new Object[]{ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue});
                    status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.setStatus(status.toString());
                    consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                }

                ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue.getTopic(), consumeRT);
                if (!this.processQueue.isDropped()) {
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                } else {
                    ConsumeMessageConcurrentlyService.log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", this.messageQueue, this.msgs);
                }

            }
        }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容