按照dubbo官网的介绍,如下
Apache Dubbo 是一款高性能、轻量级的开源 Java 服务框架
记得最开始,dubbo是把自己定位成一款高性能的rpc框架,我们现在还是按照rpc的定位进行分析,dubbo的整个对外的框架非常的简单对称美,如下
但是内部的实现非常的复杂,如下
优秀的框架都是相似的,整体对外的框架非常的简单,但是内部设计非常的复杂,将简单留给客户,将复杂封装给自己。
Make It Simple
按照上图的介绍,除了service和config层,其它的层都是spi,所谓spi就是支持客户自定义的替换,等于说整个duubo可以认为是个戴高乐积木,而每一层的实现都是可以替换成用户自己的技术栈。这样也方便各个公司在引入dubbo的时候进行定制化的改造。
整个调用链从上往下分为十层,下面依次简单的介绍
1 config层,如下图,dubbo提供了对模块的配置能力,最重要的是ServiceConfig(provide端)与ReferenceConfig(consumer端)的配置能力,当然还有MonitorConfig(监控中心的配置),ApplicationConfig(全局的应用配置),RegistryConfig(注册中心的配置),ProtocolConfig(协议配置)等,dubbo中提供的配置类图依赖如下
2 proxy代理层,主要是为了服务接口的透明代理,对外提供方便和透明的引用,生成服务的客户端的stub和服务端的Skeleton,dubbo中提供的类图依赖如下
3 registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService,dubbo支持多注册中心,dubbo中针对RegistryFactory的类依赖图如下
4 cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance,针对Cluster的实现类图如下
默认为FailOverCluster(失败重试),当然我们可以配置自己的策略
5monitor 监控层 略
6 protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter,我们代码的大部分都在这一层进行分析,Protocol的类图依赖关系如下
7 exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer,主要是对transport层的request和response的封装。
8 transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
9 serialize 数据序列化层
针对上面的类图的依赖,我们可以发现依赖图大多都是矮胖的,等于说dubbo给我们默认的实现了很多的策略,我们只需要按需取用即可(策略模式)
整个源码包的模块如下
。
dubbo主要是服务的暴露和发现调用,整个服务的暴露时序图如下
而服务的发现引用调用时序图如下
DDD
在 Dubbo 的核心领域模型中:
- Protocol 是服务域,它是 Invoker 暴露和引用的主功能入口,它负责 Invoker 的生命周期管理。可以认为Protocol实现了对Invoke的封装
- Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
- Invocation 是会话域,它持有调用过程中的变量,比如方法名,参数等。
我们的代码的分析也主要集中在Protocol和Invoker上面。
当我们希望将某个service暴露成dubbo接口的时候,只需要使用dubbo的注解@service即可,然后该service会被注册到Spring里面成为一个bean,同时也会生成一个ServiceBean,其refer这个bean,所以我们的分析也重点在ServiceBean,ServiceBean的继承关系如下
由于ServiceBean实现了InitializingBean,所以在其属性填充完毕之后,执行afterPropertiesSet(),在afterPropertiesSet主要是对各种配置进行检查填充和校验。
而又由于ServiceBean实现了ApplicationListener<ContextRefreshedEvent>,所以在监听到ContextRefreshedEvent之后,如下
Class ServiceBean
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();//暴露和注册服务
}
}
可以看到export方法是其最核心的方法。我们接下来对export方法进行进一步的分析。
export是在ServiceBean的父类ServiceConfig里面实现的,如下
Class ServiceConfig
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
//是否需要延迟暴露,如果需要的话使用线程池异步线程实现暴露
//如果在启动的时候比较慢,可以设置延迟暴露的方式
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
protected synchronized void doExport() {
****
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStub(interfaceClass);
checkMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
//核心
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
在doExport方法里面又做了很多的校验,而最重要的方法就是doExportUrls。
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
如上我们可以看到,dubbo是支持多注册中心和多protocol的,针对每一个注册地址,每一个protocol,都会调用一次 doExportUrlsFor1Protocol(protocolConfig, registryURLs)。
一般来来说,不是非常特殊的场景,我们一般都是单注册中心(默认是zk),单Protocol(dubbo,注意这里是rpc协议),如下如,我的电脑就是配置的单注册中心,单rpc协议(dubbo)
而接下来的doExportUrlsFor1Protocol方法写的比较混乱,代码比较长,我们一行行看一下。
Class ServiceConfig
doExportUrlsFor1Protocol 分段分析如下
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application); //收集application配置信息
appendParameters(map, module); //收集module配置信息
appendParameters(map, provider, Constants.DEFAULT_KEY); //收集provider配置信息
appendParameters(map, protocolConfig); //收集protocolConfig配置信息
appendParameters(map, this); //收集ServiceConfig配置信息
可以看到依次将 application --->module--->provider--->protocolConfig--->ServiceConfig
的信息收集到map里面来,如果存在重复的配置信息,后面的配置会覆盖前面的配置,所以针对一些全局的缺省配置我们可以配置在前面,而一些很细节的配置,我们配置在后面即可。
在ServiceConfig里面可以针对具体的method进行进一步的配置,如使用注解配置,样例如下
@Service(methods = [@Method(name = "orderCancel",retries = 2)])
在进行服务暴露的过程中,有一段代码如下
exportLocal(url);
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
其中
protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
会将ref就是真实处理的bean,stub interfaceClass 和local url进行封装成一个Invoker就行暴露,而在dubbo中最重要对象就是这个Invoker。
而在暴露Invoker的时候,首先要拿到proxyFactory,其中proxyFactory的初始化语句如下
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
这个proxyFactory是动态生成的(也是真牛)手动的生成java文件,然后编译加载,这种使用代码的方式来动态生成代码的方式更加的灵活,在我的电脑上,动态生成的这个proxyFactory的代码如下
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0, boolean arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
从上面的代码我们可以认为 ProxyFactory$Adaptive就是对JavassistProxyFactory的简单的代理。
而在拿到Invoke之后,使用protocol对其生命周期进行管理,我们使用同样的方法,看protocol动态生成的代码如下
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
有上面的代码我们可以知道,最终会调用InjvmProtocol的export方法,分析到最后我们发现其实这个方法将该service在当前的jvm中暴露出来。
最重要的是接下的在注册中心的暴露,源码如下
Class ServiceConfig
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
在我 的本机上registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())的结果是
registry://***:2181/com.alibaba.dubbo.registry.RegistryService?application=contract-logistics-wms&backup=****:2181,***:2181&dubbo=2.0.2&export=dubbo%3A%2F%2F10.10.131.127%3A20880%2Fcom.**.wms.dubbo.service.OrderCancelService%3Fanyhost%3Dtrue%26application%3Dcontract-logistics-wms%26bean.name%3DServiceBean%3Acom.zto.wms.dubbo.service.OrderCancelService%26bind.ip%3D10.10.131.127%26bind.port%3D20880%26default.delay%3D-1%26default.retries%3D2%26default.service.filter%3DCatTransaction%26delay%3D-1%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.zto.wms.dubbo.service.OrderCancelService%26logger%3Dslf4j%26methods%3DorderCancel%26orderCancel.retries%3D2%26orderCancel.return%3Dtrue%26pid%3D24140%26side%3Dprovider%26timestamp%3D1618222873762&logger=slf4j&pid=24140®ister=true®istry=zookeeper&subscribe=true×tamp=1618222873751
最终拿到的invoker如下图
第二句
DelegateProviderMetaDataInvoker(invoker, this);
其实就是将当前的serviceConfig最为metadata跟invoke一起封装起来(典型的装饰模式)
第三句
Exporter<?> exporter = protocol.export(wrapperInvoker)
根据前面的分析,最终会调到RegistryProtocol的export方法,其代码如下
Class RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
在该方法中主要分两步
第一步 将服务在本地暴露出来(如果不暴露出来,别人无法调用)
第二步 将服务注册到zk(如果不注册,别人不知道有这个服务)
先看第一步
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//这一步是关键,由于invokerDelegete.getUrl.getProtol = "dubbo"
//所以最终的调用了DubboProtocol的export方法
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
我们跟到DubboProtocol的export的方法里面去,代码如下
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//最重要的是这句
openServer(url);
optimizeSerialization(url);
return exporter;
}
我们跟到openServer(url)里面去
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
首先看当前ip:port下是否开启了本地服务,如果没有那么调用createServer,
在dubbo中默认使用的是netty server,代码如下
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//这句绑定url(包含ip和port)和请求处理器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Class Exchangers
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
Class HeaderExchanger
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
而getTransporter()最终会返回netty4 的NettyTransporter
如下
Class NettyTransporter
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
Class NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
Class AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
//这句最重要
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
如果你能看到这里,真的很佩服你,终于看到了我们熟悉的netty代码,后面我们会专门的针对netty做介绍(现在我还了解的不深入)
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
通过如上的操作之后,在本地起了个netty服务,理论上我们现在就可以接受rpc请求了,但是我们还需要回到开头,将服务注册到zk上面去,回到开头。
@Class RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker 前面讲的一大堆就是讲的这个
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//找到注册的zk地址
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
最终在ZookeeperRegistry调用doRegister进行注册
代码如下
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
经过上面的操作之后,最终会在
/dubbo/com.***.wms.dubbo.service.OrderCancelService/providers 永久节点下注册一个临时节点信息,巧妙的利用了zk的永久节点和临时节点的特征,可以动态的增减注册信息。
然后在此path下注册监听器防止暴露的url被重写(这个逻辑可以先忽略,我也没搞清楚)
最终返回一个DestroyableExporter,可以在返回的时候取消所有注册信息。
如上就是一个dubbo service暴露的全过程,后面我们接着介绍一个service unexport的全过程。