下面我们来说一下Dubbo的Merger实现。在开发中,有这么一种情况,先定义了一个UserService接口,有UserServiceImpl和CategoryUserServiceImpl两种实现,它们又分别属于user和category两个组,consumer将调用这两个服务,并按照自定义策略合并返回结果,作为最终结果。这就需要Dubbo的Merger来实现了。我们先来看一下MergeableClusterInvoker的invoke方法
public Result invoke(final Invocation invocation) throws RpcException {
// 获得 Invoker 集合
List<Invoker<T>> invokers = directory.list(invocation);
// 获得 Merger 拓展名
String merger = getUrl().getMethodParameter(invocation.getMethodName(), Constants.MERGER_KEY);
// 若果未配置拓展,直接调用首个可用的 Invoker 对象
if (ConfigUtils.isEmpty(merger)) { // If a method doesn't have a merger, only invoke one Group
for (final Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
return invoker.invoke(invocation);
}
}
return invokers.iterator().next().invoke(invocation);
}
// 通过反射,获得返回类型
Class<?> returnType;
try {
returnType = getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes()).getReturnType();
} catch (NoSuchMethodException e) {
returnType = null;
}
// 提交线程池,并行执行,发起 RPC 调用,并添加到 results 中
Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
public Result call() {
// RPC 调用
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
}
// 阻塞等待执行执行结果,并添加到 resultList 中
List<Result> resultList = new ArrayList<Result>(results.size());
int timeout = getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
if (r.hasException()) { // 异常 Result ,打印错误日志,忽略
log.error(new StringBuilder(32).append("Invoke ").append(getGroupDescFromServiceKey(entry.getKey())).append(" failed: ").append(r.getException().getMessage()).toString(), r.getException());
} else { // 正常 Result ,添加到 resultList 中
resultList.add(r);
}
} catch (Exception e) { // 异常,抛出 RpcException 异常
throw new RpcException(new StringBuilder(32).append("Failed to invoke service ").append(entry.getKey()).append(": ").append(e.getMessage()).toString(), e);
}
}
// 结果大小为空,返回空的 RpcResult
if (resultList.isEmpty()) {
return new RpcResult((Object) null);
// 结果大小为 1 ,返回首个 RpcResult
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}
// 返回类型为 void ,返回空的 RpcResult
if (returnType == void.class) {
return new RpcResult((Object) null);
}
Object result;
// 【第 1 种】基于合并方法
if (merger.startsWith(".")) {
// 获得合并方法 Method
merger = merger.substring(1);
Method method;
try {
method = returnType.getMethod(merger, returnType);
} catch (NoSuchMethodException e) {
throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
}
// 有 Method ,进行合并
if (method != null) {
if (!Modifier.isPublic(method.getModifiers())) {
method.setAccessible(true);
}
result = resultList.remove(0).getValue();
try {
// 方法返回类型匹配,合并时,修改 result
if (method.getReturnType() != void.class && method.getReturnType().isAssignableFrom(result.getClass())) {
for (Result r : resultList) {
result = method.invoke(result, r.getValue());
}
// 方法返回类型不匹配,合并时,不修改 result
} else {
for (Result r : resultList) {
method.invoke(result, r.getValue());
}
}
} catch (Exception e) {
throw new RpcException(new StringBuilder(32).append("Can not merge result: ").append(e.getMessage()).toString(), e);
}
// 无 Method ,抛出 RpcException 异常
} else {
throw new RpcException(new StringBuilder(32).append("Can not merge result because missing method [ ").append(merger).append(" ] in class [ ").append(returnType.getClass().getName()).append(" ]").toString());
}
// 【第 2 种】基于 Merger
} else {
Merger resultMerger;
// 【第 2.1 种】根据返回值类型自动匹配 Merger
if (ConfigUtils.isDefault(merger)) {
resultMerger = MergerFactory.getMerger(returnType);
// 【第 2.2 种】指定 Merger
} else {
resultMerger = ExtensionLoader.getExtensionLoader(Merger.class).getExtension(merger);
}
// 有 Merger ,进行合并
if (resultMerger != null) {
List<Object> rets = new ArrayList<Object>(resultList.size());
for (Result r : resultList) {
rets.add(r.getValue());
}
result = resultMerger.merge(rets.toArray((Object[]) Array.newInstance(returnType, 0)));
// 无 Merger ,抛出 RpcException 异常
} else {
throw new RpcException("There is no merger to merge result.");
}
}
// 返回 RpcResult 结果
return new RpcResult(result);
}
调用了Merger进行结果的合并处理。我们再来看一个类MergerFactory,这个类是生产Merger的工厂类
public static <T> Merger<T> getMerger(Class<T> returnType) {
Merger result;
// 数组类型
if (returnType.isArray()) {
Class type = returnType.getComponentType();
// 从缓存中获得 Merger 对象
result = mergerCache.get(type);
if (result == null) {
loadMergers();
result = mergerCache.get(type);
}
// 获取不到,使用 ArrayMerger
if (result == null && !type.isPrimitive()) {
result = ArrayMerger.INSTANCE;
}
// 普通类型
} else {
// 从缓存中获得 Merger 对象
result = mergerCache.get(returnType);
if (result == null) {
loadMergers();
result = mergerCache.get(returnType);
}
}
return result;
}
我们再来看一个类ArrayMerger,这个类是数组结果合并的处理类,实现了merge方法
if (others.length == 0) {
return null;
}
int totalLen = 0;
for (int i = 0; i < others.length; i++) {
Object item = others[i];
if (item != null && item.getClass().isArray()) {
totalLen += Array.getLength(item);
} else {
throw new IllegalArgumentException(
new StringBuilder(32).append(i + 1)
.append("th argument is not an array").toString());
}
}
if (totalLen == 0) {
return null;
}
Class<?> type = others[0].getClass().getComponentType();
Object result = Array.newInstance(type, totalLen);
int index = 0;
for (Object array : others) {
for (int i = 0; i < Array.getLength(array); i++) {
Array.set(result, index++, Array.get(array, i));
}
}
return (Object[]) result;
再来看一个类MapMerger,这个类是Map结果合并的处理类
if (items.length == 0) {
return null;
}
// 创建结果 Map
Map<Object, Object> result = new HashMap<Object, Object>();
// 合并多个 Map
for (Map<?, ?> item : items) {
if (item != null) {
result.putAll(item);
}
}
return result;
还有很多数据类型的处理类,比如BooleanArrayMerger,ByteArrayMerger,CharArrayMerger等,就不在这里具体说了。
通过Merger实现了对多个返回结果的处理。
Dubbo的Merger机制就介绍到这里了。