dubbo服务启动过程

在项目启动过程中,dubbo服务如何随项目的启动而发布?dubbo如何随着spring容器的初始化而启动。

已知,在项目启动过程中,我们会将dubbo的配置文件写到spring的配置文件里,如下xml文件:

<dubbo:application name="anyname_provider" />
<!-- 使用zookeeper注册中心暴露服务地址 -->
 <dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880" />
<!-- 声明需要暴露的服务接口 -->
<dubbo:service interface="com.shxz130.provider.Provider"
                   ref="demoService" />

官方文档中,我们能看到如下:

启动过程.png

也就是说spring启动过程中,随着Spring在初始化过程中,碰到dubbo命名的标签,如(<dubbo:service>,<dubbo:registry>)等标签,会由DubboNamespaceHandler类处理,具体原理见链接Spring自定义标签

DubboBeanDefinitionParser代码如下:

public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}

遇到不同的标签,会由不同的Parser处理,这里重点看服务发布,这行代码:

registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));

也就是说,当Spring容器处理完<dubbo:service>标签后,会在Spring容器中生成一个ServiceBean ,服务的发布也会在ServiceBean中完成。不妨看一下ServiceBean的定义:

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
}

该Bean实现了很多接口,关于InitializingBeanDisposableBeanApplicationContextAwareBeanNameAware,这些接口的使用介绍如下链接:

而在Spring初始化完成Bean的组装,会调用InitializingBeanafterPropertiesSet方法,在Spring容器加载完成,会接收到事件ContextRefreshedEvent,调用ApplicationListeneronApplicationEvent方法。

afterPropertiesSet中,和onApplicationEvent中,会调用export(),在export()中,会暴露dubbo服务,具体区别在于是否配置了delay属性,是否延迟暴露,如果delay不为null,或者不为-1时,会在afterPropertiesSet中调用export()暴露dubbo服务,如果为null,或者为-1时,会在Spring容器初始化完成,接收到ContextRefreshedEvent事件,调用onApplicationEvent,暴露dubbo服务。

部分ServiceBean的代码如下:

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
 //Spring容器初始化完成,调用
 public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            //暴露服务
            export();
        }
    }
    //判断是否延迟发布
  private boolean isDelay() {
        Integer delay = getDelay();
        ProviderConfig provider = getProvider();
        if (delay == null && provider != null) {
            delay = provider.getDelay();
        }
        return supportedApplicationListener && (delay == null || delay == -1);
    }
 //当bean初始化完成调用
 public void afterPropertiesSet() throws Exception {
        //......此处省略10000行代码
        if (!isDelay()) {
            //暴露服务
            export();
        }
    }
}

export(),暴露服务过程中,如果发现有delay属性,则延迟delay时间,暴露服务,如果没有,则直接暴露服务。

 public synchronized void export() {
        //忽略若干行代码
        if (delay != null && delay > 0) {
            //当delay不为null,且大于0时,延迟delay时间,暴露服务
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    //暴露服务
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            //直接暴露服务
            doExport();
        }
    }

而在doExport()中,验证参数,按照不同的Protocol,比如(dubbo,injvm)暴露服务,在不同的zookeeper集群节点上注册自己的服务。

protected synchronized void doExport() {
         //忽略10000行代码
        doExportUrls();
        //忽略10000行代码
    }

 private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            //按照不同的Protocal暴露服务
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

这里以dubbo协议为例,看一下发布的过程,在发布过程中,会用一个变量map保存URL的所有变量和value值,然后调用代理工程proxyFactory,获取代理类,然后将invoker转换成exporter,暴露服务,具体如下:

  protocol://host:port/path?key=value&key=value
 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        //如果协议类型为null,则默认为dubbo协议
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }
        //map是保存url中key-Value的值
        Map<String, String> map = new HashMap<String, String>();
        //URL中的side属性,有两个值,一个provider,一个consumer,暴露服务的时候为provider
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
        //dubbo的版本号  url中的dubbo
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
       //url中的timestamp
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        //url中的pid
        if (ConfigUtils.getPid() > 0) {
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
        //从其他参数中获取参数
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        //忽略若干代码
        
        if (ProtocolUtils.isGeneric(generic)) {
            map.put("generic", generic);
            map.put("methods", Constants.ANY_VALUE);
        } else {
            //url中的revesion字段
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }
            //拼接URL中的methods
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
        }
        //token 临牌校验
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put("token", UUID.randomUUID().toString());
            } else {
                map.put("token", token);
            }
        }
        //injvm协议
        if ("injvm".equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
        //获取上下文路径
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider != null) {
            contextPath = provider.getContextpath();
        }
        //获取主机名
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        //获取端口
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        //组装URL
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
         //如果url使用的协议存在扩展,调用对应的扩展来修改原url
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
       
        String scope = url.getParameter(Constants.SCOPE_KEY);
         //配置为none不暴露
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                //如果不是remote,则暴露本地服务
                exportLocal(url);
            }
             //如果配置不是local则暴露为远程服务
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
               //忽略日志 如果注册中心地址不为null
                if (registryURLs != null && registryURLs.size() > 0) {
                    for (URL registryURL : registryURLs) {
                        url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
                      //忽略不相干的代码  
                      // 通过代理工厂将ref对象转化成invoker对象
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        //代理invoker对象
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 暴露服务
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                         //一个服务可能有多个提供者,保存在一起
                        exporters.add(exporter);
                    }
                } else {
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }

doExportUrlsFor1Protocol代码再简化一下,如下:

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
     Map map=builderUrl();
     / 通过代理工厂将ref对象转化成invoker对象
     Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
     //代理invoker对象
     DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
     // 暴露服务
     Exporter<?> exporter = protocol.export(wrapperInvoker);
     //一个服务可能有多个提供者,保存在一起
     exporters.add(exporter);
}

对应官方文档中的设计,就联系起来了。

服务暴露
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

而在上面proxyFactory.getInvoker中,很显然是获取到的是接口的代理类。

而在 protocol.export(wrapperInvoker)中,将服务暴露出去。
代码如下:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        //忽略若干代码
        //打开服务
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
 private void openServer(URL url) {
       
        String key = url.getAddress();
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
         //是否server端
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //如果服务不存在,创建服务
                serverMap.put(key, createServer(url));
            } else {
                server.reset(url);
            }
        }
    }
    private ExchangeServer createServer(URL url) {
        //忽略若干代码
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        return server;
    }

而在headerExchangerbind中,调用了Transporters.bind(),一直调用到NettyServer,绑定了端口和链接。

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

//Transporters.bind
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        //忽略很多代码
        return getTransporter().bind(url, handler);
    }
//上段代码的getTransporter()
 public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }

而在Transporter的定义中看到下面代码:


@SPI("netty")
public interface Transporter {
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    Server bind(URL url, ChannelHandler handler) throws RemotingException;
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;
}

所以这里调用的是NettyTransporter,这里启动了一个新的NettyServer

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty4";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}

在NettyServer的构造方法中,调用了父类的构造方法,调用了doOpen()方法指定了端口

public class NettyServer extends AbstractServer implements Server {
  public NettyServer(URL url, ChannelHandler handler) throws   RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
}

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
       super(url, handler);
       //忽略很多代码  
      doOpen();
       //忽略很多代码
}


 @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();

        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

到这里dubbo服务就启动了,但是有一点还是有疑惑,那么,dubbo服务什么时候注册到注册中心的?带着疑惑看了一下官方文档。

拦截器

也就是说,在调用DubboProtocol暴露服务之前,回去调用拦截器,当发现是regiester,则去注册中心注册服务。

   public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    //如果是registerProtocol,则调用RegisterProtocol.export方法
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

而在RegisterProtocol.export

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        URL registryUrl = getRegistryUrl(originInvoker);

        //根据SPI机制获取具体的Registry实例,这里获取到的是ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        boolean register = registedProviderUrl.getParameter("register", true);
        if (register) {
            //在这里注册服务
            register(registryUrl, registedProviderUrl);
           //忽略很多代码
        }
        //忽略很多代码
    }

    public void register(URL registryUrl, URL registedProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registedProviderUrl);
    }

ZookeeperRegistry继承父类FailbackRegistry,在父类的register方法中,调用了 doRegister,doRegister中,创建了ZK节点,这样就将自己的服务暴露到注册中心zk上。

  @Override
    public void register(URL url) {
             //忽略很多代码
            doRegister(url);
            //忽略很多代码
    }

 protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

这样,整个dubbo服务就启动了。再回头看官方文档上的说明,就很清楚了。

暴露服务时序图.png

参考文档:http://blog.csdn.net/dachengxi/article/details/62567065
参考文档:http://blog.csdn.net/xcylive520/article/details/52347110
fyi

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容

  • 之前都有读过一些Dubbo的源码,但是发现一个问题就是过一段时间后再去看的话非常陌生,而且常常不知道自己上次读到了...
    此鱼不得水阅读 5,196评论 1 0
  • 在上一章中我只是简单论述了一下服务的启动过程的前置内容,主要是做一些参数的检查和URL的封装,然后在文章末尾引出了...
    此鱼不得水阅读 993评论 0 0
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,598评论 18 139
  • 在上一节中介绍了Dubbo启动过程中的一个操作:将Dubbo服务注册到Zk上。下面就介绍启动过程的另外一个重要操作...
    此鱼不得水阅读 1,212评论 0 0