1. NIO
阅读本节前,请先阅读我的NIO基础文章:https://www.jianshu.com/nb/8788241
NIO是Java中的一种同步非阻塞IO,NIO是面向buffer的非阻塞IO。其中最重要的的三个核心概念是:Channel,Buffer和Selector。
Channel
Channel类似于BIO中的流,可以从中读取或者写入数据。但它和流有以下区别:
- Channel是双向的,既可以读又可以写,而流是单向的。
- Channel可以进行异步的读写。
- 对Channel的读写必须通过buffer对象。
在Java NIO中Channel主要有如下几种类型:
- FileChannel:从文件读取数据的
- DatagramChannel:读写UDP网络协议数据
- SocketChannel:读写TCP网络协议数据
- ServerSocketChannel:可以监听TCP连接
Buffer
Buffer是NIO中用于存放待读写数据的容器。数据总是从Channel中读取到Buffer中,或者从Buffer中写入到Channel中。
常见Buffer的实现类包括:ByteBuffer,CharBuffer,DoubleBuffer,FloatBuffer,IntBuffer,LongBuffer,ShortBuffer等。
其中ByteBuffer又包含两个实现类:HeapBuffer和MappedByteBuffer(以及其实现类DirectBuffer)。
HeapBuffer这种缓冲区是分配在堆上面的,而DirectBuffer则是直接指向了一块堆外直接内存。
零拷贝技术
下图展示了一个IO读写流程:
- DMA
read
读取磁盘文件内容到内核缓冲区 - 拷贝内核缓冲区数据到应用进程缓冲区(内核态和用户态的切换)
- 从应用进程缓冲区copy数据到socket缓冲区(内核态和用户态的切换)
-
DMA copy
给网卡发送
可以清楚得看到,上述IO流程包含两次用户态和内核态上下文切换,在高并发场景下,这些会很致命。因此,Linux提出了零拷贝的概念:即避免用户态和内核态的切换,直接在内核中进行数据传递。Linux提供了两个函数mmap
和sendfile
来实现零拷贝:
- mmap: 内存映射文件,即将文件的一段直接映射到内存,内核和应用进程共用同一块内存地址
- sendfile: 从上图的内核缓冲区直接复制到socket缓冲区, 不需要向应用进程缓冲区拷贝
mmap
传统的IO操作都是在内核准备好数据后,将数据从内核中拷贝一份到用户空间中。而直接内存(mmap技术)将文件直接映射到内核空间的内存,返回一个操作地址(address),省去了内核空间拷贝到用户空间这一步操作。如下图所示:
在NIO中,MappedByteBuffer则对应着mmap
技术。下面是MappedByteBuffer的使用例子:
FileChannel f1 = new FileInputStream(file1).getChannel();
// FileOutputStream打开的FileChannel只能写入
FileChannel f2 = new FileOutputStream(file2).getChannel();)
// 将file1的数据全部映射成ByteBuffer
MappedByteBuffer mbb = f1.map(MapMode.READ_ONLY, 0, file.length());
// 将buffer里的数据写入到file2中
f2.write(mbb);
mbb.clear();
HeapBuffer的数据结构类似于:
public Class HeapBuffer {
byte[] data;
int position, limit, int capacity;
}
而DirectBuffer则直接指向一个内存地址:
public Class DirectBuffer {
long address;
int position, limit, int capacity;
}
当我们把一个Direct Buffer写入Channel的时候,就好比是“内核缓冲区”的内容直接写入了Channel,这样显然快了,减少了数据拷贝。而当我们把一个Heap Buffer写入Channel的时候,实际上底层实现会先构建一个临时的Direct Buffer,然后把Heap Buffer的内容复制到这个临时的Direct Buffer上,再把这个Direct Buffer写出去。当然,如果我们多次调用write方法,把一个Heap Buffer写入Channel,底层实现可以重复使用临时的Direct Buffer,这样不至于因为频繁地创建和销毁Direct Buffer影响性能。
Direct Buffer创建和销毁的代价很高,所以要用在尽可能重用的地方。 比如周期长传输文件大采用Direct Buffer,不然一般情况下就直接用heap buffer 就好。
sendfile
sendfile
不存在内存映射, 同时保留了mmap
的不需要来回拷贝优点,适用于应用进程不需要对读取的数据做任何处理的场景。如图:
Java中Channel.transferTo(Channel destination)
对应着sendfile
技术。
Selector
Selector用于监听多个Channel的事件。
2. Reactor模型
Reactor模型中主要有三种角色:
- Reactor:内部封装了一个selector,循环调用select方法获得就绪channel。然后将就绪channel dispatch给对应handler执行真的读写逻辑。
- Acceptor:监听客户端连接,并为客户端的SocketChannel向Reactor注册对应的handler。
- Handlers:真正执行非阻塞读/写任务逻辑。
Reactor模型从复杂程度又可以分为三种:单Reactor单线程模型,单Reactor多线程模型和多Reactor多线程模型。
2.1 单Reactor单线程模型
下面是其实现:
/**
* 等待事件到来,分发事件处理
*/
class Reactor implements Runnable {
private Reactor() throws Exception {
SelectionKey sk =
serverSocket.register(selector,
SelectionKey.OP_ACCEPT);
// attach Acceptor 处理新连接
sk.attach(new Acceptor());
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
it.remove();
//分发事件处理
dispatch((SelectionKey) (it.next()));
}
}
} catch (IOException ex) {
//do something
}
}
void dispatch(SelectionKey k) {
// 若是连接事件获取是acceptor
// 若是IO读写事件获取是handler
Runnable runnable = (Runnable) (k.attachment());
if (runnable != null) {
runnable.run();
}
}
}
/**
* 连接事件就绪,处理连接事件
*/
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {// 注册读写
new Handler(c, selector);
}
} catch (Exception e) {
}
}
}
/**
* 处理读写业务逻辑
*/
class Handler implements Runnable {
public static final int READING = 0, WRITING = 1;
int state;
final SocketChannel socket;
final SelectionKey sk;
public Handler(SocketChannel socket, Selector sl) throws Exception {
this.state = READING;
this.socket = socket;
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
socket.configureBlocking(false);
}
@Override
public void run() {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
}
private void read() {
process();
//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}
private void write() {
process();
//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}
/**
* task 业务处理
*/
public void process() {
//do something
}
}
这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离Socket,有新的客户端连接触发accept事件之后,Reactor交由Acceptor进行处理。当有IO读写事件就绪后则交给Hanlder 处理。
Acceptor主要任务就是构建Handler ,在获取到和client相关的SocketChannel之后 ,注册读写事件到Reactor(Selector)上,并绑定对应的Hanlder。对应的SocketChannel有读写事件之后,Reactor再交给对应的Hanlder进行处理。
2.2 单Reactor多线程模型
单Reactor多线程模型,在单Reactor的基础上,增加了一个Worker线程池,用于Handler的执行。
/**
* 多线程处理读写业务逻辑
*/
class MultiThreadHandler implements Runnable {
public static final int READING = 0, WRITING = 1;
int state;
final SocketChannel socket;
final SelectionKey sk;
//多线程处理业务逻辑
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
this.state = READING;
this.socket = socket;
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
socket.configureBlocking(false);
}
@Override
public void run() {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
}
private void read() {
//任务异步处理
executorService.submit(() -> process());
//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}
private void write() {
//任务异步处理
executorService.submit(() -> process());
//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}
/**
* task 业务处理
*/
public void process() {
//do IO ,task,queue something
}
}
相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。
2.3 多Reactor多线程模式
相比较于第二种模型,多Reactor多线程模式将Reactor分为两种:
- MainReactor:负责监听socket连接,用来处理新连接的建立,将建立的socketChannel指定注册给SubReactor。
- SubReactor:维护自己的selector, 基于MainReactor注册的socketChannel,监听读写就绪事件,读写就绪后将Handler扔给worker线程池来完成。
/**
* 多work 连接事件Acceptor,处理连接事件
*/
class MultiWorkThreadAcceptor implements Runnable {
// cpu线程数相同多work线程
int workCount =Runtime.getRuntime().availableProcessors();
SubReactor[] workThreadHandlers = new SubReactor[workCount];
volatile int nextHandler = 0;
public MultiWorkThreadAcceptor() {
this.init();
}
public void init() {
nextHandler = 0;
for (int i = 0; i < workThreadHandlers.length; i++) {
try {
workThreadHandlers[i] = new SubReactor();
} catch (Exception e) {
}
}
}
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {// 注册读写
synchronized (c) {
// 顺序获取SubReactor,然后注册channel
SubReactor work = workThreadHandlers[nextHandler];
work.registerChannel(c);
nextHandler++;
if (nextHandler >= workThreadHandlers.length) {
nextHandler = 0;
}
}
}
} catch (Exception e) {
}
}
}
/**
* 多work线程处理读写业务逻辑
*/
class SubReactor implements Runnable {
final Selector mySelector;
//多线程处理业务逻辑
int workCount =Runtime.getRuntime().availableProcessors();
ExecutorService executorService = Executors.newFixedThreadPool(workCount);
public SubReactor() throws Exception {
// 每个SubReactor 一个selector
this.mySelector = SelectorProvider.provider().openSelector();
}
/**
* 注册chanel
*
* @param sc
* @throws Exception
*/
public void registerChannel(SocketChannel sc) throws Exception {
sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
}
@Override
public void run() {
while (true) {
try {
//每个SubReactor 自己做事件分派处理读写事件
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
read();
} else if (key.isWritable()) {
write();
}
}
} catch (Exception e) {
}
}
}
private void read() {
//任务异步处理
executorService.submit(() -> process());
}
private void write() {
//任务异步处理
executorService.submit(() -> process());
}
/**
* task 业务处理
*/
public void process() {
//do IO ,task,queue something
}
}
在多Reactor多线程模型中,MainReactor 主要是用来处理网络IO建立连接操作,而SubReactor则主要复杂监听IO就绪事件,分派任务执行。此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别。
3. Netty
首先关于Netty的使用demo,请参考:https://www.jianshu.com/p/e58674eb4c7a
Netty的架构类似于多Reactor多线程模型,但是Netty默认不使用Worker线程池执行Handler,而是直接使用IO线程执行读写任务。下图是Netty的线程模型:
如图所示,Netty中包含两个NioEventLoopGroup,一个是boss,另一个是worker。boss负责监听网络连接,而worker负责分发读写事件。每一个NioEventLoopGroup都包含多个NioEventLoop,一个NioEventLoop本质上是一个包含了一个Selector的SingleThreadPool。
事实上,boss类型的NioEventLoopGroup通常只包含一个NioEventLoop。
每个boss NioEventLoop循环执行的任务包含3步:
- 第1步:轮询accept事件;
- 第2步:处理io任务,即accept事件,与client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个worker NioEventLoop的selector上;
- 第3步:处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。
每个worker NioEventLoop循环执行的任务包含3步:
- 第1步:轮询read、write事件;
- 第2步:处理io任务,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理;
- 第3步:处理任务队列中的任务,runAllTasks。
Client端的Netty架构图如下:
client端启动时connect到server,建立NioSocketChannel,并注册到某个NioEventLoop的selector上。client端只包含1个NioEventLoopGroup,每个NioEventLoop循环执行的任务包含3步:
- 第1步:轮询connect、read、write事件;
- 第2步:处理io任务,即connect、read、write事件,在NioSocketChannel连接建立、可读、可写事件发生时进行处理;
- 第3步:处理非io任务,runAllTasks。
3.1 Netty模式
下面是多Reactor的使用模式:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
bossGroup中只有一个线程(EventLoop),而workerGroup中的线程是 CPU 核心数乘以2, 因此对应的到 Reactor 线程模型中,我们知道,这样设置的 NioEventLoopGroup 其实就是多Reactor模型。
下面是单Reactor单线程的使用模式:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup).channel(NioServerSocketChannel.class)
注意, 我们实例化了一个 NioEventLoopGroup,构造器参数是1,表示 NioEventLoopGroup 的线程池大小是1。然后接着我们调用 b.group(bossGroup) 设置了服务器端的 EventLoopGroup。此时bossGroup和workerGroup就是同一个NioEventLoopGroup,并且这个 NioEventLoopGroup只有一个线程(EventLoop),那么对应到Reactor的线程模型中,就相当于单Reactor单线程模型。
3.2 耗时任务
由于Netty中的EventLoop既要处理IO,又要执行Handler。因此需要使用特殊手段执行耗时任务。主要有两种方式:
- Handler中加入自定义线程池
- Pipeline中加入线程池
方法一:自定义线程池
public class ServerBusinessThreadPoolHandler extends SimpleChannelInboundHandler {
public static final ChannelHandler INSTANCE = new ServerBusinessThreadPoolHandler();
private static ExecutorService threadPool = Executors.newFixedThreadPool(1000);
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
ByteBuf data = Unpooled.directBuffer();
data.writeBytes(msg);
threadPool.submit(() -> {
try {
//耗时的操作
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object result = getResult(data);
ctx.channel().writeAndFlush(result);
});
}
}
方法二:Pipeline中加入线程池
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
bootstrap.group(boss, worker);
EventLoopGroup businessGroup = new NioEventLoopGroup(1000); //大小为1000的线程池
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(businessGroup, new NettyServerHandler()); // 添加NettyServerHandler,用来处理Server端接收和处理消息的逻辑
}
});
如图,通过ChannelPipeline.addLast(EventExecutorGroup group, ChannelHandler handler)
API为对应的Handler提供了优先选择的executor。如果直接ChannelPipeline.addLast(ChannelHandler handler)
方法,那么Handler执行时默认使用对应的NioEventLoop来执行。而通过ChannelPipeline.addLast(EventExecutorGroup group, ChannelHandler handler)
API Netty将使用给定的EventExecutorGroup来执行handler。
3.3 Netty避免线程切换
为了尽可能的提升性能,Netty在很多地方进行了无锁化设计,例如在IO线程内部进行串行操作,避免多线程竞争导致的性能下降问题。表面上看,串行化设计似乎CPU利用率不高,并发程度不够,但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列---多个工作线程的模型性能更优。
Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead(Object msg)。只要用户不主动切换线程,一直都是由NioEventLoop调用用户的Handler,期间不进行线程切换。这种串行处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。
4. tomcat
Tomcat也采用了Reactor模型的设计理念,如下图所示:
如图所示,Tomcat线程模型中包含四个关键角色:
- Acceptor:负责处理Socket连接。获得SocketChannel对象,然后封装在一个tomcat的实现类org.apache.tomcat.util.net.NioChannel对象中。然后将NioChannel对象封装在一个PollerEvent对象中,并将PollerEvent对象压入Poller Event Queue里。
- Poller:每一个Poller线程都维护了一个Selector对象,主要负责消费Event Queue中的数据,并注册到内部的Selector上,然后不断监听Socket读写就绪事件。读写就绪后,将就绪的SocketChannel传递给Worker线程池执行读写任务。
- Poller Event Queue:存储PollerEvent对象的消息队列。注意,这个消息队列实际上存储在Poller中,Poller中包含了一个
private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();
属性用于存储PollerEvent。并且Poller开放了void addEvent(PollerEvent event)
方法,从而Acceptor能够将PollerEvent传递给Poller。 - Worker:实际的IO读写线程。
多个Worker线程,有时候也叫IO线程,就是专门负责IO读写的。一种实现方式就是像Netty一样,每个Worker线程都有自己的Selector,可以负责多个连接的IO读写事件,每个连接归属于某个线程。另一种方式实现方式就是有专门的线程负责IO事件监听,这些线程有自己的Selector,一旦监听到有IO读写事件,并不是像第一种实现方式那样(自己去执行IO操作),而是将IO操作封装成一个Runnable交给Worker线程池来执行,这种情况每个连接可能会被多个线程同时操作,相比第一种并发性提高了,但是也可能引来多线程问题,在处理上要更加谨慎些。tomcat的NIO模型就是第二种。
参考文章:
- https://blog.csdn.net/Pengjx2014/article/details/79179129#nioeventloop
- https://www.jianshu.com/p/03bb8a945b37
- https://www.jianshu.com/p/e58674eb4c7a
- https://www.jianshu.com/p/727bbc7454dc
- https://blog.tolvyou.cn/2018/11/16/netty-asyc-callback/
- https://blog.csdn.net/qq_16681169/article/details/75003640
- https://blog.csdn.net/yanlinwang/article/details/46382889