在日常开发过程中,你可能会遇到并发控制的场景,比如控制请求并发数。那么在 JavaScript 中如何实现并发控制呢?在回答这个问题之前,我们来简单介绍一下并发控制。
假设有 6 个待办任务要执行,而我们希望限制同时执行的任务个数,即最多只有 2 个任务能同时执行。当 正在执行任务列表 中的任何 1 个任务完成后,程序会自动从 待办任务列表 中获取新的待办任务并把该任务添加到 正在执行任务列表 中。为了让大家能够更直观地理解上述的过程,阿宝哥特意画了以下 3 张图:
好的,介绍完并发控制之后,阿宝哥将以 Github 上 async-pool 这个库来介绍一下异步任务并发控制的具体实现。
async-pool 这个库提供了 ES7 和 ES6 两种不同版本的实现,在分析其具体实现之前,我们来看一下它如何使用。
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
在以上代码中,我们使用 async-pool 这个库提供的 asyncPool 函数来实现异步任务的并发控制。 asyncPool 函数的签名如下所示:
function asyncPool(poolLimit, array, iteratorFn){ ... }
该函数接收 3 个参数:
poolLimit(数字类型):表示限制的并发数;
array(数组类型):表示任务数组;
iteratorFn(函数类型):表示迭代函数,用于实现对每个任务项进行处理,该函数会返回一个 Promise 对象或异步函数。
对于以上示例来说,在使用了 asyncPool 函数之后,对应的执行过程如下所示:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
通过观察以上的注释信息,我们可以大致地了解 asyncPool 函数内部的控制流程。下面我们先来分析 asyncPool 函数的 ES7 实现。
关注「全栈修仙之路」阅读阿宝哥原创的 4 本免费电子书(累计下载 3万+)及 50 几篇 TS 系列教程。
async function asyncPool(poolLimit, array, iteratorFn) {
const ret = []; // 存储所有的异步任务
const executing = []; // 存储正在执行的异步任务
for (const item of array) {
// 调用iteratorFn函数创建异步任务
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p); // 保存新的异步任务
// 当poolLimit值小于或等于总任务个数时,进行并发控制
if (poolLimit <= array.length) {
// 当任务完成后,从正在执行的任务数组中移除已完成的任务
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e); // 保存正在执行的异步任务
if (executing.length >= poolLimit) {
await Promise.race(executing); // 等待较快的任务执行完成
}
}
}
return Promise.all(ret);
}
在以上代码中,充分利用了 Promise.all 和 Promise.race 函数特点,再结合 ES7 中提供的 async await 特性,最终实现了并发控制的功能。利用 await Promise.race(executing); 这行语句,我们会等待 正在执行任务列表 中较快的任务执行完成之后,才会继续执行下一次循环。
asyncPool ES7 实现相对比较简单,接下来我们来看一下不使用 async await 特性要如何实现同样的功能。
function asyncPool(poolLimit, array, iteratorFn) {
let i = 0;
const ret = []; // 存储所有的异步任务
const executing = []; // 存储正在执行的异步任务
const enqueue = function () {
if (i === array.length) {
return Promise.resolve();
}
const item = array[i++]; // 获取新的任务项
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
let r = Promise.resolve();
// 当poolLimit值小于或等于总任务个数时,进行并发控制
if (poolLimit <= array.length) {
// 当任务完成后,从正在执行的任务数组中移除已完成的任务
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
r = Promise.race(executing);
}
}
// 正在执行任务列表 中较快的任务执行完成之后,才会从array数组中获取新的待办任务
return r.then(() => enqueue());
};
return enqueue().then(() => Promise.all(ret));
}
在 ES6 的实现版本中,通过内部封装的 enqueue 函数来实现核心的控制逻辑。当 Promise.race(executing) 返回的 Promise 对象变成已完成状态时,才会调用 enqueue 函数,从 array 数组中获取新的待办任务。
在 asyncPool 这个库的 ES7 和 ES6 的具体实现中,我们都使用到了 Promise.all 和 Promise.race 函数。其中手写 Promise.all 是一道常见的面试题。刚好趁着这个机会,阿宝哥跟大家一起来手写简易版的 Promise.all 和 Promise.race 函数。
Promise.all(iterable) 方法会返回一个 promise 对象,当输入的所有 promise 对象的状态都变成 resolved 时,返回的 promise 对象就会以数组的形式,返回每个 promise 对象 resolve 后的结果。当输入的任何一个 promise 对象状态变成 rejected 时,则返回的 promise 对象会 reject 对应的错误信息。
Promise.all = function (iterators) {
return new Promise((resolve, reject) => {
if (!iterators || iterators.length === 0) {
resolve([]);
} else {
let count = 0; // 计数器,用于判断所有任务是否执行完成
let result = []; // 结果数组
for (let i = 0; i < iterators.length; i++) {
// 考虑到iterators[i]可能是普通对象,则统一包装为Promise对象
Promise.resolve(iterators[i]).then(
(data) => {
result[i] = data; // 按顺序保存对应的结果
// 当所有任务都执行完成后,再统一返回结果
if (++count === iterators.length) {
resolve(result);
}
},
(err) => {
reject(err); // 任何一个Promise对象执行失败,则调用reject()方法
return;
}
);
}
}
});
};
需要注意的是对于 Promise.all 的标准实现来说,它的参数是一个可迭代对象,比如 Array、String 或 Set 等。
Promise.race(iterable) 方法会返回一个 promise 对象,一旦迭代器中的某个 promise 对象 resolved 或 rejected,返回的 promise 对象就会 resolve 或 reject 相应的值。
Promise.race = function (iterators) {
return new Promise((resolve, reject) => {
for (const iter of iterators) {
Promise.resolve(iter)
.then((res) => {
resolve(res);
})
.catch((e) => {
reject(e);
});
}
});
};
本文阿宝哥带大家详细分析了 async-pool 异步任务并发控制的具体实现,同时为了让大家能够更好地理解 async-pool 的核心代码。最后阿宝哥还带大家一起手写简易版的 Promise.all 和 Promise.race 函数。其实除了 Promise.all 函数之外,还存在另一个函数 —— Promise.allSettled,该函数用于解决 Promise.all 存在的问题,感兴趣的小伙伴可以自行研究一下。
作者:阿宝哥
链接:https://juejin.cn/post/6976028030770610213
来源:掘金
抢购、秒杀是平常很常见的场景,面试的时候面试官也经常会问到,比如问你淘宝中的抢购秒杀是怎么实现的等等。抢购、秒杀实现很简单,但是有些问题需要解决,主要针对两个问题:
最近在看Elasticsearch时看到了并发控制,由此看到了新的并发控制方式。不得不说Elasticsearch相较于关系型数据库就是两种理论建立的数据存储体系,当然它们在并发控制上也相差甚远,各有千秋。
在互联网时代,并发,高并发通常是指并发访问。也就是在某个时间点,有多少个访问同时到来。 高并发架构相关概念QPS (每秒查询率) : 每秒钟请求或者查询的数量,在互联网领域,指每秒响应请求数
今天看见有人聊目前系统有2亿的PV,该如何优化?当我看到这个话题的时候,突然在想自己工作中也遇到了不少高并发的场景了,所以即兴发挥,在这里简单总结和分享下,欢迎指正和补充。
很多网站有并发连接数的限制,所以当请求发送太快的时候会导致返回值为空或报错。 安装依赖 express superagent cheerio eventproxy。新建app.js 抓取所有的url
在秒杀,抢购等并发场景下,可能会出现超卖的现象,在PHP语言中并没有原生提供并发的解决方案,因此就需要借助其他方式来实现并发控制。列出常见的解决方案有:
并发编程三要素:原子性: 一个不可再被分割的颗粒。原子性指的是一个或多个操作要么全部执行成功要么全部执行失败。有序性: 程序执行的顺序按照代码的先后顺序执行。(处理器可能会对指令进行重排序)
异步是 js 一个非常重要的特性,但很多时候,我们不仅仅想让一系列任务并行执行,还想要控制同时执行的并发数,尤其是在针对操作有限资源的异步任务,比如文件句柄,网络端口等等。
Node可以在不新增额外线程的情况下,依然可以对任务进行并发处理 —— Node.js是单线程的。它通过事件循环(event loop)来实现并发操作,对此,我们应该要充分利用这一点 —— 尽可能的避免阻塞操作
在开发过程中,有时会遇到需要控制任务并发执行数量的需求。例如一个爬虫程序,可以通过限制其并发任务数量来降低请求频率,从而避免由于请求过于频繁被封禁问题的发生。
内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!