序
本文主要研究一下rocketmq的pullFromWhichNodeTable
pullFromWhichNodeTable
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
public class PullAPIWrapper {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String consumerGroup;
private final boolean unitMode;
private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
private volatile boolean connectBrokerByUser = false;
private volatile long defaultBrokerId = MixAll.MASTER_ID;
private Random random = new Random(System.currentTimeMillis());
private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
//......
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
final SubscriptionData subscriptionData) {
PullResultExt pullResultExt = (PullResultExt) pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);
List<MessageExt> msgListFilterAgain = msgList;
if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
for (MessageExt msg : msgList) {
if (msg.getTags() != null) {
if (subscriptionData.getTagsSet().contains(msg.getTags())) {
msgListFilterAgain.add(msg);
}
}
}
}
if (this.hasHook()) {
FilterMessageContext filterMessageContext = new FilterMessageContext();
filterMessageContext.setUnitMode(unitMode);
filterMessageContext.setMsgList(msgListFilterAgain);
this.executeHook(filterMessageContext);
}
for (MessageExt msg : msgListFilterAgain) {
String traFlag = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(traFlag)) {
msg.setTransactionId(msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
Long.toString(pullResult.getMaxOffset()));
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
}
pullResultExt.setMessageBinary(null);
return pullResult;
}
public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (null == suggest) {
this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
} else {
suggest.set(brokerId);
}
}
public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId;
}
AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
if (suggest != null) {
return suggest.get();
}
return MixAll.MASTER_ID;
}
public boolean isConnectBrokerByUser() {
return connectBrokerByUser;
}
//......
}
- PullAPIWrapper定义了pullFromWhichNodeTable,其key为MessageQueue,value为AtomicLong类型的brokerId
- processPullResult方法会使用pullResultExt.getSuggestWhichBrokerId()来执行updatePullFromWhichNode;updatePullFromWhichNode会更新指定MessageQueue的brokerId
- recalculatePullFromWhichNode方法在isConnectBrokerByUser为true时直接返回defaultBrokerId(
MixAll.MASTER_ID
),否则从pullFromWhichNodeTable取对应的brokerId,取不到则返回MixAll.MASTER_ID
PullResultExt
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
public class PullResultExt extends PullResult {
private final long suggestWhichBrokerId;
private byte[] messageBinary;
public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
this.suggestWhichBrokerId = suggestWhichBrokerId;
this.messageBinary = messageBinary;
}
public byte[] getMessageBinary() {
return messageBinary;
}
public void setMessageBinary(byte[] messageBinary) {
this.messageBinary = messageBinary;
}
public long getSuggestWhichBrokerId() {
return suggestWhichBrokerId;
}
}
- PullResultExt定义了suggestWhichBrokerId属性
processPullResponse
rocketmq-all-4.6.0-source-release/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
public class MQClientAPIImpl {
private final static InternalLogger log = ClientLogger.getLog();
private static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
//......
private PullResult processPullResponse(
final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
pullStatus = PullStatus.FOUND;
break;
case ResponseCode.PULL_NOT_FOUND:
pullStatus = PullStatus.NO_NEW_MSG;
break;
case ResponseCode.PULL_RETRY_IMMEDIATELY:
pullStatus = PullStatus.NO_MATCHED_MSG;
break;
case ResponseCode.PULL_OFFSET_MOVED:
pullStatus = PullStatus.OFFSET_ILLEGAL;
break;
default:
throw new MQBrokerException(response.getCode(), response.getRemark());
}
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
//......
}
- processPullResponse方法会使用responseHeader.getSuggestWhichBrokerId()来创建PullResultExt并返回
PullMessageResponseHeader
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
public class PullMessageResponseHeader implements CommandCustomHeader {
@CFNotNull
private Long suggestWhichBrokerId;
@CFNotNull
private Long nextBeginOffset;
@CFNotNull
private Long minOffset;
@CFNotNull
private Long maxOffset;
//......
}
- PullMessageResponseHeader定义了suggestWhichBrokerId属性
processRequest
rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
public class PullMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private List<ConsumeMessageHook> consumeMessageHookList;
//......
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
//......
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
break;
case SLAVE:
if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
break;
}
if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
// consume too slow ,redirect to another machine
if (getMessageResult.isSuggestPullingFromSlave()) {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
}
// consume ok
else {
responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
}
} else {
responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
}
//......
}
//......
}
- 当getMessageResult.isSuggestPullingFromSlave()则设置responseHeader的suggestWhichBrokerId为subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly(),否则设置为MixAll.MASTER_ID
whichBrokerWhenConsumeSlowly
rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java
public class SubscriptionGroupConfig {
private String groupName;
private boolean consumeEnable = true;
private boolean consumeFromMinEnable = true;
private boolean consumeBroadcastEnable = true;
private int retryQueueNums = 1;
private int retryMaxTimes = 16;
private long brokerId = MixAll.MASTER_ID;
private long whichBrokerWhenConsumeSlowly = 1;
private boolean notifyConsumerIdsChangedEnable = true;
//......
}
- SubscriptionGroupConfig的whichBrokerWhenConsumeSlowly默认值为1,而MixAll.MASTER_ID则为0
小结
- PullAPIWrapper定义了pullFromWhichNodeTable,其key为MessageQueue,value为AtomicLong类型的brokerId
- processPullResult方法会使用pullResultExt.getSuggestWhichBrokerId()来执行updatePullFromWhichNode;updatePullFromWhichNode会更新指定MessageQueue的brokerId
- recalculatePullFromWhichNode方法在isConnectBrokerByUser为true时直接返回defaultBrokerId(
MixAll.MASTER_ID
),否则从pullFromWhichNodeTable取对应的brokerId,取不到则返回MixAll.MASTER_ID