Dubbo使用了CompletableFuture,实现了真异步

Dubbo在服务调用时支持同步调用和异步调用等方式。

在Dubbo2.6版本及之前的版本在实现异步调用时存在一定的缺点,实际上是一种假异步。

下面列举一个异步案例。

// 此方法应该返回Foo,但异步后会立刻返回NULL
fooService.findFoo(fooId);
// 立刻得到当前调用的Future实例,当发生新的调用时这个东西将会被覆盖
Future<Foo> fooFuture = RpcContext.getContext().getFuture();

// 调用另一个服务的方法
barService.findBar(barId);
// 立刻得到当前调用的Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
 
// 此时,两个服务的方法在并发执行
// 等待第一个调用完成,线程会进入Sleep状态,当调用完成后被唤醒。
Foo foo = fooFuture.get();
// 同上
Bar bar = barFuture.get();
// 假如第一个调用需要等待5秒,第二个等待6秒,则整个调用过程完成的时间是6秒。

当调用服务方法后,Dubbo会创建一个DefaultFuture,并将该Future存放到RpcContext中,在用户线程中,如果用户想获取调用结果时,会从RpcContext中获取该Future,并调用get方法,但是如果此时该服务仍没有处理完毕,则会出现阻塞,直到结果返回或调用超时为止。发生阻塞时,该方法的后续步骤则得不到执行。对于异步来说,这显然是不合理的。理想中的异步是如果服务没有处理好,会继续执行用户线程的后续方法,不会阻塞等待。

从Dubbo2.7开始,Dubbo的异步调用开始以CompletableFuture为基础进行实现

DubboInvoker是一个执行体,通过它可以发起远程调用。
在Dubbo2.6的远程调用中,部分代码如下所示(只保留了部分代码):

DubboInvoker类
protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        //忽略部分代码
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        //忽略部分代码
        //单向调用,无返回值
        if (isOneway) {
           boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
           currentClient.send(inv, isSent);
           RpcContext.getContext().setFuture(null);
           return new RpcResult();
        // 异步调用
        } else if (isAsync) {
           ResponseFuture future = currentClient.request(inv, timeout);
           RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
           return new RpcResult();
        // 同步调用
        } else {
           RpcContext.getContext().setFuture(null);
           return (Result) currentClient.request(inv, timeout).get();
        }     
}

在Dubbo2.6版本及之前的版本中,不管同步调用还是异步调用,都会调用HeaderExchangeClient.request方法,返回一个DefaultFuture对象,不同的点是:异步调用会将该future存放到RpcContext中,并先返回一个空的RpcResult结果。而同步调用不会将该future存放到RpcContext中,而是直接调用该future的get方法,阻塞等待调用结果。

HeaderExchangeChannel类 
public ResponseFuture request(Object request, int timeout) throws RemotingException {
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout); 
        channel.send(req);
        //忽略了部分代码
        return future;
}
DefaultFuture类(忽略了部分代码)
public Object get(int timeout) throws RemotingException {
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
        }
        return returnFromResponse();
}

当服务端处理完信息后,HeaderExchangeHandler会处理发送过来的Response,根据requestId获取对应的DefaultFuture对象,最终调用doReceived方法对结果赋值。利用AQS的条件锁机制,唤醒阻塞线程。

DefaultFuture类
private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
}

在Dubbo2.7版本中,对异步调用进行了改良,使用了CompletableFuture

Dubbo2.7异步调用的一个样例:

// 此调用会立即返回null
asyncService.sayHello("world");
// 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
CompletableFuture<String> helloFuture = RpcContext.getContext().getCompletableFuture();
// 为Future添加回调
helloFuture.whenComplete((retValue, exception) -> {
    if (exception == null) {
        System.out.println(retValue);
    } else {
        exception.printStackTrace();
    }
});

同样是DubboInvoker发起远程调用,在doInvoke方法中进行了改进:

DubboInvoker2.7.9版本
protected Result doInvoke(final Invocation invocation) throws Throwable {
     RpcInvocation inv = (RpcInvocation) invocation;
     final String methodName = RpcUtils.getMethodName(invocation);
     boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
     //单向调用
     if (isOneway) {
         boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
         currentClient.send(inv, isSent);
         return AsyncRpcResult.newDefaultAsyncResult(invocation);
      //同步调用和异步调用
      } else {
         ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);           FutureContext.getContext().setCompatibleFuture(appResponseFuture);
        AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
        result.setExecutor(executor);
        return result;
    } 
}

在Dubbo2.7版本中,DubboInvolnvoker对同步调用和异步调用进行了统一处理,封装成CompletableFuture,并以 AsyncRpcResult返回。

Dubbo2.7版本下HeaderExchangeChannel.request方法与2.6版本相差不大,只是DeafultFuture对象有一点不同,即后续版本继承了 CompletableFuture类。

对于同步调用和异步调用的处理交给AsyncToSyncInvoker类处理。

public Result invoke(Invocation invocation) throws RpcException {
        // 调用DubboInvoker等Invoker返回的调用结果
        Result asyncResult = invoker.invoke(invocation);
        try {
            // 如果是同步调用
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                // 不能使用CompletableFuture#get()方法,否则性能会出现严重下降。
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        }
        //忽略了部分代码
        return asyncResult;
    }

不同与Dubbo2.6版本,Dubbo2.7在处理服务端返回结果时放弃了AQS的条件锁机制,改用CompletableFuture类的complete方法去实现。

DefaultFuture类
private void doReceived(Response res) {
        //忽略部分代码
        if (res.getStatus() == Response.OK) {
            // 对CompletableFuture赋值结果
            this.complete(res.getResult());
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()));
        } else {
            this.completeExceptionally(new RemotingException(channel, res.getErrorMessage()));
        }
      //忽略部分代码
}

对于上述的Result接口,有两个实现对象,我们在这里进行简单对比分析下。

AsyncRpcResult

此类表示未完成的RPC调用,它将保留此调用的一些上下文信息,例如RpcContext和Invocation,因此,当调用完成并且返回结果时,它可以确保与调用时相同地恢复所有上下文, 是在调用任何回调之前进行的。

当Result实现CompletionStage时,AsyncRpcResult允许您轻松构建异步过滤器链,其状态将完全由基础RPC调用的状态驱动。

AsyncRpcResult不包含任何具体值(由CompletableFuture带来的基础值除外),请将其视为状态传输节点。#getValue()#getException()都是从Result接口继承的,主要实现它们出于兼容性考虑。 因为许多旧式Filter实现很可能直接调用getValue。

AppResponse

Duboo3.0.0中引入了AsyncRpcResult来替换RpcResult,并且RpcResult被替换为AppResponse:AsyncRpcResult是在调用链中实际传递的对象,
AppResponse仅代表业务结果。

AsyncRpcResult是表示未完成的RPC调用的未来,而AppResponse是此调用的实际返回类型。
从理论上讲,AppResponse不必实现Result接口,这主要是出于兼容性目的。

在Dubbo服务暴露中,ProtocolFilterWrapper会构建拦截器链Filter,在调用实际的DubboInvoker之前,会先调用一些构造的Filter,比如ExecuteLimitFilter,限制每个服务中每个方法的最大并发数。下面是Dubbo2.6构建拦截器器链的逻辑:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
       for (int i = filters.size() - 1; i >= 0; i--) {
           final Filter filter = filters.get(i);
           final Invoker<T> next = last;
           ast = new Invoker<T>() {
                 //忽略部分代码
                 @Override
                 public Result invoke(Invocation invocation) {
                        return filter.invoke(next, invocation);
                 }
                };
            }
        }
        return last;
}

但是在Dubbo2.6版本进行异步调用中,会出现一些问题,因为Dubbo2.6在进行异步调用时,会先返回一个空的RpcResult对象,当某些Filter需要对返回的结果进行处理时,显然在该情景下无法处理结果。Dubbo2.7对这种情况进行了改进。

Dubbo2.7构建拦截器链的逻辑如下所示:

ProtocolFilterWrapper类
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (Filter filter : filters) {
                last = new FilterNode<T>(invoker, last, filter);
            }
        }
        return last;
}

然后解释下在FilterNode中的invoke方法:

@Override
public Result invoke(Invocation invocation) throws RpcException {
      Result asyncResult;
      asyncResult = filter.invoke(next, invocation);
      //忽略部分代码
      return asyncResult.whenCompleteWithContext((r, t) -> {
            //忽略部分代码
            } else if (filter instanceof Filter.Listener) {
                Filter.Listener listener = (Filter.Listener) filter;
                if (t == null) {
                    listener.onResponse(r, invoker, invocation);
                } else {
                    listener.onError(t, invoker, invocation);
                }
            }
        });
}

当异步调用时,以AsyncRpcResult对象传递,通过CompletableFuture#whenComplete实现异步下的逻辑处理。

public Result whenCompleteWithContext(BiConsumer<Result, Throwable> fn){
     // 是CompletableFuture类
     this.responseFuture = this.responseFuture.whenComplete((v, t) -> {
            beforeContext.accept(v, t);
            fn.accept(v, t);
            afterContext.accept(v, t);
      });
      return this;
}

Dubbo异步分析到这里就结束了,感谢大家的阅读。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容