前面的章节里面对zk的介绍很少,这边会介绍serviceBean在export的过程中,到底向zk写了什么,以及订阅了什么。zk的功能无非是信息的存储和变更通知。
我们还是从ServiceBean的export方法进行跟踪。会一直的跟踪到ServiceConfig类的doExportUrls方法。如下
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
由于我们只配置了zk一个注册中心,registryURLs返回如下
在ServiceConfig的doExportUrlsFor1Protocol方法的三句非常重要
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker)
前两句就是将此ref封装成Invoker,只有将ref封装成Invoker之后,才会将此invoker的信息写入到zk里面去。
由于Invoker里面封装的url的protocol为registry,不难猜出。
Exporter<?> exporter = protocol.export(wrapperInvoker);
会调到RegistryProtocol类的export方法。那么核心的方法都封装在RegistryProtocol类的export方法,如下
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 在本地暴露的意思是,在本地启动一个nettyService进行暴露
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//拿到zk的地址
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider 这个就是注册provider
//根据originInvoker的url拿到Registry ZookeeperRegistry(其实就是连接zk service
的客户端,里面封装了各种的信息)
final Registry registry = getRegistry(originInvoker);
//服务端暴露的dubbo服务的本地信息
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
//这里是核心,去注册
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
其实上面的这个方法,就两个目的
1 在本地进行暴露,启动nettySevice,这个在前面已经讲过了,这里带过
2 在zk上面进行注册,这样消费者就能看到自己。
我们看下 register(registryUrl, registeredProviderUrl)方法
public void register(URL registryUrl, URL registedProviderUrl) {
//这个就是前面说的ZookeeperRegistry
Registry registry = registryFactory.getRegistry(registryUrl);
//registedProviderUrl,这个就是本地dubbo服务的url信息。
registry.register(registedProviderUrl);
}
而 registry.register(registedProviderUrl);最终会进入到ZookeeperRegistry的doRegister方法,如下
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
而toUrlPath(url)返回值为,如下
可以看到,我们再
/dubbo/com.ilsp.order.service.ContractService/providers节点下添加了此dubbo服务的本地信息。
经过这步之后,那消费者通过查询/dubbo/com.ilsp.order.service.ContractService/providers节点就知道有哪些消费者进行注册了。
而在RegistryProtocol类的export方法里面还有一句重要的语句
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
overrideSubscribeUrl路径添加监听器overrideSubscribeListener进行监听。
而这个方法最终也会进入ZookeeperRegistry的doSubscribe方法,源码如下
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
//path就是/dubbo/com.ilsp.order.service.ContractService/configurators
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
//创建/dubbo/com.ilsp.order.service.ContractService/configurators的zk path
zkClient.create(path, false);
//对该path添加监听器
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//回调监听器
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
而这个notify(url, listener, urls)会回调到RegistryProtocol的内部私有类OverrideListener。
如果configurators的配置发生了修改,同时也会修改本地暴露的dubbo服务的信息。
如果我们打开dubbo控制台,对于dubbo服务来说,只关心如下的两项。
ok 下一篇我们会看下 当ReferenceBean在refer的时候,到底想zk写了什么,订阅了什么