使用p-limit 限制并发数源码解析
codeniu 人气:0前言
并发是指在同一时间内处理的任务数量。例如,在一台服务器上同时运行多个 Web 服务器线程,可以同时处理多个客户端的请求。有时为了程序的稳定运行,我们需要限制并发的数量,p-limit 就是一个用js实现的控制并发数的库。
源码地址:sindresorhus/p-limit
使用
下面是官方提供的使用示例:
import pLimit from 'p-limit'; const limit = pLimit(1); const input = [ limit(() => fetchSomething('foo')), limit(() => fetchSomething('bar')), limit(() => doSomething()) ]; // Only one promise is run at once const result = await Promise.all(input); console.log(result);
在代码的第一行,使用了 pLimit(1)
来创建一个 p-limit 实例,并将并发限制设为 1。这意味着,在任意时刻,只能有一个 Promise 在运行。
在第四行,使用了 limit(() => fetchSomething('foo'))
来包装一个异步函数,并返回一个 Promise。同样的方式,在第五、六行也包装了其他两个异步函数。
最后,使用 Promise.all
方法来等待所有 Promise 的完成,并在完成后将结果输出到控制台。由于 p-limit 的限制,这些 Promise 只会按顺序一个一个地运行,保证了并发的数量不会超过 1。
源码分析
import Queue from 'yocto-queue'; export default function pLimit(concurrency) { if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { throw new TypeError('Expected `concurrency` to be a number from 1 and up'); } const queue = new Queue(); let activeCount = 0; }
yocto-queue 是一种允许高效存储和检索数据的数据结构。前边的章节分析过它的源码,详情参见: 源码共读|yocto-queue 队列 链表
pLimit 函数接受一个参数,并发数,首先函数判断参数是否是数组类型,或者是否能够转换成数字类型,如果不能,抛出一个错误。
之后定义了一个队列来存储待执行的函数,并使用一个计数器来记录当前正在运行的函数的数量。
const next = () => { activeCount--; if (queue.size > 0) { queue.dequeue()(); } }; const run = async (fn, resolve, args) => { activeCount++; const result = (async () => fn(...args))(); resolve(result); try { await result; } catch {} next(); };
在代码的 next
函数中,如果队列不为空,则从队列中取出一个函数并执行。这个函数的执行会导致计数器的值减 1。
在代码的 run
函数中,使用了 async/await
语法来执行传入的函数 fn
。它还使用了 resolve
函数将函数的返回值包装成一个 Promise,并将这个 Promise 返回给调用者。在函数执行完成后,调用 next
函数来执行下一个函数。
const enqueue = (fn, resolve, args) => { queue.enqueue(run.bind(undefined, fn, resolve, args)); (async () => { // This function needs to wait until the next microtask before comparing // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously // when the run function is dequeued and called. The comparison in the if-statement // needs to happen asynchronously as well to get an up-to-date value for `activeCount`. await Promise.resolve(); if (activeCount < concurrency && queue.size > 0) { queue.dequeue()(); } })(); };
在代码的 enqueue
函数中,使用了 queue.enqueue
方法将传入的函数 fn
加入队列。然后,它使用了 async/await
语法来在下一个微任务中检查当前的并发数量是否小于设定的并发限制。如果是,则从队列中取出一个函数并执行。
const generator = (fn, ...args) => new Promise(resolve => { enqueue(fn, resolve, args); }); Object.defineProperties(generator, { activeCount: { get: () => activeCount, }, pendingCount: { get: () => queue.size, }, clearQueue: { value: () => { queue.clear(); }, }, }); return generator;
在代码的 generator
函数中,使用了 new Promise
语法来生成一个新的 Promise,并在其中调用了 enqueue
函数。这样,每次调用生成的函数时,都会生成一个新的 Promise,并将函数加入队列。
最后,使用了 Object.defineProperties
方法来为生成的函数添加属性。这些属性可以用来查询当前的并发数量和等待队列的大小,以及清空等待队列。
总结
控制并发的实现方式有很多种。例如,可以使用信号量或队列来控制并发请求的数量。也可以使用计数器或令牌桶算法来限制请求的频率。还可以使用拒绝服务(DoS)防护系统来检测异常请求流量并进行限制
加载全部内容