dubbo服务的调用过程之consumer端发送消息

dubbo的引用过程中可以看到,最终通过<dubbo:service >引用到的bean为InvokerInvocationHandler,。
在调用过程中,组装调用的参数组成RpcInvocation,然后传递到对应的Invoker中。

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //忽略很多代码
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}
//RpcInvocation的格式
//RpcInvocation [methodName=sayName, parameterTypes=[class java.lang.String], arguments=[hello], attachments={}]

实质在调用过程中,首先会调用MockClusterInvoker,然后传递到MockClusterInvoker对象中,判断是否需要Mock,若不需要Mock,则直接调用FailoverClusterInvoker处理逻辑。

public class MockClusterInvoker<T> implements Invoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(MockClusterInvoker.class);

    private final Directory<T> directory;

    private final Invoker<T> invoker;

    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        //是否需要Mock
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }   
}

FailoverClusterInvoker继承于AbstractClusterInvoker,首先会调用该父类的invoker方法,根据扩展类,获取LoadBalance算法,和下游系统的列表,然后交由FailoverClusterInvoker处理具体逻辑。

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
  
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance;
        //获取所有Invoker列表
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }
}

protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
}

FailoverClusterInvoker处理中,首先根据路由算法选择需要调用的invoker,然后将选择的invoker保存在RpcContext上下文中,调用选择的invoker

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {

    private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);

    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }

    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        //检查copyinvokers是否为null
        checkInvokers(copyinvokers, invocation);
        //获取重试次数
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                checkInvokers(copyinvokers, invocation);
            }
            //根据路由算法选择一个invoker对象。
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            //标记为已吊用过
            invoked.add(invoker);
            //将invokers放在RpcContext的上下文中
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                return result;
            } catch (RpcException e) {
            }
        }
       
    }
}
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        //忽略代码
        Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
        //忽略代码
        return invoker;
}

private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        if (invokers.size() == 2 && selected != null && selected.size() > 0) {
            //如果有两个服务提供,则用交替提供服务
            return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
        }
        //路由算法,选择一个invoker,默认随机算法
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
        if ((selected != null && selected.contains(invoker))
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rinvoker != null) {
                    invoker = rinvoker;
                } else {
                    int index = invokers.indexOf(invoker);
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
                 
                }
            } 
        }
        return invoker;
    }
}

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    private final Random random = new Random();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers
        //忽略权重算法        
        return invokers.get(random.nextInt(length));
    }
}

FailOverClusterInvoker调用完成之后,经过一系列的WrapperFilter,(InvokerWrapper,ListenerInvokerWrapper,ProtocolFilterWrapper,ConsumerContextFilter等)最终调用了AbstractInvokerinvoke,然后调用到了DubboInvokerinvoke方法,在ConsumerContextFilter中,设置了RpcContext的相关信息,代码如下:

public class ConsumerContextFilter implements Filter {
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }
}

AbstractInvoker.java
public Result invoke(Invocation inv) throws RpcException {
    RpcInvocation invocation = (RpcInvocation) inv;
    invocation.setInvoker(this);
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> context = RpcContext.getContext().getAttachments();
    //记录调用深度
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    //调用DubboInvoker
    return doInvoke(invocation);
    
}

//DubboInvoker.java
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    RpcContext.getContext().setFuture(null);
    //netty通信 inv中保存了所有相关信息
    // //RpcInvocation [methodName=sayName, parameterTypes=[class java.lang.String], arguments=[hello], attachments={path=com.alibaba.dubbo.config.api.DemoService, interface=com.alibaba.dubbo.config.api.DemoService, version=0.0.0}]
    return (Result) currentClient.request(inv, timeout).get();
}

currentClient为ReferenceCountExchangeClient实例,最终通过一系列包装HeaderExchangeClient,HeaderExchangeChannel,最终在HeaderExchangeChannel中封装Request,调用抽象的AbstractPeerNettyClientNettyChannel,将数据发送给服务端(调用netty通信)。

//HeaderExchangeClient.java
public ResponseFuture request(Object request) throws RemotingException {
    return channel.request(request);
}


HeaderExchangeChannel.java
public ResponseFuture request(Object request, int timeout) throws RemotingException {
    //忽略若干代码
    Request req = new Request();
    req.setVersion("2.0.0");
    req.setTwoWay(true);
    req.setData(request);
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    channel.send(req);
    //Request [id=10, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=sayName, parameterTypes=[class java.lang.String], arguments=[hello], attachments={path=com.alibaba.dubbo.config.api.DemoService, interface=com.alibaba.dubbo.config.api.DemoService, version=0.0.0}]]
    return future;
}

NettyClient.java
public void send(Object message, boolean sent) throws RemotingException {
        if (send_reconnect && !isConnected()) {
            connect();
        }
        Channel channel = getChannel();
        channel.send(message, sent);
    }
    
NettyClient.java
@Override
protected com.alibaba.dubbo.remoting.Channel getChannel() {
    Channel c = channel;
    if (c == null || !c.isConnected())
        return null;
    return NettyChannel.getOrAddChannel(c, getUrl(), this);
}

NettyChannel.java
public void send(Object message, boolean sent) throws RemotingException {
    //忽略若干代码
   //channel 为org.jboss.netty.channel.Channel 
   //Request [id=6, version=2.0.0, twoway=true, event=false, broken=false, data=RpcInvocation [methodName=sayName, parameterTypes=[class java.lang.String], arguments=[hello], attachments={path=com.alibaba.dubbo.config.api.DemoService, interface=com.alibaba.dubbo.config.api.DemoService, version=0.0.0}]]
    ChannelFuture future = channel.write(message);
    //忽略若干代码 
}

到这里,消费方就将信息已经发送给了服务端。编解码部分暂时不分析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,902评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,037评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,978评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,867评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,763评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,104评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,565评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,236评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,379评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,313评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,363评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,034评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,637评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,719评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,952评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,371评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,948评论 2 341

推荐阅读更多精彩内容