高阶函数
:把函数参数作为参数,或作为返回值
偏函数
: 将传入参数作为判断或者其他逻辑条件
注意点
异常处理
异步I/O有两个处理阶段,中间有事件循环调度,异步方法在提交请求后就返回了,try/catch只能捕获当次事件循环内的异常,不能捕获callback的异常
try {
req.body = JSON.parse(buf, options.reviver);
//callback()不能放在这,若callback异常,会导致它多次执行
} catch (err) {
err.body = buf;
err.status = 400;
return callback(err);
}
callback();
阻塞代码
这么写会持续占用CPU资源,破坏事件循环的调度,因为Node是单线程,会导致其余请求得不到响应
// TODO
var start = new Date();
while (new Date() - start < 1000) {
// TODO
}
// 需要阻塞的代码
异步编程解决方案
事件
Node中事件的发布通常是伴随事件循环而异步触发的
- 事件与侦听器是多对多的,设置过多的侦听器会导致过多的CPU占用
- 要给EventEmitter对象添加error事件和处理,若触发error事件不处理,会引起线程退出\
- 使用once()添加的侦听器只会执行一次,在执行后就会将相关的事件移除
使用事件的途径
- 实例化events
var events =require(‘events’);
var emitter = new events.EventEmitter();
emitter.on('do',function(value){console.log(value)});
emitter.emit('do','doing');
- 继承events模块
var util = require('util')
var events = require('events');
function Stream() {
events.EventEmitter.call(this);
}
util.inherits(Stream, events.EventEmitter);
Stream.prototype.test = function() {
var self = this;
try {
throw 111;
} catch (error) {
self.emit('error')
}
}
var stream = new Stream();
stream.on('error', function() {
console.error('asdasd')
});
stream.test();
利用Once()解决雪崩问题
- 在高访问量、大并发量的情况下缓存突然失效, 大量的请求同时涌入数据库中,数据库无法承受,影响网站速度
- 在缓存中无数据时,访问量大的话,同条SQL可能会被执行多次,此时可以将所有请求放入事件队列,利用once()来绑定,SQL在执行时,新到的相同调用只需在队列中等待数据,一旦查询结束,得到的结果可以被这些调用共同使用
var proxy = new events.EventEmitter();
var status = "ready";
var select = function(callback) {
proxy.once("selected", callback);
if (status === "ready") {
status = "pending";
db.select("SQL", function(results) {
proxy.emit("selected", results);
status = "ready";
});
}
};
可能会因为侦听器过多引发警号,调用setMaxListeners(0)移除或设置大的阈值
EventProxy 处理多事件对一侦听器,适用于实例化事件
- all() 当每个事件都被触发了才会执行,只执行一次
var proxy = new EventProxy();
proxy.all("template", "data", "resources", function(template, data, resources) { // TODO
});
- tail() 首次也是需要每个事件都被触发,之后只要某个事件触发,就会用最新的数据执行
- after() 事件执行多少次后执行侦听器的单一事件组合订阅
var proxy = new EventProxy();
proxy.after("data", 10, function(datas) { // TODO
});
- not()
- any()
EventProxy对异常处理的优化
- fail()
ep.fail(callback);
//等价于
ep.fail(function(err) {
callback(err);
});
//又等价于
ep.bind('error', function(err) {
// 卸载所有处理函数
ep.unbind();
// 异常回调
callback(err);
});
- done()
ep.done('tpl');
//等价于
function(err, content) {
if (err) {
// 发生异常,交给error事件处理函数处理
return ep.emit('error', err);
}
ep.emit('tpl', content);
}
done()接受函数参数
ep.done(function(content) {
// TODO
// 无需考虑异常
ep.emit('tpl', content);
});
//等价于
function(err, content) {
if (err) {
// 发生异常 给error事件的处理函数处理
return ep.emit('error', err);
}
(function(content) {
// TODO
// 无需考虑异常
ep.emit('tpl', content);
}(content));
}
代码对比
exports.getContent = function(callback) {
var ep = new EventProxy();
ep.all('tpl', function(tpl, data) {
// 成功回调
callback(null, {
template: tpl,
data: data
});
});
// 帧听error事件
ep.bind('error', function(err) {
// 卸载 所有处理函数
ep.unbind();
// 异常回调
callback(err);
});
fs.readFile('template.tpl', 'utf-8', function(err, content) {
if (err) {
// 发生异常,给error事件的处理函数处理
return ep.emit('error', err);
}
ep.emit('tpl', content);
});
};
exports.getContent = function(callback) {
var ep = new EventProxy();
ep.all('tpl', function(tpl, data) {
// 成功回调
callback(null, {
template: tpl,
data: data
});
});
//绑定错误处理函数
ep.fail(callback);
fs.readFile('template.tpl', 'utf-8', ep.done('tpl'));
};
Promise/Deferred(应用参见:es6 Promise)
- 先执行异步调用,后传递处理方法
- then()方法只接受function对象,继续返回promise()对象,可选progress事件传入
- Promise是高级接口,依靠低级接口事件来实现
//then
var Promise = function() {
EventEmitter.call(this);
};
util.inherits(Promise, EventEmitter);
Promise.prototype.then = function(fulfilledHandler, errorHandler, progressHandler) {
if (typeof fulfilledHandler === 'function') {
// 用once()方法 保证成功回调只执行一次
this.once('success', fulfilledHandler);
}
if (typeof errorHandler === 'function') {
// 用once()方法 保证成功回调只执行一次
this.once('error', errorHandler);
}
if (typeof progressHandler === 'function') {
this.on('progress', progressHandler);
}
return this;
};
//实现这些功能的对象被称为Deferred,即延迟对象
var Deferred = function() {
this.state = 'unfulfilled';
this.promise = new Promise();
};
Deferred.prototype.resolve = function(obj) {
this.state = 'fulfilled';
this.promise.emit('success', obj);
};
Deferred.prototype.reject = function(err) {
this.state = 'failed';
this.promise.emit('error', err);
};
Deferred.prototype.progress = function(data) {
this.promise.emit('progress', data);
};
对res改造成promise
Deferred用于内部,维护异步模型的状态,Promise作用于外部,通过then()暴露给外部以添加自定义逻辑
var promisify = function(res) {
var deferred = new Deferred();
var result = '';
res.on('data', function(chunk) {
result += chunk;
deferred.progress(chunk);
});
res.on('end', function() {
deferred.resolve(result);
});
res.on('error', function(err) {
deferred.reject(err);
});
return deferred.promise;//更改内部状态的行为由定义者处理
};
//then调用的是promise
promisify(res).then(function() {
// Done
}, function(err) {
// Error
}, function(chunk) {
// progress
console.log('BODY: ' + chunk);
});
Promise多异步协作,all()
Deferred.prototype.all = function(promises) {
var count = promises.length;
var that = this;
var results = [];
promises.forEach(function(promise, i) {
promise.then(function(data) {
count--;
results[i] = data;
if (count === 0) {
that.resolve(results);
}
}, function(err) {
that.reject(err);
});
});
return this.promise;
};
//all()返回resolve()结果集
Promise链式调用
- 前一个的调用的结果,作为下一个调用的开始,后一个then的回调函数会等待前一个promise的状态变化而运行
将API Promise化
// smooth(fs.readFile);
var smooth = function(method) {
return function() {
var deferred = new Deferred();
var args = Array.prototype.slice.call(arguments, 1); //跳过第一个参数
args.push(deferred.callback());
method.apply(null, args);
return deferred.promise;
};
};
bluebrid的 promisify可以将api promise化
var Promise = require('bluebird')
fs.readFileAsync = Promise.promisify(fs.readFie, fs)
var Promise = require('bluebird')
Promise.promisifyAll(fs)
async
串行执行
:series()
async.series([
function(callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function(callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], function(err, results) {
// results => [file1.txt, file2.txt]
});
传入的callback()不是由使用者指定,callback()执行时将结果保存然后执行下一个调用,最终的回调函数执行时,保存的结果以数组传入,一旦异常结束所有调用,将异常传递给最终函数的第一个参数
并行执行
:parallel()
async.parallel([function(callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function(callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], function(err, results) {
// results => [file1.txt, file2.txt]
});
//等价于
var counter = 2;
var results = [];
var done = function(index, value) {
results[index] = value;
counter--;
if (counter === 0) {
callback(null, results);
}
};
// 只传递第一个异常
var hasErr = false;
var fail = function(err) {
if (!hasErr) {
hasErr = true;
callback(err);
}
};
fs.readFile('file1.txt', 'utf-8', function(err, content) {
if (err) {
return fail(err);
}
done(0, content);
});
fs.readFile('file2.txt', 'utf-8', function(err, data) {
if (err) {
return fail(err);
}
done(1, data);
});
一旦异步调用异常,,就将异常作为第一个参数传给最终回调函数,结果为数组
异步调用 依赖
:当前一个的结果是后一个调用的输入 waterfall()
async.waterfall([function(callback) {
fs.readFile('file1.txt', 'utf-8', function(err, content) {
callback(err, content);
});
},
function(arg1, callback) {
// arg1 => file2.txt
fs.readFile(arg1, 'utf-8', function(err, content) {
callback(err, content);
});
},
function(arg1, callback) {
// arg1 => file3.txt
fs.readFile(arg1, 'utf-8', function(err, content) {
callback(err, content);
});
}
], function(err, result) {
// result => file4.txt
});
自动依赖处理
auto()
{
readConfig: function() {}, //读取配置
connectMongoDB: function() {},//连接mongo
connectRedis: function() {},//连接redis
complieAsserts: function() {},//编译静态
uploadAsserts: function() {},//上传静态到cdn
startup: function() {}//启动
}
var deps = {
readConfig: function(callback) {
// read config file
callback();
},
connectMongoDB: ['readConfig', function(callback) {
// connect to mongodb
callback();
}],
connectRedis: ['readConfig', function(callback) {
// connect to redis callback();
图灵社区会员 Eric Liu(guangqiang.dev @gmail.com) 专享 尊重版权
}],
complieAsserts: function(callback) {
// complie asserts
callback();
},
uploadAsserts: ['complieAsserts', function(callback) {
// upload to assert 2 callback();
}],
startup: ['connectMongoDB', 'connectRedis', 'uploadAsserts', function(callback) {
// startup
}]
};
//auto根据依赖关系自动分析
async.auto(deps);
Step
串行
Step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8', this);
},
function readFile2(err, content) {
fs.readFile('file2.txt', 'utf-8', this);
},
function done(err, content) {
console.log(content);
}
);
this是Step内部的一个next()方法,将调用结果作为下一个任务的参数并调用
并行
parallel()
Step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8', this.parallel());
fs.readFile('file2.txt', 'utf-8', this.parallel());
},
function done(err, content1, content2) {
// content1 => file1
// content2 => file2
console.log(arguments);
});
如果异步方法传回结果为多个参数,step只取前两个。parallel()原理是每次执行时将内部计数器加1,