9.dubbo源码-调用服务

1、RPC调用

dubbo服务调用只需在spring.xml中如下配置后,就可以调用本地方法一样,调用provider提供的远程服务:

<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" />

dubbo服务调用链路图

dubbo服务调用前部分链路如下图所示,下面根据这张图以调用com.alibaba.dubbo.demo.DemoService.sayHello("afei")为例,一步一步分析dubbo服务的调用过程:

dubbo服务调用-1.png

InvokerInvocationHandler

demoService.sayHello("afei") 这样的RPC调用,被Dubbo代理后,就会调用InvokerInvocationHandler中的invoke()方法。源码如下:

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // 得到调用的方法名称
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        // 调用toString()方法的特殊处理方式
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        // 调用hashCode()方法的特殊处理方式
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        // 调用equals()方法的特殊处理方式
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 常规的dubbo调用,都走这里,把调用的方法名称和参数封装成RpcInvocation对象,然后调用MockClusterInvoker中的invoke()方法
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

// RpcInvocation的定义如下,包含了一些RPC调用信息:方法名,参数类型,参数值,Dubbo调用的一些附属信息attachments,以及调用的Invoker(attachments和invoker在后面会赋值)
public class RpcInvocation implements Invocation, Serializable {
    private static final long serialVersionUID = -4355285085441097045L;
    private String               methodName;
    private Class<?>[]           parameterTypes;
    private Object[]             arguments;
    private Map<String, String>  attachments;
    private transient Invoker<?> invoker;
    ... ...
}

invoke()方法中Object proxy就是代理的对象;Method method就是本次调用的方法,即DemoService中的sayHello(String)方法;Object[] args就是调用的参数,即"afei",组装成Object[]就是new Object[]{"afei"}。

MockClusterInvoker

接下来分析MockClusterInvoker中的invoke()方法。部分核心源码如下:

public class MockClusterInvoker<T> implements Invoker<T>{

    ... ...
    
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        // 获取mock的值,默认为false;
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();         
        if (value.length() == 0 || value.equalsIgnoreCase("false")){
            // 如果在<dubbo:reference>中没有申明mock(默认方式),或者申明为false,那么走这里的逻辑
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            // 强制mock调用方式的WARN日志
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " +  directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else{
            //fail-mock
            try {
                // 普通的mock方式,例如申明mock="com.alibaba.dubbo.demo.consumer.mock.DemoServiceMock",那么在RPC调用抛出RPC异常时才启用mock调用;
                result = this.invoker.invoke(invocation);
            }catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " +  directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        ... ...
    }
}

mock申明方式:<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" version="1.0.0" mock="false"/>,从这段源码可知,dubbo提供了三种策略:
1、不需要mock,直接调用AbstractClusterInvoker(默认方式)
2、强制mock方式调用;
3、先AbstractClusterInvoker方式调用,如果有RpcException(比如没有任何可用的Provider),再以mock方式调用;

想要详细了解mock的使用方式,请参考dubbo一些你不一定知道但是很好用的功能中的"本地伪装"

AbstractClusterInvoker

接下来调用AbstractClusterInvoker中的invoke()方法,部分源码如下所示:

public Result invoke(final Invocation invocation) throws RpcException {

    checkWheatherDestoried();

    LoadBalance loadbalance;
    // 从Diectory中得到所有可用的,经过路由过滤的Invoker集合
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && invokers.size() > 0) {
        // 如果有可用的Invoker,那么根据第一个Invoker得到其LoadBalance策略
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        // // 如果没有可用的Invoker,那么采用默认的LoadBalance策略(随机策略)
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    // 如果异步调用,那么在attachment中给id赋值(值是自增的,通过AtomicLong.getAndIncrement()得到)
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // doInvoke()定义在AbstractClusterInvoker中是一个抽象方法,所以这里采用了模板方法设计模式,调用FailoverClusterInvoker(默认是failover集群容错)中的doInvoke()方法
    return doInvoke(invocation, invokers, loadbalance);
}

protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;

FailoverClusterInvoker

FailoverClusterInvoker的分析,请参考dubbo源码-集群容错,这篇文章对dubbo支持的所有的集群容错处理都一一进行了比较详细的分析;但是不管哪种集群容错处理,接下来都会调用invoker.invoke(invocation)得到Result;RPC调用invoker.invoke(invocation);的调用关系链图如下,根据这张图一步一步分析每个步骤:

dubbo服务调用-2.png

InvokerWrapper

InvokerWrapper中会初始化Consumer端的调用过滤链,然后在FailoverClusterInvoker中调用invoker.invoke(invocation)时一一执行每个Filter:

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 得到Consumer端的Filter集合
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (filters.size() > 0) {
        for (int i = filters.size() - 1; i >= 0; i --) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {
                ... ...
                // 通过Filter的next()方法遍历执行Filter链上所有的Filter
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }
                ... ...
            };
        }
    }
    return last;
}

Filter

Consumer端Filter有ConsumerContextFilter、FutureFilter、MonitorFilter等;这里不一一讲解,里面的业务都比较简单;执行完Filter链后,调用AbstractInvoker中的invoke()方法;

AbstractInvoker

AbstractInvoker中申明了抽象方法:protected abstract Result doInvoke(Invocation invocation) throws Throwable;,所以,这里会以模板方法设计模式调用DubboInvoker中的doInvoker()方法;接下来的调用关系链如下图所示:

dubbo服务调用-3.png

DubboInvoker

DubboInvoker.doInvoke(Invocation)核心源码如下:

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    // RpcInvocation中attachments设置path和version并赋值
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);
    
    ExchangeClient currentClient;
    // 如果只有一个Client,直接选择;如果多个Client,轮询
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        // 是否异步调用,默认false
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // 是否单边调用,即不需要等待返回结果,默认false
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        // 获取Consumer侧的timeout,默认1s
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            ... ...
        } else if (isAsync) {
            ... ...
        } else {
            // 重点关注这里,即默认实现
            RpcContext.getContext().setFuture(null);
            // 发送请求后,调用DefaultFuture.get()方法获取远程响应的结果
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

说明:currentClient.request(inv, timeout)得到的是ResponseFuture类型结果,调用get()返回Result对象;

HeaderExchangeChannel

HeaderExchangeChannel.request(Object request, int timeout)核心源码如下:

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    ... ...
    // 构造一个准备RPC远程调用的Request类型参数
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    // 将调用该方法前的RpcInvocation类型请求参数封装到Request中
    req.setData(request);
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try{
        channel.send(req);
    }catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

DefaultFuture future = new DefaultFuture(channel, req, timeout);源码解读:

这里比较重要,包含了通过netty调用后,如何拿到调用结果。

public DefaultFuture(Channel channel, Request request, int timeout){
    this.channel = channel;
    this.request = request;
    // request.getId即得到这一次请求的id,id生成方式通过AtomicLong.getAndIncrement()得到;源码参考Request.newId();这个方法会不会溢出?getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID,所以不会溢出;
    this.id = request.getId();
    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    // put into waiting map.
    // dubbo会根据请求ID从这个map中就能拿到对应的响应结果
    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}

说明:由于请求ID是从AtomicLong取得,所以理论上是唯一的;即使当达到AtomicLong的最大值后又从MIN_VALUE开始,理论上同一个ID对应的请求不可能存在这么长时间从而导致下一次轮回ID碰撞;

AtomicLong溢出问题可以通过下面一段代码验证:

/**
 * @author afei
 */
public class AtomicLongTest {
    public static void main(String[] args) {
        AtomicLong al = new AtomicLong(Long.MAX_VALUE-2);
        for (int i=0; i<5; i++){
            System.out.println(al.getAndIncrement());
        }
    }
}

运行结果如下,达到MAX_VALUE后下一次getAndIncrement()就是MIN_VALUE,所以getAndIncrement()不会溢出:
9223372036854775805
9223372036854775806
9223372036854775807
-9223372036854775808
-9223372036854775807

AbstractClient

Channel准备发送请求消息到远程服务的核心源码:

public void send(Object message, boolean sent) throws RemotingException {
    if (send_reconnect && !isConnected()){
        connect();
    }
    Channel channel = getChannel();
    //TODO getChannel返回的状态是否包含null需要改进
    if (channel == null || ! channel.isConnected()) {
      throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
    }
    channel.send(message, sent);
}

NettyChannel

NettyChannel.send(Object message, boolean sent)是真正调用Netty把请求消息通过NIO方式发给远程服务的地方,message即dubbo封装的Request类型请求参数,核心属性是mData,为RpcInvocation类型,源码如下:

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);
    
    boolean success = true;
    int timeout = 0;
    try {
        // 这里就是调用netty的NioClientSocketChannel.write(Object message)方法将请求message发送到Provider
        ChannelFuture future = channel.write(message);
        if (sent) {
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.getCause();
        if (cause != null) {
            throw cause;
        }
    } 
    ... ...
}

2. 获取结果

通过Netty以NIO方式发送请求后,接下来分析dubbo如果拿到Provider响应的结果,并把结果和请求对应起来(因为是异步调用,不能把结果和请求对应关系搞混淆);由前面分析HeaderExchangeChannel可知,dubbo调用(Result) currentClient.request(inv, timeout).get(),通过ResponseFuture.get()方法得到RpcResult结果,ResponseFuture的实现就是DefaultFuture

DefaultFuture

DefaultFuture中get()方法的核心源码:

public Object get(int timeout) throws RemotingException {
    // 如果Consumer端指定的timeout不大于0,那么设置为默认值1s
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }
    // isDone()就是判断 response != null
    if (! isDone()) {
        long start = System.currentTimeMillis();
        // 通过ReentrantLock锁保证线程安全,lock定义为:private final Lock lock = new ReentrantLock();
        lock.lock();
        try {
            while (! isDone()) {
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break;
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (! isDone()) {
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}

private Object returnFromResponse() throws RemotingException {
    // 全局申明的private volatile Response response就是结果,后面会分析response是怎么被赋值的;
    Response res = response;
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    // 如果是正常的结果,直接返回
    if (res.getStatus() == Response.OK) {
        return res.getResult();
    }
    // 如果是超时的结果,那么抛出超时异常
    if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
    }
    throw new RemotingException(channel, res.getErrorMessage());
}

HeaderExchangeHandler

发送RPC请求后,在HeaderExchangeHandler.received()中接收Porvider返回的响应结果(通过dubbo源码-NettyClient分析可知,NettyHandler是消息的handler,NettyHandler中的messageReceived()即消息接收方法,经过解码后,最终调用的就是HeaderExchangeHandler.received()),源码如下:

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            ... ...
        } else if (message instanceof Response) {
            // 这里处理响应结果
            handleResponse(channel, (Response) message);
        } 
        ... ...
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}

DefaultFure.received()方法源码:

public static void received(Channel channel, Response response) {
    try {
        // 在response中封装了请求ID,根据请求ID得到DefaultFuture(根据请求id通过remove方式获取DefaultFuture的好处是,获取的同时也清理了FUTURES中这个ID对应的请求信息,防止FUTURES堆积)
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            // 接收Reponse结果,这就是请求id对应的结果
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}


private void doReceived(Response res) {
    lock.lock();
    try {
        // 将reponse赋值给申明的:private volatile Response response;这就是请求id对应的结果
        response = res;
        if (done != null) {
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        invokeCallback(callback);
    }
}
    

3、超时请求清理

对于那些耗时超过Consumer端timeout指定的值,且没有任何响应,dubbo如何处理呢?这些请求如果不处理的话,数据一致会积压在FUTURES这个Map中,dubbo采用的方法是在DefaultFuture中开启一个后台线程,死循环检测,源码如下:

private static class RemotingInvocationTimeoutScan implements Runnable {

    public void run() {
        while (true) {
            try {
                // 只要有请求,那么FUTURES就不为空,那么遍历这些请求
                for (DefaultFuture future : FUTURES.values()) {
                    if (future == null || future.isDone()) {
                        continue;
                    }
                    // 如果耗时超过了Consumer端指定的timeout,那么返回特定status值的Response(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT),Consumer拿到这种Response后,判断它是Response.SERVER_TIMEOUT or Response.CLIENT_TIMEOUT,从而抛出TimeoutException异常;
                    if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                        // create exception response.
                        Response timeoutResponse = new Response(future.getId());
                        // set timeout status.
                        timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                        timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                        // handle response.
                        DefaultFuture.received(future.getChannel(), timeoutResponse);
                    }
                }
                Thread.sleep(30);
            } catch (Throwable e) {
                logger.error("Exception when scan the timeout invocation of remoting.", e);
            }
        }
    }
}

static {
    // 静态代码块,即初始化创建名为"DubboResponseTimeoutScanTimer"的线程来获取调用超时的请求,并返回特定status的Response
    Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
    th.setDaemon(true);
    th.start();
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342