4.1 TCP粘包/拆包
TCP是个"流"协议,是没有界限的一串数据。所以一个业务上认为的一个完整的包可能会被拆分成多个包发送,多个完整的包也可能被封装成一个大的数据包发送,这就是TCP粘包/拆包问题。
4.1.1 TCP粘包/拆包问题说明
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数不确定,故可能存在以下情况:
( 1 )服务端费两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
( 2 )服务端一次性读取到了D1和D2粘合在一起的数据包,这种现象称为粘包
( 3 )服务端分两次读取到了两个数据包,第一次读取了完成的D1和D2的部分内容,第二次读取了D2的剩余内容,这种称为拆包
( 4 )如果服务端接收窗口非常小,数据包较大,可能发生多次拆包。
4.1.3 粘包问题的解决策略
犹豫底层的TCP无法理解上层业务数据,这个问题只能通过上层的应用协议栈设计来解决,归纳如下 :
( 1 )消息定长,例如每个报文大小固定200字节,如果不够,空格补位
( 2 )在包尾增加回车换行符进行分割,如FTP协议
( 3 )将消息分为消息头和消息体,消息头包含标识消息长度的字段
( 4 )更复杂的应用层协议
4.2 未考虑TCP 粘包导致的异常案例
下面是一个时间服务器的demo,并没有考虑读半包问题。按照设计初衷,我们的本来意图是读取一条消息后,记一次数,然后发送应答消息给客户端,而且请求消息应该为"QUERY TIME ORDER"才会返回时间,否则返回"BAD ORDER"
4.2.1 TimeServer 部分代码
public class TimeServer {
public void bind(int port) throws Exception {
//配置服务端的NIO线程组
//NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,
//实际上它们就是Reactor线程组。
//这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。
ServerBootstrap b = new ServerBootstrap();
//调用ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap中。
//接着设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。
//然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024,
//最后绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的handler类,
//主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
//绑定端口,同步等待成功
//服务端启动辅助类配置完成之后,调用它的bind方法绑定监听端口
//随后,调用它的同步阻塞方法sync等待绑定操作完成。
//完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,主要用于异步操作的通知回调。
ChannelFuture f = b.bind(port).sync();
//等待服务端监听端口关闭
//使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。
f.channel().closeFuture().sync();
} finally {
//优雅退出,释放线程池资源
//调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception {
new TimeServer().bind(8080);
}
}
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//做类型转换,将msg转换成Netty的ByteBuf对象。
//ByteBuf类似于JDK中的java.nio.ByteBuffer 对象,不过它提供了更加强大和灵活的功能。
ByteBuf buf = (ByteBuf) msg;
//通过ByteBuf的readableBytes方法可以获取缓冲区可读的字节数,
//根据可读的字节数创建byte数组
byte[] req = new byte[buf.readableBytes()];
//通过ByteBuf的readBytes方法将缓冲区中的字节数组复制到新建的byte数组中
buf.readBytes(req);
//通过new String构造函数获取请求消息。
String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
//如果是"QUERY TIME ORDER"则创建应答消息,
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
//通过ChannelHandlerContext的write方法异步发送应答消息给客户端。
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4.2.2 TimeClient部分代码
客户端部分代码需要注意的是TimeClientHandler.channelActive方法,在链路建立成功后,循环发送一百条消息,每发送一次刷新一次,保证每条消息写到Channel中,按照我们的设计服务端应该接收到100条查询时间的消息。而在客户端接收消息channelRead方法中,我们每接收一条消息打印一次计数器,我们初衷是看到打印100次系统时间。
public class TimeClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
//首先创建客户端处理I/O读写的NioEventLoop Group线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置。
//与服务端不同的是,它的Channel需要设置为NioSocketChannel
//然后为其添加handler,此处为了简单直接创建匿名内部类,实现initChannel方法,
//其作用是当创建NioSocketChannel成功之后,
//在初始化它的时候将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件。
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
//客户端启动辅助类设置完成之后,调用connect方法发起异步连接,
//然后调用同步方法等待连接成功。
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
//当客户端连接关闭之后,客户端主函数退出.
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
//在退出之前,释放NIO线程组的资源。
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args !=null && args.length > 0){
port = Integer.valueOf(args[0]);
}
new TimeClient().connect(port,"127.0.0.1");
}
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter{
private int counter;
private byte[] req;
public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
//当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
//当服务端返回应答消息时,channelRead方法被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//从Netty的ByteBuf中读取并打印应答消息。
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4.2.3 运行结果
运行结果显然不如我们意,发生了粘包现象 ,服务端显示结果接受了两条消息,客户端只接收到了一条返回消息,可见客户端的请求消息和服务端的返回消息都发生了粘包:
The time server receive order : QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
......
QUE ; the counter is : 1
The time server receive order : Y TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
......
QUERY TIME ORDER ; the counter is : 2
4.3 利用LineBasedFrameDecoder解决粘包问题
为了解决半包读写问题,Netty默认提供了多种解码器用于处理半包,下面我们来修正时间服务器
4.3.1 支持TCP粘包的TimeServer
直接看代码,只是在ChildChannelHandler2添加LineBasedFrameDecoder和StringDecoder两个解码器
public class TimeServer {
public void bind(int port) throws Exception {
//配置服务端的NIO线程组
//NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,
//实际上它们就是Reactor线程组。
//这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。
ServerBootstrap b = new ServerBootstrap();
//调用ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap中。
//接着设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。
//然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024,
//最后绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的handler类,
//主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler2());
//绑定端口,同步等待成功
//服务端启动辅助类配置完成之后,调用它的bind方法绑定监听端口
//随后,调用它的同步阻塞方法sync等待绑定操作完成。
//完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,主要用于异步操作的通知回调。
ChannelFuture f = b.bind(port).sync();
//等待服务端监听端口关闭
//使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。
f.channel().closeFuture().sync();
} finally {
//优雅退出,释放线程池资源
//调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/**
* 添加LineBasedFrameDecoder和StringDecoder两个解码器
* 更多请看TimeServerHandler代码修改
*/
private class ChildChannelHandler2 extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new TimeServerHandler());
}
}
public static void main(String[] args) throws Exception {
new TimeServer().bind(8080);
}
}
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
private int counter;
/**
* 解决半包问题 不对msg做额外处理和编码
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4.3.2 支持TCP粘包的TimeClient
客户端部分代码类似,也是加上两个编码器:
public class TimeClient {
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
//首先创建客户端处理I/O读写的NioEventLoop Group线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
//继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置。
//与服务端不同的是,它的Channel需要设置为NioSocketChannel
//然后为其添加handler,此处为了简单直接创建匿名内部类,实现initChannel方法,
//其作用是当创建NioSocketChannel成功之后,
//在初始化它的时候将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件。
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//添加LineBasedFrameDecoder和StringDecoder解码器 更多看TimeClentander代码修改
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
// 发起异步连接操作
//客户端启动辅助类设置完成之后,调用connect方法发起异步连接,
//然后调用同步方法等待连接成功。
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链路关闭
//当客户端连接关闭之后,客户端主函数退出.
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放NIO线程组
//在退出之前,释放NIO线程组的资源。
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args !=null && args.length > 0){
port = Integer.valueOf(args[0]);
}
new TimeClient().connect(port,"127.0.0.1");
}
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter{
private int counter;
private byte[] req;
public TimeClientHandler() {
req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
}
//当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf message = null;
for (int i = 0; i < 100; i++) {
message = Unpooled.buffer(req.length);
message.writeBytes(req);
ctx.writeAndFlush(message);
}
}
/**
* 直接打印msg 不做额外处理
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//从Netty的ByteBuf中读取并打印应答消息。
String body = (String) msg;
System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4.3.3 运行支持粘包的时间服务器程序
结果如下,运行结果客户端 服务端 都完全符合预期 :
The time server receive order : QUERY TIME ORDER ; the counter is : 1
The time server receive order : QUERY TIME ORDER ; the counter is : 2
......
The time server receive order : QUERY TIME ORDER ; the counter is : 99
The time server receive order : QUERY TIME ORDER ; the counter is : 100
Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 1
Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 2
......
Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 99
Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 100
4.3.4 LineBasedFrameDecoder和StringDecoder原理分析
LineBasedFrameDecoder工作原理是一次便利ByteBuf中的可读字节,以"\n" 或者"\r\n"为结束位置,中间字节组成一行,是已换行符为结束标志的解码器。还可以设置单行最大长度,超过最大长度还没有换行符,就会抛出异常。
StringDecoder的功能也很简单,将接收到的对象转换成字符串,然后在调用后面的Handler。LineBasedFrameDecoder+StringDecoder其实就是组合成按行切换的文本解码器。
另外,Netty还提供了其他解码器,如果发送的消息不是以换行符结束,也可以解决半包问题。