Dubbo源码分析(十五) Merger实现

下面我们来说一下Dubbo的Merger实现。在开发中,有这么一种情况,先定义了一个UserService接口,有UserServiceImpl和CategoryUserServiceImpl两种实现,它们又分别属于user和category两个组,consumer将调用这两个服务,并按照自定义策略合并返回结果,作为最终结果。这就需要Dubbo的Merger来实现了。我们先来看一下MergeableClusterInvoker的invoke方法

public Result invoke(final Invocation invocation) throws RpcException {
    // 获得 Invoker 集合
    List<Invoker<T>> invokers = directory.list(invocation);
    // 获得 Merger 拓展名
    String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
    // 若果未配置拓展,直接调用首个可用的 Invoker 对象
    if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
        for (final Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                return invoker.invoke(invocation);
            }
        }
        return invokers.iterator().next().invoke(invocation);
    }

    // 通过反射,获得返回类型
    Class<?> returnType;
    try {
        returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
    } catch (NoSuchMethodException e) {
        returnType = null;
    }

    // 提交线程池,并行执行,发起 RPC 调用,并添加到 results 中
    Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
    for (final Invoker<T> invoker : invokers) {
        Future<Result> future = executor.submit(new Callable<Result>() {
            public Result call() {
                // RPC 调用
                return invoker.invoke(new RpcInvocation(invocation, invoker));
            }
        });
        results.put(invoker.getUrl().getServiceKey(), future);
    }

    // 阻塞等待执行执行结果,并添加到 resultList 中
    List<Result> resultList = new ArrayList<Result>(results.size());
    int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
        Future<Result> future = entry.getValue();
        try {
            Result r = future.get(timeout, TimeUnit.MILLISECONDS);
            if (r.hasException()) { // 异常 Result ,打印错误日志,忽略
                log.error(new StringBuilder(32).append("Invoke ").append(getGroupDescFromServiceKey(entry.getKey())).append(" failed: ").append(r.getException().getMessage()).toString(), r.getException());
            } else { // 正常 Result ,添加到 resultList 中
                resultList.add(r);
            }
        } catch (Exception e) { // 异常,抛出 RpcException 异常
            throw new RpcException(new StringBuilder(32).append("Failed to invoke service ").append(entry.getKey()).append(": ").append(e.getMessage()).toString(), e);
        }
    }

    // 结果大小为空,返回空的 RpcResult
    if (resultList.isEmpty()) {
        return new RpcResult((Object) null);
    // 结果大小为 1 ,返回首个 RpcResult
    } else if (resultList.size() == 1) {
        return resultList.iterator().next();
    }
    // 返回类型为 void ,返回空的 RpcResult
    if (returnType == void.class) {
        return new RpcResult((Object) null);
    }

    Object result;
    // 【第 1 种】基于合并方法
    if (merger.startsWith(".")) {
        // 获得合并方法 Method
        merger = merger.substring(1);
        Method method;
        try {
            method = returnType.getMethod(merger, returnType);
        } catch (NoSuchMethodException e) {
            throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
        }
        // 有 Method ,进行合并
        if (method != null) {
            if (!Modifier.isPublic(method.getModifiers())) {
                method.setAccessible(true);
            }
            result = resultList.remove(0).getValue();
            try {
                // 方法返回类型匹配,合并时,修改 result
                if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) {
                    for (Result r : resultList) {
                        result = method.invoke(result, r.getValue());
                    }
                // 方法返回类型不匹配,合并时,不修改 result
                } else {
                    for (Result r : resultList) {
                        method.invoke(result, r.getValue());
                    }
                }
            } catch (Exception e) {
                throw new RpcException(new StringBuilder(32).append("Can not merge result: ").append(e.getMessage()).toString(), e);
            }
        // 无 Method ,抛出 RpcException 异常
        } else {
            throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
        }
    // 【第 2 种】基于 Merger
    } else {
        Merger resultMerger;
        // 【第 2.1 种】根据返回值类型自动匹配 Merger
        if (ConfigUtils.isDefault(merger)) {
            resultMerger = MergerFactory.getMerger(returnType);
        // 【第 2.2 种】指定 Merger
        } else {
            resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
        }
        // 有 Merger ,进行合并
        if (resultMerger != null) {
            List<Object> rets = new ArrayList<Object>(resultList.size());
            for (Result r : resultList) {
                rets.add(r.getValue());
            }
            result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
        // 无 Merger ,抛出 RpcException 异常
        } else {
            throw new RpcException("There is no merger to merge result.");
        }
    }
    // 返回 RpcResult 结果
    return new RpcResult(result);
}

调用了Merger进行结果的合并处理。我们再来看一个类MergerFactory,这个类是生产Merger的工厂类

 public static <T> Merger<T> getMerger(Class<T> returnType) {
    Merger result;
    // 数组类型
    if (returnType.isArray()) {
        Class type = returnType.getComponentType();
        // 从缓存中获得 Merger 对象
        result = mergerCache.get(type);
        if (result == null) {
            loadMergers();
            result = mergerCache.get(type);
        }
        // 获取不到,使用 ArrayMerger
        if (result == null && !type.isPrimitive()) {
            result = ArrayMerger.INSTANCE;
        }
    // 普通类型
    } else {
        // 从缓存中获得 Merger 对象
        result = mergerCache.get(returnType);
        if (result == null) {
            loadMergers();
            result = mergerCache.get(returnType);
        }
    }
    return result;
}

我们再来看一个类ArrayMerger,这个类是数组结果合并的处理类,实现了merge方法

if (others.length == 0) {
        return null;
    }
    int totalLen = 0;
    for (int i = 0; i < others.length; i++) {
        Object item = others[i];
        if (item != null && item.getClass().isArray()) {
            totalLen += Array.getLength(item);
        } else {
            throw new IllegalArgumentException(
                    new StringBuilder(32).append(i + 1)
                            .append("th argument is not an array").toString());
        }
    }

    if (totalLen == 0) {
        return null;
    }

    Class<?> type = others[0].getClass().getComponentType();

    Object result = Array.newInstance(type, totalLen);
    int index = 0;
    for (Object array : others) {
        for (int i = 0; i < Array.getLength(array); i++) {
            Array.set(result, index++, Array.get(array, i));
        }
    }
    return (Object[]) result;

再来看一个类MapMerger,这个类是Map结果合并的处理类

if (items.length == 0) {
        return null;
    }
    // 创建结果 Map
    Map<Object, Object> result = new HashMap<Object, Object>();
    // 合并多个 Map
    for (Map<?, ?> item : items) {
        if (item != null) {
            result.putAll(item);
        }
    }
    return result;

还有很多数据类型的处理类,比如BooleanArrayMerger,ByteArrayMerger,CharArrayMerger等,就不在这里具体说了。
通过Merger实现了对多个返回结果的处理。
Dubbo的Merger机制就介绍到这里了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容