服务引用有两种:
1、直连方式引用
2、基于注册中心引用
引用服务时机
1、 ReferenceBean 的 afterPropertiesSet 方法时引用服务
org.apache.dubbo.config.spring.ReferenceBean#afterPropertiesSet
由于ReferenceBean实现了InitializingBean,这里是spring bean生命周期中,调用afterPropertiesSet执行初始化方法的一个切入点。
调用shouldInit方法查看init参数是否为true,如果为true将会在这里初始化。默认是false
2、 ReferenceBean 对应的服务提供者类被注入到服务消费者类中的引用(默认情况)
org.apache.dubbo.config.ReferenceConfig#get
其主要逻辑是在init方法,经历获取服务配置、Invoker 创建、代理类创建等步骤。
init
org.apache.dubbo.config.ReferenceConfig#init
1、检查本地存根checkStubAndLocal
2、检查Mock checkMock
3、添加 side、协议版本信息、时间戳和进程号等信息到 map 中,如果不是泛化服务将获取版本,获取接口方法列表,并添加到 map 中,将 ApplicationConfig、ConsumerConfig、ReferenceConfig 等对象的字段信息添加到 map 中
4、methods参数配置处理,
5、获取服务消费者 ip 地址、存储 attributes 到系统上下文中
6、调用createProxy创建代理类ref
引用服务
org.apache.dubbo.config.ReferenceConfig#createProxy
1、本地JVM引用
2、url不为空,则是点对点的服务地址。@Reference中指定了url属性
3、如果是注册中心地址,则在url中添加一个refer参数。
首先获取注册url。然后把引入的map数据属性放到refer后与注册url拼接,并添加到urls中。
如果是服务地址,有可能url中配置了参数,map中表示的服务消费者消费服务时的参数,这里所以需要合并mergeUrl
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-consumer-application&dubbo=2.0.2&pid=9484&refer=application%3Ddubbo-demo-consumer-application%26dubbo%3D2.0.2%26group%3Dg1%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D9484%26register.ip%3D192.168.1.2%26release%3D2.7.0%26revision%3D1.1.1%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1596342012598%26version%3D1.1.1®istry=zookeeper&release=2.7.0×tamp=1596342266342
4、@Reference中的protocol属性表示使用哪个协议调用服务,如果不是本地调用协议,则把注册中心地址找出来
如果只有一个注册服务url,则直接调用Protocol的自适应类,调用ref引用方法,这里会调用到RegistryProtocol、DubboProtocol
5、如果有多个注册服务url,则会进行遍历urls。生成的执行体invoker添加到invokers集合中。并对最后一个registryURL,进行Cluster包裹
6、得到执行体invoker,并通过ProxyFactory生成代理类
生成的invoker实例,就是为了调用。这里最终的包裹
MockClustInvoker包裹服务字典RegistryDirectory和集群容错Cluster的FailoverClusterInvoker,FailoverClusterInvoker也包裹了服务字典RegistryDirectory
所以说,在根据PROXY_FACTORY.getProxy获取invoker的代理类之前,生成invoker才是代理类的核心。
动态服务目录
RegistryProtocol的包裹类ProtocolFilterWrapper和ProtocolListenerWrapper(最外包裹)就不看了,也是利用SPI
1、refer
org.apache.dubbo.registry.integration.RegistryProtocol#refer
①、把url地址由registry://开头转换成 zookeeper://开头
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-consumer-application&dubbo=2.0.2&pid=10732&refer=application%3Ddubbo-demo-consumer-application%26dubbo%3D2.0.2%26group%3Dg1%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D10732%26register.ip%3D192.168.1.2%26release%3D2.7.0%26revision%3D1.1.1%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1596345426916%26version%3D1.1.1&release=2.7.0×tamp=1596345429789
②、从url获取注册类registry
③、 qs表示 查询字符, 表示url中的参数,表示消费者引入服务所配置的参数。如果有group有多个值,则这里的cluster为MergeableCluster
④、doRefer方法调用
2、doRefer
org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
①、创建动态服务目录动态RegistryDirectory,并设置registry、protocol值。RegistryDirectory实现了 NotifyListener 接口。当注册中心服务配置发生变化后,RegistryDirectory 可收到与当前服务相关的变化。收到变更通知后,RegistryDirectory 可根据配置变更信息刷新 Invoker 列表。在服务消费端最核心的就是这个服务目录
②、获取订阅subscribeUrl、并注册到zookeeper
consumer://192.168.1.2/org.apache.dubbo.demo.DemoService?application=dubbo-demo-consumer-application&dubbo=2.0.2&group=g1&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=10732&release=2.7.0&revision=1.1.1&side=consumer&sticky=false×tamp=1596345426916&version=1.1.1
a、register
org.apache.dubbo.registry.support.FailbackRegistry#register
b、doRegister
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
注册路径是:
/dubbo/org.apache.dubbo.demo.DemoService/consumers/consumer%3A%2F%2F192.168.1.2%2Forg.apache.dubbo.demo.DemoService%3Fapplication%3Ddubbo-demo-consumer-application%26category%3Dconsumers%26check%3Dfalse%26dubbo%3D2.0.2%26group%3Dg1%26interface%3Dorg.apache.dubbo.demo.DemoService%26lazy%3Dfalse%26methods%3DsayHello%26pid%3D10732%26release%3D2.7.0%26revision%3D1.1.1%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1596345426916%26version%3D1.1.1
③、构建路由链
④、服务目录需要订阅的几个路径
⑤、cluster包裹服务目录
构建路由链
路由链是动态服务目录的一个属性值,在引入服务时可以根据路由条件进行过滤
org.apache.dubbo.registry.integration.RegistryDirectory#buildRouterChain
org.apache.dubbo.rpc.cluster.RouterChain#buildChain
org.apache.dubbo.rpc.cluster.RouterChain#RouterChain
根据路由工厂RouterFactory接口的扩展实现类,得到四个:
MockRouterFactory、TagRouterFactory 标签路由、AppRouterFactory应用条件路由、ServiceRouterFactory服务条件路由
利用RouterFactory根据url生成各个类型的Router、并把routers按priority进行排序
MockInvokersSelector(priority=负最大整数)、TagRouter(priority=100)、ServiceRouter(priority=140)、AppRouter(priority=150)
服务目录订阅路径
服务目录需要订阅的几个路径
服务提供者目录:/dubbo/org.apache.dubbo.demo.DemoService/providers
老版本动态配置目录:/dubbo/org.apache.dubbo.demo.DemoService/configurators
老版本路由器目录:/dubbo/org.apache.dubbo.demo.DemoService/routers
0、subscribe
org.apache.dubbo.registry.integration.RegistryDirectory#subscribe
此时有url多了category=providers,configurators,routers参数
consumer://192.168.1.2/org.apache.dubbo.demo.DemoService?application=dubbo-demo-consumer-application&category=providers,configurators,routers&dubbo=2.0.2&group=g1&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=10224&release=2.7.0&revision=1.1.1&side=consumer&sticky=false×tamp=1596347742946&version=1.1.1
1、ConsumerConfigurationListener监听器,
ConsumerConfigurationListener是RegistryDirectory的一个属性值,在创建RegistryDirectory对象时创建
key是dubbo-demo-consumer-application.configurators,所以监听应用动态配置路径是/dubbo/config/dubbo/dubbo-demo-consumer-application.configurators
2、ReferenceConfigurationListener监听器
key是org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators,所以监听路径服务动态配置目录是/dubbo/config/dubbo/org.apache.dubbo.demo.DemoService:1.1.1:g1.configurators
3、subscribe
org.apache.dubbo.registry.support.FailbackRegistry#subscribe
①、subscribe
org.apache.dubbo.registry.support.FailbackRegistry#subscribe
②、doSubscribe
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
a、首先判断服务接口是不是*,这里不会走
b、走else逻辑
从消费者url根据category获取路径(category=providers,configurators,routers)
得到服务提供者端配置路径(providers服务提供者、configurators服务提供者配置路径、routers路由配置路径)
/dubbo/org.apache.dubbo.demo.DemoService/providers
/dubbo/org.apache.dubbo.demo.DemoService/configurators
/dubbo/org.apache.dubbo.demo.DemoService/routers
对下面三个路径进行遍历,并创建路径,在添加urls之前,会首先通过toUrlsWithEmpty进行Empty协议替换。
c、toUrlsWithEmpty
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#toUrlsWithEmpty
为空,则用
d、toUrlsWithoutEmpty
对引入/dubbo/org.apache.dubbo.demo.DemoService/providers所有服务提供者进行遍历
org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#toUrlsWithoutEmpty
e、isMatch
org.apache.dubbo.common.utils.UrlUtils#isMatch
服务提供者与消费者引入是否匹配
首先根据consumerInterface和providerInterface
根据category、根据enabled,默认是true
consumerUrl中category=providers,configurators,routers
providerUrl 得到默认category=providers
consumerUrl = "consumer://192.168.1.2/org.apache.dubbo.demo.DemoService?application=dubbo-demo-consumer-application&category=providers,configurators,routers&dubbo=2.0.2&group=g1&interface=org.apache.dubbo.demo.DemoService&lazy=false&methods=sayHello&pid=9320&release=2.7.0&revision=1.1.1&side=consumer&sticky=false×tamp=1596349730827&version=1.1.1"
providerUrl = "dubbo://192.168.1.2:20881/org.apache.dubbo.demo.DemoService?anyhost=true&application=dubbo-demo-provider1-application&bean.name=ServiceBean:org.apache.dubbo.demo.DemoService:1.1.1:g1&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&group=g1&interface=org.apache.dubbo.demo.DemoService&logger=log4j&methods=sayHello&pid=9596&release=2.7.0&revision=1.1.1&sayHello.loadbalance=random&sayHello.return=true&side=provider&timeout=3000×tamp=1596340240853&version=1.1.1"
根据group、version、classifier。三个同时符合则返回true
③、notify
org.apache.dubbo.registry.support.FailbackRegistry#notify
a、doNotify
org.apache.dubbo.registry.support.FailbackRegistry#doNotify
b、notify
org.apache.dubbo.registry.support.AbstractRegistry#notify
分类category
c、对routers、configurators、providers调用notify方法
org.apache.dubbo.registry.integration.RegistryDirectory#notify
获取动态配置URL,生成configurators、获取老版本路由URL,生成Router,并添加到路由链中、获取服务提供者URL
d、refreshOverrideAndInvoker
这里只要看providers,根据动态配置手动触发一次刷新invoker。
org.apache.dubbo.registry.integration.RegistryDirectory#refreshOverrideAndInvoker
e、refreshInvoker
org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker
这里会Protocol进行过滤,并且调用DubboProtocol.refer方法得到Invoker
得到的newInvokers设置到RegistryDirectory的routerChain和invokers中
f、toInvokers
org.apache.dubbo.registry.integration.RegistryDirectory#toInvokers
遍历当前服务所有的服务提供者URL、当前消费者如果手动配置了Protocol,那么则进行匹配
当前Protocol是否在应用中存在对应的扩展点
如果当前服务提供者URL缓存中localUrlInvokerMap没有,则之前没有生产过Invoker。则会调用Protocol的refer方法生成一个Invoker
会调用到DubboProtocol#refer方法,DubboProtocol继承了AbstractProtocol,DubboProtocol不存在refer方法。会调用到父类AbstractProtocol的refer方法
客户端ExchangeClient创建
DubboInvoker创建
1、refer
org.apache.dubbo.rpc.protocol.AbstractProtocol#refer
DubboInvoker是异步的,而AsyncToSyncInvoker会封装为同步的
2、protocolBindingRefer
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#protocolBindingRefer
通过getClients(url)方法获取ExchangeClient客户端。这里是个数组形式, 在DubboInvoker发送请求时会轮询clients去发送数据,如果有多个的话。ExchangeClient 实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信例如NettyClient
3、getClients
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients
connections表示建立几个socket连接,在DubboProtocol中,每构造一个Client就会去和Server建立一个Socket连接
如果connections为0,这时会去获取shareconnections参数,默认为1。默认情况下,使用共享客户端实例。如果不使用共享客户端会调用initClient方法,getSharedClient内部也会调用initClient
4、getSharedClient
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getSharedClient
5、buildReferenceCountExchangeClientList
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClientList
6、buildReferenceCountExchangeClient
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClient
initClient生成一个ExchangeClient,并用ReferenceCountExchangeClient包装。ReferenceCountExchangeClient是引用计数功能的 ExchangeClient
7、initClient
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#initClient
从url得到client值,默认为netty。添加编解码和心跳包参数到 url 中,编码方式是DubboCodec.NAME、心跳heartbeat值是60 * 1000ms。并检测客户端类型是否存在,不存在则抛出异常。
获取 lazy 配置,并根据配置值决定创建的客户端类型,LazyConnectExchangeClient
Exchangers建立连接,把url和requestHandler传入。requestHandler是DubboProtocol的内部类
8、connect
org.apache.dubbo.remoting.exchange.Exchangers#connect
得到一个HeaderExchanger(Exchanger的SPI默认是header)去connect
9、connect
org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#connect
这里进行以下调用,分别如下:
创建HeaderExchangeHandler 对象、 创建 DecodeHandler 对象、通过 Transporters 构建 Client 实例、 创建 HeaderExchangeClient 对象。
用NettyTransporter去connect。DecodeHandler解码,是把InputStream解析成AppResponse对象
10、connect
org.apache.dubbo.remoting.Transporters#connect
11、connect
org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect
12、NettyClient
org.apache.dubbo.remoting.transport.netty4.NettyClient#NettyClient
初始化和开始netty
13、AbstractClient
org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient
14、doOpen
org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen
netty启动
15、connect
org.apache.dubbo.remoting.transport.AbstractClient#connect
16、doConnect
org.apache.dubbo.remoting.transport.netty4.NettyClient#doConnect
所以得到的DubboInvoker结构是
MockClusterInvoker
RegistryDirectory
Collections$UnmodifiableRandomAccessList
routerChain
MockInvokersSelector
TagRouter
AppRouter
ServiceRouter
Collections$UnmodifiableRandomAccessList
RegistryDirectory$InvokerDelegate
ListenerInvokerWrapper
ProtocolFilterWrapper$CallbackRegistrationInvoker
ProtocolFilterWrapper$1
AsyncToSyncInvoker
DubboInvoker
DubboInvoker
ExchangeClient[1]
ReferenceCountExchangeClient
HeaderExchangeClient
NettyClient
创建代理
PROXY_FACTORY.getProxy(invoker)
1、getProxy
org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper#getProxy(org.apache.dubbo.rpc.Invoker<T>)
2、getProxy
org.apache.dubbo.rpc.proxy.AbstractProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>)
3、getProxy
org.apache.dubbo.rpc.proxy.AbstractProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>, boolean)
4、getProxy
org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getProxy
invoker通过InvokerInvocationHandler包装 最终生成的代理类
这里也是调用的开始,通过sayHello方法会执行InvocationHandler handler的invoke方法
package org.apache.dubbo.common.bytecode;
public class proxy0 implements org.apache.dubbo.demo.DemoService {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = $1;
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
}
总结
本片主要分析了dubbo消费端的从注册中心引入的过程。需要明白一点,引入是为调用做准备的。
引入层级较为多,其核心思想就是Invoker的生成。最终ref引入的是Invoker经过InvokerInvocationHandler包装的代理类,通过Javassist生成。
其中,Invoker是通过Protocol.ref生成。具体Protocol通过SPI原理实现,首先经过RegistryProtocol#refer、再经过DubboProtocol#refer方法。
在RegistryProtocol中主要是生成动态服务目录RegistryDirectory,并为RegistryDirectory设置registry、protocol、消费url等,并设置路由链。由于服务目录实现了NotifyListener,是个监听器。需要对新老版本的服务动态配置路径及服务配置路径、路由等的监听。动态服务目录RegistryDirectory订阅后,会直接触发一次,这次会调用到DubboProtocol的refer方法。最终得到的RegistryDirectory需要cluster进行包装。
在DubboProtocol引入中,主要是对客户端实例的获取。涉及到Exchanger 、传输协议Transporter等SPI,最终会调用NettyClient创建netty(默认)客户端,并进行连接。对DubboInvoker包装主要是AsyncToSyncInvoker