参考儒猿小文件系统的案例,按照自己的理解写了一下
对接NIO_CLIENT,客户端考虑是高性能,最大效率的传输文件,而服务端考虑的是高并发,服务端和客户端在数量上的不对等,造成了服务端要尽可能的处理多个连接,尽快的响应客户端请求。
客户端设计成Netty类似的架构,Rector模型,一个Selector负责接收Accept请求,然后分发给各个Processor进行请求解析,解析好的请求存入请求队列中,多个IO线程进行竞争拉取
设计成该架构是考虑到IO解析和磁盘写入性能不对等,在普通服务器上,磁盘IO的随机读写能力是一个瓶颈,性能较差,那么我们可以考虑在请求解析和磁盘的随机读写分离开,中间使用请求队列进行通知,那么我们可以动态的加大磁盘IO线程数量来提高磁盘的IO性能,请求解析是基于内存来进行的,性能较高,Processor线程数量可以较小。为什么将Accpet请求之后就分发给Processor线程呢,考虑到NIO的多路复用机制,它会轮询各个通道是否有相关事件发生,如果注册的连接过多,那么单个连接的请求处理速度肯定受到影响,如果分发给不同的线程进行请求解析,那么单个线程的连接数可以得到控制,在尽可能多的维持连接的情况下提高性能。
整个架构如下:
现在存在的问题:
- 大文件,文件大于5M,并发上传,内存吃紧,容易内存溢出,现在采取的措施是异常捕捉,等待内存释放再次读取,但是这个没办法限制住NIO网络请求内存,这个是个问题
- 现在没办法保证一个READ事件能读取完整请求,是将未解析完的请求缓存起来,等待下次READ事件进行解析,但是考虑一个问题,TCP的丢包和重试会造成什么影响呢,没模拟出来,如果真出现这个问题,那么从丢包的那一刻起后续的请求解析都会出问题,这个问题就很严重
- 整个架构还是很脆弱,没有设计容错机制,且对资源过长时间占用没进行处理。
NioServer
package org.zymf.nio.example3.server;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author zhuyuemufeng
* @version 1.0
* @description: IO多路复用分发器
* @date 2021-07-18 0:39
*/
public class NioServer extends Thread{
/**
* 选择器
*/
private Selector selector;
/**
* 监听通道
*/
private ServerSocketChannel socketChannel;
/**
* processor处理线程数量
*/
private int NIO_PROCESSOR_NUM = 3;
/**
* 负责解析请求的Process线程集合
*/
private List<NioProcessor> nioProcessors;
/**
* 磁盘读写线程数量
*/
private int DISK_IO_THREAD_NUM = 5;
/**
* 负责磁盘读写的线程集合
*/
private List<NioIoThread> nioIoThreads;
/**
* 记录当前分配请求的Processors索引
*/
private AtomicInteger nextProcessor;
public NioServer() throws Exception {
//初始化监听请求监听通道
this.selector = Selector.open();
this.socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.socket()
.bind(new InetSocketAddress(7899),100);
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
//初始化请求解析processor
nioProcessors = new ArrayList<>(NIO_PROCESSOR_NUM);
for (int i = 0; i < NIO_PROCESSOR_NUM; i++) {
NioProcessor nioProcessor = new NioProcessor(i);
nioProcessors.add(nioProcessor);
NetWorkRespondQueue.get().initRespondQueue(i);
nioProcessor.start();
}
//初始化磁盘读写线程
nioIoThreads = new ArrayList<>(DISK_IO_THREAD_NUM);
for (int i = 0; i < DISK_IO_THREAD_NUM; i++) {
NioIoThread nioIoThread = new NioIoThread();
nioIoThreads.add(nioIoThread);
nioIoThread.start();
}
//初始化轮询计数器
nextProcessor = new AtomicInteger(0);
}
@Override
public void run() {
while (true){
try {
//同步非阻塞获取响应事件
int select = selector.select();
if (select > 0){
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverSocketChannel.accept();
if (channel != null){
System.out.println(">>>>>>>>>>>>接收Accept请求");
channel.configureBlocking(false);
int index = incrementAndGetProcessor();
NioProcessor nioProcessor = nioProcessors.get(index);
nioProcessor.addChannel(channel);
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @description: 轮询Processor
* @param: * @param:
* @return: int
* @author zhuyuemufeng
* @date: 2021-07-18 1:08
*/
private int incrementAndGetProcessor() {
for (;;) {
int current = nextProcessor.get();
int next = (current + 1) % NIO_PROCESSOR_NUM;
if (nextProcessor.compareAndSet(current, next))
return next;
}
}
}
NioProcessor
package org.zymf.nio.example3.server;
import org.zymf.nio.example3.constant.Constant;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author zhuyuemufeng
* @version 1.0
* @description: 请求监听解析器
* @date 2021-07-18 0:40
*/
public class NioProcessor extends Thread {
/**
* 当前Process的ID
*/
private int processId;
/**
* 多路复用选择器
*/
private Selector selector;
/**
* 等待注册监听的Channel
*/
private ConcurrentLinkedQueue<SocketChannel> channelQueue;
/**
* 未读完的请求
*/
private ConcurrentHashMap<String, NetWorkRequest> cachedRequests;
/**
* 暂存的请求响应
*/
private ConcurrentHashMap<String, ConcurrentLinkedQueue<NetWorkRespond>> cachedResponds;
/**
* address对应SelectKeys映射表
*/
private ConcurrentHashMap<String, SelectionKey> cacheKeys;
public NioProcessor(int processId) throws Exception {
System.out.println(">>>>>>>>>>>>NioProcessor 解析注册Processor已初始化");
this.processId = processId;
this.channelQueue = new ConcurrentLinkedQueue<>();
this.cachedRequests = new ConcurrentHashMap<>();
this.cachedResponds = new ConcurrentHashMap<>();
this.cacheKeys = new ConcurrentHashMap<>();
this.selector = Selector.open();
}
/**
* @description: 加入需要注册监听的通道
* @param: * @param: channel
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-18 1:32
*/
public void addChannel(SocketChannel channel) {
channelQueue.offer(channel);
System.out.println(">>>>>>>>>>>>Processsor 加入Channel");
selector.wakeup();
}
@Override
public void run() {
//这边我们就需要将加入的监听Channnel加入到Selector中
System.out.println(">>>>>>>>>>>>NioProcessor 已启动");
while (true) {
try {
//注册等待处理的连接
registerQueuedClients();
//处理返回响应
cacheQueuedResponse();
//处理连接的请求解析
process();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @description: 事件监听及相关请求解析
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-18 12:27
*/
private void process() throws Exception {
int select = selector.select(Constant.POLL_BLOCK_MAX_TIME);
if (select > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//防止事件重复处理
iterator.remove();
if (key.isReadable()) {
System.out.println("****************触发Read事件");
//这里的channel都是读写事件,Accept在NioServer中已经处理
SocketChannel channel = (SocketChannel) key.channel();
//获取当前请求的远端地址标识,作为请求缓存的Key
String client = channel.getRemoteAddress().toString();
NetWorkRequest request = null;
if (cachedRequests.contains(client)) {
//说明还要一个请求没有完整读完
request = cachedRequests.get(client);
} else {
request = new NetWorkRequest();
}
request.setChannel(channel);
request.setClient(client);
request.setProcessorId(processId);
request.read();
if (request.complete()) {
//如果读取完整,就要放入请求队列中,让IOThread解析
NetworkRequestQueue.get().offer(request);
//从缓存中移除该请求
cachedRequests.remove(client);
//未读完的数据等待下次再次读
cacheKeys.put(client, key);
} else {
//请求未解析完整,继续关注Read请求
cachedRequests.put(client, request);
}
}
if (key.isWritable()) {
System.out.println("****************触发Write事件");
SocketChannel channel = (SocketChannel) key.channel();
String client = channel.getRemoteAddress().toString();
ConcurrentLinkedQueue<NetWorkRespond> responds = cachedResponds.get(client);
NetWorkRespond respond = null;
while (responds != null && (respond = responds.poll()) != null) {
ByteBuffer content = respond.getContent();
while (content.hasRemaining()) {
channel.write(content);
}
}
//最好手动触发下取消Write,防止死循环打死CPU
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
}
}
}
/**
* @description: 注册等待监听的连接
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-18 12:20
*/
private void registerQueuedClients() throws Exception {
SocketChannel socketChannel = null;
while ((socketChannel = channelQueue.poll()) != null) {
//注册读取事件
System.out.println(">>>>>>>>>>>>Processsor 注册注册读写事件");
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
/**
* @description: 暂存返回的响应,这里是一个请求对应一个响应
* @param: * @param:
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-18 16:14
*/
private void cacheQueuedResponse() throws Exception {
NetWorkRespond respond = null;
while ((respond = NetWorkRespondQueue.get().poll(processId)) != null) {
//放入缓存
ConcurrentLinkedQueue<NetWorkRespond> respondQueue = cachedResponds.get(respond.getClient());
if (respondQueue == null) {
respondQueue = new ConcurrentLinkedQueue<>();
cachedResponds.put(respond.getClient(), respondQueue);
}
respondQueue.offer(respond);
cacheKeys.get(respond.getClient()).interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
}
NioIoThread
package org.zymf.nio.example3.server;
import org.zymf.nio.example3.constant.Constant;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @author zhuyuemufeng
* @version 1.0
* @description: TODO
* @date 2021-07-18 0:42
*/
public class NioIoThread extends Thread {
private NetworkRequestQueue requestQueue;
public NioIoThread() {
System.out.println(">>>>>>>>>>>>NioIoThread 本地磁盘线程已初始化");
this.requestQueue = NetworkRequestQueue.get();
}
@Override
public void run() {
System.out.println(">>>>>>>>>>>>NioIoThread 已启动");
while (true) {
try {
NetWorkRequest request = requestQueue.poll();
if (request == null) {
Thread.sleep(200);
continue;
}
int type = request.getType();
if (type == Constant.REQUEST_SEND_FILE) {
writeFileToLocalDisk(request);
}
if (type == Constant.REQUEST_READ_FILE) {
readFileFromLocalDisk(request);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @description: 将本地缓冲写入本地磁盘
* @param: * @param: request
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-18 15:08
*/
private void writeFileToLocalDisk(NetWorkRequest request) {
FileOutputStream outputStream = null;
FileChannel fileChannel = null;
NetWorkRespond respond = new NetWorkRespond();
respond.setClient(request.getClient());
try {
outputStream = new FileOutputStream(Constant.BASE_DISK_DIR + request.getFileName());
fileChannel = outputStream.getChannel();
ByteBuffer buffer = request.getCachedFileBuffer();
buffer.flip();
//文件写入
int writeCount = 1;
while (writeCount > 0){
writeCount = fileChannel.write(buffer);
}
//封装响应
byte[] bytes = "success".getBytes();
//32位请求ID + 4位响应状态 + 4位响应长度 + 响应长度
ByteBuffer content = ByteBuffer.allocate(32 + 4 + 4 + bytes.length);
content.put(request.getRequestId().getBytes());
content.putInt(Constant.READ_STATUS_SUCCESS);
content.putInt(bytes.length);
content.put(bytes);
content.flip();
respond.setContent(content);
}catch (Exception e){
e.printStackTrace();
byte[] bytes = "fail".getBytes();
ByteBuffer content = ByteBuffer.allocate(4 + bytes.length);
content.putInt(Constant.READ_STATUS_FAIL);
content.put(bytes);
content.flip();
respond.setContent(content);
}finally {
try {
outputStream.close();
fileChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
NetWorkRespondQueue.get().offer(request.getProcessorId(),respond);
}
/**
* @description: 读取本地文件请求
* @param: * @param: request
* @return: void
* @author zhuyuemufeng
* @date: 2021-07-18 15:38
*/
private void readFileFromLocalDisk(NetWorkRequest request) {
String fileName = request.getFileName();
String realFilePath = Constant.BASE_DISK_DIR + fileName;
FileInputStream inputStream = null;
FileChannel fileChannel = null;
NetWorkRespond respond = new NetWorkRespond();
respond.setClient(request.getClient());
try {
inputStream = new FileInputStream(realFilePath);
fileChannel = inputStream.getChannel();
int length = (int) new File(realFilePath).length();
ByteBuffer buffer = ByteBuffer.allocate(32 + 8 + length);
buffer.put(request.getRequestId().getBytes());
buffer.putInt(Constant.READ_STATUS_SUCCESS);
buffer.putInt(length);
int readCount = 1;
while (readCount > 0){
readCount = fileChannel.read(buffer);
}
buffer.flip();
respond.setContent(buffer);
}catch (Exception e){
ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.putInt(Constant.READ_STATUS_FAIL);
buffer.flip();
respond.setContent(buffer);
}
NetWorkRespondQueue.get().offer(request.getProcessorId(),respond);
}
}