p-limit 使用+源码 、100个请求并发请求、分时操作
记录一个有趣的面试题目(来源pingpong)
题目:100个请求,互相独立,同时发起请求
闪光点记录:Promise.race 的源码,闭包将index和请求关联 ,面试官推荐学习下 p-limit
1、p-limit 使用
功能:用于 限制并发异步操作数量 的轻量级 JS 库
import pLimit, { limitFunction } from 'p-limit';
// 第一种写法
const limit = pLimit(2); // ①创建一个限制器实例,设置并发限制为 2
const tasks1 = Array.from({ length: 5 }, (it, index) =>
limit(() => new Promise((resolve) => setTimeout(resolve, index * 100))
); // ②创建任务数组:利用 limit 包裹每一个请求,组成数组
Promise.all(tasks1).then((results) => console.log('All:', results) ); // ③执行所有任务
// 第二种简写法
const tasks2 = Array.from({length:20}, (it, index) =>
limitFunction(() => new Promise(resolve=>setTimeout(resolve,index)),{concurrency: 2}))
Promise.all(tasks2).then((results) => console.log('All:', results) );
2、p-limit 源码
核心逻辑主要围绕任务的并发控制展开,包括任务队列的管理、并发数量的限制以及任务的执行和调度
- 任务队列管理 queue:用数组存储待执行的任务,当并发任务数量达到限制时,新的任务被添加到
- 并发数量限制 concurrency:计数器 - 跟踪正在执行的任务数量;到限制时,新任务被暂停执行
- 任务执行、调度:有任务完成时,计数器 减1;并从队列中取出一个任务执行,计数器再 +1
function validateConcurrency(limit) { // 校验是否为正整数
if (!((Number.isInteger(limit) || limit === Infinity) && limit > 0)) {
throw new TypeError('Expected concurrency to be a number from 1 and up');
}
}
function pLimit(concurrency) { // concurrency - 允许的最大并发任务数量
// 检查 concurrency 是否为有效的正整数
validateConcurrency(concurrency);
const queue = []; // 任务队列,存储等待执行的任务
let activeCount = 0; // 计数器,记录当前正在执行的任务数量
const next = () => { // 执行队列中的下一个任务
activeCount--; // 当一个任务完成,activeCount 减1
if (queue.length > 0) { // 如果队列中有等待的任务
queue.shift()(); // 从队列中取出一个任务并执行
}
};
const run = async (fn, resolve, args) => { // 执行任务
activeCount++; // 表示有一个新的任务开始执行
try {
const result = await fn(...args); // 执行传入的异步函数
resolve(result); // 任务成功完成,调用 resolve 返回结果
} catch (error) {
resolve(Promise.reject(error)); // 任务失败,调用 resolve 抛出错误
}
next(); // 任务完成,调用 next 函数执行队列中的下一个任务
};
const enqueue = (fn, resolve, args) => { // 入队函数,将任务添加到队列中
if (activeCount < concurrency) { // 如果当前并发任务数量未达到限制,直接执行任务
run(fn, resolve, args);
} else {
queue.push(run.bind(null, fn, resolve, args)); // 达到限制,将任务添加到队列中等待
}
};
const generator = (fn, ...args) => new Promise((resolve) => { // 返回一个新的 Promise
enqueue(fn, resolve, args); // 将任务添加到队列中
});
Object.defineProperties(generator, { // 返回一个可调用的函数
activeCount: { get: () => activeCount },
pendingCount: { get: () => queue.length },
clearQueue: {
value: () => { queue.length = 0 }
}
});
return generator;
}
export function limitFunction(function_, option) {
const { concurrency } = option;
const limit = pLimit(concurrency);
return (...arguments_) => limit(() => function_(...arguments_));
}
3、100个请求并发请求实现
const requests = Array.from({ length: 30 }, (v, index) => { // 创建 30 个模拟请求
return new Promise((resolve, reject) => {
setTimeout(resolve, 100 * index, `${index} resolved`);
})
})
// 执行所有请求,返回所有响应
async function execute(questList, concurreny = 6) {
const indexedRequests = questList.map((promise, index) => ({ // 闭包,关联 index
index,
promise: promise.then(value => ({ value, key: index }))
}));
// 初始化并发任务列表
const activeTasks = indexedRequests.splice(0, concurrency);
const results = [];
// 持续处理任务,直到所有任务完成
while (activeTasks.length > 0) {
const { key, value } = await Promise.race(activeTasks.map(task => task.promise));
const resolvedIndex = activeTasks.findIndex(task => task.index === key);
activeTasks.splice(resolvedIndex, 1); // 移除已完成的任务
results[key] = value; // 存储结果
if (indexedRequests.length > 0) { // 如果还有待处理的任务,添加一个新任务到并发列表
activeTasks.push(indexedRequests.shift());
}
}
return results;
}
execute(requests).then(res => {
console.log('res:', res);
})
——
// 比较复杂的实现 for 循环 + Promise.all
async function execute(questList) {
questList = transList(questList);
let initList = questList.splice(0, 6); // 初始化6个
questList = questList.concat(Array(6)) // 补齐最后的空,方便执行完
const result = [];
for (let promise of initList) {
const { key, value } = await Promise.race(initList.map(it => it.promise));
const resolvedIndex = initList.findIndex(it => +it.index === +key);
initList.splice(resolvedIndex, 1); // 移除已返回的 promise
initList.push(promise); // 添加新的 promise
result[key] = value; // 存储响应值
}
const rest = await Promise.all(initList.map(it => it.promise));
return [...result, ...rest.map(it => it.value)];
}
execute(requests).then(res => {
console.log('res:', res);
})
4、分时函数:将操作分批执行
功能:分批处理数组元素的工具函数
- 适用于需要分批处理数组元素的场景,尤其是在处理大量数据时,可以避免一次性处理过多数据导致的性能问题
定义 : 在指定的时间间隔内,每次从数组中取出一定数量的元素,并通过回调函数 fn
进行处理。当数组中的所有元素都被处理完毕后,定时器会被清除
参数说明:
- array 需要处理的数据
- fun 处理数组元素的回调函数
- count 每次处理数组元素的数量
返回值说明:
- 返回的函数是一个闭包,它启动一个定时器,每隔 200 毫秒调用一次
start
函数
我觉得写得特别好的地方:Math.min(count || 1, arr.length)
- 确保每次处理的元素数量不超过数组的剩余元素数量,且至少处理一个元素
function timeChunk(array, fun, count) {
let timeId;
const start = function() {
for(let i = 0; i < Math.min(count || 1, array.length); i++) {
fn(array.shift()); // 取出最多 count 个元素,并通过 fn 进行处理
}
}
return () => {
timeId = setInterval(() => {
if(array.length === 0){ // 全部数据处理完毕
return clearInterval(timeId)
}
start();
}, 200) // 分批间隔
}
}