概述
ServerSocketChannel主要用于服务端,而在客户端,经常打交道的是SocketChannel,这篇文章将介绍SocketChannel是如何实现的。
实例化
在之前介绍SelectorProvider的时候曾经介绍过,NIO channel的创建都是通过SelecorProvider实现的:
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
当然SocketChannel也提供了快捷方法open:
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
public static SocketChannel open(SocketAddress remote)
throws IOException
{
SocketChannel sc = open();//调用SelectorProvider实现,默认是堵塞的
try {
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}
SocketChannel是个抽象类,SelectorProvider返回的是SocketChannelImpl,继承自SocketChannel;一般情况下,当采用异步方式时,使用不带参数的open方法比较常见,而且会调用configureBlocking设置非堵塞;
SocketChannelImpl构造函数定义如下:
SocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//调用inux的socket函数,true表示TCP
this.fd = Net.socket(true);
//由于FileDescriptor未提供访问fdVal的方法,通过JNI获取
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_UNCONNECTED;//设置状态为未连接
}
connect
调用connect方法连接到远程服务器,其源码如下:
public boolean connect(SocketAddress sa) throws IOException {
int localPort = 0;
//注意加速顺序,整个类保存一致
synchronized (readLock) {
synchronized (writeLock) {
ensureOpenAndUnconnected();//检查连接状态
InetSocketAddress isa = Net.checkAddress(sa);
synchronized (blockingLock()) {
int n = 0;
try {
try {
//支持线程中断,通过设置当前线程的Interruptible blocker属性实现,由于前面已经介绍过多次,此处不再介绍
begin();
synchronized (stateLock) {
//默认为open, 除非调用了close方法
if (!isOpen()) {
return false;
}
//只有未绑定本地地址也就是说未调用bind方法才执行,该方法在ServerSocketChannel中介绍过
if (localAddress == null) {
NetHooks.beforeTcpConnect(fd,
isa.getAddress(),
isa.getPort());
}
//记录当前线程
readerThread = NativeThread.current();
}
for (;;) {
InetAddress ia = isa.getAddress();
if (ia.isAnyLocalAddress())
ia = InetAddress.getLocalHost();
//调用Linux的connect函数实现,如果采用堵塞模式,会一直等待,直到成功或出现异常,后面会介绍
n = Net.connect(fd,
ia,
isa.getPort());
if ( (n == IOStatus.INTERRUPTED)
&& isOpen())
continue;
break;
}
} finally {
//清空readerThread
readerCleanup();
//和begin成对出现,当线程中断时,抛出ClosedByInterruptException
end((n > 0) || (n == IOStatus.UNAVAILABLE));
assert IOStatus.check(n);
}
} catch (IOException x) {
close(); //出现异常,关闭channel
throw x;
}
synchronized (stateLock) {
remoteAddress = isa;
if (n > 0) {//如果连接成功,更新状态为ST_CONNECTED
state = ST_CONNECTED;
//如果未调用bind方法,操作系统内核会自动分配地址和端口;否则返回bind的地址和端口
if (isOpen())
localAddress = Net.localAddress(fd);
return true;
}
//如果是非堵塞模式,而且未立即返回成功,更新状态为ST_PENDING;
//由此可见,该状态只有非堵塞时才会存在
if (!isBlocking())
state = ST_PENDING;
else
assert false;
}
}
return false;
}
}
}
上面的代码中会调用Net.connect方法,该方法最终会调用native方法:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6,
jobject fdo, jobject iao, jint port)
{
SOCKADDR sa;
int sa_len = SOCKADDR_LEN;
int rv;
//地址转换为struct sockaddr格式
if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *) &sa,
&sa_len, preferIPv6) != 0)
{
return IOS_THROWN;
}
//传入fd和sockaddr,与远程服务器建立连接,一般就是TCP三次握手
//如果设置了configureBlocking(false),不会堵塞,否则会堵塞一直到超时或出现异常
rv = connect(fdval(env, fdo), (struct sockaddr *)&sa, sa_len);
if (rv != 0) {//0表示连接成功,失败时通过errno获取具体原因
if (errno == EINPROGRESS) {//非堵塞,连接还未建立(-2)
return IOS_UNAVAILABLE;
} else if (errno == EINTR) {//中断(-3)
return IOS_INTERRUPTED;
}
return handleSocketError(env, errno); //出错
}
return 1;//连接建立,一般TCP连接连接都需要时间,因此除非是本地网络,一般情况下非堵塞模式返回IOS_UNAVAILABLE比较多;
}
从上面可以看到,如果是非堵塞,而且连接未马上建立成功,此时状态为ST_PENDING,那么什么时候会变为ST_CONNECTED呢?是否有什么方法可以查询状态或者等待连接完成呢?
finishConnect
带着上面的问题,我们一起看看finishConnect的实现,代码比较长,我只保留比较重要的部分:
public boolean finishConnect() throws IOException {
synchronized (readLock) {
synchronized (writeLock) {
int n = 0;
try {
try {
synchronized (blockingLock()) {
if (!isBlocking()) {//非堵塞模式
for (;;) {
n = checkConnect(fd, false,
readyToConnect);
if ( (n == IOStatus.INTERRUPTED)
&& isOpen())
continue;
break;//除非被中断,否则退出
}
} else {//堵塞模式
for (;;) {
n = checkConnect(fd, true,
readyToConnect);
if (n == 0) {//除非>0,否则自旋,继续等待
continue;
}
if ( (n == IOStatus.INTERRUPTED)
&& isOpen())
continue;
break;
}
}
}
} finally {
synchronized (stateLock) {
if (state == ST_KILLPENDING) {//调用了close方法
kill();
n = 0;
}
}
}
} catch (IOException x) { //异常发生,关闭channel
close();
throw x;
}
if (n > 0) {//连接成功
synchronized (stateLock) {
state = ST_CONNECTED;//更新状态
if (isOpen())
localAddress = Net.localAddress(fd);
}
return true;
}
return false;
}
}
}
从上面看到,如果是堵塞模式,会一直循环检查状态,直到成功或发生异常;而非堵塞模式下,检查完,马上结束循环;
上面的代码是通过checkConnect检查连接状态,下面看看它是如何实现的:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_SocketChannelImpl_checkConnect(JNIEnv *env, jobject this,
jobject fdo, jboolean block,
jboolean ready)
{
int error = 0;
socklen_t n = sizeof(int);
jint fd = fdval(env, fdo);//获取FileDescriptor中的fd
int result = 0;
struct pollfd poller;
poller.revents = 1;//返回的事件
if (!ready) {
poller.fd = fd;//文件描述符
poller.events = POLLOUT;//请求的事件:写事件
poller.revents = 0;//返回的事件
//第3个参数表示超时时间(毫秒)
//-1表示永远不会超时,0表示立即返回,不阻塞进程
result = poll(&poller, 1, block ? -1 : 0);
if (result < 0) {//小于0表示调用失败
JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
return IOS_THROWN;
}
//非堵塞时,0表示没有准备好的连接
if (!block && (result == 0))
return IOS_UNAVAILABLE;
}
if (poller.revents) {//准备好写或出现错误的socket数量>0
errno = 0;
result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &n);
if (result < 0) {//出错
handleSocketError(env, errno);
return JNI_FALSE;
} else if (error) {//发生错误
handleSocketError(env, error);
return JNI_FALSE;
}
return 1;//socket已经准备好,可写,即连接已经建立好
}
return 0;
}
从上面的源码看到,底层是通过poll查询socket的状态,从而判断连接是否建立成功;
由于在非堵塞模式下,finishConnect方法会立即返回,因此不大建议用循环的方式判断连接是否建立,而是建议注册到Selector,通过ops=OP_CONNECT获取连接完成的SelectionKey,然后调用finishConnect完成连接的建立;
那么finishConnect是否可以不调用呢?答案是否,因为只有finishConnect中会将状态更新为ST_CONNECTED,而在调用read和write时都会对状态进行判断;
另外还有特别说一下的是translateReadyOps方法,在EpollSelectorImpl的doSelect方法中会调用channel的translateAndSetReadyOps方法,在该方法中设置SocketChannel的readyToConnect变量;从上面代码知道,finishConnect的时候,如果发现readyToConnect=true,将不会调用poll来查询状态;