SOFA 源码分析 — 调用方式

前言

SOFARPC 提供了多种调用方式满足不同的场景。

例如,同步阻塞调用;异步 future 调用,Callback 回调调用,Oneway 调用。

每种调用模式都有对应的场景。类似于单进程中的调用模式。在单进程中,我们可以使用 JDK 的 Future 实现异步,可以通过观察者实现回调。

那么,在 RPC 框架中,这些功能如何实现呢?

我们一个个开始看。

同步调用 Sync

每个 ConsumerConfig 都有一个 invokeType 属性,在 SOFA 中,对应的常量是 RpcConstants.INVOKER_TYPE_SYNC

直接受到这个属性影响的是 AbstractCluster 的 doSendMsg 方法。该方法会从 request 中取出 invokeType 属性,而这个值是什么时候设置进入这个 request 的呢?在 DefaultClientProxyInvoker 类中的 decorateRequest 方法里:

if (!consumerConfig.isGeneric()) {
    // 找到调用类型, generic的时候类型在filter里进行判断
    request.setInvokeType(consumerConfig.getMethodInvokeType(request.getMethodName()));
}

这里判断,如果不是泛型调用,则将 config 中的值设置到 request 中。

回到 DefaultClientProxyInvoker 中,如果是同步调用(默认),则调用 ClientTransport 的 syncSend 方法,并等待一个 response。通过层层调用,最终会来到 BaseRemoting 类的 invokeSync 方法。

该方法会创建一个 Future ,并将这个 Future 放到 Connection(和 Channel 是一对一的关系) 的里一个 Map 中,key 是一个 id, value 则是 Future,然后发送数据给服务器,当 Channel 收到服务器返回的数据时,会重 Channle 中取出对应的 Channel 中的 Connection 对象,然后取出 Connection 对象中对应 key 的 Future,这个 Key 来自服务端传递过来的,类似于 cookie。

很明显,这么做能很大的提高xiaolv,例如,客户端并发使用同一个 Channel 向服务端发送数据,客户端不用串行等待数据的返回,实际上,对于 Bolt 来说,调用是异步的,每次用户调用都会传递一个 Future,Channel 发送数据完之后,不必等待数据返回,而是继续发送数据,当 Channel 收到服务端返回的数据后,便从 Channel 中取出对应的 Future(服务端传递来的 ID 标识),完成 Future 的唤醒。

而对于用户来说,看起来是同步的,因为 Future 会阻塞等待结果,类似 JDK 的 Future。只是用户感知不到而已,在等待的过程中,Channel 或者说 Connection 一直在工作,例如发送数据,例如处理别的 Future 发送来的数据。

而整个网络层的效率也是非常的高,不涉及任何的业务操作,写出数据是异步的,拿到数据,只需唤醒等待的线程即可。

总结一下,整个同步调用的过程是:客户端发送数据的时候,会创建一个 Future,并将这个 Future 放进到 Connection 的 Map 中,key 为 Future ID,value 是 Future(一个 Connection 绑定一个 Channel),然后异步向服务端发送数据,发送完毕后 Channel 又会立即响应其他的调用。当 Channel 收到服务端的响应数据,Channle 会取出 Connection ,找到对应 ID 的 Future ,这个 ID 就是服务端和客户端的约定。找到 Future 后,便将返回值注入到 Future 的 result 中,然后唤醒阻塞等待返回值的线程。

异步调用 Future

异步调用的 invokeType 对应的常量是 RpcConstants.INVOKER_TYPE_FUTURE,异步调用会立即返回一个 Future,然后通过 get 方法阻塞等待返回值。使用方式是:BoltResponseFuture future1 = (BoltResponseFuture) SofaResponseFuture.getFuture();

具体源码在 AbstractCluster 类的 doSendMsg 方法中:

else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
    // 开始调用
    ResponseFuture future = transport.asyncSend(request, timeout);
    // 放入线程上下文
    RpcInternalContext.getContext().setFuture(future);
    response = new SofaResponse();

当 Bolt 返回一个 Future 后,将其设置到 ThreadLocal 中,然后返回一个空的 response。用户在使用 SofaResponseFuture 获取 Future 的时候,其实就是从 RpcInvokeContext 中获取。可以看出,不是一个上下文,这里 RpcInvokeContext 是面向业务开发者使用的,而 RpcInternalContext 则是框架使用的。在 DefaultClientProxyInvoker 的 decorateResponse 方法中,会进行上下文转换,将框架内部上下文的数据复制到业务上下文中。

关键在于异步调用,在 BoltClientTransport 的 doInvokeAsync 方法中,会根据是否含有监听器判断是异步还是回调,如果没有监听器,则是异步 Future 模式,而如果是异步的话,就会转成回调模式(因为实现类似)。

具体实现则是创建一个 BoltFutureInvokeCallback 对象,该对象内部有一个 onResponse 方法,该方法会在任务完成之后回调。同时也会创建一个 BoltResponseFuture 对象,该对象会有 setXXX 方法,在 Callback 的 onResponse 方法中会调用 setXXX 方法,进行结果赋值和唤醒。

onResponse 会由一个 RpcInvokeCallbackListener 对象唤醒,在他的内部类 CallbackTask 的 run 方法中,会异步回调这个方法。

而这个对象的创建时机在 BaseRemoting 的 invokeWithCallback 方法的第一行:调用 createInvokeFuture 方法。具体代码如下:

protected InvokeFuture createInvokeFuture(Connection conn, RemotingCommand request, InvokeContext invokeContext, InvokeCallback invokeCallback) {

    return new DefaultInvokeFuture(
        request.getId(), 
        new RpcInvokeCallbackListener( RemotingUtil.parseRemoteAddress(conn.getChannel())), 
        invokeCallback, 
        request.getProtocolCode().getFirstByte(), 
        this.getCommandFactory(), 
        invokeContext);
}   

在 RpcResponseProcessor 类中,在 doProcess 方法收到数据之后,会调用 future 的 executeInvokeCallback 方法。和同步方式不同的是,虽然都是一个 InvokeFuture 实现,同步只会调用 putResponse 赋值并唤醒。

而 Future 的模式则要复杂的多,该方法会执行监听器(RpcInvokeCallbackListener)的 onResponse 方法,并传递自己。 而在监听器的 onResponse 方法中,会创建一个任务,异步执行 Future 中的任务。

我们当前的 Future 任务则是拿到返回值,并唤醒阻塞线程。

整体依赖和流程大概如下图:

image.png

总结一下异步调用的过程:异步调用在 SOFA 内部包装成了回调的方式,使用一个 Callback 封装了 面向用户的 ResponsFuture, 再使用一个监听器封装 Callback, 监听器监听 InvokeFuture ,当 InvokeFuture 响应服务器的时候,会回调监听器,监听器再回调 Callback,Callback 再唤醒 ResponseFuture,用户就可以得到数据了。

单向 oneWay

单向的使用一般是不关心结果的,使用方式则是很简单的将 invokeType 设置成 RpcConstants.INVOKER_TYPE_ONEWAY,。

SOFA 内部的区别处理也在 AbstractCluster 类的 doSendMsg 方法中,具体代码如下:

// 单向调用
else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
    long start = RpcRuntimeContext.now();
    try {
        transport.oneWaySend(request, timeout);
        response = new SofaResponse();
    } finally {
        if (RpcInternalContext.isAttachmentEnable()) {
            long elapsed = RpcRuntimeContext.now() - start;
            context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
        }
    }
}

从代码中,可以看到,这里只是返回了一个空的对象。

而在 Bolt 中的实现也非常的简单,仅仅是调用了 Channel 的 writeAndFlush 方法,只有在失败的时候,才会由响应,成功是没有任何响应的。代码如下:

protected void oneway(final Connection conn, final RemotingCommand request) {
    try {
        conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (!f.isSuccess()) {
                    logger.error("Invoke send failed. The address is {}",
                        RemotingUtil.parseRemoteAddress(conn.getChannel()), f.cause());
                }
            }
        });
    } catch (Exception e) {
        if (null == conn) {
            logger.error("Conn is null");
        } else {
            logger.error("Exception caught when sending invocation. The address is {}",
                RemotingUtil.parseRemoteAddress(conn.getChannel()), e);
        }
    }
};

单向调用就是这么简单。所以,如果你试图获取返回值,那么得到的永远是 Nnll。

回调 Callback

在 SOFA 中, 有 3 种回调级别,目的是应用各种不同的场景。

  1. 接口级别,即调用这个接口的所有方法,都会引发回调。
  2. 方法级别,针对某个方法,会引发回调。
  3. 调用级别,某次调用,会引发回调。
使用方式:
  1. 接口级别:只需要设置 ConsumerConfig 的 onReturn 方法即可。例如下面这样,子类实现 SofaResponseCallback 即可:
SofaResponseCallback sofaResponseCallbackImpl = new SofaResponseCallbackImpl();

ConsumerConfig<HelloService> consumer = new ConsumerConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName())
            .setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
            .setOnReturn(sofaResponseCallbackImpl)
  1. 方法级别:需要单独创建一个 MethodConfig,然后设置到 ConsumerConfig 的 Methods 属性中:
SofaResponseCallback sofaResponseCallbackImpl = new SofaResponseCallbackImpl();

MethodConfig methodConfig = new MethodConfig();
methodConfig.setName("sayHello")
            .setInvokeType(RpcConstants.INVOKER_TYPE_CALLBACK)
            .setOnReturn(sofaResponseCallbackImpl );

ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
            .setApplication(applicationConfig)
            .setInterfaceId(HelloService.class.getName())
            .setTimeout(5000)
            .setMethods(Collections.singletonList(methodConfig))
            .setDirectUrl("bolt://127.0.0.1:22222?appName=future-server");

只需要像上面一样,设置 sayHello 方法的回调实现,然后将方法配置设置到 ConsumerConfig 中。就可以在每次调用该方法时,引发回调。

  1. 调用级别:他的优先级最高,每次调用都会引发回调。使用方式如下:
SofaResponseCallback sofaResponseCallbackImpl = new SofaResponseCallbackImpl();

RpcInvokeContext.getContext().setResponseCallback(sofaResponseCallbackImpl);

可以看出,通过在上下文的设置,就能生效,每次调用都会引发回调。

知道了如何使用,再来看看如何实现。

源码分析回调

主要逻辑还是在 AbstractCluster 的 doSendMsg 中。

// Callback调用
else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
    // 调用级别回调监听器
    SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
    if (sofaResponseCallback == null) {
        SofaResponseCallback methodResponseCallback = consumerConfig
            .getMethodOnreturn(request.getMethodName());
        if (methodResponseCallback != null) { // 方法的Callback
            request.setSofaResponseCallback(methodResponseCallback);
        }
    }
    transport.asyncSend(request, timeout);
    response = new SofaResponse();
}   

从源码中,可以看出,优先使用调用级别的回调,如果没有,才寻找方法级别的回调。而接口级别的回调则是默认机制。

因此,他们的优先级排序为 调用级别 ---> 方法级别 ----> 回调级别。

request 中的回调设置在 DefaultClientProxyInvoker 的 decorateRequest 方法中,即装饰 request,具体代码如下:

RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
RpcInternalContext internalContext = RpcInternalContext.getContext();
if (invokeCtx != null) {
    // 如果用户设置了调用级别回调函数
    SofaResponseCallback responseCallback = invokeCtx.getResponseCallback();
    if (responseCallback != null) {
        request.setSofaResponseCallback(responseCallback);
        invokeCtx.setResponseCallback(null); // 一次性用完
        invokeCtx.put(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN,
            isSendableResponseCallback(responseCallback));
    }

从 RpcInvokeContext 复制到 request 中。且用完之后,立即删除,这点需要注意!!也就是说,仅仅使用一次。

那么 consumerConfig.getMethodOnreturn()方法会根据方法名获取到一个回调对对象,如何实现呢?

    /**
     * 得到方法名对应的自定义参数列表
     *
     * @param methodName 方法名,不支持重载
     * @return method onReturn
     */
    public SofaResponseCallback getMethodOnreturn(String methodName) {
        return (SofaResponseCallback) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_ONRETURN,
            getOnReturn());
    }
    /**
     * 得到方法级配置,找不到则返回默认值
     *
     * @param methodName   方法名
     * @param configKey    配置key,例如参数
     * @param defaultValue 默认值
     * @return 配置值 method config value
     */
    public Object getMethodConfigValue(String methodName, String configKey, Object defaultValue) {
        Object value = getMethodConfigValue(methodName, configKey);
        return value == null ? defaultValue : value;
    }
    public SofaResponseCallback getOnReturn() {
        return onReturn;
    }
    /**
     * 得到方法级配置,找不到则返回null
     *
     * @param methodName 方法名
     * @param configKey  配置key,例如参数
     * @return 配置值 method config value
     */
    public Object getMethodConfigValue(String methodName, String configKey) {
        if (configValueCache == null) {
            return null;
        }
        String key = buildmkey(methodName, configKey);
        return configValueCache.get(key);
    }

以上就是获取方法级回调的实现。

通过获取 configValueCache 的值,如果没有,则使用接口级别的回调设置。即默认值。那么,我们好像没有设置过这个 configValueCache 的值,从哪里来的呢?

在 AbstractInterfaceConfig 的 getConfigValueCache 方法中,我们能够看到从 methods Map 中转移数据到 configValueCache 中。而这个方法的调用时机则是 FilterInvoker 的构造方法中:

    protected FilterInvoker(AbstractInterfaceConfig config) {
        this.config = config;
        if (config != null) {
            this.configContext = config.getConfigValueCache(false);
        }
    }

具体转移代码入下:

public synchronized Map<String, Object> getConfigValueCache(boolean rebuild) {
    if (configValueCache != null && !rebuild) {
        return configValueCache;
    }
    Map<String, Object> context = new HashMap<String, Object>(32);
    Map<String, String> providerParams = getParameters();
    if (providerParams != null) {
        context.putAll(providerParams); // 复制接口的自定义参数
    }
    Map<String, MethodConfig> methodConfigs = getMethods();
    if (CommonUtils.isNotEmpty(methodConfigs)) {
        for (MethodConfig methodConfig : methodConfigs.values()) {
            String prefix = RpcConstants.HIDE_KEY_PREFIX + methodConfig.getName() + RpcConstants.HIDE_KEY_PREFIX;
            Map<String, String> methodparam = methodConfig.getParameters();
            if (methodparam != null) { // 复制方法级自定义参数
                for (Map.Entry<String, String> entry : methodparam.entrySet()) {
                    context.put(prefix + entry.getKey(), entry.getValue());
                }
            }
            // 复制方法级参数属性
            BeanUtils.copyPropertiesToMap(methodConfig, prefix, context);
        }
    }
    // 复制接口级参数属性
    BeanUtils.copyPropertiesToMap(this, StringUtils.EMPTY, context);
    configValueCache = Collections.unmodifiableMap(context);
    return configValueCache;
}

如果 configValueCache 没有初始化或者需要重建,则执行下面的逻辑,首先拿到 methods Map,然后 for 循环到 configValueCache Map 中。

那么如何使用呢?

注意,此时 ,Cluster 的 doSendMsg 方法中,request 已经有回调对象了,注意,回调 Callback 和 Future 调用的是同一个方法,即 transport.asyncSend(request, timeout);

在 BoltClientTransport 的 doInvokeAsync 方法中,会判断 request 中,是否含有回调,如果有,则执行回调逻辑,如果没有,则将 Future 转成回调。

而回调的逻辑是,根据监听器等属性创建一个 BoltInvokerCallback 对象包装 SofaResponseCallback 对象,然后,再创建一个 RpcInvokeCallbackListener 对象包装他,再创建一个 InvokeFuture 包装监听器 发起 RPC 调用。

InvokeFuture 会在 RpcResponseProcessor 类的 doProcess 方法被回调,这点逻辑和异步 Future 的逻辑类似。回调他的 putResponse 方法和 executeInvokeCallback 方法。

然后,在 executeInvokeCallback 方法中,则调用 RpcInvokeCallbackListener 的 onResponse 方法。

RpcInvokeCallbackListener 则创建一个 CallbackTask 任务,提交给线程池,任务内容是执行 BoltInvokerCallback 的 onResponse 方法。

BoltInvokerCallback 则会调用 SofaResponseCallback 的 的 onAppXXXX 方法。完成最终用户的调用。

整体设计图和流程图和异步 Future 类似(内部是 Callback 实现)。

image.png

可以看到,和异步 Future 唯一不同的就是面向用户的 API 不同和内部一个 Callback 不同。
异步 Future 面向用户的 API 是 BoltResponseFuture 。
回调 Callback 面向用户的是 SofaResponseCallback。

但内部的原理都是一样的。都是使用 RpcInvokeCallbackListener 和 DefaultInvokeFuture 回调实现。

DefaultInvokeFuture 设置在 Channel 中,当收到返回值后,便调用他的方法,接着完成这个链式返回,直到调用用户设置的 API。

总结

今天我们分析了 SOFA 的调用 API,包括同步,异步 Future,回调 Callback,单向 oneWay。

其中单向实现最简单,同步也比较简单。

异步的内部实现和回调的内部实现完全一样,只是展示给用户的 API 不同。都是通过观察者模式实现的。

我们思考一下: 为什么要使用这么多层次包装呢?

首先 InvokeFuture 和 RpcInvokeCallbackListener 是面向网络层的。也就是 Bolt 内部的接口。

他们对于 SOFA—RPC 来讲,是统一的。

而 SOFA 对于异步和回调则使用了不同的 API:
异步使用了 BoltResponseFuture 和 BoltFutureInovkeCallback。
回调使用 SofaResponseCallback 和 BoltInvokerCallback。

为什么在 Bolt 层和用户层需要再加 InvokeCallback 这一层呢?

InvokeCallback 这一层可以将用户层和网络层很好的解耦,如果网络层发生变化,那么,只需要变动中间层就可以了,用户层不会受到影响。

好了,今天就到这里。bye!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容

  • 前言 很多朋友对异步编程都处于“听说很强大”的认知状态。鲜有在生产项目中使用它。而使用它的同学,则大多数都停留在知...
    星星在线阅读 2,848评论 2 39
  • 1 什么是异步编程 通过学习相关概念,我们逐步解释异步编程是什么。 1.1 阻塞 程序未得到所需计算资源时被挂起的...
    hugoren阅读 2,643评论 2 10
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,844评论 2 11
  • 上篇 中篇 下篇 1 什么是异步编程 1.1 阻塞 程序未得到所需计算资源时被挂起的状态。 程序在等待某个操作完成...
    秦时明星阅读 990评论 0 3
  • git 开发流程 配置个人信息$ git config --global user.name "用户名"$ git...
    majun00阅读 216评论 0 1