序
本文主要研究一下nacos ServiceManager的updateInstance
ServiceManager
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
/**
* Map<namespace, Map<group::serviceName, Service>>
*/
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new ServiceStatusSynchronizer();
private final Lock lock = new ReentrantLock();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
@Autowired
private PushService pushService;
private final Object putServiceLock = new Object();
//......
public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
if (!service.allIPs().contains(instance)) {
throw new NacosException(NacosException.INVALID_PARAM, "instance not exist: " + instance);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
Map<String, Instance> oldInstanceMap = new HashMap<>(16);
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());
for (Instance instance : currentIPs) {
map.put(instance.toIPAddr(), instance);
}
if (datum != null) {
oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
}
// use HashMap for deep copy:
HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
instanceMap.putAll(oldInstanceMap);
for (Instance instance : ips) {
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJSON());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
instanceMap.remove(instance.getDatumKey());
} else {
instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: "
+ JSON.toJSONString(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
//......
}
- updateInstance会通过service.allIPs().contains(instance)校验要更新的instance是否存在,不存在则抛出NacosException,存在则执行addInstance方法
- addInstance方法它会获取service,然后执行addIpAddresses,最后执行consistencyService.put;addIpAddresses调用的是updateIpAddresses方法,其action参数为UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
- updateIpAddresses方法首先从consistencyService获取datum,然后通过service.allIPs方法获取currentIPs,之后根据datum设置oldInstanceMap,对于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE类型执行删除,其余的action则将instance方法到instanceMap中
DistroConsistencyServiceImpl.put
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.distro.notifier");
return t;
}
});
@Autowired
private DistroMapper distroMapper;
@Autowired
private DataStore dataStore;
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private DataSyncer dataSyncer;
@Autowired
private Serializer serializer;
@Autowired
private ServerListManager serverListManager;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private GlobalConfig globalConfig;
private boolean initialized = false;
public volatile Notifier notifier = new Notifier();
private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
//......
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, ApplyAction.CHANGE);
}
//......
}
- DistroConsistencyServiceImpl的put方法会先执行onPut,然后执行taskDispatcher.addTask(key);onPut在判断key是ephemeralInstanceListKey时会创建一个Datum,递增其timestamp,然后放到dataStore中,最后调用notifier.addTask(key, ApplyAction.CHANGE)
Notifier.addTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(String datumKey, ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
while (true) {
try {
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
- Notifier的addTask方法对于action为ApplyAction.CHANGE的且不在services当中的会放入到services当中,最后添加到tasks;run方法会不断从tasks取出数据,执行相应的回调
TaskDispatcher.addTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java
@Component
public class TaskDispatcher {
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
@PostConstruct
public void init() {
for (int i = 0; i < cpuCoreCount; i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}
public void addTask(String key) {
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}
public class TaskScheduler implements Runnable {
private int index;
private int dataSize = 0;
private long lastDispatchTime = 0L;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public TaskScheduler(int index) {
this.index = index;
}
public void addTask(String key) {
queue.offer(key);
}
public int getIndex() {
return index;
}
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}", key);
}
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
keys.add(key);
dataSize++;
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
}
}
- TaskDispatcher的addTask方法会从taskSchedulerList获取指定的TaskScheduler,然后执行其addTask方法;TaskScheduler的addTask方法会往queue中添加数据,而run方法则不断从queue取数据,然后通过dataSyncer执行syncTask
SyncTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java
public class SyncTask {
private List<String> keys;
private int retryCount;
private long lastExecuteTime;
private String targetServer;
public List<String> getKeys() {
return keys;
}
public void setKeys(List<String> keys) {
this.keys = keys;
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
public long getLastExecuteTime() {
return lastExecuteTime;
}
public void setLastExecuteTime(long lastExecuteTime) {
this.lastExecuteTime = lastExecuteTime;
}
public String getTargetServer() {
return targetServer;
}
public void setTargetServer(String targetServer) {
this.targetServer = targetServer;
}
}
- SyncTask包含了keys、targetServer属性,其中targetServer用于告诉DataSyncer该往哪个server执行sync操作
小结
- updateInstance会通过service.allIPs().contains(instance)校验要更新的instance是否存在,不存在则抛出NacosException,存在则执行addInstance方法
- addInstance方法它会获取service,然后执行addIpAddresses,最后执行consistencyService.put;addIpAddresses调用的是updateIpAddresses方法,其action参数为UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
- updateIpAddresses方法首先从consistencyService获取datum,然后通过service.allIPs方法获取currentIPs,之后根据datum设置oldInstanceMap,对于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE类型执行删除,其余的action则将instance方法到instanceMap中