dubbo剖析:五 网络通信之 -- 请求发送与接收

注:文章中使用的dubbo源码版本为2.5.4

零、文章目录

  • Consumer发送请求
  • Provider接收请求并发送响应
  • Consumer接收响应

一、Consumer发送请求

1.1 代码入口

  • dubbo剖析:二 服务引用 中讲到,服务引用方根据引用接口DemoService,使用dubbo的代理工厂类JavassistProxyFactory.getProxy()创建出该接口的动态代理对象。
  • 当用户想调用DemoService的相关方法时,实际是调用了代理对象的相关方法,从InvokerInvocationHandler.invoke()进入Consumer请求发送流程。

1.2 整体流程

Consumer发送请求流程图
  • 上图从上往下展示了服务引用方发送一个RPC请求的关键步骤,经历了“代理层”、“集群层”、“过滤监听扩展点”、“调用协议层”、“信息交换层”、“网络传输层”。
  • 紫色实线条表示各层关键类的方法调用,蓝色虚线表示关键类的初始化过程。

1)代理执行(InvokerInvocationHandler.invoke):

  • 服务引用的过程中,由ReferenceConfig使用JavassistProxyFactory为引用接口创建了代理对象;
  • 服务引用方调用dubbo代理类DemoService.sayHello时,实际执行InvokerInvocationHandler.invoke()方法,即这是Consumer发送请求的起点;
  • InvokerInvocationHandler内包含一个Invoker,在JavassistProxyFactory.getProxy()过程中通过其构造器注入,该Invoker为一个集群路由功能的AbstractClusterInvoker

2)集群容错+负载均衡(AbstractClusterInvoker.invoke):

  • 服务引用的过程中,由RegistryProtocol使用Cluster.join()创建集群InvokerClusterExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable")动态生成;
  • 集群Invoker根据负载均衡算法有多种不同实现类(failover、failfast、failsafe、failback),具体使用哪一种由对应的Cluster实现决定;
  • AbstractClusterInvoker通过Directory.list()方法获取请求路径对应的Invoker列表;
  • AbstractClusterInvoker再通过LoadBalance.select()方法从多个Invoker中选取一个做本次调用,即负载均衡算法(Random、RoundRobin、LeastActive);

3)Filter链扩展点(ProtocolFilterWrapper + ProtocolListenerWrapper):

  • ReferenceConfig进行服务引用的过程中,通过refProtocol.refer()创建Invoker对象;
  • refprotocol.refer()先后经过修饰类ProtocolFilterWrapperProtocolListenerWrapper,最后执行RegistryProtocolProtocolFilterWrapperProtocolListenerWrapper就是Dubbo引入的扩展点;
  • 扩展点对请求发送和接收的核心功能流程无影响,目的是以插件的方式进行一些辅助功能处理,这里不再进一步展开;

4)调用协议层执行(AbstractInvoker.invoke):

  • 经过集群路由和扩展点,现在将直接执行AbstractInvoker.invoke方法,开始真正的远程调用了;
  • 服务引用的过程中,由RegistryDirectory使用Protocol.refer()创建远程执行AbstractInvokerProtocol默认采用default实现,即DubboProtocol
  • AbstractInvoker有多种协议的具体实现(dubbo、rmi、hessian、http),具体使用哪一种协议由对应的Protocol实现决定,默认采用dubbo协议为DubboInvoker
  • DubboInvoker中包含了ExchangeClient的引用,通过DubboInvoker的构造器注入;

5)交换层执行(ExchangeClient.request):

  • 远程执行Invoker通过其引用的ExchangeClient.request完成远程调用请求的发送并得到ResponseFuture,然后调用ResponseFuture.get()得到 远程调用结果Result
  • 服务引用的过程中,由DubboProtocol使用Exchanger.connect()创建ExchangeClient
  • Exchanger的实现类为HeaderExchanger,由ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type)动态生成;
  • ExchangeClientClient的基础上封装了请求响应模式(其以Request、Response、ResponseFuture为核心,后续单独文章讲解),这也是交换层的核心功能;

6)网络层执行(Client.send):

  • 交换层ExchangeClient.request封装请求响应模式后,最终依赖网络层Client.send将请求消息通过网络发送给服务提供方;
  • 服务引用的过程中,由HeaderExchanger使用Transporter.connect()创建Client并完成初始连接操作,Client有多种网络层实现(netty、mina...),具体使用哪一种由对应的Transporter实现决定;
  • Transporter有多种网络层实现(netty、mina...),由ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()动态生成,默认为NettyTransporter
  • 最后,NettyClient使用其包含的底层NettyChannel完成网络消息发送的功能;

二、Provider接收请求并发送响应

2.1 代码入口

  • dubbo剖析:一 服务发布 中讲到,服务提供方通过NettyServer完成服务端创建及监听工作。
  • NettyServerdoOpen()阶段创建了网络事件处理器NettyHandler,当服务端收到客户端消息时,将触发NettyHandlermessageReceived()方法。

2.2 整体流程

接收请求流程图
  • 上图从上往下表示了服务提供方接收到一个网络请求时的处理步骤,经历了一个Handler处理器链,链中的每个Handler负责实现自己的处理功能。

1)Netty网络事件处理器(NettyHandler):

  • 继承自Netty的原生网络时间处理器实现类SimpleChannelHandler,定义了网络建连(channelConnected)、断连(channelDisconnected)、消息接收(messageReceived)、异常(exceptionCaught)等事件处理方法;
  • 维护了<ip:port, channel>的对应关系Map<String, Channel>channels,在网络建连/断连时进行相应put/remove操作,并暴露给NettyServer使用;
  • 接收到网络消息时,执行messageReceived()方法,将Netty的原生Channel转换为Dubbo封装的NettyChannel,并将事件传递给其包含的ChannelHandler处理;

2)复合消息处理器(MultiMessageHandler):

    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }
  • 处理MultiMessage,将其拆分成多个Message处理;

3)心跳消息处理器(HeartbeatHandler):

  • 消息收发时重置当前通道的最新消息收发时间,用于配合HeaderExchangeServerHeaderExchangeClient中的心跳检测任务HeartBeatTask
  • 拦截并处理心跳请求/响应消息。对心跳请求消息,构建对应的心跳响应消息并通过Channel发送回去;对心跳响应消息,仅记录日志后返回,不做功能上的处理;

4)业务线程转换处理器(AllChannelHandler):

  • Dubbo通过该处理器完成了 IO线程业务线程 的解耦!
  • 内部封装了业务线程池,默认使用FixedThreadPool
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

注意点:
a)线程池默认业务线程数为200
b)队列默认采用SynchronousQueue

  • 将接收到的网络消息事件封装成可执行任务ChannelEventRunnable,交由业务线程池处理;

5)业务解码处理器(DecodeHandler):

  • 进行业务请求响应的解码工作;
  • RequestResponse中携带的消息体或结果体,如果其实现了Decodeable接口,则进行一次解码处理;

6)交换层请求响应处理器(HeaderExchangeHandler):

  • 交换层真正完成请求响应收发功能的处理器!
  • 将网络层Channel转换为交换层ExchangeChannel,为其增加了请求响应方法request()
  • 判断收到的网络消息类型,根据类型分别执行不同的处理逻辑;
            if (message instanceof Request) {
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    //case a: 请求响应模型的请求处理
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } 
                    //case b: 单向消息接收的处理
                    else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                //case c: 请求响应模型的响应处理
                handleResponse(channel, (Response) message);
            }

a)请求响应模型的Request消息:调用ExchangeHandlerAdapter.reply()获取执行结果Result -->
将本地执行结果Result封装成RPC响应Response --> 通过channel.send()发送RPC响应;

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        Object msg = req.getData();
        try {
            // 调用```ExchangeHandlerAdapter.reply()```获取执行结果```Result```
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        //将本地执行结果```Result```封装成RPC响应```Response```
        return res;
    }

b)单向请求消息的处理:调用ExchangeHandlerAdapter.received()处理请求消息,如果该消息是Invocation则执行reply()逻辑但不主动发送RPC响应Response

        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

c)请求响应模型的Response消息:调用DefaultFuture.received()处理响应消息。
...注:请求响应模型(Request,Response,DufaultFuture)相关后续专门分析,此处不展开...

7)真正本地实现类方法的执行(ExchangeHandlerAdapter):

  • ExchangeHandlerAdapterDubboProtocol创建,并实现了reply()方法;
  • reply()方法,实际通过RPC调用参数InvocationDubboProtocol.exporterMap中获取到对应的本地实现DubboExporter --> 进而获取到对应的本地执行AbstractProxyInvoker --> 最终通过AbstractProxyInvoker.invoke()方法,以反射的方式执行真正实现类的对应方法,完成RPC请求。

三、Consumer接收响应

整体流程与 “Provider接收请求” 一样,唯一的区别是在 交换层请求响应处理器(HeaderExchangeHandler)步骤中会执行 “分支c:请求响应模型的Response消息”,将Response交由DefaultFuture处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容

  • dubbo暴露服务有两种情况,一种是设置了延迟暴露(比如delay="5000"),另外一种是没有设置延迟暴露或者...
    加大装益达阅读 21,242评论 5 36
  • 去年10月Carol带着三个小伙伴来上初级班时,本来我也应该一起来,可是那时候我和精油还没有很好地链接上,我就不想...
    晓晓Akatsuki阅读 646评论 0 0
  • 1. 时序图简介   时序图(Sequence Diagram)是显示对象之间交互的图,这些对象是按时间顺序排列的...
    GuoYuebo阅读 1,505评论 0 1
  • 我想,最美的爱情是你在想着我的时候恰好我也在想你,放假这么多天了,每次看着视频中的你,都会莫名的想笑。看着你傻...
    丶玩世不恭阅读 535评论 0 1
  • 是否有一种目光 只有你我才有 在最淡然的相视里 感受最深沉的关切 是否有一条道路 只有我和你相依而行 在最平凡的脚...
    花倦琳琅阅读 171评论 0 2