14. dubbo源码-集群容错之MergeableCluster

简介

在dubbo官方的用户手册中,提到了使用MergeableCluster的场景--分组聚合:

按组合并返回结果 ,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费者需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。

功能示意图如下:


merger示意图

用法

定义菜单接口方式:

/**
 * @author wangzhenfei9
 * @version 1.0.0
 * @since 2018年05月25日
 */
public interface MenuService {

    List<Menu> getMenuList(String traceId);

}

这个接口有两个实现类:HotMenuServiceImplColdMenuServiceImpl;前者返回结果为:[{"id":1,"name":"青椒炒肉"},{"id":2,"name":"剁椒鱼头"},{"id":3,"name":"口味虾"}],后者返回结果为:[{"id":101,"name":"凉拌黄瓜"},{"id":102,"name":"凉拌木耳"}]

Provider暴露服务--一个服务属于group-hot,一个服务属于group-cold

<dubbo:service interface="com.alibaba.dubbo.demo.MenuService" ref="hotMenuServiceImpl"  group="group-hot"/>
<dubbo:service interface="com.alibaba.dubbo.demo.MenuService" ref="coldMenuServiceImpl" group="group-cold"/>

笔者测试时启动了两个Provider,所以总计有四个服务,dubbo-monitor监控显示如下:


dubbo-monitor服务列表

Consumer调用服务:

<dubbo:reference id="menuService" interface="com.alibaba.dubbo.demo.MenuService"
                 retries="0" timeout="1800000" merger="list" group="*" cluster="mergeable"/>

几个重要的配置说明:

  • merger: merger="list"指定merge方式,可以自定义,也可以指定com.alibaba.dubbo.rpc.cluster.Merger文件中申明的方式;
  • group: group="*"表示调用接口com.alibaba.dubbo.demo.MenuService所有的分组服务,由于只有group-hot和group-cold两个分组,这里也可以配置为group="group-hot,group-cold";
  • cluster: cluster="mergeable"即指定集群容错模式为MergeableCluster模式,也就是本文分析的模式;
  • timeout: timeout="1800000"即超时时间,之所以设置这么大,是为了debug源码过程中不会发生超时,此配置不适用于生产环境;

com.alibaba.dubbo.rpc.cluster.Merger文件内容如下:

list=com.alibaba.dubbo.rpc.cluster.merger.ListMerger
set=com.alibaba.dubbo.rpc.cluster.merger.SetMerger
map=com.alibaba.dubbo.rpc.cluster.merger.MapMerger
byte=com.alibaba.dubbo.rpc.cluster.merger.ByteArrayMerger
char=com.alibaba.dubbo.rpc.cluster.merger.CharArrayMerger
short=com.alibaba.dubbo.rpc.cluster.merger.ShortArrayMerger
int=com.alibaba.dubbo.rpc.cluster.merger.IntArrayMerger
long=com.alibaba.dubbo.rpc.cluster.merger.LongArrayMerger
float=com.alibaba.dubbo.rpc.cluster.merger.FloatArrayMerger
double=com.alibaba.dubbo.rpc.cluster.merger.DoubleArrayMerger
boolean=com.alibaba.dubbo.rpc.cluster.merger.BooleanArrayMerger

这里需要指出的一点,dubbo官方在2017年11月份对这个文件有过修改,修改记录请戳:合并结果问题,应该是笔误,建议修复,修改内容如下图所示,所以老版本和新版本的merger="list"效果不一样:

Merger bug fix

  • 运行结果
    [{"id":101,"name":"凉拌黄瓜"},{"id":102,"name":"凉拌木耳"},{"id":1,"name":"青椒炒肉"},{"id":2,"name":"剁椒鱼头"},{"id":3,"name":"口味虾"}],从运行结果可以看出,合并了HotMenuServiceImplColdMenuServiceImpl两个不同group服务的结果;

源码分析

核心源码在MergeableClusterInvoker.java中,源码如下所示:

@Override
@SuppressWarnings("rawtypes")
public Result invoke(final Invocation invocation) throws RpcException {
    // 拿到可用的Invoker集合
    List<Invoker<T>> invokers = directory.list(invocation);

    // 得到配置的merger参数值
    String merger = getUrl().getMethodParameter( invocation.getMethodName(), Constants.MERGER_KEY );
    // 如果方法不需要Merge,退化为只调一个group即可--选择第一个有效的Invoker调用并返回结果
    if ( ConfigUtils.isEmpty(merger) ) {
        for(final Invoker<T> invoker : invokers ) {
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        // 如果没有任意Invoker满足isAvailable(), 那么尝试调用第一个Invoker(多尝试一下, 多一次机会)
        return invokers.iterator().next().invoke(invocation);
    }
    // 得到方法的返回类型
    Class<?> returnType;
    try {
        returnType = getInterface().getMethod(
                invocation.getMethodName(), invocation.getParameterTypes() ).getReturnType();
    } catch ( NoSuchMethodException e ) {
        returnType = null;
    }

    // 由于我们调用的服务, 有两个不同的group, 且没有申明version, 所以这个map的key有两个值
    Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
    for( final Invoker<T> invoker : invokers ) {
        // 线程池方法异步调用
        Future<Result> future = executor.submit( new Callable<Result>() {
            public Result call() throws Exception {
                return invoker.invoke(new RpcInvocation(invocation, invoker));
            }
        } );
        // serviceKey非常重要--serviceKey的值为: groupName/serviceInterface:version,
        // 如果version没有申明, serviceKey的值为: groupName/serviceInterface
        // 如果group没有申明, serviceKey的值为: serviceInterface:version
        // 如果version和group都没有申明, serviceKey的值为: serviceInterface
        results.put( invoker.getUrl().getServiceKey(), future );
    }

    Object result = null;

    // 保存异步执行结果集合
    List<Result> resultList = new ArrayList<Result>( results.size() );
    
    int timeout = getUrl().getMethodParameter( invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT );
    for ( Map.Entry<String, Future<Result>> entry : results.entrySet() ) {
        Future<Result> future = entry.getValue();
        try {
            Result r = future.get(timeout, TimeUnit.MILLISECONDS);
            // 如果异步执行有异常(包括超时), 那么输出error级别的日志, 不影响最终的结果(只是部分数据缺失)
            if (r.hasException()) {
                log.error(new StringBuilder(32).append("Invoke ")
                              .append(getGroupDescFromServiceKey(entry.getKey()))
                              .append(" failed: ")
                              .append(r.getException().getMessage()).toString(),
                          r.getException());
            } else {
                resultList.add(r);
            }
        } catch ( Exception e ) {
            throw new RpcException( new StringBuilder( 32 )
                                            .append( "Failed to invoke service " )
                                            .append( entry.getKey() )
                                            .append( ": " )
                                            .append( e.getMessage() ).toString(),
                                    e );
        }
    }
    
    if (resultList.size() == 0) {
        // 如果没有结果, 那么new一个result为null的RpcResult返回即可
        return new RpcResult((Object)null);
    } else if (resultList.size() == 1) {
        // 如果只有一个结果, 那么直接返回即可
        return resultList.iterator().next();
    }
    // 如果返回类型为void, 那么new一个result为null的RpcResult返回即可
    if (returnType == void.class) {
        return new RpcResult((Object)null);
    }
    // 如果merger的值是以.开头, 例如merger=".addAll", 这段逻辑就是调用结果类型的原生方法, 例如服务的返回结果是List<Menu>,即list类型,那么merger=".addAll"就是调用List集合的.addAll()。
    if ( merger.startsWith(".") ) {
        merger = merger.substring(1);
        Method method;
        try {
            // 首先得到调用方法,如果方法不存在,则抛出异常
            method = returnType.getMethod( merger, returnType );
        } catch ( NoSuchMethodException e ) {
            throw new RpcException(... ...);
        }
        if ( method != null ) {
            if ( !Modifier.isPublic( method.getModifiers() ) ) {
                method.setAccessible( true );
            }
            // 先取得第一个结果
            result = resultList.remove( 0 ).getValue();
            try {
                // 如果merger=".addAll"指定的方法返回类型不为void,且和dubbo服务接口方法返回类型是相同类型,以测试代码为例,.addAll()返回类型是boolean,而dubbo服务接口方法返回类型是List,所以这里的if条件分支为false
                if ( method.getReturnType() != void.class
                        && method.getReturnType().isAssignableFrom( result.getClass() ) ) {
                    // 遍历剩余的结果集
                    for ( Result r : resultList ) {
                        // 根据配置的merger值,例如merger=".addAll",依次对剩余结果集调用addAll()方法
                        result = method.invoke( result, r.getValue() );
                    }
                } else {
                    // 遍历剩余的结果集
                    for ( Result r : resultList ) {
                        // 根据配置的merger值,例如merger=".addAll",依次对剩余结果集调用addAll()方法
                        method.invoke( result, r.getValue() );
                    }
                }
            } catch ( Exception e ) {
                throw new RpcException( 
                        new StringBuilder( 32 )
                                .append( "Can not merge result: " )
                                .append( e.getMessage() ).toString(), 
                        e );
            }
        } else {
            throw new RpcException(... ...);
        }
    } else {
        // 如果merger申明为不以.开头, 例如merger="list"
        Merger resultMerger;
        // true和default都是默认值(大小写不敏感)
        if (ConfigUtils.isDefault(merger)) {
            resultMerger = MergerFactory.getMerger(returnType);
        } else {
            // 如果merger配置的是com.alibaba.dubbo.rpc.cluster.Merger文件中申明的值, 例如merger="list", 或者merger="map"
            resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
        }
        if (resultMerger != null) {
            List<Object> rets = new ArrayList<Object>(resultList.size());
            for(Result r : resultList) {
                rets.add(r.getValue());
            }
            // 根据不用的merger实现, 合并结果
            result = resultMerger.merge(
                    rets.toArray((Object[])Array.newInstance(returnType, 0)));
        } else {
            // 如果申明一些未知的merger, 那么抛出异常
            throw new RpcException( "There is no merger to merge result." );
        }
    }
    return new RpcResult( result );
}

这一段代码还是很有借鉴意义的,比如支付宝获取支付方式(支付方式有多种,例如余额,红包,优惠券等),假设每种支付方式需要通过实时调用远程服务获取可用性,就可以模拟这种方式进行调用,美滋滋_

注意

在条件分支if ( merger.startsWith(".") ) {}中,有一段逻辑:method = returnType.getMethod( merger, returnType );,即从dubbo服务接口方法返回类型即java.util.List中查找merger配置的方法,例如.addAll,我们先看一下debug过程各变量的值:

debug过程中变量的值

dubbo源码中method = returnType.getMethod( merger, returnType );调用Method method = getMethod0(name, parameterTypes, true);,再调用Method res = privateGetMethodRecursive(name, parameterTypes, includeStaticMethods, interfaceCandidates);,最后调用searchMethods(privateGetDeclaredMethods(true), name, parameterTypes)),得到最后方法匹配的核心逻辑如下:

private static Method searchMethods(Method[] methods, String name, Class<?>[] parameterTypes)
{
    Method res = null;
    String internedName = name.intern();
    for (int i = 0; i < methods.length; i++) {
        Method m = methods[i];
        if (m.getName() == internedName
            && arrayContentsEq(parameterTypes, m.getParameterTypes())
            && (res == null
                || res.getReturnType().isAssignableFrom(m.getReturnType())))
            res = m;
    }

    return (res == null ? res : getReflectionFactory().copyMethod(res));
}


private static boolean arrayContentsEq(Object[] a1, Object[] a2) {
    if (a1 == null) {
        return a2 == null || a2.length == 0;
    }

    if (a2 == null) {
        return a1.length == 0;
    }

    if (a1.length != a2.length) {
        return false;
    }

    for (int i = 0; i < a1.length; i++) {
        if (a1[i] != a2[i]) {
            return false;
        }
    }

    return true;
}

从searchMethods()源码可知,方法匹配需要满足几个条件:

  1. 方法名一样,即m.getName() == internedName。配置的是merger=".addAll",而List中也有addAll方法,这个条件符合;
  2. 寻找的方法参数类型和dubbo服务接口方法的返回类型完全一致(不能是继承完全),即arrayContentsEq(parameterTypes, m.getParameterTypes())。List中.addAll()方法参数类型是Collection(boolean addAll(Collection<? extends E> c);),而dubbo服务接口方法的返回类型是List类型,虽然List继承自Collection,但是并不等于,即arrayContentsEq()返回的还是false;

由上面的分析可知,如果要merger=".addAll"能够正常工作,那么只需要将dubbo服务的返回类型改成Collection即可,例如:

Collection<Menu> getMenuList(String traceId);

自定义merger实现

如果com.alibaba.dubbo.rpc.cluster.Merger文件集中方法无法满足需求,需要自定义实现,那么还是和dubbo其他扩展实现一样,依赖SPI。只需要一下几步实现即可:

  1. step1
    在consumer侧的resources/META-INF/dubbo目录下,创建名为com.alibaba.dubbo.rpc.cluster.Merger的文件,且内容为:
    afei=com.afei.consumer.merger.AfeiMerger
  2. step2
    实现AfeiMerger,参考dubbo源码中若干Merger.java的实现类即可,例如:
public class AfeiMerger implements Merger<List<?>> {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    private static final int TOP_COUNT = 3;

    /**
     * 只随机取合并后的三个结果
     */
    public List<Object> merge(List<?>... items) {
        List<Object> result = new ArrayList<Object>();
        for (List<?> item : items) {
            if (item != null) {
                result.addAll(item);
            }
        }

        List<Integer> randList = new ArrayList<Integer>();
        for (int i=0; i<result.size(); i++) {
            randList.add(i);
        }
        log.info("before shuffle: "+randList);
        Collections.shuffle(randList);
        log.info("after shuffle: "+randList);

        int resultSize = TOP_COUNT > result.size()?result.size() : TOP_COUNT;
        List<Object> finalResult = new ArrayList<Object>(resultSize);
        for(int i=0; i<resultSize; i++){
            finalResult.add(result.get(randList.get(i)));
        }
        return finalResult;
    }

}
  1. step3
    最后一步非常简单,<dubbo:reference/>中配置merger="afei"即可,这个merger的值,对应step1文件内容中的key;
<dubbo:reference id="menuService" interface="com.alibaba.dubbo.demo.MenuService"
                 retries="0" timeout="1800000" merger="afei" group="group-hot,group-cold" cluster="mergeable"/>
  1. step4
    just run it。 多运行几次,可以看到不到的结果,即达到了随机取3个结果的目的:
[{"id":101,"name":"凉拌黄瓜"},{"id":102,"name":"凉拌木耳"},{"id":2,"name":"剁椒鱼头"}]
[{"id":101,"name":"凉拌黄瓜"},{"id":3,"name":"口味虾"},{"id":2,"name":"剁椒鱼头"}]
[{"id":2,"name":"剁椒鱼头"},{"id":102,"name":"凉拌木耳"},{"id":3,"name":"口味虾"}]
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容