【Dubbo】服务发布原理

Provider启动流程

通过dubbo的启动日志分析dubbo的服务发布原理

<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />
1. 暴露本地服务

[DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 169.254.23.23

2. 暴露远程服务

[DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to url dubbo://169.254.23.23:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=169.254.23.23&bind.port=20881&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&....., dubbo version: 2.0.0, current host: 169.254.23.23

[DUBBO] Register dubbo service com.alibaba.dubbo.demo.DemoService url dubbo://169.254.23.23:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&..... to registry registry://47.94.102.25:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&owner=wliliam&pid=10948&qos.port=22222&registry=zookeeper&timestamp=1527390445496, dubbo version: 2.0.0, current host: 169.254.23.23

3. 启动netty

[DUBBO] Start NettyServer bind /0.0.0.0:20881, export /169.254.23.23:20881, dubbo version: 2.0.0, current host: 169.254.23.23

4. 打开连接zk

INFO zookeeper.ClientCnxn: Opening socket connection to server 47.94.102.25/47.94.102.25:2181. Will not attempt to authenticate using SASL (unknown error)

5. 到zk注册

[DUBBO] Register: dubbo://169.254.23.23:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=wliliam&pid=10948&side=provider&timestamp=1527390445523, dubbo version: 2.0.0, current host: 169.254.23.23

6. 监听zk的configurators节点

[DUBBO] Subscribe: provider://169.254.23.23:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=wliliam&pid=10948&side=provider&timestamp=1527390445523, dubbo version: 2.0.0, current host: 169.254.23.23

image.png

重要概念

  • Protocol
    1.export:暴露远程服务(用于服务端),就是将proxyFactory.getInvoker创建的代理类 invoker对象,通过协议暴露给外部。
    2.refer:引用远程服务(用于客户端), 通过proxyFactory.getProxy来创建远程的动态代理类,例如DemoService的远程动态接口。
  • exporter:维护invoder的生命周期。
  • exchanger:信息交换层,封装请求响应模式,同步转异步。
  • transporter:网络传输层,用来抽象netty和mina的统一接口。


    image.png

<dubbo:service/>标签解析

我们发布一个服务使用的是类似这样的配置
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" protocol="dubbo1" registry="aliZk"/>
这个自定义schema标签也是服务发布的入口。dubbo命名空间的解析类是DubboNamespaceHandler,在这个类中可以看到给service标签注册的标签解析类为new DubboBeanDefinitionParser(ServiceBean.class, true)

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }

}
public class DubboBeanDefinitionParser implements BeanDefinitionParser {
    private final Class<?> beanClass;
    private final boolean required;
    public BeanDefinition parse(Element element, ParserContext parserContext) {
        return parse(element, parserContext, beanClass, required);
    }
    
    @SuppressWarnings("unchecked")
    private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
        RootBeanDefinition beanDefinition = new RootBeanDefinition();
        //指定生成bean的class
        beanDefinition.setBeanClass(beanClass);
        beanDefinition.setLazyInit(false);
        ....
    return beanDefinition;
}

解析service标签完成后会生成一个ServiceBean.class的实例并加入spring容器,由于ServiceBean.class实现了ApplicationListener接口,所以当spring容器加载完成后会回调ServiceBean的onApplicationEvent(...)方法,这就是服务暴露的起点。

package com.alibaba.dubbo.config.spring;
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
    ......
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }
    ......
}

服务发布

ServiceBean继承自ServiceConfig,ServiceConfig实际就是存储该service的配置


image.png

代码执行流程

ServiceBean.onApplicationEvent
-->export
  -->ServiceConfig.export
    -->doExport
      -->doExportUrls//里面有一个for循环,代表一个服务可以有多个通信协议,例如tcp http协议,默认是tcp协议
        -->loadRegistries(true) //从dubbo.properties里面组装registry的url信息
        -->doExportUrlsFor1Protocol
#com.alibaba.dubbo.config.ServiceConfig#doExportUrls
 private void doExportUrls() {
//读取配置中注册中心的配置信息
        List<URL> registryURLs = loadRegistries(true);
        //一个服务可能有多个通信协议,例如tcp协议和http协议,默认是tcp协议
        //<dubbo:protocol name="dubbo" threads="5" port="20880" dispather="execution" threadpool="cached" dispatcher="execution" id="dubbo" />
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
...
  //配置为none不暴露
        if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                //本地服务暴露
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                        URL monitorUrl = loadMonitor(registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                         //远程服务暴露
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } 
...
}

暴露本地服务和暴露远程服务的区别是什么?

  1. 暴露本地服务
    指暴露在一个JVM里面,不用通过调用ZK来进行远程通信。例如:在同一个服务,在自己调用自己的接口,就没必要进行网络IP连接来通信;
  2. 暴露远程服务
    指暴露给远程客户端的IP和端口号,通过网络来实现通信。

本地服务暴露

  • 代码执行流程
ServiceConfig.doExportUrlsFor1Protocol
-->exportLocal
  -->proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
    -->ProxyFactory$Adaptive.getInvoker
      -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
        -->StubProxyFactoryWrapper.getInvoker(proxy, type, url);
          -->proxyFactory.getInvoker(proxy, type, url);
            -->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
              -->Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl.class);
                -->makeWrapper(Class<?> c)
              -->new AbstractProxyInvoker<T>(proxy, type, url)
  -->protocol.export
    -->Protocol$Adaptive.export
      -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("injvm")
      -->extension.export(arg0);
        -->ProtocolFilterWrapper.buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER) //创建8个Filter
        -->ProtocolFilterWrapper.export
          -->InjvmProtocol.export
            -->return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
            -->目的:exporterMap.put(key, this); //key="com.alibaba.dubbo.demo.DemoService" this = InjvmExporter

代码实在太多没法一一分析>.<,其实跟着上面这个思路debug几遍流程就能很清晰的把握本地服务暴露的脉络。

#com.alibaba.dubbo.config.ServiceConfig#exportLocal
    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(NetUtils.LOCALHOST)
                    .setPort(0);
            //都是动态编译生成的  protocol$Adaptive proxyFactory$Adaptive
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
        }
    }

proxyFactory:就是为了获取一个接口的代理类,例如获取一个远程接口的代理。它有2个方法,代表2个作用

  • a.getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象。
  • b.getProxy :针对client端,创建接口的代理对象,例如DemoService的接口。
#com.alibaba.dubbo.config.ServiceConfig#proxyFactory
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
#com.alibaba.dubbo.rpc.ProxyFactory
@SPI("javassist")
public interface ProxyFactory {
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}

动态编译生成的proxyFactory$Adaptive,修改log4j的配置文件查看动态编译的adaptive类

<root>
        <level value="DEBUG" />
        <appender-ref ref="CONSOLE" />
    </root>
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)
                ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
                        .getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy", "javassist");
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)
                ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
                        .getExtension(extName);
        return extension.getProxy(arg0);
    }
}
  • 创建Invoker
    proxyFactory.getInvoker创建Invoker的时候最终调用了JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)方法(proxy实际上是服务的实现类),最终返回的是AbstractProxyInvoker,并由他实现了doInvoke方法。调用的时候会有这样一个调用链
    Invoker.doInvoke>AbstractProxyInvoker.doInvoke>wrapper.invokeMethod
/**
 * JavaassistRpcProxyFactory
 */
public class JavassistProxyFactory extends AbstractProxyFactory {
    /**
     * @Author pengyunlong
     * @Description 服务引用生成动态代理
     */
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

   /**
     * @Author pengyunlong
     * @Description 服务暴露不生成动态代理生成wrapper直接调用本地的方法
     * @param   proxy 服务实例
     * @param   type 服务接口
     */
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}
  • Invoker
    它是一个可执行的对象,能够根据方法的名称、参数得到相应的执行结果。它里面有一个很重要的方法 Result invoke(Invocation invocation),
    Invocation是包含了需要执行的方法和参数等重要信息,目前它只有2个实现类RpcInvocation、MockInvocation。它有3种类型的Invoker:
  1. 本地执行类的Invoker
    server端:要执行 demoService.sayHello,就通过InjvmExporter来进行反射执行demoService.sayHello就可以了。
  2. 远程通信类的Invoker
    client端:要执行 demoService.sayHello,它封装了DubboInvoker进行远程通信,发送要执行的接口给server端。
    server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client.
  3. 多个远程通信执行类的Invoker聚合成集群版的Invoker
    client端:要执行 demoService.sayHello,就要通过AbstractClusterInvoker来进行负载均衡,DubboInvoker进行远程通信,发送要执行的接口给server端。
    server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client.
  • Wrapper wrapper = Wrapper.getWrapper
    也许大家跟我一样会很好奇这个Wrapper wrapper = Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl)得到的wrapper 到底是什么?它又是怎么实现本地方法调用的?
    下面将动态生成的Wrapper导出并反编译才发现了真相:实际上就是依次判断接口的各个方法名和参数中传入的方法名是否匹配,如果匹配就直接调用实例的该方法。其实它类似spring的BeanWrapper,它就是包装了一个接口或一个类,可以通过wrapper对实例对象进行赋值 取值以及制定方法的调用。
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package com.alibaba.dubbo.common.bytecode;
public class Wrapper0 extends Wrapper implements DC {
    ......
    public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
        DemoService var5;
        try {
            var5 = (DemoService)var1;
        } catch (Throwable var8) {
            throw new IllegalArgumentException(var8);
        }
        try {
            if ("sayHello".equals(var2) && var3.length == 1) {
                return var5.sayHello((String)var4[0]);
            }
        } catch (Throwable var9) {
            throw new InvocationTargetException(var9);
        }
        throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.alibaba.dubbo.demo.DemoService.");
    }
    ......
}
  • 使用injvm暴露服务
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
        if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)
                ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
                        .getExtension(extName);
        return extension.export(arg0);
    }
    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {......
    }
}
  • 过滤器链
    extension.export(arg0);ProtocolFilterWrapper包装了,所以被在服务暴露的时候还给Invoker包装了一个过滤器链,总共应该是8个过滤器;在方法调用的时候会依次调用这个链上所有的filter.invoke
#com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper#export
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                .....
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                ......
                };
            }
        }
        return last;
    }
  • 暴露完成加入缓存
#com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol#export
  public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //本地暴露
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }
#com.alibaba.dubbo.rpc.protocol.injvm.InjvmExporter
    InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap){
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
//key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter
        exporterMap.put(key, this);
    }

实际是将key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter存储在com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap里面
同时com.alibaba.dubbo.config.ServiceConfig#exporters也会存储一份

3. 远程服务暴露

ServiceConfig.doExportUrlsFor1Protocol
-->proxyFactory.getInvoker //执行过程和本地暴露一样
-->protocol.export(wrapperInvoker)
  -->Protocol$Adaptive.export(Invoker arg0)
    -->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry")
      -->extension.export(arg0);
        -->ProtocolFilterWrapper.export
          -->ProtocolListenerWrapper.export  //没有添加过滤器与监听器
            -->RegistryProtocol.export
              -->doLocalExport(originInvoker)
                -->getCacheKey(originInvoker)//读取缓存【key】
                -->protocol.export(invokerDelegete)
                  -->Protocol$Adaptive.export
                    -->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo")
                    -->extension.export(arg0)
                      -->ProtocolFilterWrapper.export
                        -->buildInvokerChain //创建8个filter
                          -->ProtocolListenerWrapper.export
1.netty服务暴露的开始----    -->DubboProtocol.export
                              -->serviceKey(url)//组装key=com.alibaba.dubbo.demo.DemoService:20881
                              -->目的:exporterMap.put(key, this); //key="com.alibaba.dubbo.demo.DemoService:20881" this = DubboExporter 本地暴露只有接口名远程暴露有接口名和端口号
                              -->openServer(url)//打开服务
                                -->createServer
2.信息交换层的开始---             -->Exchangers.bind(url, requestHandler)//exchanger是一个信息交换层
                                  -->getExchanger(url)
                                    -->getExchanger("header")
                                      -->ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
                                  -->HeaderExchanger.bind
                                    -->Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
                                      -->new HeaderExchangeHandler(handler) //仅仅是this.handler = handler;
                                        -->new DecodeHandler
                                          -->AbstractChannelHandlerDelegate//this.handler = handler;
3.网络传输层--------                     -->Transporters.bind
                                          -->getTransporter()
                                            -->ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
                                        -->Transporter$Adaptive.bind
                                          -->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty")
                                          -->NettyTransporter.bind
                                            -->new NettyServer(url, listener)
                                              -->AbstractPeer     //this.url = url; this.handler = handler;
                                                -->AbstractEndpoint //codec timeout=1000 connectTimeout=3000
4.打开端口暴露服务---                           --doOpen()
                                                 -->设置NioServerSocketChannelFactory boss worker线程个数为3
                                                 -->设置编解码handler
                                                 -->bootstrap.bind(getBindAddress())
                                      -->new HeaderExchangeServer
                                        -->this.server = NettyServer
                                        -->this.heartbeat=60000
                                        -->heartbeatTimeout=180000
                                        -->startHeatbeatTimer//这是一个心跳定时器,采用了线程池,如果断开就心跳重连
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
            if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
           
                        //获取调用本地实例的invoker对象
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        //远程暴露
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } 

Exporter<?> exporter = protocol.export(invoker); 本地暴露使用的扩展点是injvm而远程暴露使用了registry,但是流程差不多只是包装类暴露时不再包装过滤器链

#com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper#export
 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            //远程暴露
            return protocol.export(invoker);
        }
        //injvm等
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

最终进入目标类RegistryProtocol

#com.alibaba.dubbo.registry.integration.RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保证每次export都返回一个新的exporter实例
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
                try {
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
                try {
                    overrideListeners.remove(overrideSubscribeUrl);
                    registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        };
    }
    
    @SuppressWarnings("unchecked")
    private <T> ExporterChangeableWrapper<T>  doLocalExport(final Invoker<T> originInvoker){
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//进入dubbo的protocol扩展
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
                    bounds.put(key, exporter);
                }
            }
        }
        return (ExporterChangeableWrapper<T>) exporter;
    }

进入DubboProtocol.export,将export存储在exporterMap中,本地暴露key是接口名,远程暴露为接口名:端口号com.alibaba.dubbo.demo.DemoService:20880

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        
        //export an stub service for dispaching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice){
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
                if (logger.isWarnEnabled()){
                    logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        openServer(url);
        return exporter;
    }

最后进入NettyServer开启服务

#com.alibaba.dubbo.remoting.transport.netty.NettyServer#doOpen
@Override
    protected void doOpen() throws Throwable {
        //参考netty的demo
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        //线程个数workerCount=3
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                //解码
                pipeline.addLast("decoder", adapter.getDecoder());
                //编码
                pipeline.addLast("encoder", adapter.getEncoder());
                //逻辑处理类
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

总结

  • Protocol
    1.export:暴露远程服务(用于服务端),就是将proxyFactory.getInvoker创建的代理类 invoker对象,通过协议暴露给外部。
    2.refer:引用远程服务(用于客户端), 通过proxyFactory.getProxy来创建远程的动态代理类,例如DemoService的远程动态接口。
  • exporter:维护invoder的生命周期。
  • exchanger:信息交换层,封装请求响应模式,同步转异步。
  • transporter:网络传输层,用来抽象netty和mina的统一接口。


    image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,830评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,992评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,875评论 0 331
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,837评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,734评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,091评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,550评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,217评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,368评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,298评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,350评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,027评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,623评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,706评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,940评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,349评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,936评论 2 341

推荐阅读更多精彩内容