一、概要
前面简单介绍了channel的总体设计,其中channel里面涉及到一个核心的组件EventLoop。EventLoop的主要作用是处理channel的IO操作。
Will handle all the I/O operations for a {@link Channel} once registered.
二、源码分析
EventLoop
EventLoop的作用:
- 处理channel的I/O操作
- 一个EventLoop一般会处理多个channel
/**
* Will handle all the I/O operations for a {@link Channel} once registered.
*
* One {@link EventLoop} instance will usually handle more than one {@link Channel} but this may depend on
* implementation details and internals.
*
*/
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
DefaultEventLoop
DefaultEventLoop是EventLoop的默认实现。我们看下它的类图。
@Override
protected void run() {
for (;;) {
//从队列获取task并执行
Runnable task = takeTask();
if (task != null) {
task.run();
//更新最后执行时间
updateLastExecutionTime();
}
//退出
if (confirmShutdown()) {
break;
}
}
}
核心方法是run的这个方法。这里有几个问题:
- task是什么时候被写入的?
- task到底做了什么事情?
- DefaultEventLoop.run的方法是什么时候执行的?
带着这几个问题,我们继续往下看。
SingleThreadEventExecutor
我们追踪DefaultEventLoop的父类,最终找到task添加的方法。
/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
...
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//添加task
addTask(task);
if (!inEventLoop) {
//启动线程
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
调用execute方法把task添加到队列中。调用execute的方法有点多,我们通过增加日志的方式来看到底添加的task具体是什么东西。
AbstractChannel提交了一个register事件。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
汇总了一下添加的task:
- register
- pipeline的addHandler
- channel的bind事件(绑定地址)
- pipeline.fireChannelActive();(channel激活)
- channel.connect(remoteAddress, connectPromise);(建立连接)
- LocalChannel serve(final LocalChannel peer),最终触发pipeline.fireChannelRead(m)方法
三、总结
EventLoop顾名思义就是"事件循环",通过一个队列和一个线程(loop)实现了事件的提交和事件的执行异步。