Nacos服务端就是一个springboot项目,提供了很多http就请路径
本次只讲解如何注册客户端
关注InstanceController类
这就是在客户端提到的服务注册路径
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
Service和instance的联系
registerInstance方法
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//创建空的服务结构,servic是双层map ,这里就是把namespace group赋值
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//获取服务
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
创建service,并将service存放进serviceMap中
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//getService里面可以看到serviceMap是双层map
// Map(namespace, Map(group::serviceName, Service)).
// 一层是namespace 二层是 group value就是每一个注册的客户端
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
//为service赋值名称
service.setName(serviceName);
//赋值namespace
service.setNamespaceId(namespaceId);
//赋值group
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
//存放并初始化service
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
putServiceAndInit方法
private void putServiceAndInit(Service service) throws NacosException {
//存放service到serviceMap,里面用的锁的双重检查方式,此时service,没有instance
putService(service);
//service的初始化
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
Service.init方法,主要是健康检查,后续会分析到,这里要提到的是,大量用到了多线程,在看源码的时候,主要看run方法
public void init() {
//健康检查,
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
回到ServiceManager#registerInstance方法
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);将service存放进
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
//定义key,因为默认是临时节点ephemeral为true
//所以返回的key为
// INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId +
//NAMESPACE_KEY_CONNECTOR + serviceName;
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
//获取service
Service service = getService(namespaceId, serviceName);
//如果是同一个service,是加锁的
synchronized (service) {
//instanceList是service中没有的ips
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
//将instancts存放进service中
consistencyService.put(key, instances);
}
}
consistencyService.put(key, instances); 存放instance到service中
这里的consistencyService 是DelegateConsistencyServiceImpl
Put方法, mapConsistencyService(key)返回的是DistroConsistencyServiceImpl
@Override
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
DistroConsistencyServiceImpl的put方法
@Override
public void put(String key, Record value) throws NacosException {
//存放instance的关键方法
onPut(key, value);
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {
//instance是Record的实现来,这里的value就是instance
// 将传进来的instance put进dataStore
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
//添加任务,这里入参是CHANGE
notifier.addTask(key, DataOperation.CHANGE);
}
public void addTask(String datumKey, DataOperation action) {
//是否包含datumKey,如果有证明已经注册过了
// services在Notifier 的run方法中,有删除掉了,这样会快速的过滤掉重复注册
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
//如果是CHANGE,存放进service,后面又会删除掉
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
//task是阻塞队列
tasks.offer(Pair.with(datumKey, action));
}
Notifier是runnable接口的实现类, 那我们就关注一下 run方法
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
//当tasks队列中有数据的时候,会从队列里面拉去,否则阻塞,不回浪费cpu
try {
Pair<String, DataOperation> pair = tasks.take();
//重要方法
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
handle方法
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
//从services删除,还记得上上一个代码块中再存放
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
//因为CHANGE 执行onChange
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO
.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
listener.onChange(datumKey, dataStore.get(datumKey).value);
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
//遍历instance,这里instances是serviceMap没有的
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
//权重,double类型,的上限
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
//权重,double类型,的下限
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
//注册进serviceMap中,我们说过真正的客户端是存放进serviceMap中的
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
updateIPs(value.getInstanceList(),方法是将新的客户端收集起来,再收集的时候,有个副本的概念
为什么会有副本的概念呢,从刚才的源码我们看出来,首先是创建一个空的service,存放进servcieMap中,在利用线程池执行task任务,将新的客户端存放进空的service中,这个存放是一个过程包括健康检查,判断新服务是否已经存在(重启客户端的时候,serviceMap并不回马上把重启的服务删除,就存在重启的客户端信息重新注册的时候,其实在serviceMap还是存在的)等一系列的操作,如果在这一系列的操作的过程中service是不完整的,如果这个时候客户端拉去服务列表,就可能出现不完整的service,针对这个情况,可以在创建和拉去接口,同时执行一段代码,这段代码加锁,是可以解决一致性的问题,但是效率低下,nacos采用的就是副本的方式,在添加instance的时候,操作的只是副本,当添加完成之后,在替换掉service里的instance,这样虽然会出现客户端拉去的instance列表是过期的,但是实现了高可用.
那关于副本的概念,在代码中是如何实现的
public void updateIps(List<Instance> ips, boolean ephemeral) {
//判断是否是临时节点,获取相应的instance集合
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
//这个oldIpMap就是副本,下面关于副本的操作在后续的文章会分析到,在代码的最后,
//把最新的instance集合判断是否是临时节点来赋值ephemeralInstances或者
//persistentInstances
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
// do not update the ip validation status of updated ips
// because the checker has the most precise result
// Only when ip is not marked, don't we update the health status of IP:
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
if (ip.isHealthy() != oldIP.isHealthy()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),
(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());
}
if (ip.getWeight() != oldIP.getWeight()) {
// ip validation status updated
Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),
ip.toString());
}
}
}
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
getName(), newIPs.size(), newIPs.toString());
for (Instance ip : newIPs) {
HealthCheckStatus.reset(ip);
}
}
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
Loggers.EVT_LOG
.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
getName(), deadIPs.size(), deadIPs.toString());
for (Instance ip : deadIPs) {
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
//赋值最新的instance集合
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
服务端是如何提供客户端列表
InstanceController# ObjectNode list(HttpServletRequest request)
doSrvIpxt方法中srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));代码从service中获取临时节点和持久节点的所有列表
大家可以跟进代码,最终发现allIPs方法,可以看到把临时节点和持久节点都取出来了
public List<Instance> allIPs() {
List<Instance> allInstances = new ArrayList<>();
allInstances.addAll(persistentInstances);
allInstances.addAll(ephemeralInstances);
return allInstances;
}