Chapter4 TCP粘包/拆包问题的解决之道

4.1 TCP粘包/拆包

TCP是个"流"协议,是没有界限的一串数据。所以一个业务上认为的一个完整的包可能会被拆分成多个包发送,多个完整的包也可能被封装成一个大的数据包发送,这就是TCP粘包/拆包问题。

4.1.1 TCP粘包/拆包问题说明

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到字节数不确定,故可能存在以下情况:
( 1 )服务端费两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包
( 2 )服务端一次性读取到了D1和D2粘合在一起的数据包,这种现象称为粘包
( 3 )服务端分两次读取到了两个数据包,第一次读取了完成的D1和D2的部分内容,第二次读取了D2的剩余内容,这种称为拆包
( 4 )如果服务端接收窗口非常小,数据包较大,可能发生多次拆包。


粘包拆包示意图.png
4.1.3 粘包问题的解决策略

犹豫底层的TCP无法理解上层业务数据,这个问题只能通过上层的应用协议栈设计来解决,归纳如下 :
( 1 )消息定长,例如每个报文大小固定200字节,如果不够,空格补位
( 2 )在包尾增加回车换行符进行分割,如FTP协议
( 3 )将消息分为消息头和消息体,消息头包含标识消息长度的字段
( 4 )更复杂的应用层协议

4.2 未考虑TCP 粘包导致的异常案例

下面是一个时间服务器的demo,并没有考虑读半包问题。按照设计初衷,我们的本来意图是读取一条消息后,记一次数,然后发送应答消息给客户端,而且请求消息应该为"QUERY TIME ORDER"才会返回时间,否则返回"BAD ORDER"

4.2.1 TimeServer 部分代码
public class TimeServer {
    public void bind(int port) throws Exception {
        //配置服务端的NIO线程组
        //NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,
        //实际上它们就是Reactor线程组。
        //这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。
            ServerBootstrap b = new ServerBootstrap();
            //调用ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap中。
            //接着设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。
            //然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024,
            //最后绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的handler类,
            //主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            //绑定端口,同步等待成功
            //服务端启动辅助类配置完成之后,调用它的bind方法绑定监听端口
            //随后,调用它的同步阻塞方法sync等待绑定操作完成。
            //完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,主要用于异步操作的通知回调。
            ChannelFuture f = b.bind(port).sync();

            //等待服务端监听端口关闭
            //使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放线程池资源
            //调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer {
        @Override
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        new TimeServer().bind(8080);
    }
}

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //做类型转换,将msg转换成Netty的ByteBuf对象。
        //ByteBuf类似于JDK中的java.nio.ByteBuffer 对象,不过它提供了更加强大和灵活的功能。
        ByteBuf buf = (ByteBuf) msg;
        //通过ByteBuf的readableBytes方法可以获取缓冲区可读的字节数,
        //根据可读的字节数创建byte数组
        byte[] req = new byte[buf.readableBytes()];
        //通过ByteBuf的readBytes方法将缓冲区中的字节数组复制到新建的byte数组中
        buf.readBytes(req);
        //通过new String构造函数获取请求消息。
        String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
        System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
        //如果是"QUERY TIME ORDER"则创建应答消息,
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
                System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        //通过ChannelHandlerContext的write方法异步发送应答消息给客户端。
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
4.2.2 TimeClient部分代码

客户端部分代码需要注意的是TimeClientHandler.channelActive方法,在链路建立成功后,循环发送一百条消息,每发送一次刷新一次,保证每条消息写到Channel中,按照我们的设计服务端应该接收到100条查询时间的消息。而在客户端接收消息channelRead方法中,我们每接收一条消息打印一次计数器,我们初衷是看到打印100次系统时间。

public class TimeClient {
    public void connect(int port, String host) throws Exception {
        // 配置客户端NIO线程组
        //首先创建客户端处理I/O读写的NioEventLoop Group线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置。
            //与服务端不同的是,它的Channel需要设置为NioSocketChannel
            //然后为其添加handler,此处为了简单直接创建匿名内部类,实现initChannel方法,
            //其作用是当创建NioSocketChannel成功之后,
            //在初始化它的时候将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件。
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            // 发起异步连接操作
            //客户端启动辅助类设置完成之后,调用connect方法发起异步连接,
            //然后调用同步方法等待连接成功。
            ChannelFuture f = b.connect(host, port).sync();

            // 等待客户端链路关闭
            //当客户端连接关闭之后,客户端主函数退出.
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            //在退出之前,释放NIO线程组的资源。
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args !=null && args.length > 0){
            port = Integer.valueOf(args[0]);
        }
        new TimeClient().connect(port,"127.0.0.1");
    }
}

public class TimeClientHandler extends ChannelInboundHandlerAdapter{

    private int counter;
    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    //当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    //当服务端返回应答消息时,channelRead方法被调用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //从Netty的ByteBuf中读取并打印应答消息。
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
4.2.3 运行结果

运行结果显然不如我们意,发生了粘包现象 ,服务端显示结果接受了两条消息,客户端只接收到了一条返回消息,可见客户端的请求消息和服务端的返回消息都发生了粘包:

The time server receive order : QUERY TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
......
QUE ; the counter is : 1
The time server receive order : Y TIME ORDER
QUERY TIME ORDER
QUERY TIME ORDER
......
QUERY TIME ORDER ; the counter is : 2

4.3 利用LineBasedFrameDecoder解决粘包问题

为了解决半包读写问题,Netty默认提供了多种解码器用于处理半包,下面我们来修正时间服务器

4.3.1 支持TCP粘包的TimeServer

直接看代码,只是在ChildChannelHandler2添加LineBasedFrameDecoder和StringDecoder两个解码器

public class TimeServer {
    public void bind(int port) throws Exception {
        //配置服务端的NIO线程组
        //NioEventLoopGroup是个线程组,它包含了一组NIO线程,专门用于网络事件的处理,
        //实际上它们就是Reactor线程组。
        //这里创建两个的原因是一个用于服务端接受客户端的连接,另一个用于进行SocketChannel的网络读写。
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建ServerBootstrap对象,它是Netty用于启动NIO服务端的辅助启动类,目的是降低服务端的开发复杂度。
            ServerBootstrap b = new ServerBootstrap();
            //调用ServerBootstrap的group方法,将两个NIO线程组当作入参传递到ServerBootstrap中。
            //接着设置创建的Channel为NioServerSocketChannel,它的功能对应于JDK NIO类库中的ServerSocketChannel类。
            //然后配置NioServerSocketChannel的TCP参数,此处将它的backlog设置为1024,
            //最后绑定I/O事件的处理类ChildChannelHandler,它的作用类似于Reactor模式中的handler类,
            //主要用于处理网络I/O事件,例如记录日志、对消息进行编解码等。
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler2());
            //绑定端口,同步等待成功
            //服务端启动辅助类配置完成之后,调用它的bind方法绑定监听端口
            //随后,调用它的同步阻塞方法sync等待绑定操作完成。
            //完成之后Netty会返回一个ChannelFuture,它的功能类似于JDK的java.util.concurrent.Future,主要用于异步操作的通知回调。
            ChannelFuture f = b.bind(port).sync();

            //等待服务端监听端口关闭
            //使用f.channel().closeFuture().sync()方法进行阻塞,等待服务端链路关闭之后main函数才退出。
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放线程池资源
            //调用NIO线程组的shutdownGracefully进行优雅退出,它会释放跟shutdownGracefully相关联的资源。
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 添加LineBasedFrameDecoder和StringDecoder两个解码器
     * 更多请看TimeServerHandler代码修改
     */
    private class ChildChannelHandler2 extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel channel) throws Exception {
            channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
            channel.pipeline().addLast(new StringDecoder());
            channel.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        new TimeServer().bind(8080);
    }
}

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    private int counter;

    /**
     * 解决半包问题 不对msg做额外处理和编码
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("The time server receive order : " + body + " ; the counter is : " + ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
                System.currentTimeMillis()).toString() : "BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
4.3.2 支持TCP粘包的TimeClient

客户端部分代码类似,也是加上两个编码器:

public class TimeClient {
    public void connect(int port, String host) throws Exception {
        // 配置客户端NIO线程组
        //首先创建客户端处理I/O读写的NioEventLoop Group线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //继续创建客户端辅助启动类Bootstrap,随后需要对其进行配置。
            //与服务端不同的是,它的Channel需要设置为NioSocketChannel
            //然后为其添加handler,此处为了简单直接创建匿名内部类,实现initChannel方法,
            //其作用是当创建NioSocketChannel成功之后,
            //在初始化它的时候将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件。
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加LineBasedFrameDecoder和StringDecoder解码器 更多看TimeClentander代码修改
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new TimeClientHandler());
                        }
                    });

            // 发起异步连接操作
            //客户端启动辅助类设置完成之后,调用connect方法发起异步连接,
            //然后调用同步方法等待连接成功。
            ChannelFuture f = b.connect(host, port).sync();

            // 等待客户端链路关闭
            //当客户端连接关闭之后,客户端主函数退出.
            f.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            //在退出之前,释放NIO线程组的资源。
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args !=null && args.length > 0){
            port = Integer.valueOf(args[0]);
        }
        new TimeClient().connect(port,"127.0.0.1");
    }
}

public class TimeClientHandler extends ChannelInboundHandlerAdapter{

    private int counter;
    private byte[] req;

    public TimeClientHandler() {
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    //当客户端和服务端TCP链路建立成功之后,Netty的NIO线程会调用channelActive方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    /**
     * 直接打印msg 不做额外处理
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //从Netty的ByteBuf中读取并打印应答消息。
        String body = (String) msg;
        System.out.println("Now is : " + body + " ; the counter is : " + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
4.3.3 运行支持粘包的时间服务器程序

结果如下,运行结果客户端 服务端 都完全符合预期 :

The time server receive order : QUERY TIME ORDER ; the counter is : 1
The time server receive order : QUERY TIME ORDER ; the counter is : 2
......
The time server receive order : QUERY TIME ORDER ; the counter is : 99
The time server receive order : QUERY TIME ORDER ; the counter is : 100

Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 1
Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 2
......
Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 99
Now is : Thu Dec 06 18:14:52 CST 2018 ; the counter is : 100
4.3.4 LineBasedFrameDecoder和StringDecoder原理分析

LineBasedFrameDecoder工作原理是一次便利ByteBuf中的可读字节,以"\n" 或者"\r\n"为结束位置,中间字节组成一行,是已换行符为结束标志的解码器。还可以设置单行最大长度,超过最大长度还没有换行符,就会抛出异常。

StringDecoder的功能也很简单,将接收到的对象转换成字符串,然后在调用后面的Handler。LineBasedFrameDecoder+StringDecoder其实就是组合成按行切换的文本解码器。

另外,Netty还提供了其他解码器,如果发送的消息不是以换行符结束,也可以解决半包问题。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • 爱自己才是人生很重要很重要的事情。我们要无条件的爱自己。 小的时候,我们爱自己,被自己充满,我们是那样的快乐。 然...
    吴美化阅读 704评论 0 1
  • 我们的利润来自于我们的谈判,促成谈判的三个先决条件:第一,僵局,第二.双方都确信靠一方之力无法解决僵局,第三,对方...
    主持人梓惟阅读 164评论 0 0
  • 一般老公回家,我就挺忙的。瞎忙。事实并没有做什么事,连饭都是老公顿顿做给我吃。心里乱忙。 反正就是他在家,我就充实...
    小渔海棠阅读 350评论 0 1
  • 本来不想对外说,但还是忍不住跟淑芸表达了最近总是“莫名悲凉”的情绪。 没有叙述原因。原因只想放自己这里就好。 想到...
    正山小种ww阅读 172评论 0 0