1. TCP粘包、拆包图解
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
1.服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
2.服务端一次接受到了两个数据包,D1和D2粘合在一起,称之为TCP粘包
3.服务端分两次读取到了数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包
4.服务端分两次读取到了数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。
特别要注意的是,如果TCP的接受滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种情况,即服务端分多次才能将D1和D2包完全接受,期间发生多次拆包。
2. 粘包、拆包产生原因
2.1 滑动窗口
TCP流量控制主要使用滑动窗口协议,滑动窗口是接受数据端使用的窗口大小,用来告诉发送端接口端的缓存大小,以此可以控制发送端发送数据的大小,从而达到流量控制的目的。这个窗口的大小就是我们以此传输几个数据。对所有数据帧按顺序赋予编号。发送方在发送过程中始终保持着一个发送窗口,只有落在发送方窗口的帧才允许被发送;同时接受方也维护一个窗口,只有落在接受窗口内的帧才允许接收。
滑动窗口是如何造成粘包、拆包的
粘包:假设发送方的每256 bytes表示一个完整的报文,接收方由于数据处理不及时,这256个字节的数据都会被缓存到SO_RCVBUF(接收缓存区)中。如果接收方的SO_RCVBUF中缓存了多个报文,那么对于接收方而言,这就是粘包。
拆包:考虑另外一种情况,假设接收方的窗口只剩了128,意味着发送方最多还可以发送128字节,而由于发送方的数据大小是256字节,因此只能发送前128字节,等到接收方ack后,才能发送剩余字节。这就造成了拆包。
2.2 MSS和MTU分片
MSS: 是Maximum Segement Size缩写,表示TCP报文中data部分的最大长度,是TCP协议在OSI五层网络
模型中传输层对一次可以发送的最大数据的限制。
MTU: 最大传输单元是,Maxitum Transmission Unit的简写,是OSI五层网络模型中链路层(datalink layer)对一次可以发送的最大数据的限制。
对于应用层来说,只关心发送的数据DATA,将数据写入socket在内核中的发送缓冲区SO_SNDBUF即返回,操作系统会将SO_SNDBUF中的数据取出来进行发送。传输层会在DATA前面加上TCP Header,构成一个完整的TCP报文。
当数据到达网络层(network layer)时,网络层会在TCP报文的基础上再添加一个IP Header,也就是将自己的网络地址加入到报文中。到数据链路层时,还会加上Datalink Header和CRC。
当到达物理层时,会将SMAC(Source Machine,数据发送方的MAC地址),DMAC(Destination Machine,数据接受方的MAC地址 )和Type域加入。
可以发现数据在发送前,每一层都会在上一层的基础上增加一些内容,下图演示了MSS、MTU在这个过程中的作用。
[图片上传失败...(image-7a5058-1618037626721)]
MTU是以太网传输数据方面的限制,每个以太网帧都有最小的大小64bytes最大不能超过1518bytes。刨去以太网帧的帧头 (DMAC目的MAC地址48bit=6Bytes+SMAC源MAC地址48bit=6Bytes+Type域2bytes)14Bytes和帧尾 CRC校验部分4Bytes(这个部分有时候大家也把它叫做FCS),那么剩下承载上层协议的地方也就是Data域最大就只能有1500Bytes这个值 我们就把它称之为MTU。
由于MTU限制了一次最多可以发送1500个字节,而TCP协议在发送DATA时,还会加上额外的TCP Header和Ip Header,因此刨去这两个部分,就是TCP协议一次可以发送的实际应用数据的最大大小,也就是MSS
MSS长度 = MTU长度 - IP Header - TCP Header
TCP Header的长度是20字节,IPv4中IP Header长度是20字节,IPV6中IP Header长度是40字节,因此:在IPV4中,以太网MSS可以达到1460byte;在IPV6中,以太网MSS可以达到1440byte。
需要注意的是MSS表示的一次可以发送的DATA的最大长度,而不是DATA的真实长度。发送方发送数据时,当SO_SNDBUF中的数据量大于MSS时,操作系统会将数据进行拆分,使得每一部分都小于MSS,这就是拆包,然后每一部分都加上TCP Header,构成多个完整的TCP报文进行发送,当然经过网络层和数据链路层的时候,还会分别加上相应的内容。
需要注意: 默认情况下,与外部通信的网卡的MTU大小是1500个字节。而本地回环地址的MTU大小为65535,这是因为本地测试时数据不需要走网卡,所以不受到1500的限制。
3. Nagle算法
TCP/IP协议中,无论发送多少数据,总是要在数据(DATA)前面加上协议头(TCP Header+IP Header),同时,对方接收到数据,也需要发送ACK表示确认。
即使从键盘输入的一个字符,占用一个字节,可能在传输上造成41字节的包,其中包括1字节的有用信息和40字节的首部数据。这种情况转变成了4000%的消耗,这样的情况对于重负载的网络来是无法接受的。
为了尽可能的利用网络带宽,TCP总是希望尽可能的发送足够大的数据。(一个连接会设置MSS参数,因此,TCP/IP希望每次都能够以MSS尺寸的数据块来发送数据)。
3.1 Nagle算法——尽可能发送大块数据,避免网络中充斥着许多小数据块
3.2 Nagle算法的规则:
- 如果SO_SNDBUF(发送缓冲区)中的数据长度达到MSS,则允许发送;
- 如果该SO_SNDBUF中含有FIN,表示请求关闭连接,则先将SO_SNDBUF中的剩余数据发送,再关闭;
- 设置了TCP_NODELAY=true选项,则允许发送。TCP_NODELAY是取消TCP的确认延迟机制,相当于禁用了Nagle 算法。
- 未设置TCP_CORK选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
上述条件都未满足,但发生了超时(一般为200ms),则立即发送。
4. 粘包问题的解决策略
- 消息定长,每个报文固定长度,不够的空格补齐
- 在包尾部增加换车换行进行分割
- 将消息提分为消息头和消息体,消息头中包含表示总长度
- 更复杂的应用层协议
5. netty 解决TCP粘包问题
拆包原理:
- 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包
- 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接
5.1 ByteToMessageDecoder
ByteToMessageDecoder
:netty 中的拆包也是如上这个原理,内部会有一个累加器,每次读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包,这个基类叫做 ByteToMessageDecoder
累加器
ByteToMessageDecoder
中定义了两个累加器
public static final Cumulator MERGE_CUMULATOR = ...;
public static final Cumulator COMPOSITE_CUMULATOR = ...;
MERGE_CUMULATOR
的原理是每次都将读取到的数据通过内存拷贝的方式,拼接到一个大的字节容器中,这个字节容器在 ByteToMessageDecoder
中叫做 cumulation
。
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.release();
return in;
}
try {
final int required = in.readableBytes();
// 判断是否需要扩容
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
//累加
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
in.release();
}
}
};
扩容:
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
int oldBytes = oldCumulation.readableBytes();
int newBytes = in.readableBytes();
int totalBytes = oldBytes + newBytes;
ByteBuf newCumulation = alloc.buffer(alloc.calculateNewCapacity(totalBytes, MAX_VALUE));
ByteBuf toRelease = newCumulation;
try {
// This avoids redundant checks and stack depth compared to calling writeBytes(...)
newCumulation.setBytes(0, oldCumulation, oldCumulation.readerIndex(), oldBytes)
.setBytes(oldBytes, in, in.readerIndex(), newBytes)
.writerIndex(totalBytes);
in.readerIndex(in.writerIndex());
toRelease = oldCumulation;
return newCumulation;
} finally {
toRelease.release();
}
}
扩容也是一个内存拷贝操作,新增的大小即是新读取数据的大小
拆包
累加器原理清楚之后。回到主流程,channelRead
方法,channelRead
方法是每次从TCP缓冲区读到数据都会调用的方法,触发点在AbstractNioByteChannel
的read
方法中,里面有个while
循环不断读取,读取到一次就触发一次channelRead
。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
//1.累加数据
first = cumulation == null;
cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
//2. 将累加到的数据传递给业务进行拆包
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
//3.netty会在每次读取到一次数据,业务拆包之后对字节字节容器做清理,清理部分的代码如下
if (cumulation != null && !cumulation.isReadable()) {
//如果字节容器当前已无数据可读取,直接销毁字节容器,并且标注一下当前字节容器一次数据也没读取
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
//字节容器中仍然有未被业务拆包器读取的数据,那就做一次压缩,有效数据段整体移到容器首部
numReads = 0;
discardSomeReadBytes();
}
//4. 传递业务数据包给业务解码器处理
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
ctx.fireChannelRead(msg);
}
}
方法体可以分为以下几个逻辑步骤
1.累加数据
2.将累加到的数据传递给业务进行业务拆包
3.清理字节容器
4.传递业务数据包给业务解码器处理
压缩,有效数据段整体移到容器首部
discardSomeReadBytes之前,字节累加器中的数据分布
+--------------+----------+----------+
| readed | unreaded | writable |
+--------------+----------+----------+
discardSomeReadBytes之后,字节容器中的数据分布
+----------+-------------------------+
| unreaded | writable |
+----------+-------------------------+
这样字节容器又可以承载更多的数据了
callDecode 字节容器的数据拆分成业务数据包塞到业务数据容器out中
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
//进行拆包 传进去的是当前读取到的未被消费的所有的数据,以及业务协议包容器
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
//一个是拆包器什么数据也没读取,可能数据还不够业务拆包器处理,直接break等待新的数据
if (oldInputLength == in.readableBytes()) {
break;
} else {
//拆包器已读取部分数据,说明解码器仍然在工作,继续解码
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
5.LineBasedFramDecoder源码
ByteOrder byteOrder
ByteOrder.BIG_ENDIAN
int maxFrameLength
Integer.MAX_VALUE 包的最大长度,超出包的最大长度netty将会做一些特殊处理
int lengthFieldOffset
0 长度域的偏移量
int lengthFieldLength
4 长度域长度
int lengthAdjustment
0 长度域的偏移量矫正。 如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 – 长度域偏移 – 长度域长。
int initialBytesToStrip
12 表示netty拿到一个完整的数据包之后向业务解码器传递之前,应该跳过多少字节
boolean failFast
true 超过最大maxFrameLength是是否报错
https://blog.csdn.net/john1337/article/details/102806307
1.基于长度的拆包
上面这类数据包协议比较常见的,前面几个字节表示数据包的长度(不包括长度域),后面是具体的数据。拆完之后数据包是一个完整的带有长度域的数据包(之后即可传递到应用层解码器进行解码),创建一个如下方式的
LengthFieldBasedFrameDecoder
即可实现这类协议
new LengthFieldBasedFrameDecoder(Integer.MAX, 0, 4);
/**
* @param maxFrameLength 包的最大长度
* the maximum length of the frame. If the length of the frame is
* greater than this value, {@link TooLongFrameException} will be
* thrown.
* @param lengthFieldOffset 长度域的偏移量 在这里是0,表示无偏移
* the offset of the length field
* @param lengthFieldLength 长度域长度 这里是4,表示长度域的长度为4
* the length of the length field
*/
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength) {
this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
}
2. 基于长度的截断拆包
度域被截掉,我们只需要指定另外一个参数就可以实现,这个参数叫做
initialBytesToStrip
,表示netty拿到一个完整的数据包之后向业务解码器传递之前,应该跳过多少字节
new LengthFieldBasedFrameDecoder(Integer.MAX, 0, 4, 0, 4);
int maxFrameLength
int lengthFieldOffset
int lengthFieldLength
int lengthAdjustment
int initialBytesToStrip
3.基于偏移长度的拆包
下面这种方式二进制协议是更为普遍的,前面几个固定字节表示协议头,通常包含一些magicNumber,protocol version 之类的meta信息,紧跟着后面的是一个长度域,表示包体有多少字节的数据
只需要基于第一种情况,调整第二个参数既可以实现
new LengthFieldBasedFrameDecoder(Integer.MAX, 4, 4);
lengthFieldOffset
是4,表示跳过4个字节之后的才是长度域
4.基于可调整长度的拆包
即长度域在前,header在后,这种情况又是如何来调整参数达到我们想要的拆包效果呢?
1.长度域在数据包最前面表示无偏移,lengthFieldOffset 为 0
2.长度域的长度为3,即lengthFieldLength为3
2.长度域表示的包体的长度略过了header,这里有另外一个参数,叫做 lengthAdjustment,包体长度调整的大小,长度域的数值表示的长度加上这个修正值表示的就是带header的包,这里是 12+2,header和包体一共占14个字节
new LengthFieldBasedFrameDecoder(Integer.MAX, 0, 3, 2, 0);
5.基于偏移可调整长度的截断拆包
更变态一点的二进制协议带有两个header,比如下面这种
拆完之后,HDR1 丢弃,长度域丢弃,只剩下第二个header和有效包体,这种协议中,一般HDR1可以表示magicNumber,表示应用只接受以该magicNumber开头的二进制数据,rpc里面用的比较多
我们仍然可以通过设置netty的参数实现
1.长度域偏移为1,那么 lengthFieldOffset
为1
2.长度域长度为2,那么lengthFieldLength
为2
3.长度域表示的包体的长度略过了HDR2
,但是拆包的时候HDR2也被netty当作是包体的的一部分来拆,HDR2的长度为1,那么lengthAdjustment
为1
4.拆完之后,截掉了前面三个字节,那么initialBytesToStrip
为 3 (长度为2 + HDR2为1 = 3)
最后,代码实现为
new LengthFieldBasedFrameDecoder(Integer.MAX, 1, 2, 1, 3);
6.LengthFieldBasedFrameDecoder 源码
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//discardingTooLongFrame 默认初始化为false
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
//in.readableBytes() 可以字节的长度
//lengthFieldEndOffset 长度字段结束偏移量
//如果当前可读的字节数< 长度域开始的字节数 返回等待, null表示返回等待
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
// 拿到长度域的实际字节偏移
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
//得到 未调整的数据帧长度 其实就是读物数据域标识的值,代表本次内容的数据长度
// 拿到实际的未调整过的包长度
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
//机上偏移量
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
//如果当前可读字节 < 本次数据包的字节数,则说明发生了粘包问题,返回 等待下一个包 然后读取完成数组
if (in.readableBytes() < frameLengthInt) {
return null;
}
//需要丢弃的字节数 > 当前数据包的长度 抛出异常
if (initialBytesToStrip > frameLengthInt) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
//跳过需要丢弃的字节数
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
//actualFrameLength 数据包有用数据的长度
int actualFrameLength = frameLengthInt - initialBytesToStrip;
//读取数据
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
// 数据包长度超出最大包长度,进入丢弃模式
private void exceededFrameLength(ByteBuf in, long frameLength) {
long discard = frameLength - in.readableBytes();
tooLongFrameLength = frameLength;
if (discard < 0) {
// 当前可读字节已达到frameLength,直接跳过frameLength个字节,丢弃之后,后面有可能就是一个合法的数据包
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// 当前可读字节未达到frameLength,说明后面未读到的字节也需要丢弃,进入丢弃模式,先把当前累积的字节全部丢弃
// Enter the discard mode and discard everything received so far.
discardingTooLongFrame = true;
// bytesToDiscard表示下次还需要丢弃多少字节
bytesToDiscard = discard;
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
}
https://www.jianshu.com/p/a0a51fd79f62
https://blog.csdn.net/e_wsq/article/details/77854547
https://www.cnblogs.com/651434092qq/p/11067528.html