在之前的文章《在IDEA中debug NameSrv、Broker、Producer、Consumer》中,我们debug Producer测试发送时,遇到过一个问题:Broker启动时我们没有配置NameSrv地址,发送程序会报错:No route info of this topic。但当我们配上NameSrv地址后,再次启动,可以正常发送消息。
example.quickstart.Producer的代码是:
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
producer.send(msg);
TopicTest之前并未创建过,Broker未配置NameSrv地址,无法发送,而配置NameSrv后则可以正常发送。这中间有2个问题:
1、topic是怎么自动创建的?
2、topic自动创建过程中Broker、NameSrv如何协作配合的?
下面我们开始分析下这个流程。想看结论的可以直接跳到文章最后面。
1、DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 省略校验相关代码
// 关键代码,如果获取到topic的路由信息,则发送,否则抛异常
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// 省略发送相关代码
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
tryToFindTopicPublishInfo是发送的关键,如果获取到topic的信息,则发送,否则就异常。因此之前No route info of this topic的异常,就是Producer获取不到TopicTest的信息,导致发送失败。
那这跟Broker配没配NameSrv地址有什么关系呢,我们接着往下看:
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// topicPublishInfoTable是Producer本地缓存的topic信息表
// Producer启动后,会添加默认的topic:TBW102
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 未获取到,从NameSrv获取该topic的信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 获取到了,则返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 没获取到,再换种方式从NameSrv获取
// 如果再获取不到,那后续就无法发送了
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
1、Producer本地topicPublishInfoTable变量中没有TopicTest的信息,只缓存了TBW102(可以认为只有key(topic),实际也无详细信息,还是要从NameSrv获取)
2、尝试从NameSrv获取TopicTest的信息。获取失败,NameSrv中根本没有TopicTest,因为这个topic是Producer发送时设置的,没有同步到NameSrv。
3、再换种方式从NameSrv获取。这里就很关键了,如果获取到了,那么可以执行发送流程,如果还是没有获取到,就会抛No route info of this topic的异常了。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 将入参的topic转换为默认的TBW102,获取TBW102的信息
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
// 省略其他代码
} else {
// 直接用入参的topic去获取
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
// 省略其他代码
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
1、第1次获取时,isDefault传的false,defaultMQProducer传的null,因此在updateTopicRouteInfoFromNameServer会走else分支,用TopicTest去获取
2、第2次获取时,isDefault传的true,defaultMQProducer也传值了,因此会走if分支,将入参的topic转换为默认的TBW102,获取TBW102的信息
到这里,大概就能建立如下的推断:
1、不管Broker配没配NameSrv地址,获取TopicTest的信息,必失败
2、获取TBW102信息:
2.1、Broker配置了NameSrv地址,成功
2.2、Broker没有配置NameSrv地址,失败
那我们进入NameSrv的源码,看看为什么如此。
2、RouteInfoManager#pickupTopicRouteData
updateTopicRouteInfoFromNameServer最终会发给NameSrv一个GET_ROUTEINTO_BY_TOPIC请求
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
// 省略其他代码
try {
try {
this.lock.readLock().lockInterruptibly();
// 从topicQueueTable获取topic信息
// 有则有,无则null
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
if (queueDataList != null) {
topicRouteData.setQueueDatas(queueDataList);
foundQueueData = true;
// 省略其他代码
}
} finally {
this.lock.readLock().unlock();
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
return topicRouteData;
}
return null;
}
NameSrv从本地变量topicQueueTable中获取对应的topic信息,没有则会返回null。由此可以推断出:
1、NameSrv本地没有TopicTest的信息(显而易见)
2、NameSrv本地记录了TBW102的topic信息
那TBW102的topic信息,NameSrv又是从哪里获取并缓存到本地的呢?
答案:来自REGISTER_BROKER请求
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
3、BrokerController#start
REGISTER_BROKER请求又是谁发给NameSrv的呢?很简单,Broker发的。Broker在启动时,会向NameSrv注册,同时有一个定时任务会定时上报
public void start() throws Exception {
// ...
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// ...
}
上报的内容有哪些呢?
# BrokerOuterAPI#registerBrokerAll
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
# TopicConfigSerializeWrapper
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable;
private DataVersion dataVersion = new DataVersion();
// ...
}
上报的内容包括TopicConfigSerializeWrapper,它的结构其实跟${user.home}\store\config\topics.json是一样的:
{
"dataVersion":{
"counter":2,
"timestamp":1579838252574
},
"topicConfigTable":{
"SELF_TEST_TOPIC":Object{...},
"DefaultCluster":Object{...},
"RMQ_SYS_TRANS_HALF_TOPIC":Object{...},
"DESKTOP-FJIT15L":Object{...},
"TBW102":{
"order":false,
"perm":7,
"readQueueNums":8,
"topicFilterType":"SINGLE_TAG",
"topicName":"TBW102",
"topicSysFlag":0,
"writeQueueNums":8
},
"BenchmarkTest":Object{...},
"OFFSET_MOVED_EVENT":Object{...}
}
}
TopicConfigManager的构造函数中,默认创建了上面的topic,其中就有TBW102,这些都是在Broker启动的时候就完成了。
因此,当Producer用TBW102去NameSrv获取topic信息时,是可以获取的。因为TBW102是Broker启动时默认创建的,Broker启动时会向NameSrv注册。这也是为什么Broker没配NameSrv时,获取不到TBW102的topic信息。
那获取到了TBW102的topic信息,跟TopicTest又有什么关系呢?TopicTest的信息还是没有啊。让我们继续往下看:
再回到MQClientInstance#updateTopicRouteInfoFromNameServer:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
// ...
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// ①第2次获取TopicTest的信息会走到这里,先转换topic,实际获取的是TBW102
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
// 获取成功,修正读写队列数
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
// old为null
TopicRouteData old = this.topicRouteTable.get(topic);
// changed为true
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
// ②走到该逻辑
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
// ③将topic的route信息转换为publish信息。实际是用了TBW102的route信息,给TopicTest用
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
// ④更新到本地
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// ...
}
}
// ...
}
在第2步获取到的数据结构为:
第3步,转换后的数据结构为:
更新本地变量:topicPublishInfoTable
public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
if (info != null && topic != null) {
TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
if (prev != null) {
log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
}
}
}
获取TBW102的topic信息,当成是TopicTest的。也可以说是,TopicTest继承了TBW102的配置信息。因此TopicTest的信息就有了。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// Producer启动后,会添加默认的topic:TBW102,但具体的信息还是没有,需要从NameSrv获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 未获取到TopicTest,从NameSrv获取该topic的信息,还是未获取到
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 没获取到,向NameSrv获取TBW102的信息,当成是TopicTest的信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
// 这次有TopicTest的信息了
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
分析完上面所有的逻辑后,就到了发送消息的步骤。
4、发送流程
发送消息的请求到达Broker后,会有一步msgCheck的过程
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
// ...
// ①Broker本地没有TopicTest,得到null
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (null == topicConfig) {
// ...
// ②自动创建topic,实际就是创建一个topicConfig对象,存放到本地map,并同步到NameSrv
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
// ...
// ③如果步骤2失败了,那就会报TOPIC_NOT_EXIST的错误
if (null == topicConfig) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
return response;
}
}
// ...
return response;
}
msgCheck做了如下校验:
1、从本地获取TopicTest的信息,得到null
2、自动创建topic,实际就是创建一个topicConfig对象,存放到本地map,并同步到NameSrv。如果topic已经缓存在本地map了,就直接返回,不需要创建
3、如果步骤2失败了,那就会报TOPIC_NOT_EXIST的错误,就不会进行下面的消息刷盘落地
下面看下createTopicInSendMessageMethod的流程:
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
// ...
// 一开始topicConfigTable还未缓存TopicTest
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
// 还是要依赖TBW102,来建立TopicTest的TopicConfig对象
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) {
if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
}
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
topicConfig = new TopicConfig(topic);
int queueNums =
clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
.getWriteQueueNums() : clientDefaultTopicQueueNums;
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
}
}
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress);
// 缓存到本地
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
createNew = true;
// 持久化到${user.home}\store\config\topics.json
this.persist();
}
// 同步到NameSrv
if (createNew) {
this.brokerController.registerBrokerAll(false, true, true);
}
return topicConfig;
}
主要的逻辑是:
1、topicConfigTable还未缓存TopicTest
2、还是要依赖TBW102,来建立TopicTest的TopicConfig对象
3、缓存到本地
4、持久化到${user.home}\store\config\topics.json
5、同步到NameSrv
到这里,校验也通过了,下一步就是消息刷盘落地了,由于不在本文分析范围内,就不作展开了。
5、TBW102是为何物?
TBW102是Broker启动时,当autoCreateTopicEnable的配置为true时,会自动创建该默认topic
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
// ...
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
// ...
}
autoCreateTopicEnable的默认值是true,可以同步外部配置文件,让Broker启动时加载,来改变该值。我理解的TBW102的作用是当开启自动创建topic功能,发送时用了未配置的topic,可以让该topic继承默认TBW102的配置,实现消息的发送。
6、总结
发送未配置topic的消息,流程图如下:
1、Broker配不配NameSrv的区别在于:配了NameSrv后,Broker会把启动默认创建的topic同步的NameSrv,而后续Producer发送时会向NameSrv查询topic信息,当查询未配置的topic信息时,Producer会将topic转换成默认的TBW102进行查询,让topic继承它的配置。
2、Broker在开启autoCreateTopicEnable的配置后(默认是开启的),才会自动创建topic,同样是继承默认TBW102的配置。
因此,要正常发送未配置topic的消息,有2个点:正确配置Broker的NameSrv地址,开启autoCreateTopicEnable。