dubbo 服务导出

目录
dubbo 拓展机制 SPI
dubbo 自适应拓展机制
dubbo 服务导出
dubbo 服务引用
dubbo 服务字典
dubbo 服务路由
dubbo 集群容错策略
dubbo 负载均衡
dubbo 服务调用过程

Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑。整个逻辑大致可分为三个部分,第一部分是前置工作,主要用于检查参数,组装 URL。第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三部分是向注册中心注册服务,用于服务发现。

ServiceBean是整个服务导出的核心类,它实现了

  • InitializingBean:从applicationContext中获取如protocol,module等配置信息,并且在注册监听上下文刷新的事件,失败时立即进行服务导出服务。
  • DisposableBean:bean的摧毁(2.7.3版本中已经是空实现了,因为前面已经注册了挂钩)。
  • ApplicationContextAware:保存applicationContext对象,注册挂钩当 jvm 关闭时关闭所有的链接以及摧毁已经注册了的 url 地址,注册监听上下文刷新的事件。
  • ApplicationListener<ContextRefreshedEvent>:监听上下文的刷新,判断是否需要导出服务。
  • BeanNameAware:设置 beanName。
  • ApplicationEventPublisherAware:用于发布事件,当导出服务完成时,发布一个ServiceBeanExportedEvent事件,dubbo监听到这个事件后会查看这个事件包含的类是否在本地调用中,如果在就立刻执行服务引入。

等接口。

大致流程:


image.png

首先在ServiceBean这个类初始化之前,将自己作为ApplicationListener添加到容器中,然后在容器启动完成时,监听容器刷新完成的事件,也就是ServiceBean 的 onApplicationEvent方法。onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行服务导出操作。

    @Override
public void setApplicationContext(ApplicationContext applicationContext) {
    this.applicationContext = applicationContext;
    // 保存 applicationContext,并且在 jvm 关闭时,删除所有的register service和关闭所有的连接
    SpringExtensionFactory.addApplicationContext(applicationContext);
    // 当上下文状态变化时,添加监听事件
    supportedApplicationListener = addApplicationListener(applicationContext, this);
}

@Override
/**
 * 监听ContextRefreshedEvent事件,当所有的bean都初始化完成并被成功装载或后会触发该事件
 */
public void onApplicationEvent(ContextRefreshedEvent event) {
    // 是否已导出 && 是不是已被取消导出
    if (!isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        // 导出服务
        export();
    }
}

代码比较简单,继续向下看export方法

public void export() {
    super.export();
    // Publish ServiceBeanExportedEvent
    // 发布事件,spring拦截事件调用 referenceBean 的 get() 进行服务引入
    publishExportEvent();
}

这里主要分析的是super.export方法。

public synchronized void export() {
    // 检测和修改配置
    checkAndUpdateSubConfigs();

    // 是否显示的使用AbstractServiceBuilder设置export,
    // 否则使用<dubbo:provider export="" />的配置
    if (!shouldExport()) {
        return;
    }

    // 是否显示的使用AbstractServiceBuilder设置delay,
    // 否则使用<dubbo:provider delay="" />的配置
    if (shouldDelay()) {
        // 延迟发布服务
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else {
        // 发布服务
        doExport();
    }
}

继续看doExport方法,该方法中先是做了一些状态判断,最主要的是其中的doExportUrls方法

private void doExportUrls() {
    // 多注册中心组装注册中心的url
    List<URL> registryURLs = loadRegistries(true);
    // 多协议
    for (ProtocolConfig protocolConfig : protocols) {
        // 获取protocol中配置的contextpath,缺省获取provider中的contextpath
        // group/(contextpath/path或path):version
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        // 服务提供者模型
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        // 保存pathKey和providerModel的映射关系
        ApplicationModel.initProviderModel(pathKey, providerModel);
        // 组装 URL
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

首先看看loadRegistries方法

loadRegistries 方法主要包含如下的逻辑:
1.检测是否存在注册中心配置类,不存在则抛出异常
2.构建参数映射集合,也就是 map
3.构建注册中心链接列表
4.遍历链接列表,并根据条件决定是否将其添加到 registryList 中

protected List<URL> loadRegistries(boolean provider) {
    // check && override if necessary
    List<URL> registryList = new ArrayList<URL>();
    if (CollectionUtils.isNotEmpty(registries)) {
        for (RegistryConfig config : registries) {
            String address = config.getAddress();
            if (StringUtils.isEmpty(address)) {
                // 若 address 为空,则将其设为 0.0.0.0
                address = ANYHOST_VALUE;
            }
            // address不为n/a
            if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
                Map<String, String> map = new HashMap<String, String>();
                // 添加 ApplicationConfig 中的字段信息到 map 中
                appendParameters(map, application);
                // 添加 RegistryConfig 字段信息到 map 中
                appendParameters(map, config);
                // 添加 path,protocol 等信息到 map 中
                map.put(PATH_KEY, RegistryService.class.getName());
                appendRuntimeParameters(map);
                if (!map.containsKey(PROTOCOL_KEY)) {
                    map.put(PROTOCOL_KEY, DUBBO_PROTOCOL);
                }

                // 解析得到 URL 列表,address 可能包含多个注册中心 ip,
                // 因此解析得到的是一个 URL 列表
                List<URL> urls = UrlUtils.parseURLs(address, map);

                for (URL url : urls) {
                    url = URLBuilder.from(url)
                            .addParameter(REGISTRY_KEY, url.getProtocol())
                            // 将 URL 协议头设置为 registry
                            .setProtocol(REGISTRY_PROTOCOL)
                            .build();
                    // 通过判断条件,决定是否添加 url 到 registryList 中,条件如下:
                    // (服务提供者 && register = true 或 null)
                    //    || (非服务提供者 && subscribe = true 或 null)
                    if ((provider && url.getParameter(REGISTER_KEY, true))
                            || (!provider && url.getParameter(SUBSCRIBE_KEY, true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}

回到doExportUrls中,在通过注册中心配置了对应的URL之后,就是通过doExportUrlsFor1Protocol方法在各个协议的基础上向所有的注册中心注册服务。

URL 是 Dubbo 配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    // 默认使用dubbo协议
    String name = protocolConfig.getName();
    if (StringUtils.isEmpty(name)) {
        name = DUBBO;
    }

    Map<String, String> map = new HashMap<String, String>();
    // 添加 side、版本、时间戳以及进程号等信息到 map 中
    map.put(SIDE_KEY, PROVIDER_SIDE);

    // 添加时间戳,dubbo 版本,pid等运行时参数
    appendRuntimeParameters(map);
    // 通过反射将对象的字段信息添加到 map 中
    appendParameters(map, metrics);
    appendParameters(map, application);
    appendParameters(map, module);
    // remove 'default.' prefix for configs from ProviderConfig
    // appendParameters(map, provider, Constants.DEFAULT_KEY);
    appendParameters(map, provider);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
    // methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
    // 这段代码用于添加 Callback 配置到 map 中
    if (CollectionUtils.isNotEmpty(methods)) {
        for (MethodConfig method : methods) {
            // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
            // 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig,
            // 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                // 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0
                if (Boolean.FALSE.toString().equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            // 获取 ArgumentConfig 列表
            // 设置方法的参数和值到map中
            List<ArgumentConfig> arguments = method.getArguments();
            if (CollectionUtils.isNotEmpty(arguments)) {
                for (ArgumentConfig argument : arguments) {
                    // convert argument type
                    if (argument.getType() != null && argument.getType().length() > 0) {
                        Method[] methods = interfaceClass.getMethods();
                        // visit all methods
                        if (methods != null && methods.length > 0) {
                            for (int i = 0; i < methods.length; i++) {
                                String methodName = methods[i].getName();
                                // target the method, and get its signature
                                if (methodName.equals(method.getName())) {
                                    Class<?>[] argtypes = methods[i].getParameterTypes();
                                    // one callback in the method
                                    // 检测 ArgumentConfig 中的 type 属性与方法参数列表中的参数名称是否一致,不一致则抛出异常
                                    if (argument.getIndex() != -1) {
                                        if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                            // 添加 ArgumentConfig 字段信息到 map 中,
                                            // 键前缀 = 方法名.index,比如:
                                            // map = {"sayHello.3": true}
                                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                        } else {
                                            throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                        }
                                    } else {
                                        // multiple callbacks in the method
                                        // 从参数类型列表中查找类型名称为 argument.type 的参数
                                        for (int j = 0; j < argtypes.length; j++) {
                                            Class<?> argclazz = argtypes[j];
                                            if (argclazz.getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + j);
                                                if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                    throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    } else if (argument.getIndex() != -1) {
                        // 添加 ArgumentConfig 字段信息到 map 中
                        appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                    } else {
                        throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                    }

                }
            }
        } // end of methods for
    }

    // 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息
    if (ProtocolUtils.isGeneric(generic)) {
        map.put(GENERIC_KEY, generic);
        map.put(METHODS_KEY, ANY_VALUE);
    } else {
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put(REVISION_KEY, revision);
        }

        // 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            // 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中
            map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    // 添加 token 到 map 中
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            // 生成随机token
            map.put(TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(TOKEN_KEY, token);
        }
    }
    // export service
    // 获取 host 和 port
    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    // 组装 URL
    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        // 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    String scope = url.getParameter(SCOPE_KEY);
    // don't export when none is configured
    // 当scope为none时,不导出服务
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

        // export to local if the config is not remote (export to remote only when config is remote)
        // scope != remote,导出到本地
        if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);
        }
        // export to remote if the config is not local (export to local only when config is local)
        // scope != local,导出到远程
        if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            if (CollectionUtils.isNotEmpty(registryURLs)) {
                for (URL registryURL : registryURLs) {
                    //if protocol is only injvm ,not register
                    if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                        continue;
                    }
                    // dynamic配置,是否开启动态注册
                    url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                    // 加载监视器链接
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        // 将监视器链接作为参数添加到 url 中
                        url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                    }
                    if (logger.isInfoEnabled()) {
                        if (url.getParameter(REGISTER_KEY, true)) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        } else {
                            logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                        }
                    }

                    // For providers, this is used to enable custom proxy to generate invoker
                    String proxy = url.getParameter(PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                    }

                    // 为服务提供类(ref)生成 Invoker
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                    // DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    // 导出服务,并生成 Exporter
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
            // 不存在注册中心,仅导出服务
            else {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
            /**
             * @since 2.7.0
             * ServiceData Store
             * 2.7 以后精简url中的配置项
             */
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                metadataReportService.publishProvider(url);
            }
        }
    }
    this.urls.add(url);
}

代码很长主要分成两部分来看

  • getWrapper对类生成代理,当服务调用方调用方法的时候,会首先调用代理类,代理类再调用对应的方法
  • 执行服务导出和服务注册

1.getWrapper

public static Wrapper getWrapper(Class<?> c) {
    // can not wrapper on dynamic class.
    while (ClassGenerator.isDynamicClass(c))
    {
        c = c.getSuperclass();
    }

    if (c == Object.class) {
        return OBJECT_WRAPPER;
    }

    Wrapper ret = WRAPPER_MAP.get(c);
    if (ret == null) {
        ret = makeWrapper(c);
        WRAPPER_MAP.put(c, ret);
    }
    return ret;
}

通过makeWrapper方法生成代理类,然后保存代理类到WRAPPER_MAP缓存中

private static Wrapper makeWrapper(Class<?> c) {
    // 检测 c 是否为基本类型,若是则抛出异常
    if (c.isPrimitive()) {
        throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
    }

    String name = c.getName();
    ClassLoader cl = ClassUtils.getClassLoader(c);

    // c1 用于存储 setPropertyValue 方法代码
    StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
    // c2 用于存储 getPropertyValue 方法代码
    StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
    // c3 用于存储 invokeMethod 方法代码
    StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");

    // 生成类型转换代码及异常捕捉代码,比如:
    //   DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }
    c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
    c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");

    // pts 用于存储成员变量名和类型
    Map<String, Class<?>> pts = new HashMap<>(); // <property name, property types>
    // ms 用于存储方法描述信息(可理解为方法签名)及 Method 实例
    Map<String, Method> ms = new LinkedHashMap<>(); // <method desc, Method instance>
    // mns 为方法名列表
    List<String> mns = new ArrayList<>(); // method names.
    // dmns 用于存储“定义在当前类中的方法”的名称
    List<String> dmns = new ArrayList<>(); // declaring method names.

    // get all public field.
    // 获取 public 访问级别的字段,并为所有字段生成条件判断语句
    for (Field f : c.getFields()) {
        String fn = f.getName();
        Class<?> ft = f.getType();
        // 忽略关键字 static 或 transient 修饰的变量
        if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
            continue;
        }

        // 生成条件判断及赋值语句,比如:
        // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
        // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}
        c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");
        // 生成条件判断及返回语句,比如:
        // if( $2.equals("name") ) { return ($w)w.name; }
        c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
        // 存储 <字段名, 字段类型> 键值对到 pts 中
        pts.put(fn, ft);
    }

    Method[] methods = c.getMethods();
    // get all public method.
    // 检测 c 中是否包含在当前类中声明的方法
    boolean hasMethod = hasMethods(methods);
    if (hasMethod) {
        c3.append(" try{");
        for (Method m : methods) {
            //ignore Object's method.
            // 忽略 Object 中定义的方法
            if (m.getDeclaringClass() == Object.class) {
                continue;
            }

            String mn = m.getName();
            // 生成方法名判断语句,比如:
            // if ( "sayHello".equals( $2 )
            c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
            int len = m.getParameterTypes().length;
            // 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:
            // && $3.length == 2
            c3.append(" && ").append(" $3.length == ").append(len);

            boolean override = false;
            for (Method m2 : methods) {
                // 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
                if (m != m2 && m.getName().equals(m2.getName())) {
                    override = true;
                    break;
                }
            }
            // 对重载方法进行处理,考虑下面的方法:
            //    1. void sayHello(Integer, String)
            //    2. void sayHello(Integer, Integer)
            // 方法名相同,参数列表长度也相同,因此不能仅通过这两项判断两个方法是否相等。
            // 需要进一步判断方法的参数类型
            if (override) {
                if (len > 0) {
                    for (int l = 0; l < len; l++) {
                        // 生成参数类型进行检测代码,比如:
                        // && $3[0].getName().equals("java.lang.Integer")
                        //    && $3[1].getName().equals("java.lang.String")
                        c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
                                .append(m.getParameterTypes()[l].getName()).append("\")");
                    }
                }
            }

            // 添加 ) {,完成方法判断语句,此时生成的代码可能如下(已格式化):
            // if ("sayHello".equals($2)
            //     && $3.length == 2
            //     && $3[0].getName().equals("java.lang.Integer")
            //     && $3[1].getName().equals("java.lang.String")) {
            c3.append(" ) { ");

            // 根据返回值类型生成目标方法调用语句
            if (m.getReturnType() == Void.TYPE) {
                // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;
                c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");
            } else {
                // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
                c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");
            }

            // 添加 }, 生成的代码形如(已格式化):
            // if ("sayHello".equals($2)
            //     && $3.length == 2
            //     && $3[0].getName().equals("java.lang.Integer")
            //     && $3[1].getName().equals("java.lang.String")) {
            //
            //     w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);
            //     return null;
            // }
            c3.append(" }");

            // 添加方法名到 mns 集合中
            mns.add(mn);
            // 检测当前方法是否在 c 中被声明的
            if (m.getDeclaringClass() == c) {
                // 若是,则将当前方法名添加到 dmns 中
                dmns.add(mn);
            }
            ms.put(ReflectUtils.getDesc(m), m);
        }
        // 添加异常捕捉语句
        c3.append(" } catch(Throwable e) { ");
        c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");
        c3.append(" }");
    }

    // 添加 NoSuchMethodException 异常抛出代码
    c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");

    // deal with get/set method.
    Matcher matcher;
    for (Map.Entry<String, Method> entry : ms.entrySet()) {
        String md = entry.getKey();
        Method method = entry.getValue();
        // 匹配以 get 开头的方法
        if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            // 获取属性名
            String pn = propertyName(matcher.group(1));
            // 生成属性判断以及返回语句,示例如下:
            // if( $2.equals("name") ) { return ($w).w.getName(); }
            c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
            pts.put(pn, method.getReturnType());
        }
        // 匹配以 is/has/can 开头的方法
        else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            String pn = propertyName(matcher.group(1));
            // 生成属性判断以及返回语句,示例如下:
            // if( $2.equals("dream") ) { return ($w).w.hasDream(); }
            c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");
            pts.put(pn, method.getReturnType());
        }
        // 匹配以 set 开头的方法
        else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
            Class<?> pt = method.getParameterTypes()[0];
            String pn = propertyName(matcher.group(1));
            // 生成属性判断以及 setter 调用语句,示例如下:
            // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
            c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");
            pts.put(pn, pt);
        }
    }
    // 添加 NoSuchPropertyException 异常抛出代码
    c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
    c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");

    // make class
    long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
    // 创建类生成器
    ClassGenerator cc = ClassGenerator.newInstance(cl);
    // 设置类名及超类
    cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
    cc.setSuperClass(Wrapper.class);

    // 添加默认构造方法
    cc.addDefaultConstructor();
    // 添加字段
    cc.addField("public static String[] pns;"); // property name array.
    cc.addField("public static " + Map.class.getName() + " pts;"); // property type map.
    cc.addField("public static String[] mns;"); // all method name array.
    cc.addField("public static String[] dmns;"); // declared method name array.
    for (int i = 0, len = ms.size(); i < len; i++) {
        cc.addField("public static Class[] mts" + i + ";");
    }

    // 添加方法代码
    cc.addMethod("public String[] getPropertyNames(){ return pns; }");
    cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
    cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
    cc.addMethod("public String[] getMethodNames(){ return mns; }");
    cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
    cc.addMethod(c1.toString());
    cc.addMethod(c2.toString());
    cc.addMethod(c3.toString());

    try {
        // 生成类
        Class<?> wc = cc.toClass();
        // setup static field.
        // 设置字段值
        wc.getField("pts").set(null, pts);
        wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
        wc.getField("mns").set(null, mns.toArray(new String[0]));
        wc.getField("dmns").set(null, dmns.toArray(new String[0]));
        int ix = 0;
        for (Method m : ms.values()) {
            wc.getField("mts" + ix++).set(null, m.getParameterTypes());
        }
        // 创建 Wrapper 实例
        return (Wrapper) wc.newInstance();
    } catch (RuntimeException e) {
        throw e;
    } catch (Throwable e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        cc.release();
        ms.clear();
        mns.clear();
        dmns.clear();
    }
}

例如源码demo中的DemoService生成的wrapper类如下:

public class Wrapper0
extends Wrapper
implements ClassGenerator.DC {
    public static String[] pns;
    public static Map pts;
    public static String[] mns;
    public static String[] dmns;
    public static Class[] mts0;

    public String[] getPropertyNames() {
        return pns;
    }

    public boolean hasProperty(String string) {
        return pts.containsKey(string);
    }

    public Class getPropertyType(String string) {
        return (Class)pts.get(string);
    }

    public String[] getMethodNames() {
        return mns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public void setPropertyValue(Object object, String string, Object object2) {
        try {
            DemoService demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.DemoService.").toString());
    }

    public Object getPropertyValue(Object object, String string) {
        try {
            DemoService demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.DemoService.").toString());
    }

    public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
        DemoService demoService;
        try {
            demoService = (DemoService)object;
        }
        catch (Throwable throwable) {
            throw new IllegalArgumentException(throwable);
        }
        try {
            if ("sayHello".equals(string) && arrclass.length == 1) {
                return demoService.sayHello((String)arrobject[0]);
            }
        }
        catch (Throwable throwable) {
            throw new InvocationTargetException(throwable);
        }
        throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.DemoService.").toString());
    }
}

2.导出服务到本地

根据执行顺序先来看看exportLocal

private void exportLocal(URL url) {
    // 设置协议头为 injvm
    URL local = URLBuilder.from(url)
            .setProtocol(LOCAL_PROTOCOL)
            .setHost(LOCALHOST_VALUE)
            .setPort(0)
            .build();
    // 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
    Exporter<?> exporter = protocol.export(
            PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
    exporters.add(exporter);
    logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}

调用的是InjvmProtocol的实现,把生成的invoker保存在AbstractExporter中,当调用invoker的doInvoke方法时会通过wrapper类调用具体实现类的方法

3.导出服务到远程

Invoker 是由 ProxyFactory 创建而来。ProxyFactory有多个实现类,那么这里使用的是哪个实现类呢?答案就在前文的创建自适应类中,查看前文生成的ProxyFactory自适应类,发现是根据URL的proxy参数获取对应的实现类,没有配置则默认取JavassistProxyFactory。下面到 JavassistProxyFactory 代码中,探索 Invoker 的创建过程。

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    // 为目标类创建 Wrapper
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 创建匿名 Invoker 类对象,并实现 doInvoke 方法
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

这里依然是通过getWrapper生成代理类,然后再使用AbstractProxyInvoker包装一下,AbstractProxyInvoker也就是直接调用代理类的invokeMethod方法。
回到doExportUrlsFor1Protocol中,接下来是导出服务到远程

Exporter<?> exporter = protocol.export(wrapperInvoker);

其中这里用到了前面提到的protocol的自适应生成类,生成类export方法代码例子如下

public Exporter export(Invoker invoker) throws RpcException {
    String string;
    if (invoker == null) {
        throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    }
    if (invoker.getUrl() == null) {
        throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
    }
    URL uRL = invoker.getUrl();
    String string2 = string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
    if (string == null) {
        throw new IllegalStateException(new StringBuffer().append("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (").append(uRL.toString()).append(") use keys([protocol])").toString());
    }
    Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
    return protocol.export(invoker);
}

从这里看出会根据wrapperInvoker中的url属性的protocol元素路由到具体的实现类。而在前面的loadRegistries方法会把url的protocol设置为registry。所以接下来就到了RegistryProtocol的export方法进行远程发布,接下来就看这个export方法(PS:这里从自适应类之后是先到ProtocolListenerWrapper然后到ProtocolFilterWrapper,因为它们两是protocol这个类的包装类,但是在provider端export方法什么也不会做,所以这里直接分析RegistryProtocol的export)。

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
    // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
    URL registryUrl = getRegistryUrl(originInvoker);
    // url to export locally
    // 获取已注册的服务提供者 URL,比如:
    // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    //  the same service. Because the subscribed is cached key with the name of the service, it causes the
    //  subscription information to cover.
    // 获取订阅 URL,比如:
    // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    // 向注册中心进行订阅 override 数据
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    // 创建监听器
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    // 根据配置修改providerUrl,
    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}

先来看下doLocalExport方法

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    String key = getCacheKey(originInvoker);

    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        // 调用 protocol 的 export 方法导出服务
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}

假设运行时协议为 dubbo,此处的 protocol 变量会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。所以,接下来目光转移到 DubboProtocol 的 export 方法上,相关分析如下

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
    // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
    String key = serviceKey(url);
    // 创建 DubboExporter
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    // 将 <key, exporter> 键值对放入缓存中
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    // 本地存根相关代码
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    // 启动服务器
    openServer(url);
    // 优化序列化
    optimizeSerialization(url);

    return exporter;
}

重点看看openServer方法,别的逻辑不理解也不影响理解服务导出的流程

private void openServer(URL url) {
    // find server.
    // 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
    String key = url.getAddress();
    //client can export a service which's only for server to invoke
    boolean isServer = url.getParameter(IS_SERVER_KEY, true);
    if (isServer) {
        // 访问缓存
        ExchangeServer server = serverMap.get(key);
        if (server == null) {
            synchronized (this) {
                server = serverMap.get(key);
                if (server == null) {
                    // 创建服务器实例
                    serverMap.put(key, createServer(url));
                }
            }
        } else {
            // server supports reset, use together with override
            // 服务器已创建,则根据 url 中的配置重置服务器
            server.reset(url);
        }
    }
}

在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。接下来看看createServer。

private ExchangeServer createServer(URL url) {
    url = URLBuilder.from(url)
            // send readonly event when server closes, it's enabled by default
            .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
            // enable heartbeat by default
            // 添加心跳检测配置到 url 中
            .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
            // 添加编码解码器参数
            .addParameter(CODEC_KEY, DubboCodec.NAME)
            .build();
    // 获取 server 参数,默认为 netty
    String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

    // 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    ExchangeServer server;
    try {
        // 创建 ExchangeServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    // 获取 client 参数,可指定 netty,mina
    str = url.getParameter(CLIENT_KEY);
    if (str != null && str.length() > 0) {
        // 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
        // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }

    return server;
}

createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。
接下来继续看bind方法

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    // 获取 Exchanger,默认为 HeaderExchanger。
    // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).bind(url, handler);
}

这里又是通过dubbo SPI加载Exchanger的实现类,默认实现是HeaderExchanger。接下来继续看HeaderExchanger的bind方法

@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

这里只有一行代码,但是包含了三个逻辑:

  • new HeaderExchangeHandler(handler),
  • new DecodeHandler(new HeaderExchangeHandler(handler))
  • Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
    仅需关心 Transporters 的 bind 方法逻辑。
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handlers == null || handlers.length == 0) {
        throw new IllegalArgumentException("handlers == null");
    }
    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        // 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
        handler = new ChannelHandlerDispatcher(handlers);
    }
    // 获取自适应 Transporter 实例,并调用实例方法
    return getTransporter().bind(url, handler);
}

自适应拓展加载 Transporter,默认为 NettyTransporter

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    // 创建 NettyServer
    return new NettyServer(url, listener);
}

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

接下来看看ChannelHandlers.wrap,这个方法很有意思,是dubbo的线程派发模型的实现。

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        // 一层又一层的装饰器,只关注最里面的dispatch方法
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

这里默认的自适应类是AllDispatcher,在后面的服务调用过程,将会详细讲解一下。回到NettyServer,这里调用了父类AbstractServer的构造方法。

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();

    // 获取 ip 和端口
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        // 设置 ip 为 0.0.0.0
        bindIp = ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    // 获取最大可接受连接数
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    try {
        // 调用模板方法 doOpen 启动服务器
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

doOpen方法由子类实现,下面回到nettyservice中

protected void doOpen() throws Throwable {
    // 创建 ServerBootstrap
    bootstrap = new ServerBootstrap();

    // 创建 boss 和 worker 线程池
    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    // 设置 PipelineFactory
    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    // 绑定到指定的 ip 和端口上
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

到此就已经完成了在指定的端口开启netty服务的过程,以及配置了一系列层层包装的ChannelHandler。
回到RegistryProtocol的export,继续看服务注册和数据订阅的逻辑

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
     .......
    // url to registry
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = providerUrl.getParameter(REGISTER_KEY, true);
    if (register) {
        // 注册服务
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}
public void register(URL registryUrl, URL registeredProviderUrl) {
    // 获取 Registry
    Registry registry = registryFactory.getRegistry(registryUrl);
    // 注册服务
    registry.register(registeredProviderUrl);
}

以 Zookeeper 注册中心为例进行分析。下面先来看一下 getRegistry 方法的源码,这个方法由 ZookeeperRegistryFactory 的父类 AbstractRegistryFactory 实现。

public Registry getRegistry(URL url) {
    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(EXPORT_KEY, REFER_KEY)
            .build();
    String key = url.toServiceStringWithoutResolving();
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        // 访问缓存
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        // 缓存未命中,创建 Registry 实例
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        // 写入缓存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

接下来去ZookeeperRegistryFactory中继续看createRegistry方法

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 获取组名,默认为 dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            // group = "/" + group
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
        zkClient = zookeeperTransporter.connect(url);
        // 添加状态监听器
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }

    // zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
    private ZookeeperTransporter zookeeperTransporter;

    /**
     * Invisible injection of zookeeper client via IOC/SPI
     * @param zookeeperTransporter
     */
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        // 创建 ZookeeperRegistry
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

这里提一下recover()方法,在重连zookeeper后会做两个动作。

  • 1.添加所有的注册连接到失败连接集合中,并且创建定时timer默认5秒中后重新建立连接,建立成功则从失败连接集合中删除。
  • 2.通知directory,刷新配置,通知成功则从失败集合删除。

接下来看看zookeeperTransporter.connect方法怎么获取zkclient的

public ZookeeperClient connect(URL url) {
    ZookeeperClient zookeeperClient;
    List<String> addressList = getURLBackupAddress(url);
    // The field define the zookeeper server , including protocol, host, port, username, password
    // 根据连接地址从缓存获取zkclient连接
    if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
        logger.info("find valid zookeeper client from the cache for address: " + url);
        return zookeeperClient;
    }
    // avoid creating too many connections, so add lock
    synchronized (zookeeperClientMap) {
        if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
            logger.info("find valid zookeeper client from the cache for address: " + url);
            return zookeeperClient;
        }
        
        // 缓存中获取不到时新建
        zookeeperClient = createZookeeperClient(toClientURL(url));
        logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
        // 写进缓存
        writeToClientMap(addressList, zookeeperClient);
    }
    return zookeeperClient;
}

其中createZookeeperClient由子类实现,继续看CuratorZookeeperClient的createZookeeperClient方法

public ZookeeperClient createZookeeperClient(URL url) {
    return new CuratorZookeeperClient(url);
}

public CuratorZookeeperClient(URL url) {
    super(url);
    try {
        int timeout = url.getParameter(TIMEOUT_KEY, 5000);
        // 创建 CuratorFramework 构造器
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(url.getBackupAddress())
                .retryPolicy(new RetryNTimes(1, 1000))
                .connectionTimeoutMs(timeout);
        String authority = url.getAuthority();
        if (authority != null && authority.length() > 0) {
            builder = builder.authorization("digest", authority.getBytes());
        }
        // 构建 CuratorFramework 实例
        client = builder.build();
        // 添加监听器
        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState state) {
                if (state == ConnectionState.LOST) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                } else if (state == ConnectionState.CONNECTED) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                } else if (state == ConnectionState.RECONNECTED) {
                    CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                }
            }
        });
        // 启动客户端
        client.start();
        boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
        if (!connected) {
            throw new IllegalStateException("zookeeper not connected");
        }
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
}

获取到Registry对象后就是调用registry.register正式的注册服务,继续看FailbackRegistry(ZookeeperRegistry的父类)的register方法

public void register(URL url) {
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        // 模板方法,由子类实现
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        // 获取 check 参数,若 check = true 将会直接抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        // 记录注册失败的链接
        addFailedRegistered(url);
    }
}

接着看doRegister,回到 FailbackRegistry 子类 ZookeeperRegistry 中

protected void doRegister(URL url) {
    try {
        // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
        //   /${group}/${serviceInterface}/providers/${url}
        // 比如
        //   /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register...");
    }
}

到此,dubbo服务就把自己的信息通过zkClient注册到了特定的路径下,并且注册的是zk的临时节点。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容