p-limit 使用+源码 、100个请求并发请求、分时操作

Posted by CodingWithAlice on March 7, 2025

p-limit 使用+源码 、100个请求并发请求、分时操作

记录一个有趣的面试题目(来源pingpong)

题目:100个请求,互相独立,同时发起请求

闪光点记录:Promise.race 的源码,闭包将index和请求关联 ,面试官推荐学习下 p-limit

1、p-limit 使用

功能:用于 限制并发异步操作数量 的轻量级 JS 库

import pLimit, { limitFunction } from 'p-limit';
// 第一种写法
const limit = pLimit(2); // ①创建一个限制器实例,设置并发限制为 2
const tasks1 = Array.from({ length: 5 }, (it, index) =>
    limit(() => new Promise((resolve) => setTimeout(resolve, index * 100))
); // ②创建任务数组:利用 limit 包裹每一个请求,组成数组
Promise.all(tasks1).then((results) => console.log('All:', results) ); // ③执行所有任务
// 第二种简写法
const tasks2 = Array.from({length:20}, (it, index) =>
    limitFunction(() => new Promise(resolve=>setTimeout(resolve,index)),{concurrency: 2}))
Promise.all(tasks2).then((results) => console.log('All:', results) );

2、p-limit 源码

核心逻辑主要围绕任务的并发控制展开,包括任务队列的管理、并发数量的限制以及任务的执行和调度

  • 任务队列管理 queue:用数组存储待执行的任务,当并发任务数量达到限制时,新的任务被添加到
  • 并发数量限制 concurrency:计数器 - 跟踪正在执行的任务数量;到限制时,新任务被暂停执行
  • 任务执行、调度:有任务完成时,计数器 减1;并从队列中取出一个任务执行,计数器再 +1
function validateConcurrency(limit) { // 校验是否为正整数 
    if (!((Number.isInteger(limit) || limit === Infinity) && limit > 0)) {
        throw new TypeError('Expected concurrency to be a number from 1 and up');
    }
}
function pLimit(concurrency) { // concurrency - 允许的最大并发任务数量
    // 检查 concurrency 是否为有效的正整数
    validateConcurrency(concurrency);

    const queue = []; // 任务队列,存储等待执行的任务
    let activeCount = 0; // 计数器,记录当前正在执行的任务数量

    const next = () => { // 执行队列中的下一个任务
        activeCount--; // 当一个任务完成,activeCount 减1
        if (queue.length > 0) { // 如果队列中有等待的任务
            queue.shift()(); // 从队列中取出一个任务并执行
        }
    };

    const run = async (fn, resolve, args) => { // 执行任务
        activeCount++; // 表示有一个新的任务开始执行
        try {
            const result = await fn(...args); // 执行传入的异步函数
            resolve(result); // 任务成功完成,调用 resolve 返回结果
        } catch (error) {
            resolve(Promise.reject(error)); // 任务失败,调用 resolve 抛出错误
        } 
        next(); // 任务完成,调用 next 函数执行队列中的下一个任务
    };

    const enqueue = (fn, resolve, args) => { // 入队函数,将任务添加到队列中
        if (activeCount < concurrency) { // 如果当前并发任务数量未达到限制,直接执行任务
            run(fn, resolve, args);
        } else {
            queue.push(run.bind(null, fn, resolve, args)); // 达到限制,将任务添加到队列中等待
        }
    };

    const generator = (fn, ...args) => new Promise((resolve) => { // 返回一个新的 Promise
        enqueue(fn, resolve, args); // 将任务添加到队列中
    });

    Object.defineProperties(generator, { // 返回一个可调用的函数
        activeCount: { get: () => activeCount },
        pendingCount: { get: () => queue.length },
        clearQueue: {
            value: () => { queue.length = 0 }
        }
    });
    return generator;
}
export function limitFunction(function_, option) {
    const { concurrency } = option;
    const limit = pLimit(concurrency);
    return (...arguments_) => limit(() => function_(...arguments_));
}

3、100个请求并发请求实现

const requests = Array.from({ length: 30 }, (v, index) => { // 创建 30 个模拟请求
    return new Promise((resolve, reject) => {
        setTimeout(resolve, 100 * index, `${index} resolved`);
    })
})
// 执行所有请求,返回所有响应
async function execute(questList, concurreny = 6) {
    const indexedRequests = questList.map((promise, index) => ({ // 闭包,关联 index
        index,
        promise: promise.then(value => ({ value, key: index }))
    }));
    // 初始化并发任务列表
    const activeTasks = indexedRequests.splice(0, concurrency);
    const results = [];
    // 持续处理任务,直到所有任务完成
    while (activeTasks.length > 0) {
        const { key, value } = await Promise.race(activeTasks.map(task => task.promise));
        const resolvedIndex = activeTasks.findIndex(task => task.index === key);
        activeTasks.splice(resolvedIndex, 1); // 移除已完成的任务
        results[key] = value; // 存储结果

        if (indexedRequests.length > 0) { // 如果还有待处理的任务,添加一个新任务到并发列表
            activeTasks.push(indexedRequests.shift());
        }
    }
    return results;
}
execute(requests).then(res => {
    console.log('res:', res);
})

——

// 比较复杂的实现 for 循环 + Promise.all
async function execute(questList) {
    questList = transList(questList);
    let initList = questList.splice(0, 6); // 初始化6个
    questList = questList.concat(Array(6)) // 补齐最后的空,方便执行完
    const result = [];
    for (let promise of initList) {
        const { key, value } = await Promise.race(initList.map(it => it.promise));
        const resolvedIndex = initList.findIndex(it => +it.index === +key);
        initList.splice(resolvedIndex, 1); // 移除已返回的 promise
        initList.push(promise); // 添加新的 promise
        result[key] = value; // 存储响应值
    }
    const rest = await Promise.all(initList.map(it => it.promise));
    return [...result, ...rest.map(it => it.value)];
}
execute(requests).then(res => {
    console.log('res:', res);
})

4、分时函数:将操作分批执行

功能:分批处理数组元素的工具函数

  • 适用于需要分批处理数组元素的场景,尤其是在处理大量数据时,可以避免一次性处理过多数据导致的性能问题

定义 : 在指定的时间间隔内,每次从数组中取出一定数量的元素,并通过回调函数 fn 进行处理。当数组中的所有元素都被处理完毕后,定时器会被清除

参数说明:

  • array 需要处理的数据
  • fun 处理数组元素的回调函数
  • count 每次处理数组元素的数量

返回值说明:

  • 返回的函数是一个闭包,它启动一个定时器,每隔 200 毫秒调用一次 start 函数

我觉得写得特别好的地方:Math.min(count || 1, arr.length) - 确保每次处理的元素数量不超过数组的剩余元素数量,且至少处理一个元素

function timeChunk(array, fun, count) {
    let timeId;
    const start = function() {
        for(let i = 0; i < Math.min(count || 1, array.length); i++) {
            fn(array.shift()); // 取出最多 count 个元素,并通过 fn 进行处理
        }
    }
    return () => {
        timeId = setInterval(() => {
            if(array.length === 0){ // 全部数据处理完毕
                return clearInterval(timeId)
            }
            start();
        }, 200) // 分批间隔
    }
}