Dubbo 协议
协议概览
Dubbo 框架定义了私有的RPC协议,其中请求和响应协议的具体内容我们使用表格来展示。
协议详情
-
Magic - Magic High & Magic Low (16 bits)
标识协议版本号,Dubbo 协议:0xdabb
-
Req/Res (1 bit)
标识是请求或响应。请求: 1; 响应: 0。
-
2 Way (1 bit)
仅在 Req/Res 为1(请求)时才有用,标记是否期望从服务器返回值。如果需要来自服务器的返回值,则设置为1。
-
Event (1 bit)
标识是否是事件消息,例如,心跳事件。如果这是一个事件,则设置为1。
-
Serialization ID (5 bit)
标识序列化类型:比如 fastjson 的值为6。
-
Status (8 bits)
仅在 Req/Res 为0(响应)时有用,用于标识响应的状态。
- 20 - OK
- 30 - CLIENT_TIMEOUT
- 31 - SERVER_TIMEOUT
- 40 - BAD_REQUEST
- 50 - BAD_RESPONSE
- 60 - SERVICE_NOT_FOUND
- 70 - SERVICE_ERROR
- 80 - SERVER_ERROR
- 90 - CLIENT_ERROR
- 100 - SERVER_THREADPOOL_EXHAUSTED_ERROR
-
Request ID (64 bits)
标识唯一请求。类型为long。
-
Data Length (32 bits)
序列化后的内容长度(可变部分),按字节计数。int类型。
-
Variable Part
被特定的序列化类型(由序列化 ID 标识)序列化后,每个部分都是一个 byte [] 或者 byte
- 如果是请求包 ( Req/Res = 1),则每个部分依次为:
- Dubbo version
- Service name
- Service version
- Method name
- Method parameter types
- Method arguments
- Attachments
- 如果是响应包(Req/Res = 0),则每个部分依次为:
- 返回值类型(byte),标识从服务器端返回的值类型:
- 返回空值:RESPONSE_NULL_VALUE 2
- 正常响应值: RESPONSE_VALUE 1
- 异常:RESPONSE_WITH_EXCEPTION 0
- 返回值:从服务端返回的响应bytes
- 返回值类型(byte),标识从服务器端返回的值类型:
- 如果是请求包 ( Req/Res = 1),则每个部分依次为:
源码分析
DubboCodec类图
- DubboCodec的类关系如上图,核心的逻辑在ExchangeCodec和DubboCodec。
NettyClient
public class NettyClient extends AbstractClient {
private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(Constants.DEFAULT_IO_THREADS, new DefaultThreadFactory("NettyClientWorker", true));
private Bootstrap bootstrap;
private volatile Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
@Override
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
// NettyCodecAdapter.getEncoder()返回InternalEncoder对象
.addLast("encoder", adapter.getEncoder())
.addLast("handler", nettyClientHandler);
}
});
}
}
- NettyClient的NettyCodecAdapter对象,在编码过程中会使用adapter.getEncoder(),在解码过程中会使用adapter.getDecoder()。
- 核心继续关注下NettyCodecAdapter。
NettyCodecAdapter
final class NettyCodecAdapter {
private final ChannelHandler encoder = new InternalEncoder();
private final ChannelHandler decoder = new InternalDecoder();
// DubboCountCodec对象
private final Codec2 codec;
private final URL url;
private final com.alibaba.dubbo.remoting.ChannelHandler handler;
public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
}
public ChannelHandler getEncoder() {
return encoder;
}
public ChannelHandler getDecoder() {
return decoder;
}
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
// DubboCountCodec对象
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
}
}
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
ChannelBuffer message = new NettyBackedChannelBuffer(input);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Object msg;
int saveReaderIndex;
try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
try {
// DubboCountCodec对象
msg = codec.decode(channel, message);
} catch (IOException e) {
throw e;
}
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
}
- NettyCodecAdapter包含编码对象InternalEncoder。
- NettyCodecAdapter包含解码对象InternalDecoder。
- NettyCodecAdapter的codec对象为DubboCountCodec。
Consumer编码过程
- Dubbo Client Consumer请求编码调用栈
=> NettyCodecAdapter.InternalEncoder.encode
=> DubboCountCodec.encode
=> ExchangeCodec.encode
=> ExchangeCodec.encodeRequest
=> DubboCodec.encodeRequestData
DubboCountCodec
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
public final class DubboCountCodec implements Codec2 {
private DubboCodec codec = new DubboCodec();
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
// DubboCodec对象
codec.encode(channel, buffer, msg);
}
}
- DubboCountCodec调用DubboCodec的encode()方法进行编码。
- DubboCodec的encode()调用的父类ExchangeCodec的encode()方法进行编码。
ExchangeCodec
public class ExchangeCodec extends TelnetCodec {
// header length.
protected static final int HEADER_LENGTH = 16;
// magic header.
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
// message flag.
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
// 获取序列化的对象
// hessian2=com.alibaba.dubbo.common.serialize.hessian2.Hessian2Serialization
Serialization serialization = getSerialization(channel);
// header. HEADER_LENGTH = 16;
// 头部的长度为16个字节
byte[] header = new byte[HEADER_LENGTH];
// 第1-2个字节:一个魔数数字(就是一个固定的数字)
Bytes.short2bytes(MAGIC, header);
// 第3个字节:双向或单向的标记。
// 双向是指请求有去有回,有返回值。单向是指请求有去无回,没有返回值。
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// 第4个字节:在request请求中空着
// 第5-12个字节:请求id,long型的8个字节。
// 异步变同步的全局唯一ID,用来做Consumer和Provider的来回通信标记。
Bytes.long2bytes(req.getId(), header, 4);
// 编码请求数据
int savedWriteIndex = buffer.writerIndex();
// 从HEADER_LENGTH第17个字节的位置开始写请求数据
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, req.getData());
} else {
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
// 检查请求大小是否超过限制,最大8M
checkPayload(channel, len);
// 第13-16个字节:消息体长度,请求数据的长度。
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // 写入头部数据
// 设置buffer的下一次位置,跳过头部数据和实体数据
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
}
- ExchangeCodec的encode()实际执行的是encodeRequest()方法。
DubboCodec
public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
// 版本号
out.writeUTF(version);
// 服务路径
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
// 方法名
out.writeUTF(inv.getMethodName());
// 方法参数类型
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
// 方法参数值
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++) {
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
// 方法透传数据
out.writeObject(inv.getAttachments());
}
}
- 在对请求进行编码时,会把版本号、服务路径、方法名、方法参数等信息都进行编码,其中参数类型是用JVM中的类型表示方法来编码的。
Consumer解码过程
- Dubbo Client Consumer响应解码调用栈
=> NettyCodecAdapter.InternalDecoder. decode
=>DubboCountCodec.decode
=>ExchangeCodec.decode
=>DubboCodec.decodeBody
=>DecodeableRPCResult.decode
DubboCountCodec
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
public final class DubboCountCodec implements Codec2 {
private DubboCodec codec = new DubboCodec();
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex();
MultiMessage result = MultiMessage.create();
do {
// DubboCodec对象
Object obj = codec.decode(channel, buffer);
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
} else {
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
}
- DubboCountCodec.decode()执行DubboCodec.decode()解码。
- DubboCodec.decode()执行父类的ExchangeCodec.decode()。
ExchangeCodec
public class ExchangeCodec extends TelnetCodec {
// header length.
protected static final int HEADER_LENGTH = 16;
// magic header.
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
// message flag.
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 检查魔数,如果不是dubbo协议,则使用父类的解码方法
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// dubbo 协议解析
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// 获取数据的实际长度
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 实际数据包长度 = 头部长度 + 数据长度
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
return decodeBody(channel, is, header);
} finally {
// 省略相关代码
}
}
}
DubboCodec
public class DubboCodec extends ExchangeCodec implements Codec2 {
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 读取消息标记为和协议类型
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// 读取请求id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 客户端对响应进行解码
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 读取响应状态
byte status = header[3];
res.setStatus(status);
try {
if (status == Response.OK) {
Object data;
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
} else if (res.isEvent()) {
data = decodeEventData(channel, CodecSupport.deserialize(channel.getUrl(), is, proto));
} else {
DecodeableRpcResult result;
// 根据decode.in.io的配置决定何时进行解码
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
// 执行解码
result.decode();
} else {
// 解码相应的响应值
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} else {
res.setErrorMessage(CodecSupport.deserialize(channel.getUrl(), is, proto).readUTF());
}
} catch (Throwable t) {
// 省略其他的代码
return res;
} else {
// decode request.
// 省略其他的代码
}
}
}
DecodeableRpcResult
public class DecodeableRpcResult extends RpcResult implements Codec, Decodeable {
private Channel channel;
private byte serializationType;
private InputStream inputStream;
private Response response;
private Invocation invocation;
private volatile boolean hasDecoded;
public DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id) {
Assert.notNull(channel, "channel == null");
Assert.notNull(response, "response == null");
Assert.notNull(is, "inputStream == null");
this.channel = channel;
this.response = response;
this.inputStream = is;
this.invocation = invocation;
this.serializationType = id;
}
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
// 获取序列化对象
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
byte flag = in.readByte();
switch (flag) {
case DubboCodec.RESPONSE_NULL_VALUE:
break;
case DubboCodec.RESPONSE_VALUE:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " + obj);
setException((Throwable) obj);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_NULL_VALUE_WITH_ATTACHMENTS:
try {
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_VALUE_WITH_ATTACHMENTS:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation);
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
case DubboCodec.RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS:
try {
Object obj = in.readObject();
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " + obj);
setException((Throwable) obj);
setAttachments((Map<String, String>) in.readObject(Map.class));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2' '3' '4' '5', get " + flag);
}
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
return this;
}
@Override
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc result failed: " + e.getMessage(), e);
}
response.setStatus(Response.CLIENT_ERROR);
response.setErrorMessage(StringUtils.toString(e));
} finally {
hasDecoded = true;
}
}
}
}