nacos服务端注册源码

Nacos服务端就是一个springboot项目,提供了很多http就请路径
本次只讲解如何注册客户端
关注InstanceController类
这就是在客户端提到的服务注册路径

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    final Instance instance = parseInstance(request);
    
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

Service和instance的联系


image.png

registerInstance方法

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //创建空的服务结构,servic是双层map ,这里就是把namespace group赋值
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    //获取服务
    Service service = getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

创建service,并将service存放进serviceMap中
createEmptyService(namespaceId, serviceName, instance.isEphemeral());

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
        throws NacosException {
//getService里面可以看到serviceMap是双层map
// Map(namespace, Map(group::serviceName, Service)). 
// 一层是namespace 二层是 group value就是每一个注册的客户端
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
//为service赋值名称
        service.setName(serviceName);
//赋值namespace
        service.setNamespaceId(namespaceId);
//赋值group
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
        //存放并初始化service
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}

putServiceAndInit方法

private void putServiceAndInit(Service service) throws NacosException {
//存放service到serviceMap,里面用的锁的双重检查方式,此时service,没有instance
    putService(service);
//service的初始化
    service.init();
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService
            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

Service.init方法,主要是健康检查,后续会分析到,这里要提到的是,大量用到了多线程,在看源码的时候,主要看run方法

public void init() {
//健康检查,
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

回到ServiceManager#registerInstance方法
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);将service存放进

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    //定义key,因为默认是临时节点ephemeral为true
//所以返回的key为
// INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId + 
//NAMESPACE_KEY_CONNECTOR + serviceName;
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    //获取service
    Service service = getService(namespaceId, serviceName);
    //如果是同一个service,是加锁的
    synchronized (service) {
//instanceList是service中没有的ips
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        //将instancts存放进service中
        consistencyService.put(key, instances);
    }
}

consistencyService.put(key, instances); 存放instance到service中
这里的consistencyService 是DelegateConsistencyServiceImpl
Put方法, mapConsistencyService(key)返回的是DistroConsistencyServiceImpl

@Override
public void put(String key, Record value) throws NacosException {
    mapConsistencyService(key).put(key, value);
}

DistroConsistencyServiceImpl的put方法

@Override
public void put(String key, Record value) throws NacosException {
//存放instance的关键方法
    onPut(key, value);
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
            globalConfig.getTaskDispatchPeriod() / 2);
}

public void onPut(String key, Record value) {
    //instance是Record的实现来,这里的value就是instance
// 将传进来的instance  put进dataStore
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        datum.value = (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        dataStore.put(key, datum);
    }
    
    if (!listeners.containsKey(key)) {
        return;
    }
    //添加任务,这里入参是CHANGE
    notifier.addTask(key, DataOperation.CHANGE);
}

public void addTask(String datumKey, DataOperation action) {
    //是否包含datumKey,如果有证明已经注册过了
// services在Notifier 的run方法中,有删除掉了,这样会快速的过滤掉重复注册
    if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
        return;
    }
//如果是CHANGE,存放进service,后面又会删除掉
    if (action == DataOperation.CHANGE) {
        services.put(datumKey, StringUtils.EMPTY);
    }
//task是阻塞队列
    tasks.offer(Pair.with(datumKey, action));
}

Notifier是runnable接口的实现类, 那我们就关注一下 run方法

@Override
public void run() {
    Loggers.DISTRO.info("distro notifier started");
    
    for (; ; ) {
//当tasks队列中有数据的时候,会从队列里面拉去,否则阻塞,不回浪费cpu
        try {
            Pair<String, DataOperation> pair = tasks.take();
//重要方法
            handle(pair);
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
}

handle方法

private void handle(Pair<String, DataOperation> pair) {
    try {
        String datumKey = pair.getValue0();
        DataOperation action = pair.getValue1();
        //从services删除,还记得上上一个代码块中再存放
        services.remove(datumKey);
        
        int count = 0;
        
        if (!listeners.containsKey(datumKey)) {
            return;
        }
        
        for (RecordListener listener : listeners.get(datumKey)) {
            
            count++;
            
            try {
                if (action == DataOperation.CHANGE) {
//因为CHANGE 执行onChange
                    listener.onChange(datumKey, dataStore.get(datumKey).value);
                    continue;
                }
                
                if (action == DataOperation.DELETE) {
                    listener.onDelete(datumKey);
                    continue;
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
            }
        }
        
        if (Loggers.DISTRO.isDebugEnabled()) {
            Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
        }
    } catch (Throwable e) {
        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    }
}

listener.onChange(datumKey, dataStore.get(datumKey).value);

@Override
public void onChange(String key, Instances value) throws Exception {
    
    Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    //遍历instance,这里instances是serviceMap没有的
    for (Instance instance : value.getInstanceList()) {
        
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        //权重,double类型,的上限
        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }
        //权重,double类型,的下限
        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    //注册进serviceMap中,我们说过真正的客户端是存放进serviceMap中的
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
    recalculateChecksum();
}

updateIPs(value.getInstanceList(),方法是将新的客户端收集起来,再收集的时候,有个副本的概念
为什么会有副本的概念呢,从刚才的源码我们看出来,首先是创建一个空的service,存放进servcieMap中,在利用线程池执行task任务,将新的客户端存放进空的service中,这个存放是一个过程包括健康检查,判断新服务是否已经存在(重启客户端的时候,serviceMap并不回马上把重启的服务删除,就存在重启的客户端信息重新注册的时候,其实在serviceMap还是存在的)等一系列的操作,如果在这一系列的操作的过程中service是不完整的,如果这个时候客户端拉去服务列表,就可能出现不完整的service,针对这个情况,可以在创建和拉去接口,同时执行一段代码,这段代码加锁,是可以解决一致性的问题,但是效率低下,nacos采用的就是副本的方式,在添加instance的时候,操作的只是副本,当添加完成之后,在替换掉service里的instance,这样虽然会出现客户端拉去的instance列表是过期的,但是实现了高可用.
那关于副本的概念,在代码中是如何实现的

public void updateIps(List<Instance> ips, boolean ephemeral) {
//判断是否是临时节点,获取相应的instance集合
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
//这个oldIpMap就是副本,下面关于副本的操作在后续的文章会分析到,在代码的最后,
//把最新的instance集合判断是否是临时节点来赋值ephemeralInstances或者
//persistentInstances
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());

    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }

    List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
    if (updatedIPs.size() > 0) {
        for (Instance ip : updatedIPs) {
            Instance oldIP = oldIpMap.get(ip.getDatumKey());

            // do not update the ip validation status of updated ips
            // because the checker has the most precise result
            // Only when ip is not marked, don't we update the health status of IP:
            if (!ip.isMarked()) {
                ip.setHealthy(oldIP.isHealthy());
            }

            if (ip.isHealthy() != oldIP.isHealthy()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
                        (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
            }

            if (ip.getWeight() != oldIP.getWeight()) {
                // ip validation status updated
                Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
                        ip.toString());
            }
        }
    }

    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    if (newIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                        getName(), newIPs.size(), newIPs.toString());

        for (Instance ip : newIPs) {
            HealthCheckStatus.reset(ip);
        }
    }

    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);

    if (deadIPs.size() > 0) {
        Loggers.EVT_LOG
                .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                        getName(), deadIPs.size(), deadIPs.toString());

        for (Instance ip : deadIPs) {
            HealthCheckStatus.remv(ip);
        }
    }

    toUpdateInstances = new HashSet<>(ips);
//赋值最新的instance集合
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
}
  

服务端是如何提供客户端列表
InstanceController# ObjectNode list(HttpServletRequest request)


image.png

doSrvIpxt方法中srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));代码从service中获取临时节点和持久节点的所有列表
大家可以跟进代码,最终发现allIPs方法,可以看到把临时节点和持久节点都取出来了

public List<Instance> allIPs() {
    List<Instance> allInstances = new ArrayList<>();
    allInstances.addAll(persistentInstances);
    allInstances.addAll(ephemeralInstances);
    return allInstances;
}


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容