Netty之三概述与高性能架构设计

个人专题目录


1. Netty概述与高性能架构设计

1.1 原生NIO 存在的问题

  1. NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

  2. 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。

  3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。

  4. JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK 1.7 版本该问题仍旧存在,没有被根本解决。

1.2 Netty 的优点

Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述问题。

  1. 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.

  2. 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。

  3. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

  4. 安全:完整的 SSL/TLS 和 StartTLS 支持。

  5. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入

1.3 简介

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

Netty从4.x版本开始,需要使用JDK1.6及以上版本提供基础支撑。

在设计上:针对多种传输类型的统一接口 - 阻塞和非阻塞;简单但更强大的线程模型;真正的无连接的数据报套接字支持;链接逻辑支持复用;

在性能上:比核心 Java API 更好的吞吐量,较低的延时;资源消耗更少,这个得益于共享池和重用;减少内存拷贝

在健壮性上:消除由于慢,快,或重载连接产生的 OutOfMemoryError;消除经常发现在 NIO 在高速网络中的应用中的不公平的读/写比

在安全上:完整的 SSL / TLS 和 StartTLS 的支持

且已得到大量商业应用的真实验证,如:Hadoop项目的Avro(RPC框架)、Dubbo、Dubbox等RPC框架。

Netty的官网是:http://netty.io

有三方提供的中文翻译Netty用户手册(官网提供源信息):http://ifeve.com/netty5-user-guide/

1.4 Netty架构

Netty 采用了比较典型的三层网络架构进行设计,逻辑架构图如下所示:


image
  1. 第一层,Reactor 通信调度层,它由一系列辅助类完成,包括 Reactor 线程 NioEventLoop 以及其父类、NioSocketChannel/NioServerSocketChannel 以及其父 类、ByteBuffer 以及由其衍生出来的各种 Buffer、Unsafe以及其衍生出的各种内部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据 读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读事 件、写事件等等,将这些事件触发到 PipeLine 中,由 PipeLine 充当的职责链来 进行后续的处理。
  2. 第二层,职责链 PipeLine,它负责事件在职责链中的有序传播,同时负责动态的 编排职责链,职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向 后/向前传播事件,不同的应用的 Handler 节点的功能也不同,通常情况下,往往 会开发编解码 Hanlder 用于消息的编解码,它可以将外部的协议消息转换成内部 的 POJO 对象,这样上层业务侧只需要关心处理业务逻辑即可,不需要感知底层 的协议差异和线程模型差异,实现了架构层面的分层隔离。
  3. 第三层,业务逻辑处理层。可以分为两类:
  4. 纯粹的业务逻辑 处理,例如订单处理。
  5. 应用层协议管理,例如HTTP协议、FTP协议等。

接下来,我从影响通信性能的三个方面(I/O模型、线程调度模型、序列化方式)来谈谈Netty的架构。

1.5 线程模型

image-20200331145219613.png

Netty中支持单线程模型,多线程模型,主从多线程模型。

在大多数场景下,并行多线程处理可以提升系统的并发性能。但是,如果对于共享资源的并发访问处理不当,会带来严重的锁竞争,这最终会导致性能的下降。为了尽可能的避免锁竞争带来的性能损耗,可以通过串行化设计,即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。

为了尽可能提升性能,Netty采用了串行无锁化设计,在I/O线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。

Reactor单线程模型

在ServerBootstrap调用方法group的时候,传递的参数是同一个线程组,且在构造线程组的时候,构造参数为1,这种开发方式,就是一个单线程模型。

个人机开发测试使用。不推荐。

image-20200327102216076.png

Reactor多线程模型

在ServerBootstrap调用方法group的时候,传递的参数是两个不同的线程组。负责监听的acceptor线程组,线程数为1,也就是构造参数为1。负责处理客户端任务的线程组,线程数大于1,也就是构造参数大于1。这种开发方式,就是多线程模型。

长连接,且客户端数量较少,连接持续时间较长情况下使用。如:企业内部交流应用。

image-20200327102919755.png

Reactor主从多线程模型

在ServerBootstrap调用方法group的时候,传递的参数是两个不同的线程组。负责监听的acceptor线程组,线程数大于1,也就是构造参数大于1。负责处理客户端任务的线程组,线程数大于1,也就是构造参数大于1。这种开发方式,就是主从多线程模型。

长连接,客户端数量相对较多,连接持续时间比较长的情况下使用。如:对外提供服务的相册服务器。

image-20200327103354987.png

Netty模型

Netty 线程模式(Netty 主要基于主从Reactor 多线程模型做了一定的改进,其中主从Reactor 多线程模型有多个Reactor)

image-20200331145441055.png
  1. Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写

  2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup

  3. NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop

  4. NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector , 用于监听绑定在其上的socket的网络通讯

  5. NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop

  6. 每个Boss NioEventLoop 循环执行的步骤有3步

    1.轮询accept 事件

    2.处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到某个worker NIOEventLoop 上的 selector

    3.处理任务队列的任务 , 即 runAllTasks

  7. 每个 Worker NIOEventLoop 循环执行的步骤

    1.轮询read, write 事件

    2.处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理

    3.处理任务队列的任务 , 即 runAllTasks

  8. 每个Worker NIOEventLoop处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器

Netty快速入门实例-TCP服务

public class NettyServer {
    public static void main(String[] args) throws Exception {


        //创建BossGroup 和 WorkerGroup
        //说明
        //1. 创建两个线程组 bossGroup 和 workerGroup
        //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
        //3. 两个都是无限循环
        //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
        //   默认实际 cpu核数 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //8
        EventLoopGroup workerGroup = new NioEventLoopGroup();


        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用链式编程来进行设置//设置两个线程组
            bootstrap.group(bossGroup, workerGroup)
                    //使用NioSocketChannel 作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    // 设置线程队列得到连接个数
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //设置保持活动连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                    //创建一个通道初始化对象(匿名对象)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
                            System.out.println("SocketChannel hashcode=" + ch.hashCode());
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器

            System.out.println(".....服务器 is ready...");

            //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
            //启动服务器(并绑定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();

            //给cf 注册监听器,监控我们关心的事件
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("监听端口 6668 成功");
                    } else {
                        System.out.println("监听端口 6668 失败");
                    }
                }
            });
            //对关闭通道进行监听
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}
/**
 * 说明
 * 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
 * 2. 这时我们自定义一个Handler , 才能称为一个handler
 *
 * @author Administrator
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");

    /**
     * 读取数据实际(这里我们可以读取客户端发送的消息)
     * 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
     * 2. Object msg: 就是客户端发送的数据 默认Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的NIOEventLoop 的 taskQueue中,

        //解决方案1 用户程序自定义的普通任务
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(DATE_TIME_FORMATTER.format(LocalDateTime.now()));
                    System.out.println("服务器发送线程2 " + Thread.currentThread().getName());
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(DATE_TIME_FORMATTER.format(LocalDateTime.now()));
                    System.out.println("服务器发送线程3 " + Thread.currentThread().getName());
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        });

        //解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中
        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(DATE_TIME_FORMATTER.format(LocalDateTime.now()));
                    System.out.println("服务器发送线程4 " + Thread.currentThread().getName());
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
                    System.out.println("channel code=" + ctx.channel().hashCode());
                } catch (Exception ex) {
                    System.out.println("发生异常" + ex.getMessage());
                }
            }
        }, 5, TimeUnit.SECONDS);


        System.out.println("go on ...");


        System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channel =" + ctx.channel());
        System.out.println("server ctx =" + ctx);
        System.out.println("看看channel 和 pipeline的关系");
        Channel channel = ctx.channel();
        //本质是一个双向链接, 出站入站
        ChannelPipeline pipeline = ctx.pipeline();


        //将 msg 转成一个 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());
    }

    /**
     * 数据读取完毕
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //writeAndFlush 是 write + flush
        //将数据写入到缓存,并刷新
        //一般讲,我们对这个发送的数据进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }

    /**
     * 处理异常, 一般是需要关闭通道
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
public class NettyClient {
    public static void main(String[] args) throws Exception {

        //客户端需要一个事件循环组
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            //创建客户端启动对象
            //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数 设置线程组
            bootstrap.group(group)
                    // 设置客户端通道的实现类(反射)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //加入自己的处理器
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });

            System.out.println("客户端 ok..");

            //启动客户端去连接服务器端
            //关于 ChannelFuture 要分析,涉及到netty的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当通道就绪就会触发该方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
    }

    /**
     * 当通道有读取事件时,会触发
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址: " + ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

任务队列中的Task 有3 种典型使用场景

  1. 用户程序自定义的普通任务 [举例说明]

  2. 用户自定义定时任务

  3. 非当前 Reactor 线程调用 Channel 的各种方法

例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的Write 会提交到任务队列中后被异步消费

方案再说明

  1. Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。

  2. NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。

  3. NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责

    • NioEventLoopGroup 下包含多个 NioEventLoop
    • 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
    • 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
    • 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
    • 每个 NioChannel 都绑定有一个自己的 ChannelPipeline

异步模型

基本介绍

  1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

  2. Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。

  3. 调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果

  4. Netty 的异步模型是建立在 future 和 callback 的之上的。callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future去监控方法 fun 的处理过程(即 : Future-Listener 机制)

Future 说明

  1. 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等等.

  2. ChannelFuture 是一个接口 : public interface ChannelFuture extends Future<Void>
    我们可以添加监听器,当监听的事件发生时,就会通知到监听器. 案例说明

工作原理说明

  1. 在使用 Netty 进行编程时,拦截操作和转换出入站数据只需要您提供 callback 或利用future 即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。

  2. Netty 框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来

Future-Listener机制

  1. 当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。

  2. 常见有如下操作

  • 通过 isDone 方法来判断当前操作是否完成;
  • 通过 isSuccess 方法来判断已完成的当前操作是否成功;
  • 通过 getCause 方法来获取已完成的当前操作失败的原因;
  • 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
  • 通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器
  1. 举例说明
serverBootstrap.bind(port).addListener(future -> {
       if(future.isSuccess()) {
           System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
       } else{
           System.err.println("端口["+ port + "]绑定失败!");
       }
   });

小结:相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量

快速入门实例-HTTP 服务

public class TestServer {
    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new TestServerInitializer());

            ChannelFuture channelFuture = serverBootstrap.bind(6666).sync();

            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
/**
 * 说明
 * 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter
 * 2. HttpObject 客户端和服务器端相互通讯的数据被封装成 HttpObject
 *
 * @author Administrator
 */
public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {

    /**
     * channelRead0 读取客户端数据
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {


        System.out.println("对应的channel=" + ctx.channel() + " pipeline=" + ctx
                .pipeline() + " 通过pipeline获取channel" + ctx.pipeline().channel());

        System.out.println("当前ctx的handler=" + ctx.handler());

        //判断 msg 是不是 httpRequest请求
        if (msg instanceof HttpRequest) {

            System.out.println("ctx 类型=" + ctx.getClass());

            System.out.println("pipeline hashcode" + ctx.pipeline().hashCode() + " TestHttpServerHandler hash=" + this.hashCode());

            System.out.println("msg 类型=" + msg.getClass());
            System.out.println("客户端地址" + ctx.channel().remoteAddress());

            //获取到
            HttpRequest httpRequest = (HttpRequest) msg;
            //获取uri, 过滤指定的资源
            URI uri = new URI(httpRequest.uri());
            if ("/favicon.ico".equals(uri.getPath())) {
                System.out.println("请求了 favicon.ico, 不做响应");
                return;
            }
            //回复信息给浏览器 [http协议]

            ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8);

            //构造一个http的相应,即 HttpResponse
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            //将构建好 response返回
            ctx.writeAndFlush(response);
        }
    }

}
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        //向管道加入处理器
        //得到管道
        ChannelPipeline pipeline = ch.pipeline();

        //加入一个netty 提供的httpServerCodec codec =>[coder - decoder]
        //1. HttpServerCodec 是netty 提供的处理http的 编-解码器
        pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
        //2. 增加一个自定义的handler
        pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());

        System.out.println("ok~~~~");
    }
}

1.6 流量整形

流量整形(Traffic Shaping)是一种主动调整流量输出速率的措施。Netty的流量整形有两个作用:

  • 防止由于上下游网元性能不均衡导致下游网元被压垮,业务流程中断;
  • 防止由于通信模块接收消息过快,后端业务线程处理不及时导致的“撑死”问题。

流量整形的原理示意图如下:

image

流量整形(Traffic Shaping)是一种主动调整流量输出速率的措施。一个典型应用是基于下游网络结点的TP指标来控制本地流量的输出。流量整形与流量监管的主要区别在于,流量整形对流量监管中需要丢弃的报文进行缓存——通常是将它们放入缓冲区或队列内,也称流量整形(Traffic Shaping,简称TS)。当令牌桶有足够的令牌时,再均匀的向外发送这些被缓存的报文。流量整形与流量监管的另一区别是,整形可能会增加延迟,而监管几乎不引入额外的延迟。

Netty支持两种流量整形模式:

  • 全局流量整形:全局流量整形的作用范围是进程级的,无论你创建了多少个Channel,它的作用域针对所有的Channel。用户可以通过参数设置:报文的接收速率、报文的发送速率、整形周期。[GlobalChannelTrafficShapingHandler]
  • 链路级流量整形:单链路流量整形与全局流量整形的最大区别就是它以单个链路为作用域,可以对不同的链路设置不同的整形策略。[ChannelTrafficShapingHandler针对于每个channel]

1.7 优雅停机

Netty的优雅停机三部曲:

  1. 不再接收新消息
  2. 退出前的预处理操作
  3. 资源的释放操作
image

Java的优雅停机通常通过注册JDK的ShutdownHook来实现,当系统接收到退出指令后,首先标记系统处于退出状态,不再接收新的消息,然后将积压的消息处理完,最后调用资源回收接口将资源销毁,最后各线程退出执行。

通常优雅退出需要有超时控制机制,例如30S,如果到达超时时间仍然没有完成退出前的资源回收等操作,则由停机脚本直接调用kill -9 pid,强制退出。

在实际项目中,Netty作为高性能的异步NIO通信框架,往往用作基础通信框架负责各种协议的接入、解析和调度等,例如在RPC和分布式服务框架中,往往会使用Netty作为内部私有协议的基础通信框架。
当应用进程优雅退出时,作为通信框架的Netty也需要优雅退出,主要原因如下:

  • 尽快的释放NIO线程、句柄等资源;
  • 如果使用flush做批量消息发送,需要将积攒在发送队列中的待发送消息发送完成;
  • 正在write或者read的消息,需要继续处理;
  • 设置在NioEventLoop线程调度器中的定时任务,需要执行或者清理

1.8 Netty架构剖析之安全性

Netty面临的安全挑战:

  • 对第三方开放
  • 作为应用层协议的基础通信框架
image

安全威胁场景分析:

  • 对第三方开放的通信框架:如果使用Netty做RPC框架或者私有协议栈,RPC框架面向非授信的第三方开放,例如将内部的一些能力通过服务对外开放出去,此时就需要进行安全认证,如果开放的是公网IP,对于安全性要求非常高的一些服务,例如在线支付、订购等,需要通过SSL/TLS进行通信。
  • 应用层协议的安全性:作为高性能、异步事件驱动的NIO框架,Netty非常适合构建上层的应用层协议。由于绝大多数应用层协议都是公有的,这意味着底层的Netty需要向上层提供通信层的安全传输功能。

SSL/TLS

Netty安全传输特性:

  • 支持SSL V2和V3
  • 支持TLS
  • 支持SSL单向认证、双向认证和第三方CA认证。

Netty通过SslHandler提供了对SSL的支持,它支持的SSL协议类型包括:SSL V2、SSL V3和TLS。

  • 单向认证:单向认证,即客户端只验证服务端的合法性,服务端不验证客户端。
  • 双向认证:与单向认证不同的是服务端也需要对客户端进行安全认证。这就意味着客户端的自签名证书也需要导入到服务端的数字证书仓库中。
  • CA认证:基于自签名的SSL双向认证,只要客户端或者服务端修改了密钥和证书,就需要重新进行签名和证书交换,这种调试和维护工作量是非常大的。因此,在实际的商用系统中往往会使用第三方CA证书颁发机构进行签名和验证。我们的浏览器就保存了几个常用的CA_ROOT。每次连接到网站时只要这个网站的证书是经过这些CA_ROOT签名过的。就可以通过验证了。

可扩展的安全特性

通过Netty的扩展特性,可以自定义安全策略:

  • IP地址黑名单机制
  • 接入认证
  • 敏感信息加密或者过滤机制

IP地址黑名单是比较常用的弱安全保护策略,它的特点就是服务端在与客户端通信的过程中,对客户端的IP地址进行校验,如果发现对方IP在黑名单列表中,则拒绝与其通信,关闭链路。

接入认证策略非常多,通常是较强的安全认证策略,例如基于用户名+密码的认证,认证内容往往采用加密的方式,例如Base64+AES等。

Netty架构剖析之扩展性

通过Netty的扩展特性,可以自定义安全策略:

  • 线程模型可扩展
  • 序列化方式可扩展
  • 上层协议栈可扩展
  • 提供大量的网络事件切面,方便用户功能扩展

Netty的架构可扩展性设计理念如下:

  • 判断扩展点,事先预留相关扩展接口,给用户二次定制和扩展使用;
  • 主要功能点都基于接口编程,方便用户定制和扩展。

1.9 数据安全性之滑动窗口协议

我们假设一个场景,客户端每次请求服务端必须得到服务端的一个响应,由于TCP的数据发送和数据接收是异步的,就存在必须存在一个等待响应的过程。该过程根据实现方式不同可以分为一下几类(部分是错误案例):

  • 每次发送一个数据包,然后进入休眠(sleep)或者阻塞(await)状态,直到响应回来或者超时,整个调用链结束。此场景是典型的一问一答的场景,效率极其低下;
  • 读写分离,写模块只负责写,读模块则负责接收响应,然后做后续的处理。此种场景能尽可能的利用带宽进行读写。但是此场景不坐控速操作可能导致大量报文丢失或者重复发送。
  • 实现类似于Windowed Protocol。此窗口是以上两种方案的折中版,即允许一定数量的批量发送,又能保证数据的完整性。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容