为什么会想去学习Async库和node.js的同步编程?在用node.js写接口/UI自动化用例的时候,在测试接口前后需要调用接口做数据准备/清除工作,所以存在如下场景:调用接口A,然后调用接口B,然后调用测试接口,最后调用数据清除接口,这些接口调用之间有顺序依赖关系,后一个接口必须等待前一个接口执行完成才能调用,而如果用原生node.js实现,需要多层的嵌套回调,写完的代码也不符合易读的标准,于是想到去了解node.js的方法如何去同步执行(wierd idea!在java里面,只有异步编程才会是需要学习的)。
Node.js的异步:
首先来1段代码看看node.js的异步机制:
setTimeout(function () {
console.log('event A occurs')
}, 3000)
console.log('event B occurs');
作为一名Java开发人员,对这段代码的期望的执行结果是:
- sleep 3秒
- event A occurs
- event B occurs
但是实际执行结果为:
ES6提供了Promise,使用了promise保证A事件触发之后才会触发B事件
var q = new Promise(function (resolve, reject) {
setTimeout(function () {
console.log('event A occurs')
resolve();
}, 3000)
})
q.then(function () {
console.log('event B occurs');
}, function (err) {
console.log('err')
})
可以把Promise对象比喻为一个容器,容器里面有一个异步的操作,这个容器只有在收到信号量(resolve或者reject)的时候才会调用then方法。
思考下面的代码:
var q = new Promise(function (resolve, reject) {
setTimeout(function () {
console.log('event A occurs')
resolve();
}, 3000)
})
q.then(function () {
console.log('event B occurs');
}, function (err) {
console.log('err')
})
console.log("event C occurs")
运行结果:
除了Promise,通过Async库也是可以实现Node.js的方法同步执行
Async实现同步
Async的语法糖有点类似Java中的线程池,提交任务,由线程池来控制任务的执行......
ExecutorService service = Executors.newFixedThreadPool(5);
service.submit(new Runnable() {
@Override
public void run() {
}
});
async支持series/parallel/parallelLimit/waterfall等各种模型
- series:串行
可以接受数组作为参数,需要串行处理的方法作为数组的元素
var async = require('async');
async.series(
[
function (callback) {
setTimeout(function () {
console.log('event A occurs')
callback(null, 'A') // 第一个参数是异常错误,第二个参数是返回值
}, 3000)
},
function (callback) {
console.log('event B occurs');
callback(null, 'B')
}
], function (err, results) {
// results是返回值的数组
console.log('event ' + results[0] + results[1] + ' occurs')
}
)
console.log("event D occurs")
async.series也可以接受对象作为参数,代码如下:
async.series(
{
A: function (callback) {
setTimeout(function () {
console.log('event A occurs')
callback(null, 'A') // 第一个参数是异常错误,第二个参数是返回值
}, 3000)
},
B: function (callback) {
console.log('event B occurs');
callback(null, 'B')
}
}, function (err, results) {
// results是所有的返回值合集,results.A获取的是A的返回值。
console.log('event ' + results.A + results.B + ' occurs')
}
)
console.log("event D occurs")
我们再看看series的注释:
/**
* Run the functions in the `tasks` collection in series, each one running once
* the previous function has completed. If any functions in the series pass an
* error to its callback, no more functions are run, and `callback` is
* immediately called with the value of the error. Otherwise, `callback`
* receives an array of results when `tasks` have completed.
*
* It is also possible to use an object instead of an array. Each property will
* be run as a function, and the results will be passed to the final `callback`
* as an object instead of an array. This can be a more readable way of handling
* results from {@link async.series}.
*
* **Note** that while many implementations preserve the order of object
* properties, the [ECMAScript Language Specification](http://www.ecma-international.org/ecma-262/5.1/#sec-8.6)
* explicitly states that
*
* > The mechanics and order of enumerating the properties is not specified.
*
* So if you rely on the order in which your series of functions are executed,
* and want this to work on all platforms, consider using an array.
*
* @name series
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array|Iterable|Object} tasks - A collection containing
* [async functions]{@link AsyncFunction} to run in series.
* Each function can complete with any number of optional `result` values.
* @param {Function} [callback] - An optional callback to run once all the
* functions have completed. This function gets a results array (or object)
* containing all the result arguments passed to the `task` callbacks. Invoked
* with (err, result).
* @example
* async.series([
* function(callback) {
* // do some stuff ...
* callback(null, 'one');
* },
* function(callback) {
* // do some more stuff ...
* callback(null, 'two');
* }
* ],
* // optional callback
* function(err, results) {
* // results is now equal to ['one', 'two']
* });
*
* async.series({
* one: function(callback) {
* setTimeout(function() {
* callback(null, 1);
* }, 200);
* },
* two: function(callback){
* setTimeout(function() {
* callback(null, 2);
* }, 100);
* }
* }, function(err, results) {
* // results is now equal to: {one: 1, two: 2}
* });
*/
function series(tasks, callback) {
_parallel(eachOfSeries, tasks, callback);
}
翻译下要点:
1、串行执行
2、如果任何一个方法在callback中返回了一个异常错误,停止后续方法的执行,且async.series的回调立即执行。
- parallel:并行
async.parallel(
[
function (callback) {
setTimeout(function () {
console.log('event A occurs')
callback(null, 'A')
}, 3000)
},
function (callback) {
console.log('event B occurs')
callback(null, 'B')
}
], function (err, result) {
console.log("event C occurs")
}
)
console.log("event D occurs")
async.parallel的注释:
/**
* Run the `tasks` collection of functions in parallel, without waiting until
* the previous function has completed. If any of the functions pass an error to
* its callback, the main `callback` is immediately called with the value of the
* error. Once the `tasks` have completed, the results are passed to the final
* `callback` as an array.
*
* **Note:** `parallel` is about kicking-off I/O tasks in parallel, not about
* parallel execution of code. If your tasks do not use any timers or perform
* any I/O, they will actually be executed in series. Any synchronous setup
* sections for each task will happen one after the other. JavaScript remains
* single-threaded.
*
* **Hint:** Use [`reflect`]{@link module:Utils.reflect} to continue the
* execution of other tasks when a task fails.
*
* It is also possible to use an object instead of an array. Each property will
* be run as a function and the results will be passed to the final `callback`
* as an object instead of an array. This can be a more readable way of handling
* results from {@link async.parallel}.
*
* @name parallel
* @static
* @memberOf module:ControlFlow
* @method
* @category Control Flow
* @param {Array|Iterable|Object} tasks - A collection of
* [async functions]{@link AsyncFunction} to run.
* Each async function can complete with any number of optional `result` values.
* @param {Function} [callback] - An optional callback to run once all the
* functions have completed successfully. This function gets a results array
* (or object) containing all the result arguments passed to the task callbacks.
* Invoked with (err, results).
*
* @example
* async.parallel([
* function(callback) {
* setTimeout(function() {
* callback(null, 'one');
* }, 200);
* },
* function(callback) {
* setTimeout(function() {
* callback(null, 'two');
* }, 100);
* }
* ],
* // optional callback
* function(err, results) {
* // the results array will equal ['one','two'] even though
* // the second function had a shorter timeout.
* });
*
* // an example using an object instead of an array
* async.parallel({
* one: function(callback) {
* setTimeout(function() {
* callback(null, 1);
* }, 200);
* },
* two: function(callback) {
* setTimeout(function() {
* callback(null, 2);
* }, 100);
* }
* }, function(err, results) {
* // results is now equals to: {one: 1, two: 2}
* });
*/
function parallelLimit(tasks, callback) {
_parallel(eachOf, tasks, callback);
}
要点:
1、并行执行,所有任务执行完后,立即执行回调函数。
2、如果有一个任务执行异常报错,立即执行回调函数。
- parallelLimit:并行执行各个方法,但是同时并行的任务数量有上限
async.parallelLimit(
[
function (callback) {
setTimeout(function () {
console.log('event A occurs')
callback(null, 'A')
}, 3000)
},
function (callback) {
console.log('event B occurs')
callback(null, 'B')
}
], 1, function (err, result) {
console.log("event C occurs")
}
)
console.log("event D occurs")
parallelLimit的注释:
/**
* The same as [`parallel`]{@link module:ControlFlow.parallel} but runs a maximum of `limit` async operations at a
* time.
*
* @name parallelLimit
* @static
* @memberOf module:ControlFlow
* @method
* @see [async.parallel]{@link module:ControlFlow.parallel}
* @category Control Flow
* @param {Array|Iterable|Object} tasks - A collection of
* [async functions]{@link AsyncFunction} to run.
* Each async function can complete with any number of optional `result` values.
* @param {number} limit - The maximum number of async operations at a time.
* @param {Function} [callback] - An optional callback to run once all the
* functions have completed successfully. This function gets a results array
* (or object) containing all the result arguments passed to the task callbacks.
* Invoked with (err, results).
*/
function parallelLimit$1(tasks, limit, callback) {
_parallel(_eachOfLimit(limit), tasks, callback);
}
要点:
1、处理逻辑和parallel方法相同,只是同一时间并行执行的任务数有上限。
- waterfall:串行,前一个任务的结果传递给后一个任务。
async.waterfall(
[
function (callback) {
setTimeout(function () {
console.log('event A occurs')
callback(null, 'B') // 第一个参数是error,第二个参数是下一个任务的参数
}, 3000)
},
function (arg1, callback) {
// arg1 等于 B
console.log('event ' + arg1 + ' occurs')
callback(null, 'C')
}
], function (err, result) {
// result 等于 C
console.log('event ' + result + ' occurs')
}
)
console.log("event D occurs")
async暴露哪些接口
async除了上面常用的几个接口之外,从/async/dist/async.js可以看到async暴露的所有接口:
exports['default'] = index;
exports.applyEach = applyEach;
exports.applyEachSeries = applyEachSeries;
exports.apply = apply;
exports.asyncify = asyncify;
exports.auto = auto;
exports.autoInject = autoInject;
exports.cargo = cargo;
exports.compose = compose;
exports.concat = concat;
exports.concatLimit = concatLimit;
exports.concatSeries = concatSeries;
exports.constant = constant;
exports.detect = detect;
exports.detectLimit = detectLimit;
exports.detectSeries = detectSeries;
exports.dir = dir;
exports.doDuring = doDuring;
exports.doUntil = doUntil;
exports.doWhilst = doWhilst;
exports.during = during;
exports.each = eachLimit;
exports.eachLimit = eachLimit$1;
exports.eachOf = eachOf;
exports.eachOfLimit = eachOfLimit;
exports.eachOfSeries = eachOfSeries;
exports.eachSeries = eachSeries;
exports.ensureAsync = ensureAsync;
exports.every = every;
exports.everyLimit = everyLimit;
exports.everySeries = everySeries;
exports.filter = filter;
exports.filterLimit = filterLimit;
exports.filterSeries = filterSeries;
exports.forever = forever;
exports.groupBy = groupBy;
exports.groupByLimit = groupByLimit;
exports.groupBySeries = groupBySeries;
exports.log = log;
exports.map = map;
exports.mapLimit = mapLimit;
exports.mapSeries = mapSeries;
exports.mapValues = mapValues;
exports.mapValuesLimit = mapValuesLimit;
exports.mapValuesSeries = mapValuesSeries;
exports.memoize = memoize;
exports.nextTick = nextTick;
exports.parallel = parallelLimit;
exports.parallelLimit = parallelLimit$1;
exports.priorityQueue = priorityQueue;
exports.queue = queue$1;
exports.race = race;
exports.reduce = reduce;
exports.reduceRight = reduceRight;
exports.reflect = reflect;
exports.reflectAll = reflectAll;
exports.reject = reject;
exports.rejectLimit = rejectLimit;
exports.rejectSeries = rejectSeries;
exports.retry = retry;
exports.retryable = retryable;
exports.seq = seq;
exports.series = series;
exports.setImmediate = setImmediate$1;
exports.some = some;
exports.someLimit = someLimit;
exports.someSeries = someSeries;
exports.sortBy = sortBy;
exports.timeout = timeout;
exports.times = times;
exports.timesLimit = timeLimit;
exports.timesSeries = timesSeries;
exports.transform = transform;
exports.tryEach = tryEach;
exports.unmemoize = unmemoize;
exports.until = until;
exports.waterfall = waterfall;
exports.whilst = whilst;
exports.all = every;
exports.allLimit = everyLimit;
exports.allSeries = everySeries;
exports.any = some;
exports.anyLimit = someLimit;
exports.anySeries = someSeries;
exports.find = detect;
exports.findLimit = detectLimit;
exports.findSeries = detectSeries;
exports.forEach = eachLimit;
exports.forEachSeries = eachSeries;
exports.forEachLimit = eachLimit$1;
exports.forEachOf = eachOf;
exports.forEachOfSeries = eachOfSeries;
exports.forEachOfLimit = eachOfLimit;
exports.inject = reduce;
exports.foldl = reduce;
exports.foldr = reduceRight;
exports.select = filter;
exports.selectLimit = filterLimit;
exports.selectSeries = filterSeries;
exports.wrapSync = asyncify;
想了解了可以深入看下源码注释。