Dubbo 异步调用过程

开篇

  • 本篇是基于Dubbo-2.6.7版本的异步调用的分析,在这个过程中会涉及异步的调用过程和响应过程的分析。
  • 文章中会有一部分简单的例子,用于讲解异步的是使用方式。


异步调用说明

Dubbo 异步调用过程
  • 关注userThread的行为,用户发出调用后,IOThread会在上下文RpcContext中设置Future,对应上图中步骤1.2.3。
  • 用户从RpcContext中取得Future,然后wait这个Future其它的事情都由IOThread完成,对应上图中步骤4.5。
  • server端响应后会把调用结果设置在RpcContext上下文当中,同时通知UserThread线程。

异步回调使用案例

<dubbo:reference id="fooService" interface="com.alibaba.foo.FooService">
      <dubbo:method name="findFoo" async="true" />
</dubbo:reference>
<dubbo:reference id="barService" interface="com.alibaba.bar.BarService">
      <dubbo:method name="findBar" async="true" />
</dubbo:reference>
// 此方法应该返回Foo,但异步后会立刻返回NULL
fooService.findFoo(fooId);
// 立刻得到当前调用的Future实例,当发生新的调用时这个东西将会被覆盖
Future<Foo> fooFuture = RpcContext.getContext().getFuture();
 
// 调用另一个服务的方法
barService.findBar(barId);
// 立刻得到当前调用的Future
Future<Bar> barFuture = RpcContext.getContext().getFuture();
 
// 此时,两个服务的方法在并发执行
// 等待第一个调用完成,线程会进入Sleep状态,当调用完成后被唤醒。
Foo foo = fooFuture.get();
// 同上
Bar bar = barFuture.get();
// 假如第一个调用需要等待5秒,第二个等待6秒,则整个调用过程完成的时间是6秒。
  • 1、异步调用的实现步骤先执行fooService.findFoo()的执行服务调用。
  • 2、获取RPC上下文RpcContext.getContext().getFuture()。
  • 3、通过future.get()方法获取执行结果,如果当时没有结果当前线程就会被挂起。


Dubbo异步调用栈

  • Consumer => InvokerInvocationHandler =>DubboInvoker =>HeaderExchangeClient。

InvokerInvocationHandler

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}
  • InvokerInvocationHandler的invoke()方法创建rpc调用的RpcInvocation对象,这个对象会在单次调用的过程中传递,相当于单次调用的上下文。

RpcInvocation

public class RpcInvocation implements Invocation, Serializable {

    private static final long serialVersionUID = -4355285085441097045L;
    // 方法名字
    private String methodName;
    // 参数类型
    private Class<?>[] parameterTypes;
    // 参数值
    private Object[] arguments;
    // 上下文透传的参数值
    private Map<String, String> attachments;

    private transient Invoker<?> invoker;

    public RpcInvocation() {
    }

    public RpcInvocation(Method method, Object[] arguments) {
        this(method.getName(), method.getParameterTypes(), arguments, null, null);
    }

    public RpcInvocation(Method method, Object[] arguments, Map<String, String> attachment) {
        this(method.getName(), method.getParameterTypes(), arguments, attachment, null);
    }

    public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments) {
        this(methodName, parameterTypes, arguments, null, null);
    }

    public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments) {
        this(methodName, parameterTypes, arguments, attachments, null);
    }

    public RpcInvocation(String methodName, Class<?>[] parameterTypes, Object[] arguments, Map<String, String> attachments, Invoker<?> invoker) {
        this.methodName = methodName;
        this.parameterTypes = parameterTypes == null ? new Class<?>[0] : parameterTypes;
        this.arguments = arguments == null ? new Object[0] : arguments;
        this.attachments = attachments == null ? new HashMap<String, String>() : attachments;
        this.invoker = invoker;
    }
}
RpcInvocation
  • RpcInvocation的核心变量包括方法名、参数类型、参数值、附带上下文数据。
  • methodName为方法名。
  • parameterTypes为参数类型。
  • arguments为参数值。
  • attachments为附带上下文数据。


异步调用流程

dubbo异步调用.jpg


DubboInvoker

public class DubboInvoker<T> extends AbstractInvoker<T> {
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 是否异步
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // 是否单向
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            // 超时时间
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

            if (isOneway) {
                // 处理单向发送
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                // 处理异步发送
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                // 处理同步发送
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}
  • 异步调用执行ResponseFuture future = currentClient.request(inv, timeout)发送请求。
  • 异步调用执行RpcContext.getContext().setFuture(new FutureAdapter<Object>(future))保存future到RpcContext
  • 同步调用执行RpcContext.getContext().setFuture(null)设置RpcContext为空。
  • 同步调用执行currentClient.request(inv, timeout).get()等待同步消息结果。
  • 同步执行和异步执行的差别在于同步发送requst之后执行get()同步等待结果,异步执行发送request之后保存future到RpcContext上下文。


HeaderExchangeChannel

final class HeaderExchangeChannel implements ExchangeChannel {

    private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);

    private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";

    private final Channel channel;

    private volatile boolean closed = false;

    HeaderExchangeChannel(Channel channel) {
        if (channel == null) {
            throw new IllegalArgumentException("channel == null");
        }
        this.channel = channel;
    }

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        // 保存channel、req等信息到DefaultFuture对象当中
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            // 调用底层逻辑发送消息
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}
  • HeaderExchangeChannel的request()方法内部会创建Request对象,核心变量包括version、data的变量。
  • channel.send()方法中channel指的是NettyClient对象。
  • HeaderExchangeChannel.request()方法返回DefaultFuture对象,用于保存异步至上下文的RpcContext当中
  • HeaderExchangeChannel的request()方法内部创建Request对象,创建DefaultFuture对象(包含request对象),调用NettyClient.send()异步发送消息。


Request

public class Request {

    public static final String HEARTBEAT_EVENT = null;

    public static final String READONLY_EVENT = "R";

    private static final AtomicLong INVOKE_ID = new AtomicLong(0);

    private final long mId;

    private String mVersion;

    private boolean mTwoWay = true;

    private boolean mEvent = false;

    private boolean mBroken = false;

    private Object mData;

    public Request() {
        mId = newId();
    }

    public Request(long id) {
        mId = id;
    }

    private static long newId() {
        // getAndIncrement() When it grows to MAX_VALUE, it will grow to MIN_VALUE, and the negative can be used as ID
        return INVOKE_ID.getAndIncrement();
    }

    public long getId() {
        return mId;
    }
}
  • Request对象的核心字段INVOKE_ID,全局静态用于记录标识request对象的唯一性。
  • Request对象的核心变量如上图所示,其中mData保存RpcInvocation对象。


DefaultFuture

public class DefaultFuture implements ResponseFuture {

    private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);
    // 保存等待响应的Channel
    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
    // 保存等待响应的DefaultFuture
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

    static {
        // 超时检测线程
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
        th.setDaemon(true);
        th.start();
    }

    // invoke id.
    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    // 核心的lock和done字段
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    private final long start = System.currentTimeMillis();
    private volatile long sent;
    private volatile Response response;
    private volatile ResponseCallback callback;

    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

    // 超时检测线程RemotingInvocationTimeoutScan
    private static class RemotingInvocationTimeoutScan implements Runnable {

        @Override
        public void run() {
            while (true) {
                try {
                    // 遍历所有保存的DefaultFuture对象检测超时
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }

                        // 检测超时的处理逻辑
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // 创建Response对象,唯一标识符为Request的唯一标识mId
                            // future.getId() 等价于request.getId()
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            // 调用DefaultFuture.received执行超时响应逻辑
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                }
            }
        }
    }

    // 处理数据接受的逻辑或者超时响应的逻辑received => doReceived
    public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    // 内部通过done.signal()方式通知等待的线程异步结果。
    private void doReceived(Response res) {
        // 通过lock来实现互斥
        lock.lock();
        try {
            response = res;
            if (done != null) {
                // 通知等待线程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        // 如果配置回调函数就执行回调函数callback
        if (callback != null) {
            invokeCallback(callback);
        }
    }

    // 以下逻辑是同步等待的逻辑
    public Object get() throws RemotingException {
        return get(timeout);
    }

    @Override
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

    private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        }
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }
}
  • DefaultFuture作为异步实现的核心,本质上通过ReentrantLock来实现异步通知。
  • Lock lock = new ReentrantLock(),互斥锁用于保证单个DefaultFuture的线程安全。
  • Condition done = lock.newCondition(),用于单个DefaultFuture的唤醒通知机制。
  • DefaultFuture包含全局唯一的静态线程RemotingInvocationTimeoutScan用于扫描超时的DefaultFuture对象。
  • DefaultFuture包含静态变量FUTURES保存所有请求的DefaultFuture对象。
  • RemotingInvocationTimeoutScan扫描超时线程后会执行DefaultFuture的received => doReceived流程进行响应。
  • 正常响应返回的处理流程会执行DefaultFuture的received => doReceived流程进行响应。
  • DefaultFuture的get方法用于执行等待操作,通过done.await()方法实现。


正常返回的处理流程

public class HeaderExchangeHandler implements ChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(HeaderExchangeHandler.class);

    public static String KEY_READ_TIMESTAMP = HeartbeatHandler.KEY_READ_TIMESTAMP;

    public static String KEY_WRITE_TIMESTAMP = HeartbeatHandler.KEY_WRITE_TIMESTAMP;

    private final ExchangeHandler handler;

    public HeaderExchangeHandler(ExchangeHandler handler) {
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        this.handler = handler;
    }

    static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if (echo != null && echo.length() > 0) {
                        channel.send(echo);
                    }
                }
            } else {
                handler.received(exchangeChannel, message);
            }
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
}
  • 正常响应的时候通过received => handleResponse执行到DefaultFuture的received进行响应。


结论

  • Dubbo的异步调用流程的底层核心借助于NettyClient的异步过程。
  • Dubbo的异步调用流程的每个请求对象Request都有唯一的标识符(使用递增的数字标识)。
  • Dubbo的异步调用流程的核心逻辑通过DefaultFuture来完成,底层逻辑是通过ReentrantLock来实现的。


参考

Dubbo异步调用(七)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,980评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,422评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,130评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,553评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,408评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,326评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,720评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,373评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,678评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,722评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,486评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,335评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,738评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,283评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,692评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,893评论 2 335

推荐阅读更多精彩内容