TCP粘包和拆包的解决之道:LineBasedFrameDecoder和StringDecoder分别为回车换行符消息解码器和字符串消息解码器
主要内容:
- TCP粘包和拆包的基础知识
- 没有考虑粘包和拆包问题的案例
- 使用Netty解决读半包问题
【TCP粘包和拆包说明】
TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以业务上认为,一个完整的包可能被TCP拆分成多个包发送,也能多个小包封装成一个大的数据包发送。
假设客户端发送D1、D2两个数据包给服务端,由于服务端一次读取的字节大小是不确定的,所以就出现了一下四种情况:
- 没有发生粘包和拆包,2次读取到了2个独立的数据包D1、D2;
- 一次读到2个数据包,D1和D2粘在一起,这就是粘包;
- 分2次读取,第一次读取到完整的D1和D2的部分内容;第二次读取D2的剩余部分,这就是拆包;
- 分2次读取,第一次读取D1的部分D1_1;第二次读取D1的剩余D1_2和D2的全部内容;
- 如果此时服务端TCP接收滑窗比较小,而数据包D1和D2比较大的话,就可能导致服务端多次才能将D1和D2接收完,期间发生多次拆包;
【TCP粘包和拆包发生的原因】
- 应用程序write写入的字节大小 > 套接口发送缓冲区的大小
- 进行MSS大小的TCP分段
- 以太网帧的payload大于MTU进行IP分片
【粘包问题的解决策略】
由于底层的TCP无法理解上层业务数据,所以底层无法保证数据包不被拆分和重组,这个问题只能通过上层的应用协议栈设计来解决,业内主流协议的解决方案:
- 消息定长,例如每个报文的大小固定长度为200字节,如果不够,空位不缺
- 在包尾增加回车换行符进行分割,例如FTP协议;
- 将消息分为消息头和消息体,消息头包含表示消息总长度(消息体总长度)的> 字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度;
- 更加复杂的应用层协议;
【未考虑TCP粘包导致功能异常的案例】
只copy重要部分代码,其他代码和Time Server是一样的。
在的方法
channelRead
添加如下代码:
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
System.out.println("The time server receive order : " + body + "; the counter is " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime += System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
在的代码如下:
/**
* Creates a client-side handler.
*/
public TimeCllientHandler() {
// Demo Code 1 : time server
// byte[] req = "QUERY TIME ORDER".getBytes();
// firstMessage = Unpooled.buffer(req.length);
// firstMessage.writeBytes(req);
// Demo Code 2 : Sticky and unpacking bag
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
// Demo Code 2 : Sticky and unpacking bag
ByteBuf message;
for (int i = 0; i < 100; i++){
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
// Demo Code 1 : time server
// ByteBuf buf = (ByteBuf)msg;
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String body = new String(req, "UTF-8");
// System.out.println("Now is : " + body);
// Demo Code 2 : Sticky and unpacking bag
ByteBuf buf = (ByteBuf)msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
}
运行结果,如下,运行结果并不是我们想要的
【利用LineBasedFrameDecoder解决TCP粘包问题】
Netty默认提供了多种编码器用于处理半包,只要熟练掌握这些类库的使用,TCP粘包就会变得很简单,你甚至都不需要去关心它。
在添加如下代码:
// Demo Code 2 : resolve sticky and unpacking bag
p.addLast(new LineBasedFrameDecoder(1024));
p.addLast(new StringDecoder());
在添加如下代码:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
//Demo Code 1 : time server
// ByteBuf buf = (ByteBuf)msg;
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String body = new String(req, "UTF-8");
// System.out.println("This time server receive order : " + body);
// String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
// ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
// ctx.write(resp);
//Demo Code 2 : Sticky and unpacking bag
// ByteBuf buf = (ByteBuf)msg;
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
// System.out.println("The time server receive order : " + body + "; the counter is " + ++counter);
// String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
// currentTime += System.getProperty("line.separator");
// ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
// ctx.write(resp);
//Demo Code 2 : resolve sticky and unpacking bag
String body = (String) msg;
System.out.println("The time server receive order : " + body + "; the counter is " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime += System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.write(resp);
}
在添加如下代码:
// Demo Code 2 : resolve sticky and unpacking bag
p.addLast(new LineBasedFrameDecoder(1024));
p.addLast(new StringDecoder());
在添加如下代码:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
// Demo Code 1 : time server
// ByteBuf buf = (ByteBuf)msg;
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String body = new String(req, "UTF-8");
// System.out.println("Now is : " + body);
// Demo Code 2 : Sticky and unpacking bag
// ByteBuf buf = (ByteBuf)msg;
// byte[] req = new byte[buf.readableBytes()];
// buf.readBytes(req);
// String body = new String(req, "UTF-8");
// System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
// Demo Code 2 : resolve sticky and unpacking bag
String body = (String) msg;
System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
}
的原理分析
的工作原理是依次遍历ByteBuf中的可读字节,判断是否有
\n
或者\r\n
,如果有,就在此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符和不携带结束符2种解码方式,同时支持配置单行字节的最大长度。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前的读到的异常码流。
就是将接收到的对象转换成字符串,然后继续调用后面的Handler。