在操作系统的底层异步通过信号量、信息等方式得到广泛应用。但在主流高级编程语言中,基于程序员的思维方式异步并不多见。
异步I/O的优势
- 良好的用户体验:在浏览器中,JavaScript 的执行与UI渲染共用一个线程,所以异步可消除UI阻塞的问题;第二当页面所需的资源路劲较多时,可通过异步并发提升资源获取总时长。
- 资源分配:并发操作之前主流的做法是多线程(或进程)并行,但多线程会存在死锁及状态同步等问题,多进程的资源开销也很大。Node 通过类似工作子进程(线程)的概念来实现异步I/O。
异步I/O的实现
实现原理
- 基于轮询:read / select / poll / epoll (轮询-休眠-通过事件唤醒) / kqueue (FreeBSD 下的 epoll)
- 基于事件:程序发起非阻塞调用后,继续向下执行,当异步操作完成后通过信号或回调函数将数据回传给程序。实现:Linux 下的 AIO,但仅支持内核I/O。
Node中异步I/O的实现
在 libuv 0.1中:Node 在 *unix 平台下自行设计线程池来模拟异步I/O; 在 Window 下通过操作系统层的 IOCP (内部也是线程池原理) 进行异步操作。通过 libuv 实现抽象兼容。在libuv 1.0 中: 两个平台都是用统一的线程池来处理异步任务。所以 Node 的单线程只是 JavaScript 执行在单线程中,内部完成异步任务都需要线程池的支持。
Node 的异步I/O
事件循环
Node 进程启动时会创建一个循环,每执行一次循环体的过程称为 Tick,每个 Tick 的过程就是查看是否有事件或关联的回调函数待处理,有执行,无进入下个循环。如果判断不再有任何事件需要处理则退出进程。
观察者
Tick 通过 观察者
来获知是否有事件需要处理。每个事件循环中可存在多个观察者。
请求对象
在调用 Node 异步接口的时候,会创建一个请求对象用于保存所有异步调用的状态,包括等待线程池分配执行,操作完成后的回调函数。
这里以 fs.open
为例
内部调用代码:
fs.open = function(path, flags, mode, callback_) {
var callback = makeCallback(arguments[arguments.length - 1]);
mode = modeNum(mode, 0o666);
if (!nullCheck(path, callback)) return;
var req = new FSReqWrap();
req.oncomplete = callback;
binding.open(pathModule._makeLong(path),
stringToFlags(flags),
mode,
req);
};
其中 FSReqWrap
即为请求对象。其继承自 ReqWrap
class FSReqWrap: public ReqWrap<uv_fs_t> {
public:
enum Ownership { COPY, MOVE };
inline static FSReqWrap* New(Environment* env,
Local<Object> req,
const char* syscall,
const char* data = nullptr,
enum encoding encoding = UTF8,
Ownership ownership = COPY);
inline void Dispose();
};
uv_fs_t
结构体:
/* uv_fs_t is a subclass of uv_req_t. */
struct uv_fs_s {
UV_REQ_FIELDS
uv_fs_type fs_type;
uv_loop_t* loop;
uv_fs_cb cb;
ssize_t result;
void* ptr;
const char* path;
uv_stat_t statbuf; /* Stores the result of uv_fs_stat() and uv_fs_fstat(). */
UV_FS_PRIVATE_FIELDS
};
#define UV_REQ_FIELDS \
/* public */ \
void* data; \
/* read-only */ \
uv_req_type type; \
/* private */ \
void* active_queue[2]; \
void* reserved[4]; \
UV_REQ_PRIVATE_FIELDS \
oncomplete
用于保存回调函数。
执行回调函数
当线程池执行完响应的操作后,将处理结果存储在 req->result
中,发起回调,再调用存储在 reqWrap 中的回调函数
req_wrap->MakeCallback(env->oncomplete_string(), argc, argv);
非 I/O 的异步 API
定时器 setTimeout() / setInterval()
setTimeout() 和 setInterval() 被调用时会创建一个uv_timer插入到定时器观察者
中。每次 Tick 执行时,会从观察者中迭代取出uv_timer,检测其是否超过定时时间,如果超过,就形成事件,执行回调函数。正是基于该原理所以 JavaScript 中的定时器是不精确的。
通过 lib/internel/bootstrap_node.js
下 setupGlobalTimeouts
函数引入,具体处理逻辑在 lib/timers.js
中。
function setupGlobalTimeouts() {
const timers = NativeModule.require('timers');
global.clearImmediate = timers.clearImmediate;
global.clearInterval = timers.clearInterval;
global.clearTimeout = timers.clearTimeout;
global.setImmediate = timers.setImmediate;
global.setInterval = timers.setInterval;
global.setTimeout = timers.setTimeout;
}
所有特定 timeout 的 timer 存放在一个 timeoutList 中,每个 timeoutList 都绑定一个 c++ 下的 uv_timer
,之后 uv_timer 被加入到 event_loop 的 handle_queue 中,事件循环会检测该 uv_timer 的 timeout 时间是否到了,检测到 timeout 时 uv_timer 会调起 listOnTimeout
函数,去检测对应的 timeoutList 中是否有 timer
定时任务到期需要执行的,有则将其从该定时列表中剔除掉,再执行该 timer 的回调函数。
function listOnTimeout() {
var list = this._list;
var msecs = list.msecs;
var now = TimerWrap.now();
var diff, timer;
while (timer = L.peek(list)) {
diff = now - timer._idleStart;
// Check if this loop iteration is too early for the next timer.
// This happens if there are more timers scheduled for later in the list.
if (diff < msecs) {
// 由于该定时列表中较早加入的timer到期,`内部timer` 调起该list的回调函数,其他后加入的timer就被提早通知,
// 发起 `内部核心timer` 新的 start, 等待事件下次调起
}
// The actual logic for when a timeout happens.
L.remove(timer);
assert(timer !== L.peek(list));
if (!timer._onTimeout) continue;
// 调用实际的回调函数
// 当回调函数执行出错时,在下一个 tick 重新执行该 timeoutList 的 listOnTimeout 函数
tryOnTimeout(timer, list);
setInterval() 与 setTimeout() 使用相同的逻辑处理,只是当 timeout 执行完回调函数后,检测到 timer 的 _repeat
属性不为空,则再将其加入 timeoutList 中。
未搞懂的地方:内部 uv_timer 与 事件轮询的调用逻辑
setImmediate()
所有 immediate 被存放到一个 immediateQueue
的 linked list 中, 事件循环执行到 uv_check
,回调到 processImmediate
函数
unction createImmediate(args, callback) {
// declaring it `const immediate` causes v6.0.0 to deoptimize this function
var immediate = new Immediate();
immediate._callback = callback;
immediate._argv = args;
immediate._onImmediate = callback;
if (!process._needImmediateCallback) {
process._needImmediateCallback = true;
process._immediateCallback = processImmediate;
}
immediateQueue.append(immediate);
return immediate;
}
processImmediate 函数中检测 immediateQueue 中待执行的任务并执行。
function processImmediate() {
var immediate = immediateQueue.head;
var tail = immediateQueue.tail;
var domain;
// Clear the linked list early in case new `setImmediate()` calls occur while
// immediate callbacks are executed
immediateQueue.head = immediateQueue.tail = null;
while (immediate) {
domain = immediate.domain;
if (!immediate._onImmediate) {
immediate = immediate._idleNext;
continue;
}
if (domain)
domain.enter();
immediate._callback = immediate._onImmediate;
// Save next in case `clearImmediate(immediate)` is called from callback
var next = immediate._idleNext;
tryOnImmediate(immediate, tail);
if (domain)
domain.exit();
// If `clearImmediate(immediate)` wasn't called from the callback, use the
// `immediate`'s next item
if (immediate._idleNext)
immediate = immediate._idleNext;
else
immediate = next;
}
// Only round-trip to C++ land if we have to. Calling clearImmediate() on an
// immediate that's in |queue| is okay. Worst case is we make a superfluous
// call to NeedImmediateCallbackSetter().
if (!immediateQueue.head) {
process._needImmediateCallback = false;
}
}
注意:《深入浅出Node.js》书中说:setImmediate() 在每轮循环中执行链表中的一个回调函数,这里看到 while (immediate)
, 所以 node v6.x (事实上从 v0.12.x 开始)中已经不是这样的了。以下代码可用来校验。
// 用于校验的代码
process.nextTick(function () {
console.log('nextTick -- 1')
})
setImmediate(function () {
console.log('setImmediate -- 1')
process.nextTick(function () {
console.log('nextTick -- run')
})
})
setImmediate(function () {
console.log('setImmediate -- 2')
})
process.nextTick(function () {
console.log('nextTick -- 2')
})
console.log('start')
/* log
start
nextTick -- 1
nextTick -- 2
setImmediate -- 1
setImmediate -- 2
nextTick -- run
*/
process.nextTick()
调用 process.nextTick() 时,会将相关参数 push 到 nextTickQueue
中,然后当事件循环进入下一个 tick 时调用 _tickCallback
函数
创建一个 nextTick 回调任务
function nextTick(callback) {
if (typeof callback !== 'function')
throw new TypeError('callback is not a function');
// on the way out, don't bother. it won't get fired anyway.
if (process._exiting)
return;
var args;
if (arguments.length > 1) {
args = new Array(arguments.length - 1);
for (var i = 1; i < arguments.length; i++)
args[i - 1] = arguments[i];
}
nextTickQueue.push({
callback,
domain: process.domain || null,
args
});
tickInfo[kLength]++;
}
处理 nextTickQueue 中的回调任务,这里 while (tickInfo[kIndex] < tickInfo[kLength])
可以看出在一个 tick 中是一次性处理掉所有的当前 nextTickQueue 中的任务。
function _tickCallback() {
var callback, args, tock;
do {
while (tickInfo[kIndex] < tickInfo[kLength]) {
tock = nextTickQueue[tickInfo[kIndex]++];
callback = tock.callback;
args = tock.args;
// Using separate callback execution functions allows direct
// callback invocation with small numbers of arguments to avoid the
// performance hit associated with using `fn.apply()`
_combinedTickCallback(args, callback); // 执行 nextTcik 的回调
if (1e4 < tickInfo[kIndex])
tickDone();
}
tickDone();
_runMicrotasks(); // 重新启动 v8 的微任务
emitPendingUnhandledRejections(); // 处理当前 tick 中的 promise 任务
} while (tickInfo[kLength] !== 0);
}
Promise
Promise 任务同 nextTick 一样都是通过 v8 MicroTasks 来执行