在es6中的async
的语法中,可以参照java并发包实现一些有意思的异步工具,辅助在异步场景(一般指请求)下的开发。
由于js是单线程,下面的实现都比java中实现简单 (抛除线程概念)。同时涉及到js的执行机制,宏任务,微任务,async
,promise
相关内容,需要提前具备这些知识。
wait(等待)
异步函数中,等待(相当于java线程中的阻塞)一段时间。
实现代码:
async function wait(time = 0) {
await new Promise(resolve => setTimeout(resolve, time));
// 避免转译成return await, 会在一些safari版本里面报错
return undefined;
}
模拟使用代码:
(async () => {
console.time();
await wait(1000);
console.timeEnd(); // 输出: default: 1.002s
})();
Lock(锁)
模拟java并发包中的Lock
类实现锁操作。 保证同一个锁包围的异步代码执行过程中,同一时刻只有一段代码在执行。
该锁不符合html5的的异步锁接口,而是提供一个java异步包中
Lock
接口的简单实现
推荐使用场景:多个请求执行过程前,保证同一时刻只有一个token
失效验证转换操作。
实现代码:
export type Resolve<T = any> = (value: T | PromiseLike<T>) => void;
export type Reject = (reason?: any) => void;
export interface FlatPromise<T = any> {
promise: Promise<T>;
resolve: Resolve<T>;
reject: Reject;
};
interface Key {
key: number,
resolve: Resolve,
};
/**
* 创建一个扁平的promise
* @returns Prmise
*/
function flatPromise<T = any>(): FlatPromise<T> {
const result: any = {};
const promise = new Promise<T>((resolve, reject) => {
result.resolve = resolve;
result.reject = reject;
});
result.promise = promise;
return result as FlatPromise<T>;
}
class Lock {
keys: Key[] = [];
hasLock: boolean = false;
idCount: number = 0;
constructor() {
this.keys = [];
this.hasLock = false;
this.idCount = 0;
}
_pushKey(resolve: Resolve) {
this.idCount += 1;
const key: Key = {
key: this.idCount,
resolve,
};
this.keys.push(key);
return key;
}
_removeKey(key: Key) {
const index = this.keys.findIndex(item => item.key === key.key);
if (index >= 0) {
this.keys.splice(index, 1);
}
}
/**
* 获取锁.
* 如果当前锁已经锁定,那么就阻塞当前操作
*/
async lock() {
if (this.keys.length || this.hasLock) {
const { promise, resolve } = flatPromise();
this._pushKey(resolve);
await promise;
return null;
}
this.hasLock = true;
return null;
}
/**
* 尝试获取锁.
* 该函数如果没有指定一个有效的time,则立马返回一个结果:如果获取到锁则为true,反之为false.
* 如果指定一个有效的time(time=0有效),则返回一个promise对象,改对象返回的结果为是否获取到锁
* @param time 最长等待时间
*/
tryLock(time?: number) {
if (time === undefined ||
Number.isNaN(Math.floor(time)) || time < 0) {
if (this.hasLock) {
return false;
}
this.lock();
return Promise.resolve(true);
}
if (!this.hasLock && !this.keys.length) {
this.hasLock = true;
return Promise.resolve(true);
}
const asyncFn = async () => {
const { promise, resolve: res } = flatPromise();
const key = this._pushKey(res);
setTimeout(() => {
this._removeKey(key);
key.resolve(false);
}, time);
const isTimeout = await promise;
return isTimeout !== false;
};
return asyncFn();
}
async lockFn(asyncFn: () => Promise<void>) {
await this.lock();
try {
await asyncFn();
} finally {
this.unlock();
}
}
/**
* 释放锁
*/
unlock() {
if (this.keys.length === 0 && this.hasLock === true) {
this.hasLock = false;
return;
}
if (this.keys.length === 0) {
return;
}
const index = Math.floor(Math.random() * this.keys.length);
const key = this.keys[index];
this._removeKey(key);
key.resolve(undefined);
}
toString() {
return `${this.keys.length}-${this.hasLock}`;
}
}
模拟使用代码:
function delay(callback: () => void, time: number) {
return new Promise<void>((resolve) => setTimeout(() => {
callback();
resolve(undefined);
}, time));
}
(async () => {
const lock = new Lock();
const syncResult: string[] = [];
const unSyncResult: string[] = [];
const withLockAsync = async () => {
await lock.lock();
await delay(() => {
syncResult.push('1');
}, Math.random() * 10);
await delay(() => {
syncResult.push('2');
}, Math.random() * 10);
await delay(() => {
syncResult.push('3');
}, Math.random() * 10);
lock.unlock();
};
const withoutLockAsync = async () => {
await delay(() => {
unSyncResult.push('1');
}, Math.random() * 3);
await delay(() => {
unSyncResult.push('2');
}, Math.random() * 3);
await delay(() => {
unSyncResult.push('3');
}, Math.random() * 3);
};
const taskList = [];
for (let i = 0; i < 10; i += 1) {
taskList.push(withLockAsync(), withoutLockAsync());
}
await Promise.all(taskList);
// 输出1,2,3,1,2,3...
// 证明withLockAsync函数中的代码同一时刻只有一个执行,不会被打算
console.log(syncResult);
// 输出的值不一定按照1,2,3,1,2,3...
// 证明在普通的async函数中,await后的代码会被打乱
console.log(unSyncResult);
})();
Semaphore(信号量)
Semaphore
(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
推荐使用场景:一般用于流量的控制,特别是公共资源有限的应用场景。例如控制同一时刻请求的连接数量,假设浏览器的请求连接数上限为10个,多个异步并发请求可以使用Semaphore
来控制请求的异步执行个数最多为10个。
简单实现代码:
class Semaphore {
constructor(permits) {
this.permits = permits;
this.execCount = 0;
this.waitTaskList = [];
}
async acquire() {
const that = this;
this.execCount += 1;
if (this.execCount <= this.permits) {
// 为了保证按照调用顺序执行
// 如果有等待的,那么先执行等待的,当前的挂起
// 没有则快速通过
if (that.waitTaskList.length !== 0) {
const waitTask = this.waitTaskList.pop();
waitTask();
await new Promise((resolve) => {
that.waitTaskList.push(resolve);
});
}
return;
}
await new Promise((resolve) => {
that.waitTaskList.push(resolve);
});
}
release() {
this.execCount -= 1;
if (this.execCount < 0) {
this.execCount = 0;
}
if (this.waitTaskList.length === 0) {
return;
}
const waitTask = this.waitTaskList.pop();
waitTask();
}
}
模拟一个复杂的页面中复杂的请求场景:
(async () => {
const semaphore = new Semaphore(5);
let doCount = 0;
let currntCount = 0;
const request = async (id) => {
await semaphore.acquire();
currntCount++;
console.log(`执行请求${id}, 正在执行的个数${currntCount}`);
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
semaphore.release();
currntCount--;
doCount++;
console.log(`执行请求${id}结束,已经执行${doCount}次`);
};
const arr = new Array(10).fill(1);
setTimeout(() => {
// 依次执行10个请求
arr.forEach((_, index) => {
request(`1-${index}`);
});
}, 300)
// 随机触发10个请求
let index = 0;
const timerId = setInterval(() => {
if (index > 9) {
clearInterval(timerId);
return;
}
request(`2-${index}`);
index++;
}, Math.random() * 300 + 300);
// 同时执行10个请求
await Promise.all(arr.map(async (_, index) => {
await request(`3-${index}`);
}));
// 等待上面的内容全部执行, 资源释放后,在执行4个请求
setTimeout(() => {
const lastArr = new Array(4).fill(1);
lastArr.forEach((_, index) => {
request(`4-${index}`);
});
}, 5000);
})();
执行结果:
执行请求3-0, 正在执行的个数1
执行请求3-1, 正在执行的个数2
执行请求3-2, 正在执行的个数3
执行请求3-3, 正在执行的个数4
执行请求3-4, 正在执行的个数5
执行请求3-2结束,已经执行1次
执行请求2-1, 正在执行的个数5
执行请求3-3结束,已经执行2次
执行请求2-0, 正在执行的个数5
...
执行请求3-8结束,已经执行29次
执行请求3-6结束,已经执行30次
执行请求4-0, 正在执行的个数1
执行请求4-1, 正在执行的个数2
执行请求4-2, 正在执行的个数3
执行请求4-3, 正在执行的个数4
执行请求4-3结束,已经执行31次
执行请求4-0结束,已经执行32次
执行请求4-2结束,已经执行33次
执行请求4-1结束,已经执行34次
CountDownLatch(倒计时闭锁)和CyclicBarrier(循环栅栏)
在es的async
中,一般情景的CountDownLatch
可以直接用Promise.all
替代。
使用场景:复杂的Promise.all
需求场景,支持在离散的多个异步函数中灵活使用(本人还没有碰到过),从而摆脱Promise.all
在使用时需要一个promise
的iterable
类型的输入。
代码实现:
class CountDownLatch {
constructor(count) {
this.count = count;
this.waitTaskList = [];
}
countDown() {
this.count--;
if (this.count <= 0) {
this.waitTaskList.forEach(task => task());
}
}
// 避免使用关键字,所以命名跟java中不一样
async awaitExec() {
if (this.count <= 0) {
return;
}
const that = this;
await new Promise((resolve) => {
that.waitTaskList.push(resolve);
});
}
}
模拟使用代码:
(async () => {
const countDownLatch = new CountDownLatch(10);
const request = async (id) => {
console.log(`执行请求${id}`);
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000 + 500));
console.log(`执行请求${id}结束`);
countDownLatch.countDown();
};
const task = new Array(10).fill(1);
// 后续的代码等同于
// await Promise.all(task.map(async (_, index) => {
// await request(index);
// }));
task.forEach((_1, index) => {
request(index);
});
await countDownLatch.awaitExec();
console.log('执行完毕');
})();
而CyclicBarrier
抛除线程相关概念后,核心功能就是一个可以重复使用的CountDownLatch
,这里就不实现了。
异步函数同步等待依次执行
如果需要对多个异步任务进行依次等待执行,可以按照下面代码实现:
function syncExec(taskList: (() => PromiseLike<void>)[]): PromiseLike<void> {
if (!taskList.length) return Promise.resolve(undefined);
let promise = taskList[0]();
for (let i = 1; i < taskList.length; i += 1) {
promise = promise.then(() => taskList[i]());
}
return promise;
}
模拟使用代码(使用时比Promise.all复杂,需要传入一个返回promise的函数):
(async () => {
const array = new Array(10).fill(1);
const taskList = array.map((_1, index) => {
return async () => {
const rnd = Math.random() * 100;
await new Promise((resolve) => setTimeout(resolve, rnd));
console.log(`执行任务${index}`);
};
});
await syncExec(taskList);
})();
输出的结果会按照顺序打印。
更多
如果你需要更加复杂的异步任务编排工具,可以尝试学习和使用gulp的undertaker和webpack的tapable。