例子
public class Consumer1 {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup1");
consumer.setNamesrvAddr("192.168.137.3:9876;192.168.137.4:9876");
consumer.setVipChannelEnabled(false);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TestData1", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
Message message = list.get(0);
try {
String body = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println(body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
看一下DefaultMQPushConsumerImpl.start()的具体实现
public synchronized void start() throws MQClientException {
switch(this.serviceState) {
case CREATE_JUST:
this.log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", new Object[]{this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()});
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
//将topic tags等订阅信息传递给rebalance
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//创建MQClientInstance实例,相同的IP和进程只会产生一个MQClientInstance实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//为rebalance设置负载均衡策略
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//拉取消息会用到PullAPIWrapper
this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), this.isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch(this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
//若是顺序消费,则实例化ConsumeMessageOrderlyService
//若是普通消费,则实例化ConsumeMessageConcurrentlyService
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
}
this.consumeMessageService.start();
boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);
} else {
this.mQClientFactory.start();
this.log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
}
default:
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
return;
case RUNNING:
case SHUTDOWN_ALREADY:
case START_FAILED:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null);
}
}
继续看MQClientInstance.start()
public void start() throws MQClientException {
synchronized(this) {
switch(this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
this.mQClientAPIImpl.start();
//开启一系列定时任务(更新topic路由信息、将队列的消费偏移量更新到broker)
this.startScheduledTask();
this.pullMessageService.start();
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
this.log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
case RUNNING:
case SHUTDOWN_ALREADY:
default:
return;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", (Throwable)null);
}
}
}
pullMessageService会在获取PullRequest时一直阻塞在pullRequestQueue,直到rebalance将PullRequest放入此阻塞队列
public void run() {
this.log.info(this.getServiceName() + " service started");
while(!this.isStopped()) {
try {
//pullmessageservice会一直阻塞在此获取PullRequest
PullRequest pullRequest = (PullRequest)this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException var2) {
;
} catch (Exception var3) {
this.log.error("Pull Message Service Run Method exception", var3);
}
}
this.log.info(this.getServiceName() + " service end");
}
RebalanceService会每隔20秒做一次负载均衡,并且将PullRequest放入pullMessageService中
public void run() {
this.log.info(this.getServiceName() + " service started");
while(!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
this.log.info(this.getServiceName() + " service end");
}
private void rebalanceByTopic(String topic, boolean isOrder) {
Set mqSet;
switch(this.messageModel) {
//可以看出广播模式是没有负载均衡策略的
case BROADCASTING:
mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
}
break;
case CLUSTERING:
mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
//注意:是根据topic以及消费者组得到所有的消费者客户端
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);
if (null == mqSet && !topic.startsWith("%RETRY%")) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List allocateResult = null;
try {
//负载均衡,计算出本消费者应该去哪个MessageQueue拉取消息
allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable var10) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), var10);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
}
}
注意:List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);根据topic以及消费者组得到所有的消费者客户端,这可能就是同一个消费者组的消费逻辑要一致的原因,另外也就允许可以多个消费者组可以订阅同一个topic,两个消费者组之间是广播消费,消费者组内部是集群消费。
private boolean updateProcessQueueTableInRebalance(String topic, Set<MessageQueue> mqSet, boolean isOrder) {
boolean changed = false;
Iterator it = this.processQueueTable.entrySet().iterator();
while(it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = (Entry)it.next();
MessageQueue mq = (MessageQueue)next.getKey();
ProcessQueue pq = (ProcessQueue)next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", this.consumerGroup, mq);
}
} else if (pq.isPullExpired()) {
switch(this.consumeType()) {
case CONSUME_ACTIVELY:
default:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", this.consumerGroup, mq);
}
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList();
Iterator var15 = mqSet.iterator();
while(true) {
while(true) {
MessageQueue mq;
do {
if (!var15.hasNext()) {
this.dispatchPullRequest(pullRequestList);
return changed;
}
mq = (MessageQueue)var15.next();
} while(this.processQueueTable.containsKey(mq));
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", this.consumerGroup, mq);
} else {
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0L) {
ProcessQueue pre = (ProcessQueue)this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", this.consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", this.consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(this.consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", this.consumerGroup, mq);
}
}
}
}
}
根据传入的mqSet为每一个MessageQueue创建一个ProcessQueue,并且构造PullRequest,将PullRequest传入PullMessageService阻塞队列
dispatchPullRequest(pullRequestList);在RebalancePullImpl没有任何实现
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
Iterator var2 = pullRequestList.iterator();
while(var2.hasNext()) {
PullRequest pullRequest = (PullRequest)var2.next();
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);
log.info("doRebalance, {}, add a new pull request {}", this.consumerGroup, pullRequest);
}
}
public void executePullRequestImmediately(PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException var3) {
this.log.error("executePullRequestImmediately pullRequestQueue.put", var3);
}
}
PullMessageService从阻塞状态改变成运行状态,继续执行拉取消息逻辑
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
this.log.info("the pull request[{}] is dropped.", pullRequest.toString());
} else {
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try {
this.makeSureStateOK();
} catch (MQClientException var20) {
this.log.warn("pullMessage exception, consumer state not ok", var20);
this.executePullRequestLater(pullRequest, 3000L);
return;
}
if (this.isPause()) {
this.log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, 1000L);
} else {
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / 1048576L;
if (cachedMessageCount > (long)this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, 50L);
if (this.queueFlowControlTimes++ % 1000L == 0L) {
this.log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", new Object[]{this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, this.queueFlowControlTimes});
}
} else if (cachedMessageSizeInMiB > (long)this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, 50L);
if (this.queueFlowControlTimes++ % 1000L == 0L) {
this.log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", new Object[]{this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, this.queueFlowControlTimes});
}
} else {
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > (long)this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, 50L);
if (this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
this.log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", new Object[]{processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, this.queueMaxSpanFlowControlTimes});
}
return;
}
} else {
if (!processQueue.isLocked()) {
this.executePullRequestLater(pullRequest, 3000L);
this.log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
if (!pullRequest.isLockedFirst()) {
long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
this.log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", new Object[]{pullRequest, offset, brokerBusy});
if (brokerBusy) {
this.log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
}
final SubscriptionData subscriptionData = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, 3000L);
this.log.warn("find the consumer's subscription failed, {}", pullRequest);
} else {
final long beginTimestamp = System.currentTimeMillis();
PullCallback pullCallback = new PullCallback() {
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);
switch(pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = 9223372036854775807L;
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty()) {
firstMsgOffset = ((MessageExt)pullResult.getMsgFoundList().get(0)).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), (long)pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0L) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) {
DefaultMQPushConsumerImpl.this.log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", new Object[]{pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset});
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
DefaultMQPushConsumerImpl.this.log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable var2) {
DefaultMQPushConsumerImpl.this.log.error("executeTaskLater Exception", var2);
}
}
}, 10000L);
}
}
}
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith("%RETRY%")) {
DefaultMQPushConsumerImpl.this.log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, 3000L);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0L) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(commitOffsetEnable, true, subExpression != null, classFilter);
try {
this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, 15000L, 30000L, CommunicationMode.ASYNC, pullCallback);
} catch (Exception var19) {
this.log.error("pullKernelImpl exception", var19);
this.executePullRequestLater(pullRequest, 3000L);
}
}
}
}
}
}
构造消息拉取的回调PullCallback,继续调用pullAPIWrapper.pullKernelImpl构造拉取消息的请求
public PullResult pullKernelImpl(MessageQueue mq, String subExpression, String expressionType, long subVersion, long offset, int maxNums, int sysFlag, long commitOffset, long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
if (!ExpressionType.isTagType(expressionType) && findBrokerResult.getBrokerVersion() < Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", " + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, (Throwable)null);
} else {
int sysFlagInner = sysFlag;
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlag);
}
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = this.computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback);
return pullResult;
}
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", (Throwable)null);
}
}
异步拉取消息
public PullResult pullMessage(String addr, PullMessageRequestHeader requestHeader, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(11, requestHeader);
switch(communicationMode) {
case ONEWAY:
assert false;
return null;
case ASYNC:
this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
return null;
case SYNC:
return this.pullMessageSync(addr, request, timeoutMillis);
default:
assert false;
return null;
}
}
回调PullCallback的onSuccess()方法,提交消费请求到消费线程池。
public void submitConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue, boolean dispatchToConsume) {
int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeMessageConcurrentlyService.ConsumeRequest consumeRequest = new ConsumeMessageConcurrentlyService.ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException var10) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
int total = 0;
while(total < msgs.size()) {
List<MessageExt> msgThis = new ArrayList(consumeBatchSize);
for(int i = 0; i < consumeBatchSize && total < msgs.size(); ++total) {
msgThis.add(msgs.get(total));
++i;
}
ConsumeMessageConcurrentlyService.ConsumeRequest consumeRequest = new ConsumeMessageConcurrentlyService.ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException var11) {
while(total < msgs.size()) {
msgThis.add(msgs.get(total));
++total;
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
执行defaultMQPushConsumer的MessageListenerConcurrently监听
public void run() {
if (this.processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
} else {
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(this.messageQueue);
ConsumeConcurrentlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap());
consumeMessageContext.setMq(this.messageQueue);
consumeMessageContext.setMsgList(this.msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(this.msgs);
if (this.msgs != null && !this.msgs.isEmpty()) {
Iterator var9 = this.msgs.iterator();
while(var9.hasNext()) {
MessageExt msg = (MessageExt)var9.next();
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
status = listener.consumeMessage(Collections.unmodifiableList(this.msgs), context);
} catch (Throwable var11) {
ConsumeMessageConcurrentlyService.log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", new Object[]{RemotingHelper.exceptionSimpleDesc(var11), ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue});
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumeTimeout() * 60L * 1000L) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put("ConsumeContextType", returnType.name());
}
if (null == status) {
ConsumeMessageConcurrentlyService.log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}", new Object[]{ConsumeMessageConcurrentlyService.this.consumerGroup, this.msgs, this.messageQueue});
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue.getTopic(), consumeRT);
if (!this.processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
ConsumeMessageConcurrentlyService.log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", this.messageQueue, this.msgs);
}
}
}