nacos-服务注册发现源码分析-注册请求接口-03

nacos-服务注册发现源码分析-启动注册-02分析了nacos在springboot 启动后,通过事件机制,通知到启动事件监听器,在事件监听器里面完成了服务的注册。现在我们来分析一下,服务注册的具体访问接口。

  • NacosServiceRegistry 服务注册接口
/**
 * @author xiaojing
 * @author <a href="mailto:mercyblitz@gmail.com">Mercy</a>
 */
public class NacosServiceRegistry implements ServiceRegistry<Registration> {

    private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class);

    private final NacosDiscoveryProperties nacosDiscoveryProperties;

    private final NamingService namingService;

    public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
        this.nacosDiscoveryProperties = nacosDiscoveryProperties;
        this.namingService = nacosDiscoveryProperties.namingServiceInstance(); // 依赖这个接口实现服务注册请求
    }
        // 服务注册
    @Override
    public void register(Registration registration) {

        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }

        String serviceId = registration.getServiceId();
        String group = nacosDiscoveryProperties.getGroup();

        Instance instance = getNacosInstanceFromRegistration(registration);

        try {
            namingService.registerInstance(serviceId, group, instance); // 注册接口
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                    instance.getIp(), instance.getPort());
        }
        catch (Exception e) {
            log.error("nacos registry, {} register failed...{},", serviceId,
                    registration.toString(), e);
            // rethrow a RuntimeException if the registration is failed.
            // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
            rethrowRuntimeException(e);
        }
    }
  • this.namingService = nacosDiscoveryProperties.namingServiceInstance(); 实例化
public NamingService namingServiceInstance() {

        if (null != namingService) {
            return namingService;
        }

        try {  // 继续看这个方法
            namingService = NacosFactory.createNamingService(getNacosProperties());
        }
        catch (Exception e) {
            log.error("create naming service error!properties={},e=,", this, e);
            return null;
        }
        return namingService;
    }
  • namingService = NacosFactory.createNamingService(getNacosProperties());
/**
     * Create naming service
     *
     * @param properties init param
     * @return Naming
     * @throws NacosException Exception
     */
    public static NamingService createNamingService(Properties properties) throws NacosException {
        return NamingFactory.createNamingService(properties);
    }

    public static NamingService createNamingService(Properties properties) throws NacosException {
        try { // 创建的对象
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            NamingService vendorImpl = (NamingService)constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }

com.alibaba.nacos.client.naming.NacosNamingService 从上面的方法我们可以看出 this.namingService = nacosDiscoveryProperties.namingServiceInstance(); 这里的namingService 就是com.alibaba.nacos.client.naming.NacosNamingService。ok ,回国头去看看这句话namingService.registerInstance(serviceId, group, instance); 也就是说这个方法就是 com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(serviceId, group, instance);

  • registerInstance(serviceId, group, instance);
.......
private NamingProxy serverProxy; //代理对象,这个对象就是发送底层http请求的

    public NacosNamingService(String serverList) { //构造
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.SERVER_ADDR, serverList);

        init(properties);
    }

    public NacosNamingService(Properties properties) { // 构造
        init(properties); // 调用下面的方法
    }

    private void init(Properties properties) {
        namespace = InitUtils.initNamespaceForNaming(properties);
        initServerAddr(properties);
        InitUtils.initWebRootContext();
        initCacheDir();
        initLogName(properties);

        eventDispatcher = new EventDispatcher();
        serverProxy = new NamingProxy(namespace, endpoint, serverList); // 初始化 代理对象`serverList` 注册中心服务地址集合,也就是nacos 服务端集合
        serverProxy.setProperties(properties);
        beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
        hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties));
    }
// 负责服务注册
@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

        if (instance.isEphemeral()) {
            BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            long instanceInterval = instance.getInstanceHeartBeatInterval();
            beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
// 添加心跳
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
// 我们看到这里,就知道注册是继续调用这句话
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }
  • NamingProxy http 服务代理 的registerService
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);
// 组装请求参数
        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp()); //服务的ip
        params.put("port", String.valueOf(instance.getPort())); // 端口
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
// 发送服务注册请求 post 请求
        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }
  • reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
public String reqAPI(String api, Map<String, String> params, String method) throws NacosException {

        List<String> snapshot = serversFromEndpoint;
        if (!CollectionUtils.isEmpty(serverList)) {
            snapshot = serverList;
        }

        return reqAPI(api, params, snapshot, method); // 发送请求
    }
  • reqAPI(api, params, snapshot, method);
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {

        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        }

        Exception exception = new Exception();

        if (servers != null && !servers.isEmpty()) {

            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size()); // 随机取一个服务地址进行注册

            for (int i = 0; i < servers.size(); i++) { 
                String server = servers.get(index);
                try {
                    return callServer(api, params, server, method); // 如果注册失败,就执行下面的逻辑,记录日志,并取 下一个服务地址,直到注册成功,return
                } catch (NacosException e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.", server, e);
                } catch (Exception e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.", server, e);
                }
// 如果 上面 注册失败,就再取一个服务地址进行注册
                index = (index + 1) % servers.size();
            }

            throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
                + exception.getMessage());
        }

        for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
            try {
                return callServer(api, params, nacosDomain);
            } catch (Exception e) {
                exception = e;
                NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
            }
        }

        throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
            + exception.getMessage());

    }

上面代码是服务注册的逻辑,大致的意思是:注册请求随机发送的服务器,如果第一次就注册成功,就return,否则继续遍历服务列表,继续注册,直到注册成功。下面看看callServer

  • callServer
 public String callServer(String api, Map<String, String> params, String curServer, String method)
        throws NacosException {
        long start = System.currentTimeMillis();
        long end = 0;
        checkSignature(params);
        List<String> headers = builderHeaders();

        String url;
        if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
            url = curServer + api; // 请求地址
        } else {
            if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
                curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
            }
            url = HttpClient.getPrefix() + curServer + api;
        }
// 发送http 请求
        HttpClient.HttpResult result = HttpClient.request(url, headers, params, UtilAndComs.ENCODING, method);
        end = System.currentTimeMillis();

        MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
            .observe(end - start);

        if (HttpURLConnection.HTTP_OK == result.code) {
            return result.content; // 请求成功,返回数据
        }

        if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
            return StringUtils.EMPTY;
        }

        throw new NacosException(NacosException.SERVER_ERROR, "failed to req API:"
            + curServer + api + ". code:"
            + result.code + " msg: " + result.content);
    }
  • 请求接口UtilAndComs.NACOS_URL_INSTANCE
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);

        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));
// url = UtilAndComs.NACOS_URL_INSTANCE
        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }
  • UtilAndComs.NACOS_URL_INSTANCE
    public static String WEB_CONTEXT = "/nacos";

    public static String NACOS_URL_BASE = WEB_CONTEXT + "/v1/ns";

    public static String NACOS_URL_INSTANCE = NACOS_URL_BASE + "/instance";

/nacos/v1/ns/instance 接口地址,请求方式:POST。
ok,到这里我们就分析完了,最底层服务注册请求逻辑。主要有:1.随机将服务注册到一个服务器,直到注册成功。2. 请求接口是:/nacos/v1/ns/instance,请求方式是post,通信协议采用http 协议。

  • 总结:
    我们使用nacos 的时候,发现我们服务一启动后,就会注册服务到注册中心,于是我们就分析了一下,nacos 服务注册的逻辑,主要有一下几点:
  1. 服务启动就注册,原因是 springboot 启动后发送启动事件,然后事件监听器里发送服务注册请求。
  2. 服务底层通信,采用http 协议
  3. 服务注册的时候,随机注册到一个nacos 服务器,如果第一次注册失败,就继续取下一服务器地址,发送请求进行注册,直到注册成功,便返回。
    注意:有一段代码没有分析:
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
            try {
                return callServer(api, params, nacosDomain);
            } catch (Exception e) {
                exception = e;
                NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
            }
        }

        throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
            + exception.getMessage());

如果有兴趣的可以 nacosDomain 取得是啥值。上面的逻辑是没有获取到nacos 服务器地址时的执行逻辑。

public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {

        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        }

        Exception exception = new Exception();

        if (servers != null && !servers.isEmpty()) { // 服务器地址不为空

            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());

            for (int i = 0; i < servers.size(); i++) {
                String server = servers.get(index);
                try {
                    return callServer(api, params, server, method);
                } catch (NacosException e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.", server, e);
                } catch (Exception e) {
                    exception = e;
                    NAMING_LOGGER.error("request {} failed.", server, e);
                }

                index = (index + 1) % servers.size();
            }

            throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: "
                + exception.getMessage());
        }
// 服务端地址为空的时候,执行这段逻辑
        for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { // 循环注册3次
            try {
                return callServer(api, params, nacosDomain);
            } catch (Exception e) {
                exception = e;
                NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + nacosDomain, e);
            }
        }

        throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
            + exception.getMessage());

    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343