本文将继续上一篇《Netty、Redis、Zookeeper高并发实战》(三)整理第三章的内容。
SocketChannel套接字通道
在NIO中,涉及网络连接的通道有两个,一个是SocketChannel负责连接传输,另一个是ServerSocketChannel负责连接的监听。
下面就不逐一的介绍其相应方法了,直接上代码。
使用SocketChannel发送文件的实践
客户端发送文件
package com.netty.echo.nio.niodemo;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Date;
public class NioSendClient {
private static Charset charset = Charset.forName("UTF-8");
public static void sendFile(String path) {
FileChannel fileChannel = null;
SocketChannel socketChannel = null;
try {
File file = new File(path);
if (!file.exists()) {
return;
}
fileChannel = new FileInputStream(file).getChannel();
socketChannel = SocketChannel.open();
socketChannel.socket().connect(new InetSocketAddress("127.0.0.1",8080));
socketChannel.configureBlocking(false);
while (!socketChannel.finishConnect()) {
System.out.println("正在尝试连接...." + new Date().getTime());
}
//发送文件名称
ByteBuffer fileNameByteBuffer = charset.encode(file.getName());
socketChannel.write(fileNameByteBuffer);
//发送文件长度
ByteBuffer fileByteBuffer = ByteBuffer.allocate(1024);
fileByteBuffer.putLong(file.length());
fileByteBuffer.flip();
socketChannel.write(fileByteBuffer);
fileByteBuffer.clear();
//发送文件内容
int length = 0;
while ((length = fileChannel.read(fileByteBuffer)) > 0) {
fileByteBuffer.flip();
socketChannel.write(fileByteBuffer);
fileByteBuffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
closeQuietly(fileChannel);
socketChannel.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
}
closeQuietly(socketChannel);
}
}
public static void closeQuietly(java.io.Closeable o) {
if (null == o) return;
try {
o.close();
} catch (IOException e) {
e.printStackTrace();
}
}
服务端监听链接,接受文件
package com.netty.echo.nio.niodemo;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class NioReceiveServer {
private Charset charset = Charset.forName("UTF-8");
/**
* 服务器端保存的客户端对象,对应一个客户端文件
*/
static class Client {
//文件名称
String fileName;
//长度
long fileLength;
//开始传输的时间
long startTime;
//客户端的地址
InetSocketAddress remoteAddress;
//输出的文件通道
FileChannel outChannel;
}
private ByteBuffer buffer
= ByteBuffer.allocate(10240);
//使用Map保存每个客户端传输,当OP_READ通道可读时,根据channel找到对应的对象
Map<SelectableChannel, Client> clientMap = new HashMap<SelectableChannel, Client>();
public void startServer() throws IOException {
// 1、获取Selector选择器
Selector selector = Selector.open();
// 2、获取通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
// 3.设置为非阻塞
serverChannel.configureBlocking(false);
// 4、绑定连接
InetSocketAddress address
= new InetSocketAddress(8080);
serverSocket.bind(address);
// 5、将通道注册到选择器上,并注册的IO事件为:“接收新连接”
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6、轮询感兴趣的I/O就绪事件(选择键集合)
while (selector.select() > 0) {
// 7、获取选择键集合
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
// 8、获取单个的选择键,并处理
SelectionKey key = it.next();
// 9、判断key是具体的什么事件,是否为新连接事件
if (key.isAcceptable()) {
// 10、若接受的事件是“新连接”事件,就获取客户端新连接
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = server.accept();
if (socketChannel == null) continue;
// 11、客户端新连接,切换为非阻塞模式
socketChannel.configureBlocking(false);
// 12、将客户端新连接通道注册到selector选择器上
SelectionKey selectionKey =
socketChannel.register(selector, SelectionKey.OP_READ);
// 余下为业务处理
Client client = new Client();
client.remoteAddress
= (InetSocketAddress) socketChannel.getRemoteAddress();
clientMap.put(socketChannel, client);
} else if (key.isReadable()) {
processData(key);
}
// NIO的特点只会累加,已选择的键的集合不会删除
// 如果不删除,下一次又会被select函数选中
it.remove();
}
}
}
/**
* 处理客户端传输过来的数据
*/
private void processData(SelectionKey key) throws IOException {
Client client = clientMap.get(key.channel());
SocketChannel socketChannel = (SocketChannel) key.channel();
int num = 0;
try {
buffer.clear();
while ((num = socketChannel.read(buffer)) > 0) {
buffer.flip();
//客户端发送过来的,首先是文件名
if (null == client.fileName) {
// 文件名
String fileName = charset.decode(buffer).toString();
// String destPath = IOUtil.getResourcePath(NioDemoConfig.SOCKET_RECEIVE_PATH);
File directory = new File("C:\\Users\\cnjia\\Desktop\\管理系统接口文档\\123");
if (!directory.exists()) {
directory.mkdir();
}
client.fileName = fileName;
String fullName = directory.getAbsolutePath()
+ File.separatorChar + fileName;
File file = new File(fullName);
FileChannel fileChannel = new FileOutputStream(file).getChannel();
client.outChannel = fileChannel;
}
//客户端发送过来的,其次是文件长度
else if (0 == client.fileLength) {
// 文件长度
long fileLength = buffer.getLong();
client.fileLength = fileLength;
client.startTime = System.currentTimeMillis();
}
//客户端发送过来的,最后是文件内容
else {
// 写入文件
client.outChannel.write(buffer);
}
buffer.clear();
}
// key.cancel();
} catch (IOException e) {
key.cancel();
e.printStackTrace();
return;
}
// 调用close为-1 到达末尾
if (num == -1) {
closeQuietly(client.outChannel);
System.out.println("上传完毕");
key.cancel();
long endTime = System.currentTimeMillis();
}
}
public static void closeQuietly(java.io.Closeable o) {
if (null == o) return;
try {
o.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 入口
*
* @param args
*/
public static void main(String[] args) throws Exception {
NioReceiveServer server = new NioReceiveServer();
server.startServer();
}
}