dubbo中提供了Protocol接口,我们看下源码
@SPI("dubbo") //默认的实现是DubboProtocol来实现rpc
public interface Protocol {
int getDefaultPort(); //如果用户没有指定,那么此协议默认使用的端口号 dubbo默认是20880
@Adaptive //负责将其创建的invoker通过某种协议暴露出去
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive //根据type(就是我们的dubbo接口类)url(远程provider地址)封装成
//一个invoker
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
//销毁,其实就是销毁其创建的invoker的关联的资源
void destroy();
}
通过上面的接口类我们可以看到dubbo 通过refer封装了rpc调用成一个Invoker,而Invoker是dubbo里面的核心类。
而Invoker就是具体的执行类,当然要发起一个远程调用,在invoker的内部肯定应该是有个client角色的存在,不然如果将reqest发出去呢,在DubboInvoker内部确实有个 ExchangeClient[] clients内部属性。该属性是在构造方法里面初始化的,因为Invoker是由Protocol创建,所以我们看下DubboProtocol是如何的初始化其创建的DubboInvoker里面的ExchangeClient[] clients属性的。
在DubboProtocol的refer(也就是创建invoker的方法)方法里面源码如下
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
如上,非常的简单,直接的就new了个DubboInvoker,而ExchangeClient[] clients就是通过getClients(url)方法创建的。
private ExchangeClient[] getClients(URL url) {
//是否共享connections
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//默认是共享的
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
如上有两个方法getSharedClient和initClient,举例如下
Consumer 1---->Provider1
Consumer 2---->Provider1
如果共享连接,那么Consumer 1和Consumer 2使用同一个Client发起连接Provider1,如果不共享,那么Consumer 1和Consumer 2分别创建一个Client发起连接Provider1。
默认是共享模式。
我们看下代码
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
//根据key 地址(ip+port) value ReferenceCountExchangeClient缓存Client
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
//引用计数
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
//在这里初始化Client
ExchangeClient exchangeClient = initClient(url);
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
locks.remove(key);
return client;
}
}
我们接着看下initClient方法
private ExchangeClient initClient(URL url) {
// 使用的client类型,默认是netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
//使用的编解码器
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy 只有在真正的调用的调用的时候才创建这个client
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//立即创建client
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
我们看下ExchangeClient在类中的继承关系,如下
等于说这个ExchangeClient兼具了channel与client的能力,channel是一个连接客户端和服务端的通道,client就是客户端。如果大家熟悉netty,按照netty的概念来理解也可以,虽然在dubbo里面dubbo为了兼容netty与mina等io框架对channl又进行了一次封装和抽象。