NIO selector原理浅析

无阻塞io是使用单线程或者只使用少量的多线程,每个连接共用一个线程,当处于等待(没有事件)的时候线程资源可以释放出来处理别的请求,通过事件驱动模型当有accept/read/write等事件发生后通知(唤醒)主线程分配资源来处理相关事件。java.nio.channels.Selector就是在该模型中事件的观察者,可以将多个SocketChannel的事件注册到一个Selector上,当没有事件发生时Selector处于阻塞状态,当SocketChannel有accept/read/write等事件发生时唤醒Selector。

这个Selector是使用了单线程模型,主要用来描述事件驱动模型,要优化性能需要一个好的线程模型来使用,目前比较好的nio框架有Netty,apache的mina等。线程模型这块后面再分享,这里重点研究Selector的阻塞和唤醒原理。

先看一段简单的Selector使用的代码

selector = Selector.open();

ServerSocketChannel ssc = ServerSocketChannel.open();

ssc.configureBlocking(false);

ssc.socket().bind(new InetSocketAddress(port));

ssc.register(selector, SelectionKey.OP_ACCEPT);

while (true) {

// select()阻塞,等待有事件发生唤醒

int selected = selector.select();

if (selected > 0) {

Iterator selectedKeys = selector.selectedKeys().iterator();

while (selectedKeys.hasNext()) {

SelectionKey key = selectedKeys.next();

if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {

// 处理 accept 事件

} else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {

// 处理 read 事件

} else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {

// 处理 write 事件

}

selectedKeys.remove();

}

}

}

代码中关键的几个点在:

Selector.open();

selector.select();

阻塞后唤醒可以通过注册在selector上的socket有事件发生 或者 selector.select(timeOut)超时 或者 selector.wakeup()主动唤醒;

整个阻塞和唤醒的过程涉及到的点非常多,先上一张梳理出的整体图,再进入源码会比较容易理解

现在通过openjdk中的源码来解析上图中的每一个环节:

1. Selector.open()

Selector.java

-----

public static Selector open() throws IOException {

return SelectorProvider.provider().openSelector();

}

先看看SelectorProvider.provider()做了什么:

SelectorProvider.java

-----

public static SelectorProvider provider() {

synchronized (lock) {

if (provider != null)

return provider;

return (SelectorProvider)AccessController

.doPrivileged(new PrivilegedAction() {

public Object run() {

if (loadProviderFromProperty())

return provider;

if (loadProviderAsService())

return provider;

provider = sun.nio.ch.DefaultSelectorProvider.create();

return provider;

}

});

}

}

其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;

这里主要以windows的实现来梳理整个流程,拿到provider后来看openSelector()中的实现

WindowsSelectorProvider.java

----

public AbstractSelector openSelector() throws IOException {

return new WindowsSelectorImpl(this);

}

WindowsSelectorImpl.java

----

WindowsSelectorImpl(SelectorProvider sp) throws IOException {

super(sp);

pollWrapper = new PollArrayWrapper(INIT_CAP);

wakeupPipe = Pipe.open();

wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

// Disable the Nagle algorithm so that the wakeup is more immediate

SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();

(sink.sc).socket().setTcpNoDelay(true);

wakeupSinkFd = ((SelChImpl)sink).getFDVal();

pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

}

这段代码中做了如下几个事情

Pipe.open()打开一个管道(打开管道的实现后面再看);拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里;

那么为什么需要一个管道,这个管道是怎么实现的?接下来看Pipe.open()做了什么

Pipe.java

----

public static Pipe open() throws IOException {

return SelectorProvider.provider().openPipe();

}

同样,SelectorProvider.provider()也是获取操作系统相关的实现

SelectorProvider.java

----

public Pipe openPipe() throws IOException {

return new PipeImpl(this);

}

这里还是看windows下的实现

PipeImpl.java

----

PipeImpl(final SelectorProvider sp) throws IOException {

try {

AccessController.doPrivileged(new Initializer(sp));

} catch (PrivilegedActionException x) {

throw (IOException)x.getCause();

}

}

创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法

PipeImpl.Initializer

-----

public Object run() throws IOException {

ServerSocketChannel ssc = null;

SocketChannel sc1 = null;

SocketChannel sc2 = null;

try {

// loopback address

InetAddress lb = InetAddress.getByName("127.0.0.1");

assert(lb.isLoopbackAddress());

// bind ServerSocketChannel to a port on the loopback address

ssc = ServerSocketChannel.open();

ssc.socket().bind(new InetSocketAddress(lb, 0));

// Establish connection (assumes connections are eagerly

// accepted)

InetSocketAddress sa

= new InetSocketAddress(lb, ssc.socket().getLocalPort());

sc1 = SocketChannel.open(sa);

ByteBuffer bb = ByteBuffer.allocate(8);

long secret = rnd.nextLong();

bb.putLong(secret).flip();

sc1.write(bb);

// Get a connection and verify it is legitimate

for (;;) {

sc2 = ssc.accept();

bb.clear();

sc2.read(bb);

bb.rewind();

if (bb.getLong() == secret)

break;

sc2.close();

}

// Create source and sink channels

source = new SourceChannelImpl(sp, sc1);

sink = new SinkChannelImpl(sp, sc2);

} catch (IOException e) {

try {

if (sc1 != null)

sc1.close();

if (sc2 != null)

sc2.close();

} catch (IOException e2) { }

IOException x = new IOException("Unable to establish"

+ " loopback connection");

x.initCause(e);

throw x;

} finally {

try {

if (ssc != null)

ssc.close();

} catch (IOException e2) { }

}

return null;

}

这里即为上图中最下面那部分创建pipe的过程,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。

source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0))

PollArrayWrapper.java

----

private AllocatedNativeObject pollArray; // The fd array

// Adds Windows wakeup socket at a given index.

void addWakeupSocket(int fdVal, int index) {

putDescriptor(index, fdVal);

putEventOps(index, POLLIN);

}

// Access methods for fd structures

void putDescriptor(int i, int fd) {

pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);

}

void putEventOps(int i, int event) {

pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);

}

这里将source的POLLIN事件标识为感兴趣的,当sink端有数据写入时,source对应的文件描述符wakeupSourceFd就会处于就绪状态

Java代码  收藏代码

AllocatedNativeObject.java

----

class AllocatedNativeObject extends NativeObject

AllocatedNativeObject(int size, boolean pageAligned) {

super(size, pageAligned);

}

NativeObject.java

----

protected NativeObject(int size, boolean pageAligned) {

if (!pageAligned) {

this.allocationAddress = unsafe.allocateMemory(size);

this.address = this.allocationAddress;

} else {

int ps = pageSize();

long a = unsafe.allocateMemory(size + ps);

this.allocationAddress = a;

this.address = a + ps - (a & (ps - 1));

}

}

从以上可以看到pollArray是通过unsafe.allocateMemory(size + ps)分配的一块系统内存

到这里完成了Selector.open(),主要完成建立Pipe,并把pipe的wakeupSourceFd放入pollArray中,这个pollArray是Selector的枢纽。这里是以Windows的实现来看,在windows下通过两个链接的socketChannel实现了Pipe,linux下则是直接使用系统的pipe。

2. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

AbstractSelectableChannel.java --> register() --> SelectorImpl.java

----

protected final SelectionKey register(AbstractSelectableChannel ch,int ops,Object attachment)

{

if (!(ch instanceof SelChImpl))

throw new IllegalSelectorException();

SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);

k.attach(attachment);

synchronized (publicKeys) {

implRegister(k);

}

k.interestOps(ops);

return k;

}

关键是implRegister(k);

WindowsSelectorImpl.java

----

protected void implRegister(SelectionKeyImpl ski) {

growIfNeeded();

channelArray[totalChannels] = ski;

ski.setIndex(totalChannels);

fdMap.put(ski);

keys.add(ski);

pollWrapper.addEntry(totalChannels, ski);

totalChannels++;

}

PollArrayWrapper.java

----

void addEntry(int index, SelectionKeyImpl ski) {

putDescriptor(index, ski.channel.getFDVal());

}

这里把socketChannel的文件描述符放到pollArray中。

3. selector.select();

SelectorImpl.java

----

public int select(long timeout) throws IOException

{

if (timeout < 0)

throw new IllegalArgumentException("Negative timeout");

return lockAndDoSelect((timeout == 0) ? -1 : timeout);

}

private int lockAndDoSelect(long timeout) throws IOException {

synchronized (this) {if (!isOpen())              throw new ClosedSelectorException();          synchronized (publicKeys) {              synchronized (publicSelectedKeys) {                  return doSelect(timeout);              }          }      }  }

其中的doSelector又回到我们的Windows实现:

WindowsSelectorImpl.java

----

protected int doSelect(long timeout) throws IOException {

if (channelArray == null)

throw new ClosedSelectorException();

this.timeout = timeout; // set selector timeout

processDeregisterQueue();

if (interruptTriggered) {

resetWakeupSocket();

return 0;

}

// Calculate number of helper threads needed for poll. If necessary

// threads are created here and start waiting on startLock

adjustThreadsCount();

finishLock.reset(); // reset finishLock

// Wakeup helper threads, waiting on startLock, so they start polling.

// Redundant threads will exit here after wakeup.

startLock.startThreads();

// do polling in the main thread. Main thread is responsible for

// first MAX_SELECTABLE_FDS entries in pollArray.

try {

begin();

try {

subSelector.poll();

} catch (IOException e) {

finishLock.setException(e); // Save this exception

}

// Main thread is out of poll(). Wakeup others and wait for them

if (threads.size() > 0)

finishLock.waitForHelperThreads();

} finally {

end();

}

// Done with poll(). Set wakeupSocket to nonsignaled  for the next run.

finishLock.checkForException();

processDeregisterQueue();

int updated = updateSelectedKeys();

// Done with poll(). Set wakeupSocket to nonsignaled  for the next run.

resetWakeupSocket();

return updated;

}

private int poll() throws IOException{ // poll for the main thread

return poll0(pollWrapper.pollArrayAddress,

Math.min(totalChannels, MAX_SELECTABLE_FDS),

readFds, writeFds, exceptFds, timeout);

}

private native int poll0(long pollAddress, int numfds, int[] readFds, int[] writeFds, int[] exceptFds, long timeout);

其他的都是一些准备工作,关键是subSelector.poll(),最后调用了native的poll0,并把pollWrapper.pollArrayAddress作为参数传给poll0,那么poll0对pollArray做了什么:

WindowsSelectorImpl.c

----

Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,

jlong pollAddress, jint numfds,

jintArray returnReadFds, jintArray returnWriteFds,

jintArray returnExceptFds, jlong timeout)

{

// 代码.... 此处省略一万字

/* Call select */

if ((result = select(0 , &readfds, &writefds, &exceptfds, tv)) == SOCKET_ERROR) {

// 代码.... 此处省略一万字

for (i = 0; i < numfds; i++) {

// 代码.... 此处省略一万字

}

}

}

代码已经忘得差不多了,但这里可以看到实现思路是调用c的select方法,这里的select对应于内核中的sys_select调用,sys_select首先将第二三四个参数指向的fd_set拷贝到内核,然后对每个被SET的描述符调用进行poll,并记录在临时结果中(fdset),如果有事件发生,select会将临时结果写到用户空间并返回;当轮询一遍后没有任何事件发生时,如果指定了超时时间,则select会睡眠到超时,睡眠结束后再进行一次轮询,并将临时结果写到用户空间,然后返回。

这里的select就是轮询pollArray中的FD,看有没有事件发生,如果有事件发生收集所有发生事件的FD,退出阻塞。

关于select系统调用参考了《select、poll、epoll的比较》这篇文章,同时看到nio的select在不同平台上的实现不同,在linux上通过epoll可以不用轮询,在第一次调用后,事件信息就会与对应的epoll描述符关联起来,待的描述符上注册回调函数,当事件发生时,回调函数负责把发生的事件存储在就绪事件链表中,最后写到用户空间。

到这里已经比较清楚了,退出阻塞的方式有:regist在selector上的socketChannel处于就绪状态(放在pollArray中的socketChannel的FD就绪) 或者 第1节中放在pollArray中的wakeupSourceFd就绪。前者(socketChannel)就绪唤醒应证了文章开始的阻塞->事件驱动->唤醒的过程,后者(wakeupSourceFd)就是下面要看的主动wakeup。

4. selector.wakeup()

WindowsSelectorImpl.java

----

public Selector wakeup() {

synchronized (interruptLock) {

if (!interruptTriggered) {

setWakeupSocket();

interruptTriggered = true;

}

}

return this;

}

// Sets Windows wakeup socket to a signaled state.

private void setWakeupSocket() {

setWakeupSocket0(wakeupSinkFd);

}

private native void setWakeupSocket0(int wakeupSinkFd);

native实现摘要:

WindowsSelectorImpl.c

----

Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,

jint scoutFd)

{

/* Write one byte into the pipe */

send(scoutFd, (char*)&POLLIN, 1, 0);

}

这里完成了向最开始建立的pipe的sink端写入了一个字节,source文件描述符就会处于就绪状态,poll方法会返回,从而导致select方法返回。(原来自己建立一个socket链着自己另外一个socket就是为了干这事)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容

  • /Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home...
    光剑书架上的书阅读 3,856评论 2 8
  • 转至元数据结尾创建: 董潇伟,最新修改于: 十二月 23, 2016 转至元数据起始第一章:isa和Class一....
    40c0490e5268阅读 1,678评论 0 9
  • jHipster - 微服务搭建 CC_简书[https://www.jianshu.com/u/be0d56c4...
    quanjj阅读 794评论 0 2
  • ``` /* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject ...
    非专业码农阅读 331评论 0 0
  • 今天是三八妇女节。嘻嘻,很高兴地说,抢到了不少红包,而且,还有单独包给我的红包,是我的哥哥和嫂子给我的,我非常感谢...
    醒来的自己阅读 172评论 0 0