紧跟前文,先把前面缺的例子补全,接着一步一步分析。
这里的内容比较少,直接以注释的形式补充在每一行上,有必要的地方会在底下补上图文。
public void dispatch(SelectionKey key) throws IOException, InterruptedException {
if (key.isAcceptable()) {
//前文中已经轮训过一遍updateList,通过
//ski.channel.translateAndSetReadyOps已经将准备好的操作set到key中,
//这里的key有4种操作OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT,
//这里只有ServerSocketChannel支持accept操作,对应的操作如最底下
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//accept操作必定是ServerSocketChannel
// 接受一个连接. 底下详细讲
SocketChannel sc = ssc.accept();
// 对新的连接的channel注册read事件.
sc.configureBlocking(false);
//这里的selector是另一个了
sc.register(readBell.getSelector(), SelectionKey.OP_READ);
// 如果读取线程还没有启动,那就启动一个读取线程.
synchronized(NioServer.this) {
if (!NioServer.this.isReadBellRunning) {
NioServer.this.isReadBellRunning = true;
new Thread(readBell).start();
}
}
} else if (key.isReadable()) {
// 这是一个read事件,并且这个事件是注册在socketchannel上的.
SocketChannel sc = (SocketChannel) key.channel(); //这个channel就是 ssc.accept()对应的那个
// 写数据到buffer
int count = sc.read(temp);
if (count < 0) {
// 客户端已经断开连接.
key.cancel();
sc.close();
return;
}
// 切换buffer到读状态,内部指针归位.
temp.flip();
String msg = Charset.forName("UTF-8").decode(temp).toString();
System.out.println("Server received ["+msg+"] from client address:" + sc.getRemoteAddress());
Thread.sleep(1000);
// echo back.
sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
// 清空buffer
temp.clear();
}
}
}
核心语句 ssc.accept()
public SocketChannel accept() throws IOException {
synchronized (lock) {//加锁
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
try {
begin();//和下面的end()配对,用于线程被中断时关闭channel,这里另开一篇再讲
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
n = accept0(this.fd, newfd, isaa);//native函数,解释:
// Accepts a new connection, setting the given file descriptor to refer to
// the new socket and setting isaa[0] to the socket's remote address.
// Returns 1 on success, or IOStatus.UNAVAILABLE (if non-blocking and no
// connections are pending) or IOStatus.INTERRUPTED.
if ((n == IOStatus.INTERRUPTED) && isOpen())//如果状态是INTERRUPTED且channel还是open的则继续尝试
continue;
break;
}
} finally {
thread = 0;
end(n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
//新建一个socketchannel供read/write操作使用
sc = new SocketChannelImpl(provider(), newfd, isa);
//IP端口权限检测
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
}
}
就绪操作与通道对应关系:
OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ | |
---|---|---|---|---|
客户端 SocketChannel | Y | Y | Y | |
服务端 ServerSocketChannel | Y | |||
服务端 SocketChannel | Y | Y |