原文链接:Understanding Node.js Event-Driven Architecture
作者:Samer Buna
译者:李序锴
本文已获得作者授权,转载请注明出处。
大多数的Node对象(如HTTP requests, responses以及streams)都实现了EventEmitter模块,这样它们就能够触发和监听事件。
事件驱动特性最简单的形式就是通用的Node.js函数当中的一些回调风格(例如:fs.readFile)。在这个类比中,事件会立即启动(当Node准备调用回调时)而回调则充当事件处理器的角色。
当你准备好请调用我,Node!
Node处理异步事件的最初方式就是通过回调。而这已经是JavaScript拥有原生promises支持以及async/await特性之前很就以前的事情了。
回调基本上都是你传递给其它函数的函数。这可能是因为在JavaScript当中函数是第一类对象。
回调并不会在代码当中注明它是异步调用,理解这一点很关键。函数可以通过同步和异步两种方式触发回调。例如:如下是一个宿主函数fileSize,它接收一个回调函数cb,根据条件的不同它可以执行同步和异步两种方式的回调。
function fileSize (fileName, cb) {
if (typeof fileName !== 'string') {
return cb(new TypeError('argument should be string')); // Sync
}
fs.stat(fileName, (err, stats) => {
if (err) { return cb(err); } // Async
cb(null, stats.size); // Async
});
}
但是这是一种会导致预料之外错误的糟糕实践。它使得宿主函数总是以同步或者异步方式执行回调。
我们来探寻一个使用回调风格书写的异步Node函数的典型例子。
const readFileAsArray = function(file, cb) {
fs.readFile(file, function(err, data) {
if (err) {
return cb(err);
}
const lines = data.toString().trim().split('\n');
cb(null, lines);
});
};
readFileAsArray接受文件路径和回调函数作为参数。其读取文件内容并将文件内容分割为数组,再利用该数组调用回调函数。
下方是一个应用实例。假设我们在同级目录下有一个numbers.txt文件,其内容如下:
10
11
12
13
14
15
假设我们的任务是统计该文件当中的奇数个数,我们可以使用readFileAsArray来简化代码:
readFileAsArray('./numbers.txt', (err, lines) => {
if (err) throw err;
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(n => n%2 === 1);
console.log('Odd numbers count:', oddNumbers.length);
});
这段代码将数字内容读取成了字符串数组,之后解析成数字,再之后统计了奇数的数量。
Node回调风格在此处获得了充分应用。该回调的第一个参数是可为空的err参数,我们将该回调作为最后一个参数传递给宿主函数。由于用户的阅读习惯问题,因此你应该在你的函数一直按这种形式书写。让回调作为宿主函数的的最后一个参数,将error对象作为回调函数的第一个参数。
新版JavaScript对于回调的替代形式
在新版JavaScript当中,我们有了promise对象。在异步APIs中Promises可以作为异步回调的一种替代形式。promise对象允许我们分别处理success和error的cases而非在同一处同时传递callback和error两个参数,并且promise也允许我们串联多重异步调用而不是进行嵌套。
如果readFileAsArray函数支持promises,我们可以做如下应用:
readFileAsArray('./numbers.txt')
.then(lines => {
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(n => n%2 === 1);
console.log('Odd numbers count:', oddNumbers.length);
})
.catch(console.error);
我们没有传递callback函数,而是调用了.then函数作为宿主函数的的返回值。这个.then函数通常能让我们达到跟利用带有callback函数的代码同样的效果,而且我们也能够像之前一样在其上做处理。为了处理errors,我们在末尾添加了.catch代码块,当发生错误时我们利用.catch代码块进行处理。
由于新Promise对象的存在,让宿主函数在新版JavaScript支持promise接口变得更加容易。修改如下的readFileAsArray函数让其支持promise接口,以及支持之前已经支持的callback接口。
const readFileAsArray = function(file, cb = () => {}) {
return new Promise((resolve, reject) => {
fs.readFile(file, function(err, data) {
if (err) {
reject(err);
return cb(err);
}
const lines = data.toString().trim().split('\n');
resolve(lines);
cb(null, lines);
});
});
};
因此我们让函数返回一个Promise对象,这个Promise对象包含着fs.readFile异步回调。该promise对象对外暴露两个参数(一个resolve函数以及一个reject函数)。
我们总是会运用promise的reject函数执行回调来处理error,同时也总是利用resolve函数执行回调来处理data。
另外我们在这个例子当中需要为回调参数设置一个默认值,因为这段代码有可能会被用于promise接口。我们可以使用一种简单的空函数作为默认值,如:() => {}。
以async/await方式执行promises
当需要循环嵌套异步函数时,添加promise接口会让你的代码更易于维护。回调则会让情况变得复杂。
Promises稍稍改善了一些这种状况,而函数生成器则带来更多的优化。即是说处理异步代码更新近的替代方式是使用async函数,它能让我们像是以一种同步的方式处理异步的代码,这回让代码更具有可读性。
我们通过async/await方式执行readFileAsArray函数,代码如下:
async function countOdd () {
try {
const lines = await readFileAsArray('./numbers');
const numbers = lines.map(Number);
const oddCount = numbers.filter(n => n%2 === 1).length;
console.log('Odd numbers count:', oddCount);
} catch(err) {
console.error(err);
}
}
countOdd();
我们先创建了一个异步函数,就是在普通函数的function之前加上关键字async。在这个异步函数当中,我们调用readFileAsArray函数(假设它返回了lines变量),为了让这种方式生效,我们使用关键字await。之后,我们继续执行代码就如同对readFileAsArray进行同步调用一样。
为了让代码顺利运行,我们执行异步函数。这让代码变得非常简洁且易于阅读。而为了能够处理errors,我们需要将异步调用包裹在try/catch语句中。
通过这种async/await特性,我们就不必使用其它特殊的API(例如.then和.catch)。我们仅仅需要特别标记一下函数就可以使用纯粹的JavaScript进行编程。
我们可以在任何支持promise接口的函数当中使用async/await特性,但是我们不能将其用于回调风格的异步函数之中(例如:setTimeout)。
EventEmitter模块
EventEmitter是一种支持Node当中对象间通信的模块。EventEmitter是Node异步事件驱动架构的核心。很多Node内置模块都是继承自EventEmitter。
概念很简单:emitter对象发出已经命名好的事件,该事件会触发之前注册好的监听器。因此,一个emitter对象通常有两个主要特性:
- 发出命名事件
- 注册和注销监听函数
为了理解EventEmitter,我们创建一个继承自EventEmitter的类(class)
class MyEmitter extends EventEmitter {
}
Emitter对象是我们通过MyEmitter类(继承自类EventEmitter)实例化生成的。
const myEmitter = new MyEmitter();
在这些emitter对象生命周期的任何阶段,我们都可以通过emit函数发射我们想要发射的任何命名事件。
myEmitter.emit('something-happened');
单次事件的触发表明已经满足某些条件。该条件在触发对象中通常是一种状态变化。
我们可以通过on方法添加监听函数,每当发射器对象触发相关联的命名事件时这些监听函数都会执行。
事件!==异步
我们来看一个例子:
const EventEmitter = require('events');
class WithLog extends EventEmitter {
execute(taskFunc) {
console.log('Before executing');
this.emit('begin');
taskFunc();
this.emit('end');
console.log('After executing');
}
}
const withLog = new WithLog();
withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));
withLog.execute(() => console.log('*** Executing task ***'));
WithLog类是一个事件发射器。它定义了一个函数执行的实例。该执行函数接受一个参数(一个任务函数)并用日志声明包裹该执行函数。这些日志声明会在函数执行前后触发。
为了查看函数的执行顺序,我们在两个命名事件上添加了监听器,最后会执行一个样本任务以启动其它函数。
如下是函数的输出结果:
Before executing
About to execute
*** Executing task ***
Done with execute
After executing
对于这些输出结果我希望你注意到它们都是同步执行的。这段代码当中没有异步操作。
- 首先输出了"Before executing"
- 以begin命名的事件输出了"About to execute"
- 实际执行行(hang)之后输出了"Executing task"
- 以end命名的事件输出了"Done with execute"
- 最后我们得到了"After executing"
如同plain-old回调一样,事件跟代码是同步还是异步执行没有什么关联。
这点很重要,因为如果我们传入异步taskFunc函数去执行的话,事件的触发将会变得不够精准。
我们可以用带有setImmediate的调用来模拟上面的例子:
// ...
withLog.execute(() => {
setImmediate(() => {
console.log('*** Executing task ***')
});
});
输出就会变成像下面这样:
Before executing
About to execute
Done with execute
After executing
*** Executing task ***
这是错误的。异步调用(其调用了"Done with execute"和"After executing")之后的那几行已经不再准确。
为了在一个异步函数执行完成之后触发事件,我们需要结合基于事件通信的回调机制。下面的例子会进行说明。
使用事件而非回调的好处在于我们可以通过定义多个监听器对同一信号响应多次。用回调实现相同功能的话,我们需要在单个回调当中书写更多的逻辑。事件是一种很好的实现方式,它让应用程序能够通过多个外部插件在其核心之上构建功能。你可以将它们当做hook points,它们会为状态变化作特定的记录。
异步事件
我们现在将这个同步的简单例子改写成更加实用的异步代码。
const fs = require('fs');
const EventEmitter = require('events');
class WithTime extends EventEmitter {
execute(asyncFunc, ...args) {
this.emit('begin');
console.time('execute');
asyncFunc(...args, (err, data) => {
if (err) {
return this.emit('error', err);
}
this.emit('data', data);
console.timeEnd('execute');
this.emit('end');
});
}
}
const withTime = new WithTime();
withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));
withTime.execute(fs.readFile, __filename);
WithTime类执行了asyncFunc并通过使用console.time以及console.timeEnd记录了asyncFunc的执行时间。在程序执行前后,它触发了事件执行的正确顺序。同时利用error/data事件去处理异步调用当中的常见信号。
我们传入fs.readFile函数(它是异步函数)来测试withTime发射器。而非用回调来处理文件,如此我便能够监听数据对象了。
当执行这段代码时,我们得到事件的正确执行顺序。不出所料,我们获得了指定代码的执行时间,这很有用处:
About to execute
execute: 4.507ms
Done with execute
我们怎样结合带有事件监听器的回调来实现呢?如果asynFunc函数也支持promises的话,我们可以使用async/await特性来实现相同的功能:
class WithTime extends EventEmitter {
async execute(asyncFunc, ...args) {
this.emit('begin');
try {
console.time('execute');
const data = await asyncFunc(...args);
this.emit('data', data);
console.timeEnd('execute');
this.emit('end');
} catch(err) {
this.emit('error', err);
}
}
}
我不太清楚你的情况,但是相较于基于回调和带有.then/.catch的代码段我觉得以上代码更容易理解。async/await特性让我们更接近于JavaScript语言本身,我认为这是一个重大进展。
事件参数和错误
在前面的例子当中,有两个事件都是由额外的参数来触发。
error事件是通过一个error对象触发。
this.emit('error', err);
data事件则是由一个data对象触发。
this.emit('data', data);
我们可以根据需要在命名事件之后使用任意数量的参数,所有参数在我们为这些命名事件注册的监听函数当中都可以使用。
例如,为了处理data事件,我们注册的监听事件将能够获得我们传递给被发射事件的数据参数,而其就是asyncFunc函数暴露的数据对象。
withTime.on('data', (data) => {
// do something with data
});
error事件通常是一种特殊情况。在基于回调的例子当中,如果我们不使用监听器处理error事件的话,node进程实际上会退出。
为了说明这种情况,我们用一个恶性参数对执行方法做再一次调用:
class WithTime extends EventEmitter {
execute(asyncFunc, ...args) {
console.time('execute');
asyncFunc(...args, (err, data) => {
if (err) {
return this.emit('error', err); // Not Handled
}
console.timeEnd('execute');
});
}
}
const withTime = new WithTime();
withTime.execute(fs.readFile, ''); // BAD CALL
withTime.execute(fs.readFile, __filename);
如上代码的第一次执行会引起错误。node进程将会崩溃并退出:
events.js:163
throw er; // Unhandled 'error' event
^
Error: ENOENT: no such file or directory, open ''
第二次执行会受到崩溃影响根本不会继续往下执行。
如果为该特殊error事件注册监听器的话,node进程的行为将会发生变化。例如:
withTime.on('error', (err) => {
// do something with err, for example log it somewhere
console.log(err)
});
如果执行以上代码,第一次执行的错误会被播报,但是node进程不会崩溃和退出。其它的执行调用会正常结束:
{ Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
execute: 4.276ms
现在基于promise的函数会有不同的行为并且只是输出警告,但是最终会发生变化:
UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''
DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
处理已触发错误当中异常的另一种方式是为全局uncaughtException进程事件注册一个监听器。然而,全部捕捉errors是一种糟糕的想法。
对uncaughtException的通常建议是避免使用它,但是如果非用不可的话(播报发生的情况或者直接清除),你应该直接让进程退出。
process.on('uncaughtException', (err) => {
// something went unhandled.
// Do any cleanup and exit anyway!
console.error(err); // don't do just that.
// FORCE exit the process too.
process.exit(1);
});
然而,想象一下多个error事件在同一时间发生。这意味着上面的uncaughtException监听器会启动多次,对于cleanup代码这会是个问题。一个典型的例子就是有多次调用用于数据库关闭操作。
EventEmitter模块对外暴露一个一次性的方法。该方法只会启动一次监听器,不会每次都进行响应。因此,这是一个使用uncaughtException的实际用例,因为通过第一个未捕捉的异常我们将会进行清理操作,而且无论如何我们都将退出进程。
监听器的顺序
如果我们为同一事件注册了多个监听器,这些监听器会按照顺序执行。注册的第一个监听器将会第一个执行。
// प्रथम
withTime.on('data', (data) => {
console.log(`Length: ${data.length}`);
});
// दूसरा
withTime.on('data', (data) => {
console.log(`Characters: ${data.toString().length}`);
});
withTime.execute(fs.readFile, __filename);
上方代码中带有"Length"的那一行会先于带有"Characters"的那一行执行,因为这是我们定义监听器的顺序。
如果你需要定义一个新监听器但是需要该监听器第一个执行,你可以使用prependListener方法:
// प्रथम
withTime.on('data', (data) => {
console.log(`Length: ${data.length}`);
});
// दूसरा
withTime.prependListener('data', (data) => {
console.log(`Characters: ${data.toString().length}`);
});
withTime.execute(fs.readFile, __filename);
上方代码会先执行带有"Character"的那一行。
最后,如果需要移除一个监听器,你可以使用removeListener方法。