概述
代码实现方式:
- BIO:服务端阻塞式监听到一个客户端,就单独开启一个子线程(或者丢到线程池)阻塞式的监听客户端的消息,客户端连接成功以后,也是阻塞式的监听服务端写入的消息。
- NIO:服务端把自己绑定在Selector上,然后重写4个方法监听和响应客户端的Accept()、read()、write()的事件,循环遍历Selector的事件,来响应客户端的行为
- AIO:服务器端绑定一个ConnectHandler,连接以后,给该线程绑定一个ReadHandler,和一个WriteHandler,涉及的类比较多,功能单一。
特点:
- 复杂度: BIO < NIO < AIO
- 线程的占用:NIO最少 < AIO < BIO
- 响应的即时性:BIO最快 > AIO > NIO最慢
阻塞式多线程TCPsocket编程原理
- ServerSocket创建TCP的服务器,调用阻塞式的accept()方法监听连接的客户端。
- 客户端和服务端通过Socket对象的输入流、输出流进行内容交互 socket.getInputStream()和socket.getOutputStream()
BIO
服务器端:用线程池去管理多个接入的客户端,把接收到的客户端Socket作为参数,用处理类在单独的线程里面和客户端交互
- 初始化服务端
- while(true)+ socket.accept的方式,阻塞式的等待新客户的接入
- 一旦有一个先Clinet接入的话,则从线程池中创建一个线程,来和Client交互,一般是读取信息,也可以输出固定的信息,比如服务器已经收到
- 工作子线程也是while(true)+buf.readLine(),阻塞和循环式的读取Clinet发过来的消息
public static void main(String[] args) {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
try {
ServerSocket socket = new ServerSocket(5000);
while (true) {
Socket client = socket.accept();
newFixedThreadPool.execute(new ClientHandler(client));
}
}
catch (IOException ioe) {
ioe.printStackTrace();
}
}
交互类:
public class ClientHandler implements Runnable {
private Socket client = null;
private String address;
//通过构造函数注入接收到的客户端Socket对象
public ClientHandler(Socket client) {
this.client = client;
}
@Override
public void run() {
try {
String host = client.getInetAddress().toString();
String port = Integer.toString(client.getPort());
PrintStream out = new PrintStream(client.getOutputStream());
System.out.println("Get Connection");
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
address = host + ":" + port;
System.out.println("get connection from" + address);
while (true) {
//阻塞式遍历读取客户端发过来的消息
String str = buf.readLine();
System.out.println(str);
if (str != null) {
out.println(str);
out.flush();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
客户端:
public class TcpClient1 {
public static void main(String[] args) throws IOException {
//尝试连接服务端 127.0.1.1:5000的服务器
Socket client = new Socket("127.0.0.1", 5000);
//获取输出流,往服务端写数据
PrintStream out = new PrintStream(client.getOutputStream());
BufferedReader buf = new BufferedReader(new InputStreamReader(client.getInputStream()));
String[] msgs = {"你好,我是client1"};
for (String msg : msgs) {
out.println(msg);
out.flush();
while (true) {
String echo = buf.readLine();
if (echo != null) {
System.out.println(echo);
}
}
}
}
}
NIO
NIO server端的代码逻辑
- 创建和初始化Selector和ServerSocketChannel,给Server绑定Accept事件
- 主线程While(true) 从selector中读取发生的事件,一旦获取到就触发监听
- 如果阻塞过程中触发了事件,就遍历事件按照事件的类型来处理(连接事件、读取事件、写事件),处理完成删除
- 处理连接操作,就可以从key里面获取到Server,从Server.accept返回ClientChannel,就可以和客户端端交互了
服务端的操作(读取客户端的消息,和往客户端的Channel里面写消息)
- 获取到ClientChannel需要首先注册READ的监听,就可以读取到客户端发过来的消息,socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
- 也可以直接往clientChannel里面写内容, socketChannel.write(ByteBuffer.wrap(("welcome".getBytes())));
客户端的操作:
- 初始化客户端和Selector,类似的绑定OP_CONNECT的监听,需要在监听事件中 finishConnect()的方法才算连接成功
- 连接成功之后,要在Client端绑定OP_READ监听,这样如果Server往里面写数据,就可以读取到。 socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer);
- 也可以直接往Channel里面写数据,这样服务端也可以收的到 channel.write(ByteBuffer.wrap(new String("say hi from client").getBytes()));
相关代码
- 服务端代码
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(10083));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ServerSelectorProtocol protocol = new ServerSelectorProtocol(BUFF_SIZE);
while (true) {
selector.select();
Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
while (keyIter.hasNext()) {
SelectionKey key = keyIter.next();
keyIter.remove();
if (key.isAcceptable()){
protocol.handleAccept(key);
}
if (key.isReadable()){
protocol.handleRead(key);
}
}
}
}
- 服务端处理类ServerSelectorProtocol
public void handleAccept(SelectionKey key) throws IOException {
System.out.println("Accept");
//根据key.channel() 获取Server,再根据Server.accept()获取Clinet
SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
socketChannel.configureBlocking(false);
//附着一个ByteBuffer,把接收到的客户端socketChannel也注册在selector上,监听客户端channel的read操作
socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
System.out.println("create new session from "+socketChannel.getRemoteAddress()+"\n");
//往ClinetChannle里面写数据
socketChannel.write(ByteBuffer.wrap(("welcome".getBytes())));
}
public void handleRead(SelectionKey key) throws IOException {
SocketChannel clntChan = (SocketChannel) key.channel();
//获取该信道所关联的附件,这里为缓冲区
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.clear();
long bytesRead = clntChan.read(buf);
//如果read()方法返回-1,说明客户端关闭了连接,那么客户端已经接收到了与自己发送字节数相等的数据,可以安全地关闭
if (bytesRead == -1){
clntChan.close();
}else if(bytesRead > 0){
buf.flip();
String result = "";
while(bytesRead>0){
byte[] data = buf.array();
result+=new String(data);
bytesRead = clntChan.read(buf);
}
System.out.println(result);
}
}
- 客户端代码
public static void main(String[] args) throws IOException {
SocketChannel clientChannel = SocketChannel.open();
Selector selector = Selector.open();
ClientHandler handler=new ClientHandler();
clientChannel.configureBlocking(false);
clientChannel.connect(new InetSocketAddress("127.0.0.1", 10083));
clientChannel.register(selector, SelectionKey.OP_CONNECT);
while (true) {
selector.select();
Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
while (keyIter.hasNext()) {
SelectionKey key = keyIter.next();
keyIter.remove();
if (key.isConnectable()){
handler.handleConnect(key);
}
if (key.isReadable()){
handler.handleRead(key);
}
}
}
}
- 客户端Handler
public void handleConnect(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
//如果正在连接,则完成连接
if(channel.isConnectionPending()){
channel.finishConnect();
}
System.out.println("Connected");
//ServerChannel的key,有accept接入
SocketChannel socketChannel = (SocketChannel) key.channel();
socketChannel.configureBlocking(false);
//附着一个ByteBuffer,把接收到的客户端socketChannel也注册在selector上,监听客户端channel的read操作
socketChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFF_SIZE));
channel.write(ByteBuffer.wrap(new String("say hi from client").getBytes()));
}
public void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate(200);
channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println("recevie message from server:, size:" + buffer.position() + " msg: " + message);
}
AIO Socket编程
- AIO Socket编程支持2种方式,将来式和回调式,我演示的是回调式
- 参与的类比较多,但是职责单一,AcceptHandler监听客户的连接行为,ReadHandler监听客户的写入行为,WriteHandler没什么用,只是作为服务端向客户端写消息成功后,系统调用成功的反馈
- 因为服务端的行为很多都是要重复的,而回调函数都是一次性的,比如异步监听客户端的accept、read行为,所以每次回调之后都需要重复绑定
服务端:
- Server 服务创建和初始化AsynchronousServerSocketChannel,并且绑定AcceptHandler,监听客户的连接行为
- AcceptHandler 监听到客户连接后,获取到客户的AsynchronousSocketChannel,然后对客户绑定ReadHandler
- ReadHandler 监听客户往服务端发消息,读取到之后,会发起一个反馈消息,告知客户端已经收到你的消息,此时服务端写消息,需要绑定一个WriteHandler作为写入完成的反馈
- WriteHandler 因为服务端要往客户端写反馈消息,该写入操作需要有个异步回调操作,WriteHandler 里面输出 “写入完成”
public class Server {
public final static int PORT = 8001;
public final static String IP = "127.0.0.1";
public static void main(String[] args) throws IOException {
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP,PORT));
System.out.println("Server listen on "+PORT);
server.accept(null,new AcceptHandler(server));
while(true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
private AsynchronousServerSocketChannel server = null;
AcceptHandler(AsynchronousServerSocketChannel server){
this.server=server;
}
@Override
public void completed(AsynchronousSocketChannel socket, Server attachment) {
server.accept(null,this);
try {
System.out.println("有客户端连接:" + socket.getRemoteAddress().toString());
} catch (IOException e1) {
e1.printStackTrace();
}
startRead(socket);
}
@Override
public void failed(Throwable exc, Server attachment) {}
public void startRead(AsynchronousSocketChannel socket) {
ByteBuffer clientBuffer = ByteBuffer.allocate(1024);
ReadHandler rd=new ReadHandler(socket);
socket.read(clientBuffer, clientBuffer, rd);
try {
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ReadHandler implements CompletionHandler<Integer,ByteBuffer>{
private AsynchronousSocketChannel socket;
public String msg;
public ReadHandler(AsynchronousSocketChannel socket) {
this.socket = socket;
}
private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
//回调函数里面的buf参数,就是Client写的内容,调用 msg=decoder.decode(buf).toString(); 读取客户端的内容
@Override
public void completed(Integer i, ByteBuffer buf) {
if (i > 0) {
socket.read(buf, buf, this);
buf.flip();
try {
msg=decoder.decode(buf).toString();
System.out.println("收到" +socket.getRemoteAddress().toString() + "的消息:" + msg);
buf.compact();
} catch (CharacterCodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
try {
write(socket);
} catch (UnsupportedEncodingException ex) {
Logger.getLogger(ReadHandler.class.getName()).log(Level.SEVERE, null, ex);
}
} else if (i == -1) {
try {
System.out.println("客户端断线:" + socket.getRemoteAddress().toString());
buf = null;
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) { }
public void write(AsynchronousSocketChannel socket) throws UnsupportedEncodingException{
String sendString="server recieve your message:"+msg;
ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));
socket.write(clientBuffer, clientBuffer, new WriteHandler(socket));
}
}
public class WriteHandler implements CompletionHandler<Integer,ByteBuffer> {
private AsynchronousSocketChannel socket;
public WriteHandler(AsynchronousSocketChannel socket) {
this.socket = socket;
}
@Override
public void completed(Integer i, ByteBuffer buf) {
if (i > 0) {
System.out.println("往客户端发送消息成功");
} else if (i == -1) {
try {
System.out.println("对端断线:" + socket.getRemoteAddress().toString());
buf = null;
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {}
}
客户端
相对来说比较简单,直接用匿名内部类来解决,connect事件绑定一个回调,连接成功后写入一段话,写入成功后,再绑一个read的监听,来接受客户端的消息
public static void main(String[] args) throws IOException {
final AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1",8001);
CompletionHandler<Void, ? super Object> handler = new CompletionHandler<Void,Object>(){
@Override
public void completed(Void result, Object attachment) {
client.write(ByteBuffer.wrap("Hello".getBytes()),null,
new CompletionHandler<Integer,Object>(){
@Override
public void completed(Integer result,Object attachment) {
final ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer,buffer,new CompletionHandler<Integer,ByteBuffer>(){
@Override
public void completed(Integer result,
ByteBuffer attachment) {
buffer.flip();
System.out.println(new String(buffer.array()));
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {}
});
}
@Override
public void failed(Throwable exc, Object attachment) {}
});
}
@Override
public void failed(Throwable exc, Object attachment) {}
};
client.connect(serverAddress, null, handler);
}