Dubbo2.7的泛化调用和超时机制

路途虽遥远,将来更美好
微信公号号:九点半的马拉

在传统模式下,Dubbo消费端需要调用某一远程服务器端的方法时,消费端也需要额外导入服务类接口API,Dubbo也由此实现了面向接口代理的高性能RPC调用。

但是当服务消费端没有服务接口或方法参数类型时,无法使用上述的方式进行服务调用,针对该场景,Dubbo使用泛化调用方法进行服务调用。

Dubbo在进行泛化调用时,将相关信息封装到Map对象中,并利用GenericService接口处理。

举个例子

服务器端配置:

<bean id="helloserviceimpl" class="org.apache.dubbo.samples.generic.call.impl.HelloServiceImpl"/>

<dubbo:service interface="org.apache.dubbo.samples.generic.call.api.HelloService" ref="helloserviceimpl"/>

服务器端服务具体实现类:

public class HelloServiceImpl implements HelloService {
    @Override
    public CompletableFuture<String> sayHelloAsync(String name) {
        CompletableFuture<String> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            future.complete("sayHelloAsync: " + name);
        }).start();
        return future;
    }

消费端配置:

public class GenericCallConsumer {
    private static GenericService genericService;
    public static void main(String[] args) throws Exception {
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("generic-call-consumer");
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("zookeeper://127.0.0.1:2181");
        ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
    referenceConfig.setInterface("org.apache.dubbo.samples.generic.call.api.HelloService");
        applicationConfig.setRegistry(registryConfig);
        referenceConfig.setApplication(applicationConfig);
        // 开启泛化
        referenceConfig.setGeneric(true);
        referenceConfig.setAsync(true);
        referenceConfig.setTimeout(7000);
        genericService = referenceConfig.get();
        invokeAsyncSayHelloAsync();
public static void invokeAsyncSayHelloAsync() throws Exception {
        CompletableFuture<Object> future = genericService.$invokeAsync("sayHelloAsync",
                new String[]{"java.lang.String"}, new Object[]{"world"});
        CountDownLatch latch = new CountDownLatch(1);
        future.whenComplete((value, t) -> {
           System.err.println("invokeAsyncSayHelloAsync(whenComplete): " + value);
            latch.countDown();
        });
        latch.await();
}

原理分析

消费端通过一个代理对象进行服务调用,

1)执行InvokerInvocationHandler#invoke方法,

2)调用MockClusterInvoker#invoke方法

3)调用AbstractCluster$InterceptorInvokerNode#invoke方法

code1.png

这里新增了一个ClusterInterceptor,与Filter不同,它在一个特定的地址或invoker被选择之前的较外层执行逻辑,在服务发现之前拦截请求。

3.1)调用ConsumerContextClusterInterceptor#before方法。
从RpcContext设置invocation,并设置localAddress和invoker(默认FailoverClusterInvoker),清除RpcContext内部的SERVER_LOCAL上下文内容。

3.2)调用ClusterInterceptor#intercept方法

code2.png

默认调用FailoverClusterInvoker#doinvoke方法。

在该方法中从RegistryDirectory中获取invoker列表,然后获取负载均衡LoadBalance(默认RandomLoadBalance), 选择一个invoker,进行服务调用。

4)调用InvokerWrapper#invoke方法,之后执行一个Filter链

4.1)调用ConsumerContextFilter#invoke方法,

设置RpcContext中的invoker变量问当前invoker(默认是ProtocolFilterWrapper),设置invocation。

从RpcContext中获取timeout-countdown变量,如果存在,则转化为TimeCountDown对象,判断该调用是否超时,如果超时,则返回一个AsyncRpcResult对象,记录一个异常。

4.2)调用FutureFilter#invoke方法和MonitorFilter#invoke方法。

4.3)调用GenericImplFilter#invoke方法,这里是泛化调用在客户端的主要核心步骤

4.3.1)从url中获取generic字段,调用方法不是$invoke,也不是$invokeAysnc时:

4.3.1.1)重新创建一个RpcInvocation,在attributes变量中添加GENERIC_IMPL_MARKER值,设置为true,其中:attributes变量参数类型为Map<Object,Object>,并且该变量只在调用者端,不会出现在线路上。

4.3.1.2)获取调用的方法名,调用的参数类型和参数值,对参数类型进行解析修改,效果如下:

java.lang.Object[][].class => "java.lang.Object[][]"

4.3.1.3)如果泛化调用方式为bean方式,遍历参数值,并序列化为JavaBeanDescriptor类型数据;

如果是其他调用方式,深入对象,将复杂类型转化为简单类型。

4.3.1.4)如果方法返回类型是CompletableFuture,则设置方法名为$invokeAysnc;其他情况设置方法名为$invoke

4.3.1.5)将参数类型设置为new Class<?>[]{String.class, String[].class, Object[].class};,这样转化为传统的泛化调用方式,并将参数值设置为类似new Object[]{methodName, types, args}的格式。

4.3.2)当调用方法为$invoke或者$invokeAysnc,并且方法参数变量数量为3个时,首先获取泛化参数,然后判断泛化调用方式:

4.3.2.1)如果是nativejava方式,判断参数是否为byte[]类型;如果不是,则说明参数传递异常。

4.3.2.2)如果是bean方式,则判断参数是否为JavaBeanDescriptor类型;如果不是,则说明参数传递异常。

4.3.3)在RpcInvocation中的attachment中设置是否泛化调用。

4.4) 调用invoker#invoke方法

当远程调用返回结果时,会触发onResponse方法。

从url中获取generic参数值,从invocation中获取方法名,方法参数类型,参数值GENERIC_IMPL是否存在。

如果参数值GENERIC_IMPL存在,并且为true:
从invoker中获取接口类型,当方法不是$invoke也不是$invokeAysnc,并且接口父类型为GenericService时,从invoker中的interface参数中获取真实的interface,并转化为Class类型。

之后,是所有不同调用方式的统一处理。

获取调用的方法Method,如果调用方式是Bean方式:
判断appReponse的value是否为JavaBeanDescriptor类型,如果是,将该value进行反序列化,重新赋值;如果不是,则抛出异常。

如果是其他调用方式,则使用PojoUtils工具类进行反序列化。

5)调用AsyncToSyncInvoker#invoke方法

6)调用DubboInvoker#invoke方法,发起远程调用。

服务端收到请求后,在最终调用AbstractProxyInvoker#invoke方法之前,会先执行一个过滤器链,和上述的消费端的类似,其中会经过一个GenericFilter,该类是服务端实现泛化调用功能的重要步骤

GenericFilter中,首先判断方法名是否为$invoke或者$invokeAsync,由此来判定是否为泛化调用,如果方法名不是这两个,则直接调用下一个invoker;如果是,则执行下面的逻辑。

获取参数名称、参数类型、参数值,通过反射获取调用方法,根据不同的调用方式进行反序列化,获取实际调用方法的相关信息,然后将RpcInvocation中的相关信息进行替换:

code3.png

Dubbo2.7下的超时机制

在上述中的ConsumerContextFilter#invoke中涉及到了超时情况的处理,使用了TimeoutCountDown类,是2020.5.1日提交的信息。

Dubbo2.6版本中,在HeaderExchangeChannel中进行远程调用前,会创建一个DefaultFuture对象,里面有一个静态代码块,创建一个线程,执行RemotingInvocationTimeoutScan任务,轮询FUTURES集合,通过DefaultFuture记录的开始时间与当前时间进行计算,判断是否超时,如果超时,则直接创建一个超时的Response,并将该DefaultFuture从FUTURES集合中移除。

当服务端在一定时间内执行完逻辑后,会发送给客户端,在此之前,客户端通过定时任务已经将相关信息从FUTURES集合中移除,所以这次服务端发送过来的信息在FUTURES集合中查找不到,所以不做处理,服务端的这次发送显得有些多余,对于客户端来说是无用的。

所以在2020.5.1日,提交了上述代码来解决这一问题。

code4.png

DubboInvoker#doInvoke方法中进行远程远程调用前,会计算timeout。

code5.png

首先从RpcContext中获取timeout-countdown变量值

如果为空:
----》 从url中通过timeout参数获取超时时间,默认是1000,
----》从url中获取enable-timeout-countdown参数值,默认是false,通过该参数开启新的超时机制(使用了上述的TimeoutCountDown)
----》如果开启了,在attachments变量里添加_TO变量,值为计算后的timeout

如果不为空:
将其转化为 TimeoutCountDown对象,计算剩余的有效时间,将其设置为新的timeout,并将其添加到_TO变量,值为计算后的timeout。

code6.png

那TimeoutCountDown对象在什么时候被创建的呢?

在服务端的ContextFilter。

从RpcInvocation中获取“_TO”变量的值,如果不为-1,则在RpcContext中创建一个TimeoutCountDown。

code7.png

在后续的TimeoutFilter中,从RpcContext中获取TimeoutCountDown,如果超时了,则清空处理的结果。

code8.png

在消费端的ConsumerContextFilter中,在进行远程调用前,同样从RpcContext中获取TimeOutCountDown,当过期时,直接返回一个异常,而不再进行远程调用。

code10.png

疑问点

但是最后有一个疑问,即当超时后,服务端仍然会发送给客户端,虽然结果已经被清空,(可能自己的理解问题)。

code11.png

下面的这个建议感觉挺好的,但是在Dubbo在没有发现类似的机制。


code12.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容