完成了服务的初始化之后,生成了代理,生成的动态代理类对接口的方法进行了包装,每次调用都会调用到InvocationHandler的invoke()方法,此方法中会进行远程服务调用一些列复杂过程,诸如网络通信,编码,解码,序列化等,然后将结果返回。在InvokerInvocationHandler.invoker()方法中,最后调用invoker.invoke(new RpcInvocation(method, args)),首先invoker为构造InvokerInvocationHandler传入的,具体类型为MockClusterInvoker,然后调用MockClusterInvoker.invoke-->FailfastClusterInvoker.doInvoke()
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")){//没有mock
//no mock
result = this.invoker.invoke(invocation);
..................................................................
}
然后调用到FailfastClusterInvoker.invoke(),此对象持有接口服务的RegistryDirectory,里面含有远程提供者的具体信息,是在服务消费者初始化时,通过订阅zk相应节点得到的。此方法的调用过程比较复杂,会调用负载均衡算法,根据一定的策略,选取一个提供者,生成DubboInvoker对象。调用过程:AbstractClusterInvoker.invoke()--> FailfastClusterInvoker.doInvoke()
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance;//加载loadbalance
List<Invoker<T>> invokers = list(invocation);//根据method加载invokers
if (invokers != null && invokers.size() > 0) {//默认采用随机策略的loadbalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
/** invocation:方法,参数类型,参数
* invokers:此接口提供端的Invoker;
* loadbalance:均衡负载
*/
return doInvoke(invocation, invokers, loadbalance);
}
下面看如何加载invokers的,进入到 list(invocation)方法,调用过程:AbstractDirectory.list()-->RegistryDirectory.doList()
public List<Invoker<T>> doList(Invocation invocation) {
........................................................
List<Invoker<T>> invokers = null;
// 本地method-->lnvokers对应关系
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
//获取方法名称和参数
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if(args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
}
if(invokers == null) {//根据方法获取
invokers = localMethodInvokerMap.get(methodName);
}
if(invokers == null) {//获取*匹配的
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if(invokers == null) {//获取所有
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
下面回到AbstractDirectory.list(),在这个方法中进行路由规则的匹配,匹配规则,路由规则也是spi机制的,有脚本、文件、表达式,一般通过服务治理中心配置,与接口服务关联,具体参见dubbo官方文档。路由匹配完成之后,获取到provider端的所有可用的Invokers,回到AbstractClusterInvoker.invoke()方法,最终返回provider端可用的Invokers,然后调用FailfastClusterInvoker.doInvoke():
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
checkInvokers(invokers, invocation);//检查invokers的可用性
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
throw (RpcException) e;
}
.........................
}
}
select(loadbalance, invocation, invokers, null)会根据loadbalance进行负载均衡算法匹配,此处为随机选择:
com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 总个数
int totalWeight = 0; // 总权重
boolean sameWeight = true; // 权重是否都一样
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // 累计总权重
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; // 计算所有权重是否一样
}
}
if (totalWeight > 0 && ! sameWeight) {
// 如果权重不相同且权重大于0则按总权重数随机
int offset = random.nextInt(totalWeight);
// 并确定随机值落在哪个片断上
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果权重相同或权重为0则均等随机
return invokers.get(random.nextInt(length));
}
回到FailfastClusterInvoker.doInvoke(),根据负载均衡算法返回一个Invoker,为dubboInvoker,下面进行DubboInvoke.invoke()的调用
protected Result doInvoke(final Invocation invocation) throws Throwable {
//RpcInvocation封装了调用方法的对象和参数类型,具体参数
RpcInvocation inv = (RpcInvocation) invocation;
//获取具体的方法名称
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
//调用客户端,在refer中已经初始化好了,为HeaderExchangeClient
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {//没有返回参数
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//异步
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {//同步返回
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
下面分析常用的同步返回,进入HeaderExchangeClient.request()方法:
public ResponseFuture request(Object request, int timeout) throws RemotingException {
return channel.request(request, timeout);
}
上面方法的channel为HeaderExchangeChannel,在HeaderExchangeClient初始化的时候直接初始化的,里面封装了NettyClient:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);//request为RpcInvocation对象
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
上面方法中channel为NettyClient,进入NettyClient.send()-->NettyChannel.send(),channel是netty的一个组件,负责客户端与服务端之间的链路传递,调用Netty框架的IO事件之后会触发Netty框架的IO事件处理链。
com.alibaba.dubbo.remoting.transport.netty.NettyChannel:
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
//调用netty channel写入请求
ChannelFuture future = channel.write(message);
if (sent) {//是否等待请求消息发出,默认不等待
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {// 等待消息发出,消息发送失败将抛出异常
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if(! success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
下面来分析消费端和提供端网络请求的部分,利用netty的encoder和decoder,来写入协议内容和请求数据,这里涉及到序列化机制,dubbo默认使用hessian序列化,过程为:消费端将请求编码-->发送-->提供端将请求解码-->响应请求-->提供端响应编码-->发送-->消费端响应解码-->结果返回
1) 消费端请求编码
消费端初始化nettyclient时,添加了事件处理链:NettyCodecAdapter.decoder-->NettyCodecAdapter.encoder-->NettyHandler,NettyCodecAdapter.decoder为上行事件处理器,NettyCodecAdapter.encoder为下行事件处理器,NettyHandler为上下行事件处理器,chanel.write()是一个下行事件,NettyCodecAdapter.decoder和NettyHandler将会被调用,调用顺序为NettyHandler-->NettyCodecAdapter.decoder
调用NettyHandler中的writeRequested方法:(从注释可以看出)
/**
* Invoked when a message object (e.g: {@link ChannelBuffer}) was received
* from a remote peer.
*/
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.writeRequested(ctx, e);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
handler.sent()调用链路为:NettyClient.send()-->AbstractPeer.send()
public void sent(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
handler.sent(ch, msg);
}
此handler为DubboProtocol一路传过来的,包装了几层
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
调用链路为:DecodeHandler.send()-->HeaderExchangeHandler.send()-->ExchangeHandlerAdapter.send(),最后调到ChannelHandlerAdapter.send()没有任何操作,NettyHandler就调用完毕了。下面看核心encoder的处理,调用方法为:InternalEncoder.encode()
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
}
codec是NettyCodecAdapter初始化时传过来的,类型为DubboCountCodec,DubboCountCodec.encode()会调用到DubboCodec.encode()
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
接着调用到encodeRequest(),下面的方法是按照dubbo协议进行编码,dubbo协议头,长度为16个字节,共128位,的内容如下:
0-15:魔数
16-20:序列化ID
21:是否事件数据
22:需要响应标识
23:标识是请求还是响应
24-31:状态(req为空)
32-95:请求id
96-127:实体数据长度
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.
byte[] header = new byte[HEADER_LENGTH];//16个字节的消息头
// set magic number.
Bytes.short2bytes(MAGIC, header);//前两个字节魔数
// 第三个字节标识序列化id、请求信息等,对应16-23位
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;//
if (req.isEvent()) header[2] |= FLAG_EVENT;
//32-95 请求id
Bytes.long2bytes(req.getId(), header, 4);
// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {//序列化数据
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData());
}
out.flushBuffer();
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);//检查请求数据实体长度
Bytes.int2bytes(len, header, 12);//数据实体长度写入请求头
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
下面进入 encodeRequestData(channel, out, req.getData()),看看序列化,默认使用hessian2序列化,消息体数据包含dubbo版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加信息,把它们按顺序依次序列化,数据写入到类型为ChannelBuffer的buffer参数中,接着将ChannelBuffer封装成netty的org.jboss.netty.buffer.ChannelBuffer,netty会将数据写到链路发送到服务端。
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
out.writeUTF(inv.getMethodName());
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++){
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
out.writeObject(inv.getAttachments());
}
2) 提供端响应请求
此步骤包括:提供端将请求解码,响应请求,将结果编码,返回到消费端
2.1 提供端将请求解码
客户端将数据发送到服务端,netty服务端会触发decoder和handler两个处理器,进入com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived()方法,
首先将netty的ChannelBuffer转成dubbo的ChannelBuffer,这里需要处理半包问题。
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
.................................................
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
Object msg;
int saveReaderIndex;
try {
// decode object.
do {//循环处理半包问题
saveReaderIndex = message.readerIndex();//保存当前解析的位置等待链路的下次IO事件
try {
msg = codec.decode(channel, message);如果数据不完整返回NEED_MORE_INPUT
} catch (IOException e) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {//从上次解析位置读取
message.readerIndex(saveReaderIndex);
break;
} else {
if (saveReaderIndex == message.readerIndex()) {
buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
if (msg != null) {
Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
}
}
} while (message.readable());
.................................................
}
DubboCodec.decode()->ExchangeCodec.decode(),进入到ExchangeCodec.decode():
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
....................................................//检查协议头中的长度、魔数等
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
return decodeBody(channel, is, header);
}
下面进入decodeBody(),首先初始化一个Request对象,创建DecodeableRpcInvocation对象,此对象为rpcInvocation的子类,调用此对象的decode()序列化出请求的method,paramtype等信息 set到DecodeableRpcInvocation中,最后将此对象set到request的data对象中。
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
}else {
Object data;
......................................
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
..................................
return req;
}
回调InternalDecoder.messageReceived()方法,调用Channels.fireMessageReceived()方法,激活下一个处理器的messageReceived事件,并且把解码后的对象封装在MessageEvent中。
2.2 提供端处理请求
decoder之后,继续出发netty的下一个处理链handler,进入NettyHandler的messageReceived()方法,
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
此handler为createServer传递过来的,封装了多层,涉及到服务调用线程的初始化,具体可看《dubbo 线程模型浅析》,此处只分析任务丢到线程池的执行过程:DecodeHandler->HeaderExchangeHandler->ExchangeHandler
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
最终调到HeaderExchangeHandler.receive()方法,此处的message为刚才decoder传递过来的request类型
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
}
.......................................................
}
进入handleRequest(),取出request中的data对象,为invocation,然后进入到DubboProtocol内部类ExchangeHandlerAdapter的reply()方法
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
..........................................................
// find handler by message class.
Object msg = req.getData();
try {
// handle data.
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
在DubboProtocol.ExchangeHandlerAdapter.reply()中,传入的参数分别是ExchangeChannel、Invocation,根据这两个参数拼装serviceKey,然后从exporterMap中找到Invoker,exportMap在服务发布的时候,DubboProtocol.export()方法put进去的。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);//执行调用
}
...................................................
此invoker为包装类型,包含filter执行链、提供端接口实现类的包装类,执行invoke()方法,先调用filter执行链,然后调用AbstractProxyInvoker实现类的doInvoke()方法,进行调用的包装类的invokeMethod()方法,包装类持有具体实现类,至此调用完毕,回到HeaderExchangeHandler.receive()方法,调用channel.send()写到客户端
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
写到客户端仍然会触发netty事件链,接着调用NettyCodecAdapter.encoder和NettyHandler,InternalEncoder.encode()-->ExchangeCodec.encode():
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
在encodeResponse()中,与消费端一样,将协议头,结果作为消息体写入ChannelBuffer中,在encodeResponse()中调用了encodeResponseData(),进行序列化
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
Result result = (Result) data;
Throwable th = result.getException();
if (th == null) {//处理结果没有异常
Object ret = result.getValue();
if (ret == null) {//结果为null
out.writeByte(RESPONSE_NULL_VALUE);
} else {//结果不为空
out.writeByte(RESPONSE_VALUE);
out.writeObject(ret);
}
} else {//处理结果异常
out.writeByte(RESPONSE_WITH_EXCEPTION);
out.writeObject(th);
}
}
回到InternalEncoder.encode()中,调用ChannelBuffers.wrappedBuffer(buffer.toByteBuffer()),将ChannelBuffer写回链路,服务端所有处理结束。
2.3消费端响应结果
服务端写数据到客户端调用netty事件处理器:NettyCodecAdapter.decoder和NettyHandler,调用链路:NettyCodecAdapter.InternalDecoder. messageReceived()-->ExchangeCodec.decode()-->DubboCodec.decodeBody()
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// get status.
byte status = header[3];
res.setStatus(status);
if (status == Response.OK) {
try {
Object data;
............................................
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation)getRequestData(id), proto);
result.decode();//DecodeableRpcResult.decode()
}
.......................................
data = result;
}
res.setResult(data);
.....................................................
return res;
} else{
}
}
上面的方法会调用DecodeableRpcResult.decode(),调完之后,将DecodeableRpcResult set到response对象的data属性中,DecodeableRpcResult为RpcResult的子类,进入到DecodeableRpcResult.decode():
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE://正常返回,空值
break;
case DubboCodec.RESPONSE_VALUE://正常返回,有值
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION://异常返回
..................................
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
}
return this;
}
在执行结果有值,正常返回的情况下,会序列化出返回值,调用setValue()设置到result中。
然后触发NettyHandler事件,调用NettyHandler中的messageReceived():
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
这个handler的封装关系在前面已经说明:DecodeHandler->HeaderExchangeHandler->ExchangeHandler,一路调用直到HeaderExchangeHandler.received():
public void received(Channel channel, Object message) throws RemotingException {
........................
else if (message instanceof Response) {
handleResponse(channel, (Response) message);
}
.......................
}
handleResponse()的调用链路为:DefaultFuture.received()-->DefaultFuture.doReceive():
private void doReceived(Response res) {
lock.lock();
try {
response = res;//将resposne设置到DefaultFuture中
if (done != null) {
done.signal();//唤起消费端调用等待线程
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
调用 done.signal()的作用是唤起消费端调用等待线程,在消费端向提供端发送调用请求后,会调用DefaultFuture.get阻塞等待响应结果,回到DubboInvoker.doInvoke()方法:
protected Result doInvoke(final Invocation invocation) throws Throwable {
..............................
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
.............................
} else if (isAsync) {
.............................
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
}
currentClient.request()最终调到HeaderExchangeChannel.request(Object request, int timeout):
public ResponseFuture request(Object request, int timeout) throws RemotingException {
...........................................
// create request.
Request req = new Request();//初始化的时候会设置一个id
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);//RpcInvocation对象
//DefaultFuture初始化的时候会将id 当前对象保存起来,以备返回的时候使用rpcInvocation
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);//调用netty发送
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
可以看到返回的是DefaultFuture对象,进入此对象的get()方法:
public Object get(int timeout) throws RemotingException {
..............................................
while (! isDone()) {//等待的条件为response为null
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
}
..............................................
return returnFromResponse();
}
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
//see Dubbocodec.decodebody(),返回 DecodeableRpcResult
return res.getResult();
}
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
回到DubboInvoker.doInvoke(),将结果返回,回到最初的InvokerInvocationHandler.invoke()
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
经过一些列复杂的过程,执行 invoker.invoke(new RpcInvocation(method, args))完成,返回ecodeableRpcResult,然后调用它的recreate()获取result值,此值就是提供端的执行结果,具体可以回顾上面的流程,将具体的结果返回,整个rpc调用流程完毕。
参考:
http://blog.csdn.net/pentiumchen/article/details/53227844
http://www.imooc.com/article/details/id/22597
http://blog.kazaff.me/2015/02/02/dubbo%E7%9A%84%E6%9C%8D%E5%8A%A1%E6%B2%BB%E7%90%86%E7%BB%86%E8%8A%82/
http://blog.csdn.net/manzhizhen/article/details/73436619