Broker消息服务器在启动时向所有NameServer注册,消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台消息服务器进行消息发送。NameServer与每台Broker服务器保持长连接,并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,为什么要这么设计呢?这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。
上面这段话是截取自《RocketMQ技术内幕》2.1节:NameServer架构设计,主要叙述了NameServer主要的架构设计和实现思路。本文就上面这段话进行拆解,一一进行源码解读。
1、Broker消息服务器在启动时向所有NameServer注册
// BrokerController#start
public void start() throws Exception {
// ...
this.registerBrokerAll(true, false, true);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
// ...
}
Broker在启动时会调用registerBrokerAll函数,该函数实际会向NameServer发送REGISTER_BROKER请求,进行注册,注册的内容有哪些呢?
{
"dataVersion":{
"counter":2,
"timestamp":1579838252574
},
"topicConfigTable":{
"SELF_TEST_TOPIC":Object{...},
"DefaultCluster":Object{...},
"RMQ_SYS_TRANS_HALF_TOPIC":Object{...},
"DESKTOP-FJIT15L":Object{...},
"TBW102":{
"order":false,
"perm":7,
"readQueueNums":8,
"topicFilterType":"SINGLE_TAG",
"topicName":"TBW102",
"topicSysFlag":0,
"writeQueueNums":8
},
"BenchmarkTest":Object{...},
"OFFSET_MOVED_EVENT":Object{...}
}
}
除此之外,还有brokerAddr、brokerId、brokerName、clusterName等。
在Broker启动时,还会启动一个定时任务,进行定时注册上报,默认30s执行一次。
向所有NameServer注册体现在,通过一个for循环,遍历所有NameServer,向其注册:
// BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(...) {
// ...
// 来自启动配置
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
// ...
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
// ...
}
});
}
// ...
}
2、NameServer与每台Broker服务器保持长连接
看Broker端的代码
// BrokerController#start
public void start() throws Exception {
// ...
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
// ...
}
start函数实际是Netty的初始化:NettyRemotingClient#start。而实际的网络连接,则进行了本地缓存,如果连接已存在且可以,复用之,否则会重新创建新的连接:
// NettyRemotingClient#getAndCreateChannel
private Channel getAndCreateChannel(final String addr) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
}
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
}
return this.createChannel(addr);
}
// NettyRemotingClient#createChannel
private Channel createChannel(final String addr) throws InterruptedException {
// ...
if (createNewConnection) {
ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
}
// ...
}
channelTables的key是server地址(ip:port),因此对于多个NameServer,就有多个KV对。
3、并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者
// NamesrvController#initialize
public boolean initialize() {
// ...
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// ...
}
书中“并间隔30s检测Broker是否存活”的描述并不准确,Broker定时注册的间隔是30s,而NameServer定时检测Broker是否宕机的间隔是10s。
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME /* 120秒 */) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
NameServer每隔10秒扫描brokerLiveTable,如果Broker的上次更新时间距今超过120秒,则认为Broker失效,移除该Broker,删除与该Broker相关的路由信息,关闭与Broker连接。
Broker的上次更新时间是什么时候更新的?答案是,Broker有一个30秒的定时注册逻辑,NameServer在收到请求后,会更新这个时间戳(RouteInfoManager#registerBroker)。
关于"路由变化不会马上通知消息生产者"的描述,首先,NameServer检测到Broker宕机是有延迟的,连续120s没收到Broker的心跳,才会认为异常进行移除。其次,NameServer检测到宕机后不会通知给Producer,需要依靠Producer自身的定时任务去更新topic的路由信息,这个间隔是30s。也就是说,在这个间隔内,Produer是不知道Broker已经宕机了。因此需要在发送时,有额外的逻辑保障发送的高可用。
// MQClientInstance#startScheduledTask
private void startScheduledTask() {
// ...
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// ...
}
4、消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台消息服务器进行消息发送
5、在消息发送端提供容错机制来保证消息发送的高可用性
Broker选择、消息队列选择、容错发送,我们合一起讲,用example.quickstart.Producer进行分析。
// DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(...) {
// ...
// 获取到topic路由信息,才能执行下一步发送
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// ...
// 重试次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
MessageQueue mq = null;
for (; times < timesTotal; times++) {
// ...
// 选择队列
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
mq = mqSelected;
// ...
// 发送
this.sendKernelImpl(msg, mq, ...);
// ...
}
}
// ...
}
大致的流程是:
1、获取topic的路由信息
2、采用重试机制,for循环执行
选择消息队列
发送消息
获取topic路由信息:在发送消息之前,需要知道要发送到哪个Broker,这部分逻辑都封装在tryToFindTopicPublishInfo,具体的分析可以参考RocketMQ自动创建topic,或者《RocketMQ技术内幕》3.4.2节。
获取到的路由信息数据结构如下:
再看下messageQueueList的数据是什么样的。由MQClientInstance#topicRouteData2TopicPublishInfo的逻辑,我们知道messageQueueList中的元素是按brokerName + queueId排序的,举例说明:
[
{
"brokerName":"broker-a",
"queueId":0
},
{
"brokerName":"broker-a",
"queueId":1
},
{
"brokerName":"broker-b",
"queueId":0
},
{
"brokerName":"broker-b",
"queueId":1
}
]
发到哪个Broker确定后,需要确定发到哪个队列。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 触发重试时,lastBrokerName就不为null
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
// pos指针往后移
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 取brokerName不相同的
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
// 第1次时,会随机生成一个index,后面发送一次就+1
int index = this.sendWhichQueue.getAndIncrement();
// 取模,分配到其中一个队列中
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
// 随机数
index = Math.abs(random.nextInt());
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
}
index = Math.abs(index + 1);
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
return index;
}
对于消息发送成功的:第1次发送时,会先随机生成一个index,用index%messageQueueList.size(),确定落到哪个队列。后续消息index会累加,消息就发送到别的队列去了。
对于消息发送失败的:重试时,会从messageQueueList依次往后取brokerName跟之前不一样的那个队列。如果只有1个Broker,那最终发送的还是之前那个Broker。
测试发送4条消息,消息队列共4个,发送情况如下:
总结一下就是:如果Broker宕机,如果Producer发送前从NameServer获取到了最新的topic路由信息,那么发送不会有问题;如果发送时,topic路由信息还是包含了宕机的Broker,那会触发重试机制,发送时选择不同的Broker再去发送。
在消息队列选择中,还有另外一种机制:Broker故障延迟机制,sendLatencyFaultEnable默认时不开启的。它主要是在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中。