NameServer架构设计--源码解读

Broker消息服务器在启动时向所有NameServer注册,消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台消息服务器进行消息发送。NameServer与每台Broker服务器保持长连接,并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者,为什么要这么设计呢?这是为了降低NameServer实现的复杂性,在消息发送端提供容错机制来保证消息发送的高可用性。

上面这段话是截取自《RocketMQ技术内幕》2.1节:NameServer架构设计,主要叙述了NameServer主要的架构设计和实现思路。本文就上面这段话进行拆解,一一进行源码解读。

1、Broker消息服务器在启动时向所有NameServer注册

// BrokerController#start
public void start() throws Exception {
    
    // ...
    
    this.registerBrokerAll(true, false, true);

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

    // ...
}

Broker在启动时会调用registerBrokerAll函数,该函数实际会向NameServer发送REGISTER_BROKER请求,进行注册,注册的内容有哪些呢?

{
    "dataVersion":{
        "counter":2,
        "timestamp":1579838252574
    },
    "topicConfigTable":{
        "SELF_TEST_TOPIC":Object{...},
        "DefaultCluster":Object{...},
        "RMQ_SYS_TRANS_HALF_TOPIC":Object{...},
        "DESKTOP-FJIT15L":Object{...},
        "TBW102":{
            "order":false,
            "perm":7,
            "readQueueNums":8,
            "topicFilterType":"SINGLE_TAG",
            "topicName":"TBW102",
            "topicSysFlag":0,
            "writeQueueNums":8
        },
        "BenchmarkTest":Object{...},
        "OFFSET_MOVED_EVENT":Object{...}
    }
}

除此之外,还有brokerAddr、brokerId、brokerName、clusterName等。

在Broker启动时,还会启动一个定时任务,进行定时注册上报,默认30s执行一次。

向所有NameServer注册体现在,通过一个for循环,遍历所有NameServer,向其注册:

// BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll(...) {
    
    // ...

    // 来自启动配置
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();

    for (final String namesrvAddr : nameServerAddressList) {
        brokerOuterExecutor.execute(new Runnable() {
            @Override
            public void run() {
                // ...
                
                RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                
                // ...
            }
        });
    }

    // ...
}

2、NameServer与每台Broker服务器保持长连接

看Broker端的代码

// BrokerController#start
public void start() throws Exception {
    
    // ...
    
    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }
    
    // ...
}

start函数实际是Netty的初始化:NettyRemotingClient#start。而实际的网络连接,则进行了本地缓存,如果连接已存在且可以,复用之,否则会重新创建新的连接:

// NettyRemotingClient#getAndCreateChannel
private Channel getAndCreateChannel(final String addr) throws InterruptedException {
    if (null == addr) {
        return getAndCreateNameserverChannel();
    }

    ChannelWrapper cw = this.channelTables.get(addr);
    if (cw != null && cw.isOK()) {
        return cw.getChannel();
    }

    return this.createChannel(addr);
}

// NettyRemotingClient#createChannel
private Channel createChannel(final String addr) throws InterruptedException {
    // ...
    
    if (createNewConnection) {
        ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
        
        cw = new ChannelWrapper(channelFuture);
        this.channelTables.put(addr, cw);
    }
     
    // ...
}

channelTables的key是server地址(ip:port),因此对于多个NameServer,就有多个KV对。

3、并间隔30s检测Broker是否存活,如果检测到Broker宕机,则从路由注册表中将其移除。但是路由变化不会马上通知消息生产者

// NamesrvController#initialize
public boolean initialize() {

    // ...

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        }
    }, 5, 10, TimeUnit.SECONDS);

    // ...
}

书中“并间隔30s检测Broker是否存活”的描述并不准确,Broker定时注册的间隔是30s,而NameServer定时检测Broker是否宕机的间隔是10s。

public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        if ((last + BROKER_CHANNEL_EXPIRED_TIME /* 120秒 */) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

NameServer每隔10秒扫描brokerLiveTable,如果Broker的上次更新时间距今超过120秒,则认为Broker失效,移除该Broker,删除与该Broker相关的路由信息,关闭与Broker连接。

Broker的上次更新时间是什么时候更新的?答案是,Broker有一个30秒的定时注册逻辑,NameServer在收到请求后,会更新这个时间戳(RouteInfoManager#registerBroker)。

关于"路由变化不会马上通知消息生产者"的描述,首先,NameServer检测到Broker宕机是有延迟的,连续120s没收到Broker的心跳,才会认为异常进行移除。其次,NameServer检测到宕机后不会通知给Producer,需要依靠Producer自身的定时任务去更新topic的路由信息,这个间隔是30s。也就是说,在这个间隔内,Produer是不知道Broker已经宕机了。因此需要在发送时,有额外的逻辑保障发送的高可用。

// MQClientInstance#startScheduledTask
private void startScheduledTask() {
    // ...
    
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    // ...
}

4、消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载均衡算法从列表中选择一台消息服务器进行消息发送

5、在消息发送端提供容错机制来保证消息发送的高可用性

Broker选择、消息队列选择、容错发送,我们合一起讲,用example.quickstart.Producer进行分析。

// DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(...) {
    // ...
    
    // 获取到topic路由信息,才能执行下一步发送
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        // ...
        
        // 重试次数
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        MessageQueue mq = null;
        for (; times < timesTotal; times++) {
            
            // ...
            
            // 选择队列
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            mq = mqSelected;

            // ...
            
            // 发送
            this.sendKernelImpl(msg, mq, ...);
            
            // ...
         }
    }
    
    // ...
}

大致的流程是:

1、获取topic的路由信息

2、采用重试机制,for循环执行

  • 选择消息队列

  • 发送消息

获取topic路由信息:在发送消息之前,需要知道要发送到哪个Broker,这部分逻辑都封装在tryToFindTopicPublishInfo,具体的分析可以参考RocketMQ自动创建topic,或者《RocketMQ技术内幕》3.4.2节。

获取到的路由信息数据结构如下:


topicPublisInfo数据结构.png

再看下messageQueueList的数据是什么样的。由MQClientInstance#topicRouteData2TopicPublishInfo的逻辑,我们知道messageQueueList中的元素是按brokerName + queueId排序的,举例说明:

[
    {
        "brokerName":"broker-a",
        "queueId":0
    },
    {
        "brokerName":"broker-a",
        "queueId":1
    },
    {
        "brokerName":"broker-b",
        "queueId":0
    },
    {
        "brokerName":"broker-b",
        "queueId":1
    }
]

发到哪个Broker确定后,需要确定发到哪个队列。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        // 触发重试时,lastBrokerName就不为null
        int index = this.sendWhichQueue.getAndIncrement();
        
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            // pos指针往后移
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            // 取brokerName不相同的
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}

public MessageQueue selectOneMessageQueue() {
    // 第1次时,会随机生成一个index,后面发送一次就+1
    int index = this.sendWhichQueue.getAndIncrement();
    // 取模,分配到其中一个队列中
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

public int getAndIncrement() {
    Integer index = this.threadLocalIndex.get();
    if (null == index) {
        // 随机数
        index = Math.abs(random.nextInt());
        if (index < 0)
            index = 0;
        this.threadLocalIndex.set(index);
    }

    index = Math.abs(index + 1);
    if (index < 0)
        index = 0;

    this.threadLocalIndex.set(index);
    return index;
}

对于消息发送成功的:第1次发送时,会先随机生成一个index,用index%messageQueueList.size(),确定落到哪个队列。后续消息index会累加,消息就发送到别的队列去了。

对于消息发送失败的:重试时,会从messageQueueList依次往后取brokerName跟之前不一样的那个队列。如果只有1个Broker,那最终发送的还是之前那个Broker。

测试发送4条消息,消息队列共4个,发送情况如下:


消息队列选择.png

总结一下就是:如果Broker宕机,如果Producer发送前从NameServer获取到了最新的topic路由信息,那么发送不会有问题;如果发送时,topic路由信息还是包含了宕机的Broker,那会触发重试机制,发送时选择不同的Broker再去发送。

在消息队列选择中,还有另外一种机制:Broker故障延迟机制,sendLatencyFaultEnable默认时不开启的。它主要是在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容