1、nettey的线程模型
1.1、Rector模式
Reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。
单线程模型:
单线程模型下,所有的IO操作都由同一个Reactor线程来完成,其主要职责如下:
- 作为服务端,接收客户端的TCP连接;
- 作为客户端,向服务端发起TCP连接;
- 读取通信对端的请求或者应答消息;
- 向通信对端发送消息请求或者应答消息。
Reactor单线程模型原理图如下:
如图所示,由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞。通常Reactor线程中聚合了多路复用器负责监听网络事件,当有新连接到来时,触发连接事件,Disdatcher负责使用Acceptor接受客户端连接,建立通信链路;当I/O事件就绪后,Disdatcher负责将事件分发到对应的event handler上负责处理。
该模型的缺点很明显,不适用于高负载、高并发的应用场景;由于只有一个Reactor线程,一旦挂彩,整个系统通信模块将不可用。
多线程模型:
该模型的特点:
- 专门由一个Reactor线程-Acceptor线程用于监听服务端,接收客户端连接请求;
- 网络I/O操作读、写等由Reactor线程池负责处理;
- 一个Reactor线程可同时处理多条链路,但一条链路只能对应一个Reactor线程,这样可避免并发操作问题。
绝大多数场景下,Reactor多线程模型都可以满足性能需求,但是,在极个别特殊场景中,一个Reactor线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。因此,诞生了第三种线程模型。
主从多线程模型:
该模型的特点:
- 服务端使用一个独立的主Reactor线程池来处理客户端连接,当服务端收到连接请求时,从主线程池中随机选择一个Reactor线程作为Acceptor线程处理连接;
- 链路建立成功后,将新创建的SocketChannel注册到sub reactor线程池的某个Reactor线程上,由它处理后续的I/O操作。
具体介绍请参照:https://www.jianshu.com/p/b4de9b85c79d
1.2、netty中的Reactor模式
Netty同时支持Reactor单线程模型 、Reactor多线程模型和Reactor主从多线程模型,用户可根据启动参数配置在这三种模型之间切换。Netty线程模型原理图如下:
2、NioEventLoopGroup相关源码学习
NioEventLoopGroup类继承图:
2.1、NioEventLoopGroup功能介绍
NioEventLoopGroup主要包含两方面的功能:
- 注册Channel,主要是将Channel与某个线程池绑定等;
- 线程池管理及任务管理等;
2.1.1、注册Channel
NioEventLoopGroup主要通过EventLoopGroup的register()方法进行Channel注册,具体实现是通过EventLoop的register()将Channel注册到对应EventLoop的Selector上,由Selector来调度Channel的相关事件,如读、写、Accept等事件。
而EventLoopGroup的设计是,它包含多个EventLoop(每一个EventLoop通常内部包含一个线程),在执行上述注册过程中是需要选择其中的一个EventLoop来执行上述注册行为,这里就出现了一个选择策略的问题,该选择策略接口是EventExecutorChooser,你也可以自定义一个实现。
2.1.2、线程池管理及任务执行
EventLoopGroup继承了EventExecutorGroup,EventExecutorGroup也是EventExecutor的集合,EventExecutorGroup也是掌管着EventExecutor的初始化工作,EventExecutorGroup对于Runnable任务的执行也是选择内部中的一个EventExecutor来做具体的执行工作。
netty中很多任务都是异步执行的,一旦当前线程要对某个EventLoop执行相关操作,如注册Channel到某个EventLoop,如果当前线程和所要操作的EventLoop内部的线程不是同一个,则当前线程就仅仅向EventLoop提交一个注册任务,对外返回一个ChannelFuture。
2.2、NioEventLoopGroup源码学习
NioEventLoopGroup中主要有以下实现:
2.2.1、线程组创建及初始化
主要是调用父类MultithreadEventLoopGroup的构造方法进行初始,而MultithreadEventLoopGroup又调用MultithreadEventExecutorGroup对线程组进行初始化,其中包括EventExecutor数组初始化、EventExecutorChooser初始化及相关线程监听器初始化等;
2.2.2、IoRatio初始化
源码如下:
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}
其主要是调用NioEventLoop的方法对执行时间占比进行设置;
2.2.3、重建selector
源码如下:
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}
主要是调用NioEventLoop的方法进行重建;
2.2.3、创建子执行器
源码如下:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoopGroup创建NioEventLoop类型的子执行器;
2.3、MultithreadEventExecutorGroup源码学习
2.3.1、基本属性
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
children:EventExecutor类型的子执行器数组
readonlyChildren:只读执行器
terminatedChildren:已终止的执行器个数;
terminationFuture:线程终止异步结果
chooser:子执行器选择器;
2.3.2、构造函数
实现源码:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
处理流程:
- 初始化NioEventLoop的执行线程,本处执行线程类型为:ThreadPerTaskExecutor,顾名思义,既每个任务一个线程,以下为任务提交处理源码:
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
由以上源码可知,每个任务都会创建一个线程进行执行。
- 执行器组初始化,其调用子类的newChild()方法获取执行器,具体为调用NioEventLoopGroup的newChild()方法创建EventLoop类型的执行器。
- 初始化执行器选择器,默认选择器实现是通过DefaultEventExecutorChooserFactory的newChooser()获取的,其有两种具体的实现:PowerOfTwoEventExecutorChooser(基于原子自增通线程数做与运算获取),GenericEventExecutorChooser(做取模运算获取);
- 给每个子执行器添加终止监听器,当所有子执行器都有终止时,设置NioEventLoopGroup的终止异步结果。
2.3.3、其他实现
- 获取下个执行器:通过chooser获取
public EventExecutor next() {
return chooser.next();
}
- 获取执行器个数:
public final int executorCount() {
return children.length;
}
2.4、其他实现类
MultithreadEventLoopGroup:
- 默认执行器组个数初始化:
主要对默认执行器组的个数进行初始化,默认为cpu的processor个数的2倍。
- Channel注册:
具体调用EventLoop的register()方法将Channel注册到EventLoop中。
3、NioEventLoop相关源码学习
NioEventLoop类继承图:
3.1、NioEventLoop功能介绍
NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O读写之外,还兼顾处理以下两类任务:
- 系统Task:通过调用NioEventLoop的execute(Runnable task)方法实现,Netty有很多系统Task,创建它们的主要原因是:当I/O线程和用户线程同时操作网络资源时,为防止并发操作导致的锁的竞争,将用户线程的操作封装成Task放入消息队列中,由I/O线程复制执行,这样就实现了局部无锁化。
- 定时任务:通过调用NioEventLoop的schedule(Runnable command,long delay,TimeUnit unit)方法实现。
3.2、NioEventLoop源码学习
3.2.1、基本属性
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
private final IntSupplier selectNowSupplier;
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
SELECTOR_AUTO_REBUILD_THRESHOLD:重建selector的select次数上限;
selectNowSupplier:selectNow()的包装实现;
selector:本EventLoop的Selector;
unwrappedSelector:未包装的jdk原生Selector;
selectedKeys:Selector获取到的SelectedSelectionKeySet;
provider:Selector的提供者;
wakenUp:Selector的wakeup标志;
selectStrategy:select的处理策略;
ioRatio:I/O任务与其他任务执行时间占比;
cancelledKeys:已取消的keys计数;
needsToSelectAgain:是否需要再次select;
3.2.2、构造函数
NioEventLoop构造函数源码:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
provider :默认实现为SelectorProvider.provider()(在NioEventLoopGroup中设定)即调用jdk的默认实现;
openSelector:构建本EventLoop对应的Selector,selectorTuple包含原生unwrappedSelector和netty包装实现的selector。
selectStrategy:select的处理策略,其具体实现类为:DefaultSelectStrategy(在NioEventLoopGroup中设定);
3.2.3、主要方法实现解析
3.2.3.1、openSelector()实现
openSelector()实现源码:
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
主要处理流程:
- 调用provider(SelectorProvider)的openSelector()方法获取Selector,即调用jdk的SelectorProvider的实现获取Selector;
- DISABLE_KEYSET_OPTIMIZATION为true表示禁止SelectedKeys的优化处理,则直接返回原生的Selector实现。
- 利用AccessController获取平台相关的Selector实现类:sun.nio.ch.SelectorImpl;
- 如果未获取到sun.nio.ch.SelectorImpl类型的实现或其不是jdk原生Selector实现或其子类实现,则直接返回原生的Selector实现;
- 利用AccessController实现sun.nio.ch.SelectorImpl的包装器类,将sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys属性替换为本地的SelectedSelectionKeySet,这样实现将Selector实现中的底层SelectedSelectionKeySet替换为NioEventLoop中的属性。
- Selector的包装器类实现失败,则返回jdk原生的Selector实现,否则返回包装后的本地实现SelectedSelectionKeySetSelector。
3.2.3.2、run()实现
run()实现源码:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
主要处理流程:
- 通过selectStrategy.calculateStrategy()方法决定select的处理方式,当EventLoop中有未处理的任务时时,直接调用selectorNowSupplier.get(),其实际调用selectNow(),而selectNow()实际调用selector.selectNow()这个非阻塞方法;
- 当返回值为SelectStrategy.CONTINUE则继续进行下一次处理,当为SelectStrategy.SELECT时立即进行select()操作。
- 其他情况下,则调用processSelectedKeys()处理I/O任务,调用runAllTask()处理EventLoop中的任务。
3.2.3.3、select()实现
select实现源码:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
主要处理流程:
- 通过delayNanos()获取处理的超时时间,当有任务时,获取的是任务的超时时间;当无任务时,获取的默认超时时间1秒;
- 当超时时间小于或等于0,当未进行select操作,立即进行selectNow()操作,然后则退出循环;
- 若EventLoop有任务,且wakeUp为false,则立即进行selectNow操作并退出循环;
- 进行selector.select()操作,直到有I/O事件或超时,select计数加1;
- 当有I/O事件或被用户唤醒或有任务需要处理,则终止循环;
- 当处理未超时,则初始化select计数为1,并继续循环处理;否则若select计数大于重建selector的限值,表示遇到了select的bug,则调用rebuildSelector()进行selector的重建,并立即调用selectcNow()进行处理。
3.2.3.4、rebuildSelector()实现
rebuildSelector0()源码实现:
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
主要处理流程:
- 调用openSelector()获取新的SelectorTuple对象;
- 遍历就的Selector中的SelectionKey,获取感兴趣的事件,对旧的SelectionKey做取消操作;
- 将原Channel注册到新的Selector中,若SelectionKey中的attachment为AbstractNioChannel,则设置其对于的selectionKey为新的;
- 若注册失败则关闭对于的Channel或发起ChannelUnregistered事件。
- 将EventLoop中的selector及unwrappedSelector替换为新的,并关闭就的Selector。
3.2.3.5、I/O事件处理实现
processSelectedKeys()源码实现:
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
当DISABLE_KEYSET_OPTIMIZATION为false,即启用了Keyset优化处理并优化处理成功,则调用processSelectedKeysOptimized()对I/O事件进行处理,否则调用processSelectedKeysPlain(selector.selectedKeys())对I/O事件进行处理;
processSelectedKeysOptimized()实现源码:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
processSelectedKeysPlain()实现源码:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
processSelectedKeysOptimized()与processSelectedKeysPlain()处理大同小异,主要是遍历事件的SelectedKeys对事件进行处理;
3.2.3.6、任务处理实现
实现源码:
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
主要处理流程:
- 调用fetchFromScheduledTaskQueue()获取任务,并将任务填充到taskQueue中,直到taskQueue填满;
- 调用runAllTasksFrom()批量处理任务;
4、NioEventLoopGroup启动流程分析
启动调用流程如下:
up中的newChild()新建事件循环器并组成group;
(4)、当用户调用通过Bootstrap进行bind或connect操作时,实际是向NioEventLoopGroup中添加任务;
(5)、最终向NioEventLoop中添加任务,即调用SingleThreadEventExecutor的execute()添加任务;
(6)、execute()中会调用startThread()启动线程;
(7)、startThread()中调用NioEventLoop的run()方法;
(8)、NioEventLoop的run()中循环进行I/O事件处理及任务处理;
相关阅读:
Netty源码愫读(一)ByteBuf相关源码学习 【https://www.jianshu.com/p/016daa404957】
Netty源码愫读(二)Channel相关源码学习【https://www.jianshu.com/p/02eac974258e】
Netty源码愫读(三)ChannelPipeline、ChannelHandlerContext相关源码学习【https://www.jianshu.com/p/be82d0fcdbcc】
Netty源码愫读(四)ChannelHandler相关源码学习【https://www.jianshu.com/p/6ee0a3b9d73a】
Netty源码愫读(六)ServerBootstrap相关源码学习【https://www.jianshu.com/p/a71a9a0291f3】
参考书籍:
《Netty实战》
《Netty权威指南》
参考博客: