RPC框架设计

1. Socket回顾与I/0模型

1.1 Socket网络编程回顾

1.1.1 Socket概述

Socket,套接字就是两台主机之间逻辑连接的端点。TCP/IP协议是传输层协议,主要解决数据如何在网络中传输,而HTTP是应用层协议,主要解决如何包装数据。Socket是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议、本地主机的IP地址、本地进程的协议端口、远程主机的IP地址、远程进程的协议端口。

1.1.2 Socket整体流程

Socket编程主要涉及到客户端和服务端两个方面,首先是在服务器端创建一个服务器套接字(ServerSocket),并把它附加到一个端口上,服务器从这个端口监听连接。端口号的范围是0到65536,但是0到1024是为特权服务保留的端口号,可以选择任意一个当前没有被其他进程使用的端口。
客户端请求与服务器进行连接的时候,根据服务器的域名或者IP地址,加上端口号,打开一个套接字。当服务器接受连接后,服务器和客户端之间的通信就像输入输出流一样进行操作。


Socket通信流程

1.1.3 代码实现

服务端

package com.socket.demo;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ServerDemo {

    public static void main(String[] args) throws IOException {
        // 线程池:核心线程数10,最大线程数2000(等待队列用完时使用),线程保持活跃时间,活跃时间单位, 等待队列(LinkedBlockingQueue 无上限)
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 2000,  3000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        // 创建服务端监听
        ServerSocket serverSocket = new ServerSocket(8888);
        System.out.println("服务器已经启动");
        // 持续监听
        while(true) {
            // 阻塞式监听一直到有客户端访问
            Socket socket = serverSocket.accept();
            System.out.println("有客户端连接");
            // 使用线程池的方式异步处理socket通信,避免阻塞其他客户端的访问
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    handler(socket);
                }
            });
        }
    }

    /**
     * 处理套接字
     * @param socket
     */
    private static void handler(Socket socket) {
        try {
            doHandler(socket);
        } catch (Exception e){
            System.out.println("处理套接字出现异常,异常消息" + e);
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                System.out.println("socket 关闭出现异常,异常消息" + e);
                e.printStackTrace();
            }
        }
    }

    private static void doHandler(Socket socket) throws IOException {
        InputStream inputStream = socket.getInputStream();
        // 按字节读取
        byte [] b = new byte[1024];
        int len;
        StringBuffer buffer = new StringBuffer();
        do {
            len = inputStream.read(b);
            buffer.append(new String(b, 0, len));
        } while (len > b.length);
        System.out.println("客户端:" + buffer.toString());
        // 服务端给予回复
        OutputStream outputStream = socket.getOutputStream();
        outputStream.write("已读不回".getBytes());
    }

}

客户端

package com.socket.demo;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class ClientDemo {

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost",8888);
        // 发送消息
        OutputStream outputStream = socket.getOutputStream();
        outputStream.write("请问一下这个需求你了解吗".getBytes());
        // 接收消息
        InputStream inputStream = socket.getInputStream();
        byte [] b = new byte[1024];
        int len;
        StringBuffer buffer = new StringBuffer();
        do {
            len = inputStream.read(b);
            buffer.append(new String(b, 0, len));
        } while (len > b.length);
        System.out.println("服务端端:" + buffer.toString());
        socket.close();
    }

}

1.2 I/O模型

1.2.1 I/O模型说明

就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能
Java 共支持 3 种网络编程模型/IO 模式:BIO(同步并阻塞)、NIO(同步非阻塞)、AIO(异步非阻塞)

阻塞与非阻塞
主要指的是访问IO的线程是否会阻塞(或处于等待)
线程访问资源,该资源是否准备就绪的一种处理方式

阻塞式与非阻塞示意图

同步和异步
主要是指的数据的请求方式
同步和异步是指访问数据的一种机制

同步与异步示意图

1.2.2 BIO(同步并阻塞)

Java BIO就是传统的 socket编程.
BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。

工作机制

BIO工作机制

BIO问题分析

  1. 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write
  2. 并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
  3. 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费

1.2.3 NIO(同步非阻塞)

同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理


image.png

1.2.4 AIO(异步非阻塞)

AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长
的应用
Proactor 模式是一个消息异步通知的设计模式,Proactor 通知的不是就绪事件,而是操作完成事件,这也就是操作系统异步 IO 的主要模型。

1.2.5 BIO、NIO、AIO 适用场景分析

  1. BIO(同步并阻塞) 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解
  2. NIO(同步非阻塞) 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持
  3. AIO(异步非阻塞) 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作, 编程比较复杂,JDK7 开始支持。

2. NIO编程

Java NIO 全称java non-blocking IO ,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的.

  1. NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
  2. NIO是 面向缓冲区编程的。数据读取到一个缓冲区中,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
  3. Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情。通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个

2.2 NIO和 BIO的比较

  1. BIO 以流的方式处理数据,而 NIO 以缓冲区的方式处理数据,缓冲区 I/O 的效率比流 I/O 高很多
  2. BIO 是阻塞的,NIO则是非阻塞的
  3. BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道

2.3 NIO 三大核心原理示意图

一张图描述 NIO 的 Selector 、 Channel 和 Buffer 的关系


image.png
  1. 每个 channel 都会对应一个 Buffer
  2. Selector 对应一个线程, 一个线程对应多个 channel(连接)
  3. 每个 channel 都注册到 Selector选择器上
  4. Selector不断轮询查看Channel上的事件, 事件是通道Channel非常重要的概念
  5. Selector 会根据不同的事件,完成不同的处理操作
  6. Buffer 就是一个内存块 , 底层是有一个数组
  7. 数据的读取写入是通过 Buffer, 这个和 BIO , BIO 中要么是输入流,或者是输出流, 不能双向,但是NIO 的 Buffer 是可以读也可以写 , channel 是双向的.

2.4 缓冲区(Buffer)

2.4.1 基本介绍

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个数组,该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer.


image.png

2.4.2 Buffer常用API介绍

1. Buffer 类及其子类

image.png

在 NIO 中,Buffer是一个顶层父类,它是一个抽象类, 类的层级关系图,常用的缓冲区分别对应byte,short, int, long,float,double,char 7种.

2. 缓冲区对象创建

方法名 说明
static ByteBuffer allocate(长度) 创建byte类型的指定长度的缓冲区
static ByteBuffer wrap(byte[] array) 创建一个有内容的byte类型缓冲区

示例代码:

public static void main(String[] args) {
        ByteBuffer buffer = ByteBuffer.allocate(5);
        for (int i = 0; i < 5; i++) {
            System.out.println(buffer.get());
        }
        //在此调用会报错,不信你放开注释试试。因为buffer的内容已经读到了边界,内有一个指针控制。
        // System.out.println(byteBuffer.get());
        ByteBuffer wrap = ByteBuffer.wrap("lagou".getBytes());
        for (int i = 0; i < "lagou".getBytes().length; i++) {
            System.out.println(wrap.get());
        }
    }

3. 缓冲区对象添加数据

方法名 说明
int position()
position(intnewPosition)
获得当前要操作的索引
修改当前要操作的索引位置
int limit()
limit(int newLimit)
最多能操作到哪个索引
修改最多能操作的索引位置
int capacity() 返回缓冲区的总长度
int remaining()
boolean hasRemaining()
还有多少能操作索引个数
是否还有能操作
put(byte b)
put(byte[] src)
添加一个字节
添加字节数组

图解:

image.png

示例代码

public static void main(String[] args) {

        ByteBuffer byteBuffer = ByteBuffer.allocate(10);
        showBuffer(byteBuffer);
        //修改当前索引位置
        // 添加一个字节
        byteBuffer.put((byte) 97);
        showBuffer(byteBuffer);
        //添加一个字节数组
        byteBuffer.put("abc".getBytes());
        showBuffer(byteBuffer);
        //当添加超过缓冲区的长度时会报错
        byteBuffer.put("012345".getBytes());
        showBuffer(byteBuffer);
        // 如果缓存区存满后, 可以调整position位置可以重复写,这样会覆盖之前存入索引的对应的值
        byteBuffer.position(0);
        byteBuffer.put("012345".getBytes());
        showBuffer(byteBuffer);
    }

    public static void showBuffer(ByteBuffer byteBuffer){
        //0 获取当前索引所在位置
        System.out.println("position: " + byteBuffer.position());
        //10 最多能操作到哪个索引
        System.out.println("limit: " + byteBuffer.limit());
        //10 返回缓冲区总长度
        System.out.println("capacity: " + byteBuffer.capacity());
        //10 还有多少个能操作
        System.out.println("remaining: " + byteBuffer.remaining());
    }

4. 缓冲区对象读取数据

方法名 说明
flip() 写切换读模式 limit设置position位置, position设置0
get() 读一个字节
get(byte[] dst) 读多个字节
get(int index) 读指定索引的字节
rewind() 将position设置为0,可以重复读
clear() 切换写模式 position设置为0 , limit 设置为 capacity
array() 将缓冲区转换成字节数组返回

图解:flip()方法


切换为读模式示意图

图解:clear()方法


切换为写模式

实例代码:

public static void main(String[] args) {
        // 1.创建一个指定长度的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(10);
        buffer.put("0123".getBytes());
        showBuffer(buffer);
        // 2.切换读模式
        System.out.println("切换为读模式--------------");
        buffer.flip();
        showBuffer(buffer);
        for (int i = 0; i < buffer.limit(); i++) {
            System.out.println(buffer.get());
        }
        //读取完毕后.继续读取会报错,超过limit值
        // System.out.println(buffer.get());
        // 读取指定索引字节
        System.out.println("读取指定索引字节--------------");
        System.out.println(buffer.get(1));
        System.out.println("读取多个字节--------------");
        // 重复读取
        buffer.rewind();
        byte[] bytes = new byte[4];
        buffer.get(bytes);
        System.out.println(new String(bytes));
        // 将缓冲区转化字节数组返回
        System.out.println("将缓冲区转化字节数组返回--------------");
        byte[] array = buffer.array();
        System.out.println(new String(array));
        // 切换写模式,覆盖之前索引所在位置的值
        System.out.println("写模式--------------");
        buffer.clear();
        buffer.put("abc".getBytes());
        System.out.println(new String(buffer.array()));
    }

    public static void showBuffer(ByteBuffer byteBuffer) {
        //0 获取当前索引所在位置
        System.out.println("position: " + byteBuffer.position());
        //10 最多能操作到哪个索引
        System.out.println("limit: " + byteBuffer.limit());
        //10 返回缓冲区总长度
        System.out.println("capacity: " + byteBuffer.capacity());
        //10 还有多少个能操作
        System.out.println("remaining: " + byteBuffer.remaining());
    }

注意事项:

  1. capacity:容量(长度)limit: 界限(最多能读/写到哪里)posotion:位置(读/写哪个索引)
  2. 获取缓冲区里面数据之前,需要调用flip方法
  3. 再次写数据之前,需要调用clear方法,但是数据还未消失,等再次写入数据,被覆盖了才会消失。

2.5 通道(Channel)

2.5.1 基本介绍

通常来说NIO中的所有IO都是从 Channel(通道) 开始的。NIO 的通道类似于流,但有些区别如下:

  1. 通道可以读也可以写,流一般来说是单向的(只能读或者写,所以之前我们用流进行IO操作的时候需要分别创建一个输入流和一个输出流)
  2. 通道可以异步读写
  3. 通道总是基于缓冲区Buffer来读写


    image.png

2.5.2 Channel常用类介绍

  1. Channel接口常用实现类
    FileChannel ,用于文件的数据读写
    DatagramChannel ,用于 UDP 的数据读写
    ServerSocketChannel、SocketChannel :用于 TCP 的数据读写

  2. SocketChannel 与ServerSocketChannel
    类似 Socke和ServerSocket,可以完成客户端与服务端数据的通信工作.

2.5.3 ServerSocketChannel

服务端实现步骤:

  1. 打开一个服务端通道
  2. 绑定对应的端口号
  3. 通道默认是阻塞的,需要设置为非阻塞
  4. 检查是否有客户端连接 有客户端连接会返回对应的通道
  5. 获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中
  6. 给客户端回写数据
  7. 释放资源

代码实现:

服务端

public static void main(String[] args) throws IOException, InterruptedException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8888));
        serverSocketChannel.configureBlocking(false);
        System.out.println("服务器启动成功");
        while(true){
            SocketChannel socketChannel = serverSocketChannel.accept();
            if(socketChannel == null){
                System.out.println("没有客户连接,去做别的事情");
                Thread.sleep(1000);
                continue;
            }
            // 创建Buffer
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            //返回值:
            // 正数: 表示本次读到的有效字节个数.
            // 0 : 表示本次没有读到有效字节.
            // -1 : 表示读到了末尾
            int read = socketChannel.read(byteBuffer);
            System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
            socketChannel.write(ByteBuffer.wrap("没 钱".getBytes(StandardCharsets.UTF_8)));
            socketChannel.close();
        }

    }

客户端

public class NIOClient {

    public static void main(String[] args) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888));
        socketChannel.write(ByteBuffer.wrap("老板, 该还钱 拉!".getBytes(StandardCharsets.UTF_8)));
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        int read=socketChannel.read(readBuffer);
        System.out.println("服务端消息:" + new String(readBuffer.array(), 0, read, StandardCharsets.UTF_8));
        socketChannel.close();
    }

}

2.6 Selector (选择器)

2.6.1 基本介绍

可以用一个线程,处理多个的客户端连接,就会使用到NIO的Selector(选择器). Selector 能够检测多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。


image.png

在这种没有选择器的情况下,对应每个连接对应一个处理线程. 但是连接并不能马上就会发送信息,所以还会产生资源浪费

image.png

只有在通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程, 避免了多线程之间的上下文切换导致的开销

2.6.2 常用API介绍

  1. Selector 类是一个抽象类


    image.png

    常用方法:

  • Selector.open() : //得到一个选择器对象
  • selector.select() : //阻塞 监控所有注册的通道,当有对应的事件操作时, 会将SelectionKey放入集合内部并返回事件数量
  • selector.select(1000): //阻塞 1000 毫秒,监控所有注册的通道,当有对应的事件操作时, 会将SelectionKey放入集合内部并返回
  • selector.selectedKeys() : // 返回存有SelectionKey的集合
  1. SelectionKey


    image.png

常用方法

  • SelectionKey.isAcceptable(): 是否是接收连接就绪事件
  • SelectionKey.isConnectable(): 是否是连接就绪事件
  • SelectionKey.isReadable(): 是否是读就绪事件
  • SelectionKey.isWritable(): 是否是写就绪事件

SelectionKey中定义的4种事件:

  • SelectionKey.OP_ACCEPT —— 接收连接就绪事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
  • SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户端与服务器的连接已经建立成功
  • SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
  • SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)

2.6.3 Selector 编码

服务端实现步骤:

  1. 打开一个服务端通道
  2. 绑定对应的端口号
  3. 通道默认是阻塞的,需要设置为非阻塞
  4. 创建选择器
  5. 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
  6. 检查选择器是否有事件
  7. 获取事件集合
  8. 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
  9. 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
  10. 判断是否是客户端读就绪事件SelectionKey.isReadable()
  11. 得到客户端通道,读取数据到缓冲区
  12. 给客户端回写数据
  13. 从集合中删除对应的事件, 因为防止二次处理.
    代码示例
    服务端
package nio2;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;

/**
 * @author chenyi
 * @Description
 * @date 2021/12/27 20:00
 */
public class NIOServer {

    public static void main(String[] args) throws IOException {
        // 打开服务通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));
        // 设置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 创建hygiene选择器
        Selector selector = Selector.open();
        // 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务器启动成功");
        while(true) {
            if(selector.select(2000) == 0) {
                continue;
            }

            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while(iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                if (selectionKey.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    System.out.println("客户端已连接......" + socketChannel);
                    // 必须设置通道为非阻塞, 因为selector需要轮询监听每个通道的事件
                    socketChannel.configureBlocking(false);
                    // 并指定监听事件为OP_READ
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
                if (selectionKey.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                    System.out.println("客户端读已就绪......" + socketChannel);
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    int read = socketChannel.read(byteBuffer);
                    if (read > 0) {
                        System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
                        // 给客户端回写数据
                        socketChannel.write(ByteBuffer.wrap("没 钱".getBytes(StandardCharsets.UTF_8))); socketChannel.close();
                    }
                }
                // 从集合中删除对应的事件, 因为防止二次处理.
                iterator.remove();
            }
        }

    }

}

3. Netty核心原理

3.1 Netty 介绍

3.1.1 原生 NIO 存在的问题

  1. NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  2. 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
  3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。
  4. JDK NIO 的 Bug:臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。直到JDK 1.7版本该问题仍旧存在,没有被根本解决

在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。

3.1.2 概述

Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为当前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

image.png

从图中就能看出 Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP、UDP、HTTP、WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。

具备如下优点:

  1. 设计优雅,提供阻塞和非阻塞的 Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型。
  2. 具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。
  3. 提供安全传输特性。
  4. 支持多种主流协议;预置多种编解码功能,支持用户开发私有协议。

3.2 线程模型

3.2.1 线程模型基本介绍

不同的线程模式,对程序的性能有很大影响,在学习Netty线程模式之前,首先讲解下 各个线程模式, 最后看看 Netty 线程模型有什么优越性.目前存在的线程模型有:

  • 传统阻塞 I/O 服务模型
  • Reactor 模型
    根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现
    • 单 Reactor 单线程
    • 单 Reactor 多线程
    • 主从 Reactor 多线程

3.2.2 传统阻塞 I/O 服务模型

采用阻塞 IO 模式获取输入的数据, 每个连接都需要独立的线程完成数据的输入 , 业务处理和数据返回工作。

image.png

示例代码
服务端

package com.socket.demo;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ServerDemo {

    public static void main(String[] args) throws IOException {
        // 线程池:核心线程数10,最大线程数2000(等待队列用完时使用),线程保持活跃时间,活跃时间单位, 等待队列(LinkedBlockingQueue 无上限)
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 2000,  3000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        // 创建服务端监听
        ServerSocket serverSocket = new ServerSocket(8888);
        System.out.println("服务器已经启动");
        // 持续监听
        while(true) {
            // 阻塞式监听一直到有客户端访问
            Socket socket = serverSocket.accept();
            System.out.println("有客户端连接");
            // 使用线程池的方式异步处理socket通信,避免阻塞其他客户端的访问
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    handler(socket);
                }
            });
        }
    }

    /**
     * 处理套接字
     * @param socket
     */
    private static void handler(Socket socket) {
        try {
            doHandler(socket);
        } catch (Exception e){
            System.out.println("处理套接字出现异常,异常消息" + e);
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                System.out.println("socket 关闭出现异常,异常消息" + e);
                e.printStackTrace();
            }
        }
    }

    private static void doHandler(Socket socket) throws IOException {
        InputStream inputStream = socket.getInputStream();
        // 按字节读取
        byte [] b = new byte[1024];
        int len;
        StringBuffer buffer = new StringBuffer();
        do {
            len = inputStream.read(b);
            buffer.append(new String(b, 0, len));
        } while (len > b.length);
        System.out.println("客户端:" + buffer.toString());
        // 服务端给予回复
        OutputStream outputStream = socket.getOutputStream();
        outputStream.write("已读不回".getBytes());
    }

}

存在问题:

  1. 当并发数很大,就会创建大量的线程,占用很大系统资源
  2. 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费

3.2.3 Reactor 模型

Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher模式. Reactor 模式使用IO 复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键。

  1. 单 Reactor 单线程


    image.png
  • Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求
  • Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发
  • 是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理
  • Handler 会完成 Read→业务处理→Send 的完整业务流程

示例代码
Reactor

package nio3;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);
        //分步处理,第一步,接收accept事件
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor());
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    //Reactor负责dispatch收到的事件
                    dispatch((SelectionKey)(it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        //调用之前注册的callback对象
        if (r != null) {
            r.run();
        }
    }

    // inner class
    class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                if (channel != null) {
                    new Handler(selector, channel);
                }
            } catch (IOException ex) { /* ... */ }
        }
    }
}

Handler

package nio3;


import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

class Handler implements Runnable {
    final SocketChannel channel;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        // Optionally try first read now
        sk = channel.register(selector, 0);
        //将Handler作为callback对象
        sk.attach(this);
        //第二步,注册Read就绪事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete() {
        /* ... */
        return false;
    }

    boolean outputIsComplete() {

        /* ... */
        return false;
    }

    void process() {
        /* ... */
        return;
    }

    @Override
    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        channel.read(input);
        if (inputIsComplete()) {

            process();

            state = SENDING;
            // Normally also do first write now

            //第三步,接收write就绪事件
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        channel.write(output);

        //write完就结束了, 关闭select key
        if (outputIsComplete()) {
            sk.cancel();
        }
    }

}

优点:
模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成

缺点:

  1. 性能问题: 只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
  2. 可靠性问题: 线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
  1. 单 Reactor多线程


    image.png
  • Reactor 对象通过 selector 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发
  • 如果建立连接请求, 则右 Acceptor 通过accept 处理连接请求
  • 如果不是连接请求,则由 reactor 分发调用连接对应的 handler 来处理
  • handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的
  • worker 线程池的某个线程处理业务
  • worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
  • handler 收到响应后,通过 send 将结果返回给 client

示例代码

package nio4;


import nio3.SystemConfig;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.*;

class MthreadHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKey selectionKey;
    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
    static final int READING = 0, SENDING = 1;
    int state = READING;


    ExecutorService pool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    static final int PROCESSING = 3;

    MthreadHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        // Optionally try first read now
        selectionKey = channel.register(selector, 0);
        //将Handler作为callback对象
        selectionKey.attach(this);
        //第二步,注册Read就绪事件
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    boolean inputIsComplete() {
        /* ... */
        return false;
    }

    boolean outputIsComplete() {

        /* ... */
        return false;
    }

    void process() {
        /* ... */
        return;
    }

    @Override
    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) { /* ... */ }
    }


    synchronized void read() throws IOException {
        // ...
        channel.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            //使用线程pool异步执行
            pool.execute(new Processer());
        }
    }

    void send() throws IOException {
        channel.write(output);

        //write完就结束了, 关闭select key
        if (outputIsComplete()) {
            selectionKey.cancel();
        }
    }

    synchronized void processAndHandOff() {
        process();
        state = SENDING;
        // or rebind attachment
        //process完,开始等待write就绪
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

    class Processer implements Runnable {
        public void run() {
            processAndHandOff();
        }

    }

}

优点:
可以充分的利用多核 cpu 的处理能力
缺点:
多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈

  1. 主从 Reactor 多线程


    image.png
  • Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过Acceptor 处理客户端连接事件
  • 当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将连接分配给 SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端建立连接之后将连接交由 SubReactor 监听后面的 IO 事件。)
  • SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理
  • 当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理
  • Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理
  • Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。
  • Handler 通过 send 向客户端发送响应数据
  • 一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个SubReactor 线程

示例代码

package nio4;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class MthreadReactor implements Runnable {

    //subReactors集合, 一个selector代表一个subReactor
    Selector[] selectors = new Selector[2];
    int next = 0;
    final ServerSocketChannel serverSocket;

    MthreadReactor(int port) throws IOException { //Reactor初始化
        selectors[0] = Selector.open();
        selectors[1] = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        //非阻塞
        serverSocket.configureBlocking(false);
        //分步处理,第一步,接收accept事件
        SelectionKey sk = serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor());
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                for (int i = 0; i < 2; i++) {
                    selectors[i].select();
                    Set selected = selectors[i].selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext()) {
                        //Reactor负责dispatch收到的事件
                        dispatch((SelectionKey)(it.next()));
                    }
                    selected.clear();
                }

            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        //调用之前注册的callback对象
        if (r != null) {
            r.run();
        }
    }


    class Acceptor implements Runnable { // ...
        @Override
        public synchronized void run() {
            try {
                SocketChannel connection = serverSocket.accept(); //主selector负责accept
                if (connection != null) {
                    new MthreadHandler(selectors[1], connection); //选个subReactor去负责接收到的connection
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

}

优点:

  1. MainReactor 线程与 SubReactor 线程的数据交互简单职责明确,MainReactor 线程只需要
    接收新连接,SubReactor 线程完成后续的业务处理
  2. MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接
    传给 SubReactor 线程,SubReactor 线程无需返回数据
  3. 多个 SubReactor 线程能够应对更高的并发请求

缺点:
这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括Nginx、Memcached、Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个,1 只是表示相对较少)连接建立线程+M 个 IO 线程+N 个业务处理线程。这是业界成熟的服务器程序设计模式。

3.2.4 Netty线程模型

Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。

  1. 简单版Netty模型


    image.png
  • BossGroup 线程维护 Selector,ServerSocketChannel 注册到这个 Selector 上,只关注连接建立请求事件(主 Reactor)
  • 当接收到来自客户端的连接建立请求事件的时候,通过 ServerSocketChannel.accept 方法获得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程中的
  • Selector,每个 Selector 运行在一个线程中(从 Reactor) 当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的 IO 事件后,就调用 Handler 进行
    处理
  1. 进阶版Netty模型


    image.png
  • 有两组线程池:BossGroup 和 WorkerGroup,BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写
  • BossGroup 和 WorkerGroup 含有多个不断循环的执行事件处理的线程,每个线程都包含一个 Selector,用于监听注册在其上的 Channel
  • 每个 BossGroup 中的线程循环执行以下三个步骤
    • 轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
    • 处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到WorkerGroup 中某个线程上的 Selector 上
    • 再去以此循环处理任务队列中的下一个事件
  • 每个 WorkerGroup 中的线程循环执行以下三个步骤
    • 轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
    • 在对应的 NioSocketChannel 上处理 read/write 事件
    • 再去以此循环处理任务队列中的下一个事件
  1. 详细版Netty模型


    image.png
  • Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做
    BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是NioEventLoopGroup
  • NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop
  • NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)
  • NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
  • 每个 BossNioEventLoop 中循环执行以下三个步骤
    • select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
    • processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个
      NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上
    • runAllTasks:再去以此循环处理任务队列中的其他任务
  • 每个 WorkerNioEventLoop 中循环执行以下三个步骤
    • select:轮训注册在其上的 NioSocketChannel 的 read/write 事件
      (OP_READ/OP_WRITE 事件)
    • processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
    • runAllTasks:再去以此循环处理任务队列中的其他任务
  • 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。

3.3 核心API介绍

3.3.1 ChannelHandler及其实现类

ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具 体的业务逻辑。API 关系如下图所示


image.png

Netty开发中需要自定义一个 Handler 类去实现 ChannelOutboundHandle或ChannelInboundHandle接口或其子接口或其实现类,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法

  • public void channelActive(ChannelHandlerContext ctx),通道就绪事件
  • public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
  • public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件

3.3.2 ChannelPipeline

ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的责任链


image.png

如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler. InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler是按照Pipeline的加载顺序,逆序执行

3.3.3 ChannelHandlerContext

这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时ChannelHandlerContext 中也绑定了对应的 ChannelPipeline和 Channel 的信息,方便对ChannelHandler 进行调用。常用方法如下所示:

  • ChannelFuture close(),关闭通道
  • ChannelOutboundInvoker flush(),刷新
  • ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前
  • ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

3.3.4 ChannelOption

Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:

  • ChannelOption.SO_BACKLOG
    对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。
  • ChannelOption.SO_KEEPALIVE ,一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

3.3.5 ChannelFuture

表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返
回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。
常用方法如下所示:

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步

3.3.6 EventLoopGroup和实现类NioEventLoopGroup

EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般 会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。

EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:

BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示:


image.png

BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。

一般情况下我们都是用实现类NioEventLoopGroup.
常用方法如下所示:

  • public NioEventLoopGroup(),构造方法,创建线程组
  • public Future<?> shutdownGracefully(),断开连接,关闭线程

3.3.7 ServerBootstrap和Bootstrap

ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置;Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下 所示:

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroupchildGroup), 该方法用于服务器端,用来设置两个 EventLoop
  • public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
  • public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道 实现
  • public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
  • public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
  • public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处理类(自定义的 handler)
  • public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
  • public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连 接服务器端

3.3.8 Unpooled类

这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:

  • public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据 和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)

3.4 Netty入门案例

TimeServerHandler

package com.demo.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author chenyi
 * @Description
 * @date 2021/12/28 15:34
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    // (1) 如前所述,该channelActive()方法将在建立连接并准备好生成流量时调用。让我们在这个方法中写一个 32 位整数来表示当前时间
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // (2)要发送新消息,我们需要分配一个新的缓冲区来包含该消息。
        // 我们将要写入一个 32 位整数,因此我们需要ByteBuf容量至少为 4 个字节的 a。
        // 获取当前ByteBufAllocator通孔ChannelHandlerContext.alloc()并分配一个新缓冲区。
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int)(System.currentTimeMillis()/ 1000L + 2208988800L));
        // (3)像往常一样,我们编写构造的消息。
        // 但是等等,翻转在哪里?java.nio.ByteBuffer.flip()之前我们不是在蔚来发消息之前先打电话吗?
        // ByteBuf没有这样的方法,因为它有两个指针;一个用于读操作,另一个用于写操作。
        // 当你写一些东西ByteBuf而读者索引不会改变时,作者索引会增加。读取器索引和写入器索引分别表示消息开始和结束的位置。
        //
        // 相比之下,NIO 缓冲区没有提供一种干净的方法来确定消息内容的开始和结束位置,而无需调用翻转方法。
        // 当您忘记翻转缓冲区时,您会遇到麻烦,因为不会发送任何数据或不正确的数据。
        // 这种错误在 Netty 中不会发生,因为我们针对不同的操作类型有不同的指针。
        // 当你习惯了它时,你会发现它会让你的生活变得更轻松——一种没有生气的生活!
        //
        // 另一点要注意的是ChannelHandlerContext.write()(and writeAndFlush()) 方法返回一个ChannelFuture.
        // AChannelFuture表示尚未发生的 I/O 操作。这意味着,任何请求的操作可能尚未执行,
        // 因为所有操作在 Netty 中都是异步的。例如,以下代码甚至可能在发送消息之前关闭连接:
        //
        // 因此,您需要在close()方法ChannelFuture返回的完成后调用该方法write(),并在写入操作完成时通知其侦听器。
        // 请注意,close()也可能不会立即关闭连接,而是返回一个ChannelFuture.
        ChannelFuture channelFuture = ctx.writeAndFlush(time);
        // (4) 那么当写请求完成时我们如何得到通知?这就像ChannelFutureListener在返回的ChannelFuture.
        // 在这里,我们创建了一个新的匿名函数ChannelFutureListener,它Channel在操作完成时关闭。
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                assert channelFuture == future;
                ctx.close();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

}

TimeServer

package com.demo.time;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @author chenyi
 * @Description
 * @date 2021/12/28 14:23
 */
public class TimeServer {

    private int port;
    public TimeServer(int port) {
        this.port = port;
    }

    public void run() {
        // (1)NioEventLoopGroup是一个处理 I/O 操作的多线程事件循环。
        // Netty为不同类型的传输提供了EventLoopGroup各种实现。
        // 我们在这个例子中实现了一个服务器端应用程序,因此NioEventLoopGroup将使用两个。
        // 第一个,通常称为“boss”,接受传入连接。
        // 第二个,通常称为“worker”,一旦boss接受连接并将接受的连接注册到worker,就会处理已接受连接的流量。
        // 使用了多少线程以及它们如何映射到创建的Channels 取决于EventLoopGroup实现,甚至可以通过构造函数进行配置。
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
           // (2)ServerBootstrap是一个设置服务器的辅助类。
            // 您可以直接使用Channel设置服务器。但是,请注意,这是一个乏味的过程,在大多数情况下您不需要这样做。
           ServerBootstrap serverBootstrap = new ServerBootstrap();
           serverBootstrap.group(bossGroup, workerGroup)
                          // (3)在这里,我们指定使用NioServerSocketChannel用于实例化新的Channel类以接受传入连接。
                          .channel(NioServerSocketChannel.class)
                          // (4)此处指定的处理程序将始终由新接受的Channel.
                          // 这ChannelInitializer是一个特殊的处理程序,旨在帮助用户配置新的Channel.
                          // 您很可能希望通过添加一些处理程序来配置ChannelPipeline新的Channel,
                          // 例如DiscardServerHandler实现您的网络应用程序。
                          // 随着应用程序变得复杂,您可能会向管道添加更多处理程序,并最终将此匿名类提取到顶级类中。
                          .childHandler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel socketChannel) throws Exception {
                                  socketChannel.pipeline().addLast(new TimeServerHandler());
                              }
                          })
                          // (5)您还可以设置特定于Channel实现的参数。
                          // 我们正在编写一个 TCP/IP 服务器,因此我们可以设置套接字选项,例如tcpNoDelay和keepAlive。
                          // 请参阅 apidocsChannelOption和具体ChannelConfig实现以获取有关支持的ChannelOptions的概述。
                          .option(ChannelOption.SO_BACKLOG, 128)
                          // (6)你有没有注意到option()和childOption()?
                          // option()用于NioServerSocketChannel接受传入连接的 。
                          // childOption()是为Channelparent 接受的 ServerChannel,NioSocketChannel在这种情况下。
                          .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 我们现在准备好了。剩下的就是绑定到端口并启动服务器。
            // 在这里,我们绑定到机器8080中所有网卡(网络接口卡)的端口。您现在可以bind()根据需要多次调用该方法(使用不同的绑定地址。)
            // Bind and start to accept incoming connections.
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("服务器启动成功");
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8888;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new TimeServer(port).run();
    }

}

TimeClientHandler

package com.demo.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // (1) 在 TCP/IP 中,Netty 将从对等方发送的数据读取到ByteBuf.
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeClient

package com.demo.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = "127.0.01";
        int port = 8888;
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            // (1) Bootstrap类似于,ServerBootstrap除了它用于非服务器通道,例如客户端或无连接通道。
            Bootstrap b = new Bootstrap();
            // (2) 如果仅指定一个EventLoopGroup,它将同时用作老板组和工人组。但是,老板工人不用于客户端
            b.group(workerGroup);
            // (3) 而不是NioServerSocketChannel,NioSocketChannel用于创建客户端Channel.
            b.channel(NioSocketChannel.class);
            // (4) 请注意,我们childOption()在此处不使用与 ServerBootstrap 不同的方法,
            // 因为客户端SocketChannel没有父级。
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // 我们应该调用connect()方法而不是bind()方法。
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

3.5 Netty异步模型

3.5.1 基本介绍

异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。


image.png

Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得IO 操作结果. Netty 的异步模型是建立在 future 和 callback 的之上的。
callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 去监控方法 fun 的处理过程(即 : Future-Listener 机制)

3.5.2 Future 和Future-Listener

  1. Future
    表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,ChannelFuture 是他的一个子接口. ChannelFuture 是一个接口 ,可以添加监听器,当监听的事件发生时,就会通知到监听器当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态, 注册监听函数来执行完成后的操作。

常用方法有:

  • sync 方法, 阻塞等待程序结果反回
  • isDone 方法来判断当前操作是否完成;
  • isSuccess 方法来判断已完成的当前操作是否成功;
  • getCause 方法来获取已完成的当前操作失败的原因;
  • isCancelled 方法来判断已完成的当前操作是否被取消;
  • addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器
  1. Future-Listener 机制
    给Future添加监听器,监听操作结果
            ChannelFuture channelFuture = serverBootstrap.bind(port);
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        System.out.println("端口绑定成功!");
                    } else {
                        System.out.println("端口绑定失败!");
                    }
                }
            });

4. Netty高级应用

4.1 Netty编解码器

4.1.1 Java的编解码

  1. 编码(Encode)称为序列化, 它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。
  2. 解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。


    image.png

    java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。
    Java序列化目的:1.网络传输。2.对象持久化。
    Java序列化缺点:1.无法跨语言。 2.序列化后码流太大。3.序列化性能太低。
    Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框架,这些编解码框架实现消息的高效序列化。

4.1.2 Netty编解码器

  1. 概念
    在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码。
    对于Netty而言,编解码器由两部分组成:编码器、解码器。
  • 解码器:负责将消息从字节或其他序列形式转成指定的消息对象。
  • 编码器:将消息对象转成字节或其他序列形式在网络上传输。
    Netty 的编(解)码器实现了 ChannelHandlerAdapter,也是一种特殊的 ChannelHandler,所以依赖于 ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。

Netty里面的编解码: 解码器:负责处理“入站 InboundHandler”数据。 编码器:负责“出站OutboundHandler” 数据。

  1. 解码器(Decoder)
    解码器负责 解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder


    image.png
  • ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节
    ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但 是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。
  • 项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
  • MessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)

核心方法:

decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out)

代码实现:
解码器:

package com.lagou;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;

import java.util.List;

/*** 消息解码-可以将字符串消息进行在进行解码. 只有消息入站时才会进行解码 */
public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("正在进行消息解码");
        out.add(in.toString(CharsetUtil.UTF_8));
    }
}

使用解码器

protected void initChannel(SocketChannel ch) throws Exception {
        //8. 向pipeline中添加自定义业务处理handler
        // 添加解码器
        ch.pipeline().addLast(new MessageDecoder());
        // 业务处理handler
        ch.pipeline().addLast(new NettyServerHandler()); }
    }
  1. 编码器(Encoder)
    与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现ChannelOutboundHandler接口。


    image.png

抽象编码器

  • MessageToByteEncoder: 将消息转化成字节
  • MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)

核心方法:

encode(ChannelHandlerContext ctx, String msg, List<Object> out)

代码实现:
编码器:

package com.lagou;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;

import java.util.List;

/*** 编码器 */
public class MessageEncoder extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
        System.out.println("消息进行消息编码");
        out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
    }

}

消息发送:

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ChannelFuture future = ctx.writeAndFlush("你好呀.我是Netty客户端");
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("数据发送成功!");
                } else {
                    System.out.println("数据发送失败!");
                }
            }
        });
    }

使用编码器

protected void initChannel(SocketChannel ch) throws Exception {
        //8. 向pipeline中添加自定义业务处理handler
        // 添加编码器
        ch.pipeline().addLast(new MessageEncoder ());
        // 业务处理handler
        ch.pipeline().addLast(new NettyServerHandler()); }
    }

4. 编码解码器Codec
编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。

image.png

Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类
ByteToMessageCodec ,MessageToMessageCodec都继承与此类.

代码实现:

package com.lagou;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.CharsetUtil;

import java.util.List;

/**
 * @author chenyi
 * @Description
 * @date 2021/12/28 16:55
 */
public class MessageCoder extends MessageToMessageCodec {


    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List out) throws Exception {
        System.out.println("正在进行消息解码");
        String str = (String) msg;
        out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
    }
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, Object msg, List out) throws Exception {
        System.out.println("消息进行消息编码");
        String str = (String) msg;
        out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
    }

}

使用编码解码器

protected void initChannel(SocketChannel ch) throws Exception {
        //8. 向pipeline中添加自定义业务处理handler
        // 添加编码解码器
        ch.pipeline().addLast(new MessageCoder());
        // 业务处理handler
        ch.pipeline().addLast(new NettyServerHandler()); }
    }

4.2 Netty案例-群聊天室

案例要求:

  1. 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯
  2. 实现多人群聊
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  4. 客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息

4.2.1 聊天室服务端编写

  1. NettyChatServerHandler
package com.lagou.chart;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.util.ArrayList;
import java.util.List;

/**
 * @author chenyi
 * @Description
 * @date 2021/12/28 17:07
 */
public class NettyChatServerHandler extends SimpleChannelInboundHandler {

    public static List<Channel> channelList = new ArrayList<>();

    /*** 通道就绪事件 ** @param ctx * @throws Exception */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //当有新的客户端连接的时候, 将通道放入集合
        channelList.add(channel);
        System.out.println("[Server]:" + channel.remoteAddress()
                                                .toString()
                                                .substring(1) + "在线.");
    }

    /*** 通道未就绪--channel下线 ** @param ctx * @throws Exception */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //当有客户端断开连接的时候,就移除对应的通道
        channelList.remove(channel);
        System.out.println("[Server]:" + channel.remoteAddress()
                                                .toString()
                                                .substring(1) + "下线.");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
        //当前发送消息的通道, 当前发送的客户端连接
        Channel channel = channelHandlerContext.channel();
        for (Channel channel1 : channelList) {
            //排除自身通道
            if (channel != channel1) {
                channel1.writeAndFlush("[" + channel.remoteAddress()
                                                    .toString()
                                                    .substring(1) + "]说:" + msg);
            }
        }
    }

    /*** 异常处理事件 ** @param ctx * @param cause * @throws Exception */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        Channel channel = ctx.channel();
        //移除集合
        channelList.remove(channel);
        System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "异常.");
    }

}

NettyChatServer

package com.lagou.chart;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author chenyi
 * @Description 聊天室服务端
 * @date 2021/12/28 17:00
 */
public class NettyChatServer {

    private int port;
    public NettyChatServer(int port) {
        this.port = port;
    }

    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                           .channel(NioServerSocketChannel.class)
                           .childHandler(new ChannelInitializer<SocketChannel>() {
                               @Override
                               protected void initChannel(SocketChannel socketChannel) {
                                   socketChannel.pipeline().addLast(new StringDecoder());
                                   socketChannel.pipeline().addLast(new StringEncoder());
                                   socketChannel.pipeline().addLast(new NettyChatServerHandler());
                               }
                           })
                           .option(ChannelOption.SO_BACKLOG, 128)
                           .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("服务器启动成功");
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyChatServer(9998).run();
    }

}

NettyChatClientHandler

package com.lagou.chart;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * @author chenyi
 * @Description
 * @date 2021/12/28 17:18
 */
public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }

}

NettyChatClient

package com.lagou.chart;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

/**
 * @author chenyi
 * @Description 聊天室客户端
 * @date 2021/12/28 17:14
 */
public class NettyChatClient {

    private String ip;
    private int port;
    public NettyChatClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new StringDecoder());
                    socketChannel.pipeline().addLast(new StringEncoder());
                    socketChannel.pipeline().addLast(new NettyChatClientHandler());
                }
            });
            ChannelFuture f = b.connect(ip, port).sync();
            sendMessage(f.channel());
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

    private void sendMessage(Channel channel) {
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()) {
            String msg = scanner.nextLine();
            //向服务端发送消息
            channel.writeAndFlush(msg);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new NettyChatClient("127.0.01", 9998).run();
    }

}

4.3 基于Netty的Http服务器开发

4.3.1 介绍

Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。


image.png

4.3.2 功能需求

  1. Netty 服务器在 8080 端口监听
  2. 浏览器发出请求 "http://localhost:8080/ "
  3. 服务器可以回复消息给客户端 "Hello! 我是Netty服务器 " ,并对特定请求资源进行过滤.

示例代码
HttpHelloWorldServerHandler

/*
 * Copyright 2013 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.lagou.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;

import static io.netty.handler.codec.http.HttpHeaderNames.*;
import static io.netty.handler.codec.http.HttpHeaderValues.*;
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;

public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    private static final byte[] CONTENT = { 'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd' };

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
        if (msg instanceof HttpRequest) {
            HttpRequest req = (HttpRequest) msg;

            boolean keepAlive = HttpUtil.isKeepAlive(req);
            FullHttpResponse response = new DefaultFullHttpResponse(req.protocolVersion(), OK,
                                                                    Unpooled.wrappedBuffer(CONTENT));
            response.headers()
                    .set(CONTENT_TYPE, TEXT_PLAIN)
                    .setInt(CONTENT_LENGTH, response.content().readableBytes());

            if (keepAlive) {
                if (!req.protocolVersion().isKeepAliveDefault()) {
                    response.headers().set(CONNECTION, KEEP_ALIVE);
                }
            } else {
                // Tell the client we're going to close the connection.
                response.headers().set(CONNECTION, CLOSE);
            }

            ChannelFuture f = ctx.write(response);

            if (!keepAlive) {
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

HttpHelloWorldServerInitializer

/*
 * Copyright 2013 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.lagou.http;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerExpectContinueHandler;
import io.netty.handler.ssl.SslContext;

public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    public HttpHelloWorldServerInitializer(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    public void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        if (sslCtx != null) {
            p.addLast(sslCtx.newHandler(ch.alloc()));
        }
        p.addLast(new HttpServerCodec());
        p.addLast(new HttpServerExpectContinueHandler());
        p.addLast(new HttpHelloWorldServerHandler());
    }
}

HttpHelloWorldServer

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.lagou.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

/**
 * An HTTP server that sends back the content of the received HTTP request
 * in a pretty plaintext form.
 */
public final class HttpHelloWorldServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8443" : "8888"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.option(ChannelOption.SO_BACKLOG, 1024);
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new HttpHelloWorldServerInitializer(sslCtx));

            Channel ch = b.bind(PORT).sync().channel();

            System.err.println("Open your web browser and navigate to " +
                    (SSL? "https" : "http") + "://127.0.0.1:" + PORT + '/');

            ch.closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

4.4 基于Netty的WebSocket开发网页版聊天室

略过,前端使用websocket插件,后端使用netty遵守同一套协议即可

4.5 Netty中粘包和拆包的解决方案

4.5.1 粘包和拆包简介

粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制。
TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
如图所示,假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。


image.png

TCP粘包和拆包产生的原因:
数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

4.5.2 粘包和拆包代码演示

  1. 粘包
    客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端" + i, CharsetUtil.UTF_8));
        }
    }

服务端

  public int count = 0;
 /*** 通道读取事件 *
  * @param ctx 
  * @param msg 
  * @throws Exception */ 
  @Override 
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
        ByteBuf byteBuf = (ByteBuf) msg; 
        System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); 
        System.out.println("读取次数:"+(++count)); 
  }

运行结果


image.png

拆包
客户端

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //一次发送102400字节数据
        byte[] bytes = new byte[102400];
        Arrays.fill(bytes, (byte)10);
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(bytes));
        }
    }

服务端

  public int count = 0;
 /*** 通道读取事件 *
  * @param ctx 
  * @param msg 
  * @throws Exception */ 
  @Override 
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
        ByteBuf byteBuf = (ByteBuf) msg; 
        System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); 
        System.out.println("读取次数:"+(++count)); 
  }

运行结果


image.png

当客户端发送的数据包比较大的时候, 读取了18次, 应该读取10次, 则发送拆包事件.

4.5.3 粘包和拆包的解决方法

  1. 业内解决方案
    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。
  • 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息
  • 将换行符作为消息结束符
  • 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
  • 通过在消息头中定义长度字段来标识消息的总长度
  1. Netty中的粘包和拆包解决方案
    Netty提供了4种解码器来解决,分别如下:
  • 固定长度的拆包器 FixedLengthFrameDecoder,每个应用层数据包的都拆分成都是固定长度的大小
  • 行拆包器 LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分
  • 分隔符拆包器 DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分隔符,进行分割拆分
  • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度
  1. 代码实现
    LineBasedFrameDecoder解码器
ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"\n", CharsetUtil.UTF_8));

DelimiterBasedFrameDecoder解码器

ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8)); 
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"$", CharsetUtil.UTF_8));

5. 自定义RPC框架

5.1 分布式架构网络通信

在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、Hessian、SOAP、ESB和JMS等,它们背后到底是基于什么原理实现的呢

5.1.1 基本原理

要实现网络机器间的通讯,首先得来看看计算机系统网络通信的基本原理,在底层层面去看,网络通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络IO来实现,其中传输协议比较出名的有tcp、udp等等,tcp、udp都是在基于Socket概念上为某类应用场景而扩展出的传输协议,网络IO,主要有bio、nio、aio三种方式,所有的分布式应用通讯都基于这个原理而实现。

5.1.2 什么是RPC

RPC全称为remote procedure call,即远程过程调用。借助RPC可以做到像本地调用一样调用远程服务,是一种进程间的通信方式.
比如两台服务器A和B,A服务器上部署一个应用,B服务器上部署一个应用,A服务器上的应用想调用B服务器上的应用提供的方法,由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来表达调用的语义和传达调用的数据。需要注意的是RPC并不是一个具体的技术,而是指整个网络远程调用过程。


image.png

RPC架构
一个完整的RPC架构里面包含了四个核心的组件,分别是Client,Client Stub,Server以及ServerStub,这个Stub可以理解为存根。

  • 客户端(Client),服务的调用方。
  • 客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后通过网络远程发送给服务方。
  • 服务端(Server),真正的服务提供者。
  • 服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。


    image.png
  1. 客户端(client)以本地调用方式(即以接口的方式)调用服务;
  2. 客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制);
  3. 客户端通过socket将消息发送到服务端;
  4. 服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
  5. 服务端存根( server stub)根据解码结果调用本地的服务;
  6. 服务处理
  7. 本地服务执行并将结果返回给服务端存根( server stub);
  8. 服务端存根( server stub)将返回结果打包成消息(将结果消息对象序列化);
  9. 服务端(server)通过socket将消息发送到客户端;
  10. 客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
  11. 客户端(client)得到最终结果。

RPC的目标是要把2、3、4、5、7、8、9、10这些步骤都封装起来。只剩下1、6、11

注意:无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。

在java中RPC框架比较多,常见的有Hessian、gRPC、Dubbo 等,其实对 于RPC框架而言,核心模块就是通讯和序列化

5.1.3 RMI示例

https://www.jianshu.com/p/b7c3229ddaa6

5.2 基于Netty实现RPC框架

https://www.jianshu.com/p/156de4bd087a

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容