Nacos源码系列—服务端那些事儿

点赞再看,养成习惯,微信搜索【牧小农】关注我获取更多资讯,风里雨里,小农等你,很高兴能够成为你的朋友。
项目源码地址:公众号回复 nacos,即可免费获取源码

前言

在上节课中,我们讲解了客户端注册服务的大体流程,客户端在注册服务的时候调用的是 NamingService.registerInstance 来完成实例的注册,在最后呢我们知道服务注册是通过 nacos/v1/ns/instance 接口来完成注册的,我们今天来讲解服务端的注册,首先就从这个接口地址开始,来看具体服务端都做了哪些事情

服务注册

image.png

上面是我们从官网中找到的Nacos架构图,从这个图中我们大体可以得出我们要找的接口应该是在NamingService这个服务中,同时我们在项目结构中也可以看到naming这个模块,naming就是实现服务注册的,我们都知道请求路径都是通过controller来进行处理的,而在其中我们可以看到一个InstanceController的这么一个类,那么注册实例肯定会和它有关。可以看到InstanceController类的请求路由即是我们POST请求的路由的部分,如下:

image.png

所以,我们就从开始研究接收请求处理服务注册的源码,我们找到通过RestFul API接口,请求类型为Post,的方法,符合条件的只有InstanceController.register方法,这个方法用来接收用户的请求,并且把收到的信息进行解析,装换成实例信息,然后通过getInstanceOperator().registerInstance进行调用,这个方法也是服务注册的核心

    @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        //从 request信息中获取namespaceId,如果没有默认为public
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //获取服务名称 格式:“group@@serviceName”
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        //将request参数还原成instance实例
        final Instance instance = HttpRequestInstanceBuilder.newBuilder()
                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
        //【核心】注册服务实例
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

我们先来看一下下面这个核心方法

getInstanceOperator().registerInstance(namespaceId, serviceName, instance);

getInstanceOperator() 这个判断是否走Grpc协议,默认走Grpc,所以我们使用的是instanceServiceV2这个实例对象

  private InstanceOperator getInstanceOperator() {
        return upgradeJudgement.isUseGrpcFeatures() ? instanceServiceV2 : instanceServiceV1;
    }

instanceServiceV2就是InstanceOperatorClientImpl,方法所以我们需要进入的是下面这个实例的处理类

image.png

具体方法如下所示:

    @Override
    public void registerInstance(String namespaceId, String serviceName, Instance instance) {
        //判断是否为临时客户端
        boolean ephemeral = instance.isEphemeral();
        //获取客户端ID
        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
        //通过客户端ID创建客户端连接
        createIpPortClientIfAbsent(clientId);
        //获取服务信息
        Service service = getService(namespaceId, serviceName, ephemeral);
        //注册服务
        clientOperationService.registerInstance(service, instance, clientId);
    }

从Nacos2.0以后,新增了Client模型,管理与该客户机有关的数据内容,如果一个客户机发布了一个服务,那么这个客户机发布的所有服务和订阅者信息都会被更新到一个Client对象中,这个Client对象对应于这个客户机的链接,然后通过事件机制触发索引信息的更新。Client负责管理一个客户机的服务实例注册Publish和服务订阅Subscribe,可以方便地对需要推送的服务范围进行快速聚合,同时一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的 clientId

package com.alibaba.nacos.naming.core.v2.client;
public interface Client {

    // 客户端id
    String getClientId();
    // 是否临时客户端
    boolean isEphemeral();
    //设置客户端更新时间
    void setLastUpdatedTime();
    //获取客户端更新时间
    long getLastUpdatedTime();
    // 服务实例注册
    boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo);
    //服务实例移除
    InstancePublishInfo removeServiceInstance(Service service);
    //服务实例查询
    InstancePublishInfo getInstancePublishInfo(Service service);
    Collection<Service> getAllPublishedService();
    // 服务订阅
    boolean addServiceSubscriber(Service service, Subscriber subscriber);
    ///取消订阅
    boolean removeServiceSubscriber(Service service);
    //查询订阅
    Subscriber getSubscriber(Service service);
    Collection<Service> getAllSubscribeService();
    // 生成同步给其他节点的client数据
    ClientSyncData generateSyncData();
    // 是否过期
    boolean isExpire(long currentTime);
    // 释放资源
    void release();

}

知道了Client模型后,我们来接着从clientOperationService.registerInstance(service, instance, clientId);找到对应的具体实现

EphemeralClientOperationServiceImpl.registerInstance()

下面这个方法是具体来负责处理服务注册,我们来详细了解一下:

    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        //确保Service单例存在,注意Service的equals和hasCode方法
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        //如果不是临时客户端
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                            singleton.getGroupedServiceName()));
        }
        //根据客户端ID找到客户端信息,这个关系在连接建立的时候存储
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        //将客户端实例模型,装换成服务端实例模型
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //将实例存储到client中
        client.addServiceInstance(singleton, instanceInfo);
        //设置最后更新时间
        client.setLastUpdatedTime();
        //建立服务和客户端的关联关系
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
  1. ServiceManager.getInstance().getSingleton() 当调用getSingleton的时候会负责管理service的单例,在这里service会重写equlas和hasCode方法作为key
public class ServiceManager {

    //单例service 看service中equals和hasCode方法
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    //namespace下所有的service
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    
    //通过Map储存单例的Service
    public Service getSingleton(Service service) {
        singletonRepository.putIfAbsent(service, service);
        Service result = singletonRepository.get(service);
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
}
  1. service中 equal和hasCode方法,namespace+group+name在服务端是一个单例Service
    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof Service)) {
            return false;
        }
        Service service = (Service) o;
        return namespace.equals(service.namespace) && group.equals(service.group) && name.equals(service.name);
    }
    
    @Override
    public int hashCode() {
        return Objects.hash(namespace, group, name);
    }
  1. clientManager.getClient() 这里对应的实现类为ConnectionBasedClientManager这个实现类负责管理长连接clientId和client模型的映射关系
@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
    //通过map存储ID和client之间的关联关系
    private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();

    //根据clientId查询client
    @Override
    public Client getClient(String clientId) {
        return clients.get(clientId);
    }
}
  1. client.addServiceInstance(); 抽象类为AbstractClient:负责存储当前客户端服务注册表,也就是 service和instance的关系。
  protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);、    

  //将service和实例进行关联
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            //监控指标自增实例数
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }
  1. ClientOperationEvent.ClientRegisterServiceEvent() :这里目的是为了过滤目标服务得到最终instance列表建立service和client的关系,能够方便我们快速查询,同时会触发ClientServiceIndexesManager的监听事件。
image.png

  //服务与发布client的关系
   private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    //服务与订阅clientId的关系
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();

    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            removeSubscriberIndexes(service, clientId);
        }
    }
    
        
        private void addPublisherIndexes(Service service, String clientId) {
        publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    
       private void removePublisherIndexes(Service service, String clientId) {
        if (!publisherIndexes.containsKey(service)) {
            return;
        }
        publisherIndexes.get(service).remove(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
    
        private void addSubscriberIndexes(Service service, String clientId) {
        subscriberIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
        // Fix #5404, Only first time add need notify event.
        if (subscriberIndexes.get(service).add(clientId)) {
            NotifyCenter.publishEvent(new ServiceEvent.ServiceSubscribedEvent(service, clientId));
        }
    }
    
    private void removeSubscriberIndexes(Service service, String clientId) {
        if (!subscriberIndexes.containsKey(service)) {
            return;
        }
        subscriberIndexes.get(service).remove(clientId);
        if (subscriberIndexes.get(service).isEmpty()) {
            subscriberIndexes.remove(service);
        }
    }

请求流程图:

image.png

服务端监控检查

Nacos作为注册中心不止提供了服务注册和服务发现的功能,还提供了服务可用性检测的功能,在1.0的版本中,临时实例走的是distro协议,客户端向注册中心发送心跳来维持自身的健康(healthy)状态,持久实例则走的是Raft协议存储。

  1. 两种检测机制
  • 客户端主动上报机制
  • 服务器端主动下探机制

客户端主动上报机制:你主动找上级,说你没有打卡(不健康状态)

服务器端主动下探机制:上级检测到你有不打卡的记录,主动来找你

image.png

对于Nacos健康检测机制,我们不能主动去设置,但是健康检查机制是和Nacos的服务实例类型强相关,主要是有两种服务实例:

  • 临时实例:客户端主动上报
  • 持久实例:服务端主动下探

客户端主动上报

临时实例每隔5秒会主动上报自己的健康状态,发送心跳,如果发送心跳的间隔时间超过15秒,Nacos服务器端会将服务标记为亚健康状态,如果超过30S没有发送心跳,那么服务实例会被从服务列表中剔除

在2.0版本以后,持久实例不变,临时实例而是通过长连接来判断实例是否健康。

  1. 长连接: 一个连接上可以连续发送多数据包,在连接保持期间,如果没有数据包发送,需要双方发链路检测包,在Nacos2.0之后,使用Grpc协议代替了http协议。长连接会保持客户端和服务端发送的状态,在源码中ConnectionManager 管理所有客户端的长连接

ConnectionManager: 每3秒检测所有超过20S内没有发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在1s内成功响应,则检测通过,否则执行unregister方法移除Connection

如果客户端持续和服务端进行通讯,服务端是不需要主动下探的,只有当客户端没有一直和服务端通信的时候,服务端才会主动下探操作

@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();

   //只要spring容器启动,会触发这个方法
    @PostConstruct
    public void start() {
    // 启动不健康连接排除功能.
    RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {
      @Override
      public void run() {
        // 1. 统计过时(20s)连接
         Set<Map.Entry<String, Connection>> entries = connections.entrySet();
        //2.获得需要剔除的IP和端口
        //3.根据限制获取剔除的IP和端口
        //4.如果还是有需要剔除的客户端,则继续执行
        //5.没有活动的客户端执行探测            
        //6.如果没有马上响应,则马上剔除
        //7.剔除后发布ClientDisconnectEvent事件
      }
    });

    }
}

//注销(移出)连接方法
public synchronized void unregister(String connectionId) {
    Connection remove = this.connections.remove(connectionId);
    if (remove != null) {
        String clientIp = remove.getMetaInfo().clientIp;
        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);
        if (atomicInteger != null) {
            int count = atomicInteger.decrementAndGet();
            if (count <= 0) {
                connectionForClientIp.remove(clientIp);
            }
        }
        remove.close();
        Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);
        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);
    }

当服务端操作移除事件以后,会操作notifyClientDisConnected()方法,主要调用的是clientConnectionEventListener.clientDisConnected(connection)方法,将连接信息传入进去

    public void notifyClientDisConnected(final Connection connection) {
        
        for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {
            try {
                clientConnectionEventListener.clientDisConnected(connection);
            } catch (Throwable throwable) {
                Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}",
                        clientConnectionEventListener.getName(), throwable);
            }
        }
        

clientConnectionEventListenerd的实现类是ConnectionBasedClientManager,在这里面会出发清除索引缓存等操作

@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {
    @Override
    public boolean clientDisconnected(String clientId) {
        Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
        //同步移除client数据
        ConnectionBasedClient client = clients.remove(clientId);
        if (null == client) {
            return true;
        }
        client.release();
        //服务订阅,将变更通知到客户端
        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));
        return true;
    }
}

总结

到这里Nacos服务端的基础的源码就讲完了,有些地方我们没有展开来讲,在后续的源码讲解中,会给大家详细的进行讲解,今天主要讲解了,服务端注册以及监控检查的基础代码,后面会有最新的内容呈现给大家,如果觉得文中对您有帮助的,记得点赞关注~

今天是母亲节,在这里祝妈妈们,节日快乐!

我是牧小农,怕什么真理无穷,进一步有进一步的欢喜,大家加油

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容