序
本文主要研究一下nacos的DistroConsistencyServiceImpl
ConsistencyService
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java
public interface ConsistencyService {
/**
* Put a data related to a key to Nacos cluster
*
* @param key key of data, this key should be globally unique
* @param value value of data
* @throws NacosException
* @see
*/
void put(String key, Record value) throws NacosException;
/**
* Remove a data from Nacos cluster
*
* @param key key of data
* @throws NacosException
*/
void remove(String key) throws NacosException;
/**
* Get a data from Nacos cluster
*
* @param key key of data
* @return data related to the key
* @throws NacosException
*/
Datum get(String key) throws NacosException;
/**
* Listen for changes of a data
*
* @param key key of data
* @param listener callback of data change
* @throws NacosException
*/
void listen(String key, RecordListener listener) throws NacosException;
/**
* Cancel listening of a data
*
* @param key key of data
* @param listener callback of data change
* @throws NacosException
*/
void unlisten(String key, RecordListener listener) throws NacosException;
/**
* Tell the status of this consistency service
*
* @return true if available
*/
boolean isAvailable();
}
- ConsistencyService定义了put、remove、get、listen、unlisten、isAvailable方法
EphemeralConsistencyService
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java
public interface EphemeralConsistencyService extends ConsistencyService {
}
- EphemeralConsistencyService接口继承了ConsistencyService接口
DistroConsistencyServiceImpl
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.distro.notifier");
return t;
}
});
@Autowired
private DistroMapper distroMapper;
@Autowired
private DataStore dataStore;
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private DataSyncer dataSyncer;
@Autowired
private Serializer serializer;
@Autowired
private ServerListManager serverListManager;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private GlobalConfig globalConfig;
private boolean initialized = false;
public volatile Notifier notifier = new Notifier();
private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
@PostConstruct
public void init() {
GlobalExecutor.submit(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
Loggers.DISTRO.error("load data failed.", e);
}
}
});
executor.submit(notifier);
}
public void load() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return;
}
// size = 1 means only myself in the list, we need at least one another server alive:
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
for (Server server : serverListManager.getHealthyServers()) {
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + server);
}
// try sync data from remote server:
if (syncAllDataFromRemote(server)) {
initialized = true;
return;
}
}
}
//......
public boolean syncAllDataFromRemote(Server server) {
try {
byte[] data = NamingProxy.getAllData(server.getKey());
processData(data);
return true;
} catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
return false;
}
}
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(data, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
// create empty service
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
// Should not happen:
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
// Update data store if listener executed successfully:
dataStore.put(entry.getKey(), entry.getValue());
}
}
}
//......
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}
@Override
public void remove(String key) throws NacosException {
onRemove(key);
listeners.remove(key);
}
@Override
public Datum get(String key) throws NacosException {
return dataStore.get(key);
}
//......
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
listeners.put(key, new CopyOnWriteArrayList<>());
}
if (listeners.get(key).contains(listener)) {
return;
}
listeners.get(key).add(listener);
}
@Override
public void unlisten(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
return;
}
for (RecordListener recordListener : listeners.get(key)) {
if (recordListener.equals(listener)) {
listeners.get(key).remove(listener);
break;
}
}
}
@Override
public boolean isAvailable() {
return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());
}
//......
}
- DistroConsistencyServiceImpl实现了EphemeralConsistencyService接口
- 其init方法会异步执行load方法,该方法会执行syncAllDataFromRemote进行初始化,该方法会通过NamingProxy.getAllData获取data,然后执行processData,它主要是执行回调然后往dataStore添加数据;init方法最后会异步执行Notifier
- 其put方法会执行onPut方法及taskDispatcher.addTask(key);其remove方法会执行onRemove方法即listeners.remove(key);其get方法直接从dataStore读取;其listen会添加RecordListener;其unlisten则会移除RecordListener;其isAvailable会通过isInitialized及ServerStatus.UP状态来判断
Notifier
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(String datumKey, ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
while (true) {
try {
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
- Notifier实现了Runnable接口,其run方法会从LinkedBlockingQueue取task,然后挨个执行listener回调
小结
- DistroConsistencyServiceImpl实现了EphemeralConsistencyService接口
- 其init方法会异步执行load方法,该方法会执行syncAllDataFromRemote进行初始化,该方法会通过NamingProxy.getAllData获取data,然后执行processData,它主要是执行回调然后往dataStore添加数据;init方法最后会异步执行Notifier
- 其put方法会执行onPut方法及taskDispatcher.addTask(key);其remove方法会执行onRemove方法即listeners.remove(key);其get方法直接从dataStore读取;其listen会添加RecordListener;其unlisten则会移除RecordListener;其isAvailable会通过isInitialized及ServerStatus.UP状态来判断