背景
服务暴露网上已经有很多文章了,大而全,我们这里主要抓细节😄。
疑问
暴露过程做了些啥?
是先启动服务还是先连接注册中心?
服务下线怎么感知注册中心?
暴露
我们从 org.apache.dubbo.config.ServiceConfig#doExportUrls() 方法进去
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
//支持多协议暴露就是说 <dubbo:protocol 可以多个
//<dubbo:protocol name="dubbo" port="20880"/>
//<dubbo:protocol name="rest" port="20881"/>
//像这样,如果有php客户端 和 dubbo客户端都可以同事支持
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
进入 doExportUrlsFor1Protocol()中,这个方法大家一定要进去瞅一眼,和我们写的代码也差不多,方法长度太长,而且循环嵌套很深。
//org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {//没有配置协议,默认dubbo
name = DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
//将所有的配置都放到URL 的key=value 中
appendRuntimeParameters(map);
appendParameters(map, metrics);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (ProtocolUtils.isGeneric(generic)) {//泛化
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {//版本
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
//token,dubbo 支持token校验,只有携带对的token才能调用成功
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);//先将服务暴露到本地,下面分析
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
//注册中心也支持多个,比如可以将服务暴露到集群内,也可以将
//服务暴露到中台供所其他业务线用
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
//加载监控配置
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
// 调用具体bean的代理模式,默认为javassist
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//组装invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//暴露服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
//存储发布信息
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
本地暴露 exportLocal(url)
//org.apache.dubbo.config.ServiceConfig#exportLocal
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)//收到设置协议为injvm,以供下面选择对应的protocol
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
//
Exporter<?> exporter = protocol.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
static Protocol protocol = ExtensionLoader
.getExtensionLoader(Protocol.class).getAdaptiveExtension();
static ProxyFactory PROXY_FACTORY = ExtensionLoader
.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
protocol 静态变量为 Protocol 接口的自适应扩展点,调用 protocol.export(Invoker<T> invoker) 将会根据传入的invoker 信息决定去往哪个实现类。而 invoker 传入的值为
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local) ,PROXY_FACTORY 静态变量也是一个 ProxyFactory 的扩展点,从下面可以看到该扩展点为方法扩展点,这里我们并没有个自定义过proxy属性,默认实现为javassist=JavassistProxyFactory;(这里忽略各种包装器)
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({"proxy"})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) ;
}
进到JavassistProxyFactory 的 getInvoker实现中。
//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//这里的proxy 是我们真正的实现类HelloServiceImpl@xxx,
//如果传进来的是一个代理类实现的花,这里只取接口type=HelloService
Class cls = proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type;
//将 HelloServiceImpl包装成一个Wrapper类,而wrapper对象的创建方式正式默认的javassist
final Wrapper wrapper = Wrapper.getWrapper(cls);
//返回一个匿名内部类对象,对象 doInvoke 方法中持有wrapper对象
//AbstractProxyInvoker 实现了Invoker
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
上面这种匿名的写法可能不够具体,我们通过自定义类的方式去实现它,更具象点
//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new MyProxyInvoker(proxy,type,url,wrapper);
}
public class MyProxyInvoker extends AbstractProxyInvoker {
private Wrapper wrapper;
public MyProxyInvoker(Object proxy, Class type, URL url, Wrapper wrapper) {
super(proxy, type, url);
this.wrapper = wrapper;
}
@Override
protected Object doInvoke(Object proxy, String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
}
这样写的效果是一样的 JavassistProxyFactory#getInvoker()方法返回的是 MyProxyInvoker 对象,后面我们就用该对象来描述分析。
回到 Exporter<?> exporter = protocol.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));中,表达式变成了Exporter<?> exporter = protocol.export(MyProxyInvoker),MyProxyInvoker中的url对象为local
URL local = URLBuilder.from(url)
.setProtocol("injvm")
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
所以protocol.export()的实现类为InjvmProtocol
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(),
exporterMap);
}
该方法返回 InjvmExporter,最后执行 exporters.add(exporter),将InjvmExporter(这里其实外面会包装一层ListenerExporterWrapper包装器) 对象暴露到map中结束了jvm本地暴露。
远程暴露
我们再来看看远程暴露的区别
//同本地暴露一样返回MyProxyInvoker实例
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//区别本地暴露 将 MyProxyInvoker实例 包装为 DelegateProviderMetaDataInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//这里写法和本地暴露一样,区别在于 wrapperInvoker 中的url#protocol 并不是injvm
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
我们dubug 看看 wrapperInvoker 中的url#protocol 是啥
Protocol 为registry,所以流程会进入到 RegistryProtocol#export(同样这里也会有Wrapper包装)我们debug进去,这个方法内容太丰富了,这里我们先只分析服务暴露
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// 获取要暴露到注册中心的url
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//暴露服务 下面分析
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
//...
return new DestroyableExporter<>(exporter);
}
暴露服务 doLocalExport()
//org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
//将要暴露的服务生成唯一的key,避免重复
String key = getCacheKey(originInvoker);
//再次包装invoker,然后暴露
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
//providerUrl 为dubbo://xxx
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//protocol.export 经过各种Wrapper 会进入到Dubbo.export
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
private String getCacheKey(final Invoker<?> originInvoker) {
URL providerUrl = getProviderUrl(originInvoker);
String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
return key;
}
这里originInvoker为 DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx)),invokerDelegate再次包装为InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))),我们继续debug,到了ProtocolFilterWrapper#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
//.....
};
}
}
return new CallbackRegistrationInvoker<>(last, filters);
}
buildInvokerChain()会将InvokerDelegate关联多个Filter过滤器,然后包装为CallbackRegistrationInvoker对象返回,我们接着debug,最后到了DubboProtocol#export(),此时的invoker为CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//生成服务key=com.poizon.study.api.service.HelloService:20880,和方法无关
String key = serviceKey(url);
//将CallbackRegistrationInvoker包装为DubboExporter,然后存储在map中
//这个map 很关键,将作为后面调用寻找服务的入口
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//开启服务,也就是调用netty,开启20880端口
openServer(url);
//加载指定序列化方式 默认采用hessan2
optimizeSerialization(url);
return exporter;
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
private void openServer(URL url) {
//..... createServer()创建服务
serverMap.put(key, createServer(url));
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
private ExchangeServer createServer(URL url) {
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
return server;
}
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
//org....remoting.Transporters#bind(URL, ChannelHandler...)
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}//默认选择netty4 实现
return getTransporter().bind(url, handler);
}
//org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
//org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
try {
doOpen();
//org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
跟到最后看到了熟悉的netty启动,这里有好多我们熟悉的配置,比如第一篇文章我们说到的心跳实现IdleStateHandler,以及心跳默认时间 UrlUtils.getHeartbeat(getUrl()),还有netty 的自定义handler nettyClientHandler(没错这个handler就是处理dubbo消费者请求的)
总结
总结下,我们一根线走到底,走到了最后的socket启动,最后将 DubboExporter 放入了map中,最后层层包装为 DestroyableExporter(ExporterChangeableWrapper(ListenerExporterWrapper(DubboExporter(CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))))));嵌套虽然多了点,但是Wrapper 类的功能都是为了扩展小功能,后面我们调几个分析
后面将分析注册中心和Wrapper 等功能。