-
先看官网两张图【引用来自官网】:
暴露服务的入口自然就清楚了
官网说明:
- 1.首先 ServiceConfig 类拿到对外提供服务的实际类 ref(如:HelloWorldImpl),然后通过 ProxyFactory 类的 getInvoker 方法使用 ref 生成一个 AbstractProxyInvoker 实例,到这一步就完成具体服务到 Invoker 的转化。接下来就是 Invoker 转换到 Exporter 的过程。
- 2.Dubbo 处理服务暴露的关键就在 Invoker 转换到 Exporter 的过程,上图中的红色部分。下面我们以 Dubbo 和 RMI 这两种典型协议的实现来进行说明:
一.概览
- 1.暴露主要入口在ServiceBean,该bean实现了spring相关的接口,主要关注下InitializingBean和ApplicationListener,那也就说明在容器启动初始化完成之后会收到容器发送的监听通知,进而执行监听方法,也就是在这个监听方法里实现了服务的暴露。
- 2.拿到容器的bean后,也就是ref指向的那个配置bean或者@Sevice配置的那个bean,反正就是给spring托管的bean;进而将其封装成invoker;
- 3.拿到invoker之后,就将其转成Exporter,这里当然就要缓存起来,缓存的key就是invoker的url了,至于什么是url,后面就清楚了
- 4.拿到Exporter后就启动Server服务,开启端口,请求来到时,根据请求信息生成key,到缓存查找Exporter,找到Invoker完成调用。
- 5.这里会很奇怪,此处没有注册中心,简单点认知:其实注册中心也就管理下地址并进行变更时通知而已;脱离了注册中心,Exporter照样暴露服务开启端口等待调用。
二.容器初始化
当容器初始化OK,ContextRefreshEvent触发监听事件,ServiceBean执行onApplicationEvent事 件方法,进而进行export。
此处其实利用了上篇讲的SPI扩展,ServiceConfig初始化时,首先会先初始化静态变量protocol和proxyFactory,这两个变量的初始化就是通过dubbo的spi扩展机制得到的,因此此处可以提前跟踪下代码就理解了,此处不再做笔录。
protocol扩展出来的类形态是这样的
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
//根据URL配置信息获取Protocol协议,默认是dubbo
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
//根据协议名,获取Protocol的实现
//获得Protocol的实现过程中,会对Protocol先进行依赖注入,然后进行Wrapper包装,最后返回被修改过的Protocol
//包装经过了ProtocolFilterWrapper,ProtocolListenerWrapper,RegistryProtocol
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
}
-
proxyFactory扩展出来的类形态是这样的:
package com.alibaba.dubbo.rpc; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory { public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); } public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); } }
看下具体扩展的是哪个类吧
/** * ProxyFactory. (API/SPI, Singleton, ThreadSafe) */ @SPI("javassist") public interface ProxyFactory { /** * create proxy. * * @param invoker * @return proxy */ @Adaptive({Constants.PROXY_KEY}) <T> T getProxy(Invoker<T> invoker) throws RpcException; /** * create invoker. * * @param <T> * @param proxy * @param type * @param url * @return invoker */ @Adaptive({Constants.PROXY_KEY}) <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException; }
默认的处理配置很明显是javassist,后面debug就可以方便打断点了。
同时跟着上面生成的类描述,后面用到的时候跟这这样的代码执行逻辑就比较好跟踪,不然执行debug有点蒙。
二.服务暴露
1.暴露原理
- 处理过程大概分为以下几个主要点,官网图只是个大致流程:
- 1.反复检查及准备相关环境配置
- 2.加载相关的注册中心,这里充当注册中心的就多了
- 3.暴露本地服务
- 4.暴露远程服务(此处启动netty,打开端口)
- 5.连接注册中心并注册
- 6.监听注册中心
按照上面的步骤详细笔录一下
- ServiceBean监听事件触发时执行了下面方法:
/***
* 就比如监听spring容器初始化完成
* 没有设置延迟或者延迟为-1,dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent事件的时候,通知实现了ApplicationListener的类进行回调onApplicationEvent,dubbo会在这个方法中发布服务。
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
显然有一个延迟配置选项,延迟是否配置也会影响这里的入口,没有延迟时的入口就是这个监听,设置了延迟时入口就是afterPropertySet();其实最后的重点都是export();
-
2.export()
/** * 暴露服务 */ public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }
这个方法没啥,直接看最后一行doExport好了。
/** * 先执行一系列的检查方法,然后调用doExportUrls方法。 * 检查方法会检测dubbo的配置是否在Spring配置文件中声明,没有的话读取properties文件初始化。 */ protected synchronized void doExport() { ... checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // 重点 doExportUrls(); ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
很显然这个方法前面省略的那一大坨就是检查环境配置及准备环境配置了,重点依然在最后doExportUrls();
-
doExportUrls()
@SuppressWarnings({"unchecked", "rawtypes"}) private void doExportUrls() { /** 获取所有的注册中心url */ List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
这里有个点很有意思:就是这里会去加载所有的注册中心,为啥会这么做呢,看看我们的配置:
因为我们显示指明了registry这一配置,跟下代码如何处理:
```
protected List<URL> loadRegistries(boolean provider) {
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && !registries.isEmpty()) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
}
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
```
直接进来就是这样的效果,很显然对于在配置中指定了registry属性的,会在加载spring BeanDefinition的时候就加载了注册中心。
返回的注册中心URL是这样的:
后面重点来了:doExportUrlsFor1Protocol 这里就区分协议进行服务的暴露了
-
doExportUrlsFor1Protocol
下面代码有点长,还是直接copy出来,因为有些注释直接看的比较方便/** * 服务发布 -- 远程暴露 & 本地暴露 * 本地暴露是暴露在JVM中,不需要网络通信. * 远程暴露是将ip,端口等信息暴露给远程客户端,调用时需要网络通信. * * 根据不同的协议将服务以URL形式暴露。如果scope配置为none则不暴露,如果服务未配置成remote,则本地暴露exportLocal,如果未配置成local,则注册服务registryProcotol。 */ private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { ... // don't export when none is configured if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { /** 根据配置选择暴露服务方式 */ // export to local if the config is not remote (export to remote only when config is remote) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { /** 本地暴露 **/ exportLocal(url); } /** 远程暴露 */ // export to remote if the config is not local (export to local only when config is local) if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } } this.urls.add(url); }
既然是RPC,远程暴露必然无疑,那为啥会有本地暴露? 其实解释很简单,自己调用自己难道也要走网不成,一个JVM的速度跟处理比网络肯定要好吧。
前面一大段都是都是根据配置为服务最终暴露做准备,不多说,后面没省略的才是重点代码直接进入本地暴露和远程暴露
2.本地暴露
-
exportLocal(URL url):
@SuppressWarnings({"unchecked", "rawtypes"}) private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST) .setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); /** // 1.首先还是先获得Invoker // 2.然后导出成Exporter,并缓存 // 3.这里的proxyFactory实际是JavassistProxyFactory **/ Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } }
debug到这个url,它的值是这样的:
跟注释中讲的一样,做了这几件事:
- 1.获得Invoker
- 2.导出成Exporter,并缓存
这里的proxyFactory实际是JavassistProxyFactory前面已经讲过为啥了。那就看看getInvoker做了什么?
/** * JavaassistRpcProxyFactory * * 让用户像以本地调用方式调用远程服务,就必须使用代理,然后说到动态代理,一般我们就想到两种,一种是JDK的动态代理,一种是CGLIB的动态代理,那我们看看两者有什么特点. * 1.JDK的动态代理代理的对象必须要实现一个接口,而针对于没有接口的类,则可用CGLIB. * 2.CGLIB其原理也很简单,对指定的目标类生成一个子类,并覆盖其中方法实现增强,但由于采用的是继承,所以不能对final修饰的类进行代理. * 3.除了以上两种大家都很熟悉的方式外,其实还有一种方式,就是javassist生成字节码来实现代理 */ public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' Wrapper类不能处理带$的类名 /** * 1.首先对实现类做一个包装,生成一个包装后的类。 * 2.然后新创建一个Invoker实例,这个Invoker中包含着生成的Wrapper类,Wrapper类中有具体的实现类 */ final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { /** * 返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod * 关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用 */ @Override protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
注释很清楚,返回一个包装类的Invoker,里面包装的是具体的实现类,执行逻辑当然也是通过包装调用到具体逻辑了。
获取到invoker就执行生成Expoter了。
这里稍微有点复杂,export链是这样的:ProtocolListenerWrapper-->ProtocolFilterWrapper-->InjvmProtocol
中间过程大致为相关的过滤操作,看到最后的一个InjvmExporter
/** * InjvmExporter */ class InjvmExporter<T> extends AbstractExporter<T> { private final String key; private final Map<String, Exporter<?>> exporterMap; /** * 利用exporterMap缓存了exporter, */ InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker); this.key = key; this.exporterMap = exporterMap; exporterMap.put(key, this); } @Override public void unexport() { super.unexport(); exporterMap.remove(key); } }
生成了Exporter并缓存,用下面的debug结束本地暴露。
3.远程暴露
-
1.获取Invoker过程省略,跟本地暴露基本一致。依然用的JavassistProxyFactory,这里也贴出来JdkProxyFactory,做个比较。
public class JdkProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { Method method = proxy.getClass().getMethod(methodName, parameterTypes); return method.invoke(proxy, arguments); } }; } }
-
2.export过程
可以debug看到ProtocolListenerWrapper跟ProtocolFilterWrapper对于Registry类型的Invoker不做任何处理,直接调用具体协议进行处理。
-
3.进入RegistryProtocol
/** * 暴露服务 */ @Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // export invoker // 这里就交给了具体的协议去暴露服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //registry provider --> /** 注册开始 链接註冊中心 eg: zookeeper */ //registry provider //根据invoker中的url获取Registry实例 //并且连接到注册中心 //此时提供者作为消费者引用注册中心核心服务RegistryService final Registry registry = getRegistry(originInvoker); /** 注册到注册中心的URL */ final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registedProviderUrl.getParameter("register", true); /** 将originInvoker加入本地缓存 */ ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if (register) { //调用远端注册中心的register方法进行服务注册 //若有消费者订阅此服务,则推送消息让消费者引用此服务。 //注册中心缓存了所有提供者注册的服务以供消费者发现。 register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME 提供者订阅时,会影响同一JVM即暴露服务又引用统一服务的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖 // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. /** 一旦registedProviderUrl有变化,就重新组装返回URL,否则返回原来值 */ final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); /** 构造服务变化通知Listener */ final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); /** 将Listener缓存 */ overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); /** 为更新本地的zookeeper信息缓存文件,而发起的订阅请求 //提供者向注册中心订阅所有注册服务的覆盖配置*/ registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 保证每次export都返回一个新的export实例 // 返回暴露后的Exporter给上层ServiceConfig进行缓存,便于后期撤销暴露。 //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
看注释很明显做了这几件事:
- 1.交给具体的协议去暴露服务,也就是开启服务,打开端口,等待请求了
- 2.获取注册中心地址URl,并连接到注册中心
- 3.注册自己到注册中心
- 4.获取配置覆盖地址,设置监听器进行覆盖监听
- 5.订阅覆盖配置
- 6.返回暴露的Exporter
-
3.1 交给具体协议进行服务暴露:doLocalExport(originInvoker)
继续跟踪@SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { //得到一个Invoker代理,里面包含原来的Invoker final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //此处protocol还是最上面生成的代码,调用代码中的export方法,会根据协议名选择调用具体的实现类 //这里我们需要调用DubboProtocol的export方法 //这里的使用具体协议进行导出的invoker是个代理invoker //导出完之后,返回一个新的ExporterChangeableWrapper实例 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
先把生成的invoker包装成InvokerDelegete代理,交由扩展的protocol去暴露
看暴露逻辑
很明显这里要经过invoker链的构造。构造完之后进入到具体协议的暴露了。这里有个点需要说明下:最ServiceConfig中Export进来的时候是invoker是register型,所以这里不做处理,在这里会将具体的dubbo url提取出来再经过链路构造暴露,所以这里会进行相关处理了,就是下面这行代码。final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
进入具体协议暴露:DubboProtocol
-
3.2 具体协议暴露服务
/** * 暴露服务 */ @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. // key由serviceName,port,version,group组成 // 当nio客户端发起远程调用时,nio服务端通过此key来决定调用哪个Exporter,也就是执行的Invoker。 // dubbo.common.hello.service.HelloService:20880 String key = serviceKey(url); //将Invoker转换成Exporter //直接new一个新实例 //没做啥处理,就是做一些赋值操作 //这里的exporter就包含了invoker DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 缓存要暴露的服务,key是上面生成的 exporterMap.put(key, exporter); //export an stub service for dispatching event //是否支持本地存根 //远程服务后,客户端通常只剩下接口,而实现全在服务器端, //但提供方有些时候想在客户端也执行部分逻辑,比如:做ThreadLocal缓存, //提前验证参数,调用失败后伪造容错数据等等,此时就需要在API中带上Stub, //客户端生成Proxy实例,会把Proxy通过构造函数传给Stub, //然后把Stub暴露组给用户,Stub可以决定要不要去调Proxy。 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } /** 打开socket,供调用方调用 -> 根据URL绑定IP与端口,建立NIO框架的Server*/ openServer(url); optimizeSerialization(url); /** 远程服务结束 */ return exporter; }
注释已经解释了做了哪些事情,很清楚。存根的用处借用官网一张图及配置实现说明下:
具体配置:
提供实现:
package com.foo;
public class BarServiceStub implements BarService {
private final BarService barService;
// 构造函数传入真正的远程代理对象
public (BarService barService) {
this.barService = barService;
}
public String sayHello(String name) {
// 此代码在客户端执行, 你可以在客户端做ThreadLocal本地缓存,或预先验证参数是否合法,等等
try {
return barService.sayHello(name);
} catch (Exception e) {
// 你可以容错,可以做任何AOP拦截事项
return "容错数据";
}
}
}
还是回到暴露的主体:
- 1.获得Exporter并缓存
- 2.打开socket,供调用方调用,即:根据URL绑定IP与端口,建立NIO框架的Server
看第二点:
/** 创建NIO Server进行监听 */
private void openServer(URL url) {
// find server.
// key是IP:PORT
// 192.168.110.197:20880
String key = url.getAddress();
// client 也可以暴露一个只有server可以调用的服务
// client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
//同一JVM中,同协议的服务,共享同一个Server,
//第一个暴露服务的时候创建server,
//以后相同协议的服务都使用同一个server
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server支持reset,配合override功能使用
// server supports reset, use together with override
//同协议的服务后来暴露服务的则使用第一次创建的同一Server
//server支持reset,配合override功能使用
//accept、idleTimeout、threads、heartbeat参数的变化会引起Server的属性发生变化
//这时需要重新设置Server
server.reset(url);
}
}
}
划重点:createServer(URL url)
private ExchangeServer createServer(URL url) {
// 默认开启server关闭时发送readonly事件
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// 开启默认的heartbeat
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//Exchangers是门面类,里面封装的是Exchanger的逻辑。
//Exchanger默认只有一个实现HeaderExchanger.
//Exchanger负责数据交换和网络通信。
//从Protocol进入Exchanger,标志着程序进入了remote层。
//这里requestHandler是ExchangeHandlerAdapter
// 封装信息转换,Dubbo的Exchanger层
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
/**
* 封装请求响应模式,同步转异步
* getExchanger方法根据url获取到一个默认的实现HeaderExchanger
* 调用HeaderExchanger的bind方法
*/
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//getExchanger方法根据url获取到一个默认的实现HeaderExchanger
//调用HeaderExchanger的bind方法
return getExchanger(url).bind(url, handler);
}
最后进入到这里
/**
* doOpen方法创建Netty的Server端并打开,具体的事情就交给Netty去处理了
*/
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
//boss线程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
//worker线程池
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
//ChannelFactory,没有指定工作者线程数量,就使用cpu+1
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
到这里服务暴露逻辑就完成了,完了就可以接受请求并进行处理了。下面是注册到注册中心了
-
3.3 注册
回到RegistryProtocol的export方法
贴代码
@Override public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { /** 向服务器发起注册请求 调用子类具体实现,发送注册请求*/ // Sending a registration request to the server side doRegister(url); } catch (Exception e) { Throwable t = e; /** 如果开启了启动时检测,则直接抛出异常 */ // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } /** 记录失败列表,定时重试 */ // Record a failed registration request to a failed list, retry regularly, failedRegistered.add(url); } }
第一行继续调用父类AbstractRegistry的构造方法,不做复杂事,就缓存
完了之后继续回到钩子方法
具体就到具体的注册中心了,比如ZookeeperRegistry,MulticastRegistry等。这里常用的是ZK,那就拿ZK举例:@Override protected void doRegister(URL url) { try { /** 注册即创建节点 */ /** toUrlPath: /dubbo/dubbo.common.hello.service.HelloService/providers/dubbo%3A%2F%2F192.168.1.100%3A20880%2F dubbo.common.hello.service.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-provider%26 application.version%3D1.0%26dubbo%3D2.5.3%26environment%3Dproduct%26interface%3D dubbo.common.hello.service.HelloService%26methods%3DsayHello%26 organization%3Dchina%26owner%3Dcheng.xi%26pid%3D8920%26side%3Dprovider%26timestamp%3D1489828029449 默认创建的节点是临时节点*/ zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
看最后一行,注册就是创建节点
/** 创建节点 */ /** * 在分布式系统中,我们常常需要知道某个机器是否可用,传统的开发中,可以通过Ping某个主机来实现,Ping得通说明对方是可用的,相反是不可用的; * ZK 中我们让所有的机其都注册一个临时节点,我们判断一个机器是否可用,我们只需要判断这个节点在ZK中是否存在就可以了,不需要直接去连接需要检查的机器,降低系统的复杂度 * */ @Override public void create(String path, boolean ephemeral) { if (!ephemeral) { if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
zk节点路径看上面代码就很清晰了,以全路径不断往下创建,到最后具体时就创建临时节点。
@Override public void createEphemeral(String path) { try { client.createEphemeral(path); } catch (ZkNodeExistsException e) { } }
catch里面的空逻辑解决的就是防止多个provider竞争创建相同的父级持久节点。
这样将自己注册到注册中心就完事了 -
3.4 监听
provider在注册到注册中心之后,registry会去订阅覆盖配置的服务,之后就会在/dubbo/xxx.xxx.service/XxxService节点下多一个configurators节点
这一步也是缓存注册中心信息到本地的一个重要步骤,这就是为啥注册中心挂了,provider与consumer还能通信的原理所在了。看代码
进入父类:FailbackRegistry
继续到父类缓存处理映射关系
回到FailbackRegistey中的钩子方法
核心子类处理,依然以ZK为例@Override protected void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } //将zkClient的事件IZkChildListener转换到registry事件NotifyListener ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { @Override public void childChanged(String parentPath, List<String> currentChilds) { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List<String> services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } } else { List<URL> urls = new ArrayList<URL>(); // 这里的path分别为providers,routers,configurators三种 /** 分别对providers,routers,configurators三种不同类型的进行订阅,也就是往zookeeper中注册节点,注册之前先给url添加监听器。最后是订阅完之后进行通知 */ for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { /** * 这里设置了监听回调的地址,即回调给FailbackRegistry中的notify * 当关注的路径的下增减节点,就会触发回调,然后通过notify方法,进行业务数据的变更逻辑 */ @Override public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } /** 创建持久节点 */ //创建三个节点 // /dubbo/.../providers/ // /dubbo/.../configurators/ // /dubbo/.../routers/ //上面三个路径会被消费者端监听,当提供者,配置,路由发生变化之后, //注册中心会通知消费者刷新本地缓存。 zkClient.create(path, false); /** 开始对该节点设置监听 */ List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } /** 下面要开始更新新的服务信息,服务启动和节点更新回调(前面设置了回调到这里)都会调用到这里 */ notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
划重点
这里初始化了ChildListener,并且当关注的路径的下增减节点,就会触发回调,然后通过notify方法,进行业务数据的变更逻辑,这里notify里的参数linstener就是下图圈定的linstener
启动时,触发一下监听事件,用以更新回调
经过一系列父类方法触发之后,核心逻辑调用到AbstractRegistry
/** 开始更新本地缓存文件的信息 */ protected void notify(URL url, NotifyListener listener, List<URL> urls) { if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((urls == null || urls.isEmpty()) && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<String, List<URL>>(); // 获取catagory列表,providers,routers,configurators for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { // 不同类型的数据分开通知,providers,consumers,routers,overrides // 允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } // 已经通知过 Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } //对这里得到的providers,configurators,routers分别进行通知 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); /** 更新本地缓存文件,用以保证procider 与 consumer 的通信 */ saveProperties(url); //上面获取到的监听器进行通知 --> 到RegistryDirectory中查看notify方法 /** * 对于消费者来说这里listener是RegistryDirectory * 而对于服务提供者来说这里是OverrideListener,是RegistryProtocol的内部类 */ listener.notify(categoryList); } }
看圈定的重点
- 1.更新本地配置,解决了注册中心挂了也能通信的问题,不多做解释。
- 2.触发监听器的监听事件,这一步在消费端做详细解释
- 整个服务暴露过程结束。