NettyClient
dubbo的SOA架构分为Consumer和Provider,Consumer需要通过Netty(默认)把请求信息发送到远程Provider,Provider处理完后需要通过Netty把结果返回给Consumer,Consumer接收到结果返回;这个处理流程的核心实现在NettyClient中,如下图所示,接下来重点分析Consumer如何使用Netty的:
Protocol
协议接口定义如下,所以默认是dubbo协议,当Consumer调用Provider时,通过refer()
得到Invoker;
@SPI("dubbo")
public interface Protocol {
... ...
/**
* 引用远程服务:<br>
* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
... ...
}
DubboProtocol
DubboProtocol中refer()方法源码如下:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker. 构建DubboInvoker的核心是获取ExchangeClient
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
// 如果<dubbo:reference/>中配置了connections,且值大于1,那么表示不共享连接,默认connections=0,即共享连接
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}
// connections值决定ExchangeClient的数量,共享连接只有一个ExchangeClient
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect){
// getSharedClient(url)会调用initClient(url),所以是否共享连接的区别就在方法getSharedClient(url)中
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
/**
*获取共享连接
*/
private ExchangeClient getSharedClient(URL url){
// 根据访问的URL得到key,例如192.168.0.1:20880
String key = url.getAddress();
// 查看map中是否已经缓存过
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if ( client != null ){
// 如果缓存过且该Client没有关闭,那么Client中refenceCount值+1,然后返回这个Client
if ( !client.isClosed()){
client.incrementAndGetCount();
return client;
} else {
// logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client));
// 如果Client已经关闭,那么清理掉map中的缓存;
referenceClientMap.remove(key);
}
}
// 如果map中没有缓存Client,调用initClient()进行初始化
ExchangeClient exchagneclient = initClient(url);
client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
// 将刚才初始化的Client缓存起来,方便下次获取
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
/**
* 创建新连接.
*/
private ExchangeClient initClient(URL url) {
... ...
ExchangeClient client ;
try {
//设置连接应该是lazy(在<dubbo:reference/>中设置lazy="true",默认为false)
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
client = new LazyConnectExchangeClient(url ,requestHandler);
} else {
// 这里为默认创建Client的地方
client = Exchangers.connect(url ,requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url
+ "): " + e.getMessage(), e);
}
return client;
}
Exchangers
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
... ...
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
public static Exchanger getExchanger(URL url) {
// Constants.DEFAULT_EXCHANGER的值为header,所以默认Exchanger实现为HeaderExchanger
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
HeaderExchanger
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
Transporters
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
// 根据Transporter注解SPI的值可知,Transporter默认是NettyTransporter
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
// Transporter注解SPI的值为netty
@SPI("netty")
public interface Transporter {
... ...
}
NettyTransporter
public class NettyTransporter implements Transporter {
public static final String NAME = "netty";
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
NettyClient
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// 开启Netty服务的一些config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
// 设置编码&解码
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
// 设置Netty处理消息的Handler为NettyHandler
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
NettyServer
Consumer需要通过Netty(默认)把请求信息发送到远程Provider,Provider也是通过Netty接收并处理请求;这个处理流程的核心实现在NettyServer中;在下文中分析;