当前位置: 首页 > news >正文

源码阅读:p-limit

源码阅读:p-limit

  • 源码阅读:p-limit
    • 简介
    • 源码解读
    • 学习与收获

源码阅读:p-limit

简介

p-limit是一个用于限制并发操作的包,它可以控制同时执行的异步操作数量。它提供了一种简单的方式来管理并发操作,以避免系统资源过度占用和性能下降。

p-limit的工作原理是使用一个计数器来跟踪当前正在执行的操作数量。当有新的操作需要执行时,它会检查当前的计数器值,如果小于设定的并发限制数,则立即执行操作并将计数器加一。如果计数器已达到并发限制数,则将操作加入等待队列,直到有空闲的位置。

p-limit提供了以下功能和特点:

  1. 简单易用:使用p-limit非常简单,只需创建一个限制对象,并将需要执行的异步操作包装成一个函数进行调用即可。
  2. 并发限制:通过设置并发限制数,可以控制同时执行的操作数量,以避免过度占用系统资源。
  3. 异步操作支持:p-limit适用于异步操作,可以是Promise对象、回调函数或任何需要异步执行的操作。
  4. 队列管理:当并发限制数已满时,p-limit会自动将操作加入等待队列,并在有空闲位置时按照先进先出的顺序执行等待的操作。
  5. 可以清空队列:p-limit还提供了清空队列的方法,可以在需要时立即取消所有等待执行的操作。

使用p-limit可以很方便地控制并发操作的数量,特别适用于需要限制资源消耗或避免性能问题的场景,例如网络请求、文件操作、数据库查询等。

使用p-limit非常简单,以下是p-limit的基本用法:

创建一个限制对象,指定并发限制数:

const limit = pLimit(2); // 限制同时执行的操作数量为3

将需要执行的异步操作包装成一个函数,并调用限制对象的函数来执行操作:

const asyncTask = (id) => {return limit(() => {return new Promise((resolve) => {console.log(`Start task ${id}`);setTimeout(() => {console.log(`End task ${id}`);resolve();}, 1000);});});
};asyncTask(1);
asyncTask(2);
asyncTask(3);

在上面的示例中,我们创建了一个 p-limit 实例,并将并发限制设置为 2。然后,我们定义了一个异步任务 asyncTask,通过 limit 方法包装这个任务,确保最多同时执行 2 个任务。最后,我们按顺序调用 asyncTask,可以看到只有同时运行的任务数不超过 2

此外,p-limit还提供了其他一些功能和方法,例如:

  • 获取当前正在执行的操作数量:limit.activeCount可以获取当前正在执行的操作数量。
  • 获取等待执行的操作数量:limit.pendingCount可以获取等待执行的操作数量。
  • 清空等待队列:limit.clearQueue()可以清空等待队列,取消所有等待执行的操作。

这些功能可以帮助我们更好地管理并发操作的执行和控制。

源码解读

import Queue from 'yocto-queue';export default function pLimit(concurrency) {// 检查并发数concurrency是否为正整数或正无穷大,并且大于0if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {throw new TypeError('Expected `concurrency` to be a number from 1 and up');}// 创建一个队列实例const queue = new Queue();// 当前正在执行的任务数量let activeCount = 0;return generator;
}

这段代码实现了一个 pLimit 函数,用于限制并发执行异步操作的数量。首先,代码导入了一个名为 yocto-queue 的链表队列库(详细请看:源码阅读:yocto-queue),并将其命名为 Queue,然后导出了一个名为 pLimit 的函数。pLimit 函数接受一个参数 concurrency,表示允许同时执行的异步操作数量。接下来,代码进行了一系列的校验。它判断 concurrency 是否为一个大于零的整数,或者是否为正无穷大。如果不满足这些条件,将抛出一个类型错误。然后,代码创建了一个队列实例 queue,用于存储需要执行的异步操作。同时,定义了一个变量 activeCount,用于记录当前正在执行的异步操作数量。最后,返回了 generator 函数作为 pLimit 函数的结果。

接下来我们来看看 generator 函数:

// 生成器函数,用于创建一个Promise,将任务入队
const generator = (fn, ...args) => new Promise(resolve => {enqueue(fn, resolve, args);
});

生成器函数 generator用于创建一个新的 Promise,并在其内部调用 enqueue 函数,并将异步操作的结果通过 resolve 函数传递出去。

紧接着我们来看一下 enqueue 函数:

// 入队任务的函数
const enqueue = (fn, resolve, args) => {// 将任务函数fn和resolve函数作为参数传递给run函数,并将run函数入队queue.enqueue(run.bind(undefined, fn, resolve, args));// 异步执行以下代码块(async () => {// 这个函数需要在下一个微任务中执行,以便在比较`activeCount`和`concurrency`之前等待// `activeCount`在任务函数出队并调用时会异步更新。if语句中的比较也需要异步执行,以获取`activeCount`的最新值。await Promise.resolve();// 如果当前正在执行的任务数量小于并发数并且队列中还有任务,则出队并执行if (activeCount < concurrency && queue.size > 0) {queue.dequeue()();}})();
};

函数 enqueue用于将异步函数包装成一个新的异步函数,并将其加入队列中。在加入队列之后,通过一个立即执行的异步函数,检查是否可以从队列中取出并执行下一个操作。这里需要通过 Promise.resolve() 来等待下一个微任务,以确保 activeCountconcurrency 的比较是在队列中的异步操作被执行之后进行的。

接下来我们来看一下入队的 run 函数:

// 执行任务的函数
const run = async (fn, resolve, args) => {activeCount++;// 执行任务函数fn,并将结果保存到result中const result = (async () => fn(...args))();// 将结果resolve出去resolve(result);try {await result;} catch {}// 执行完任务后调用next函数,继续执行下一个任务next();
};

异步函数 run用于执行传入的异步函数,并在异步操作完成后执行 next 函数。在执行异步函数之前,将 activeCount 加一,并将异步函数的返回结果通过 resolve 函数传递出去。在 try...catch 语句中,捕获异步函数可能抛出的错误,并在最后调用 next 函数。

接下来看看 next 函数:

// 下一个任务的处理函数
const next = () => {activeCount--;// 如果队列中还有任务,则出队并执行if (queue.size > 0) {queue.dequeue()();}
};

函数 next,用于执行队列中的下一个异步操作。它将 activeCount 减一,并检查队列中是否还有待执行的操作。如果有,就从队列中取出一个操作并执行。

// 使用Object.defineProperties定义generator对象的属性
Object.defineProperties(generator, {activeCount: {// activeCount属性的getter函数,返回当前正在执行的任务数量get: () => activeCount,},pendingCount: {// pendingCount属性的getter函数,返回队列中等待执行的任务数量get: () => queue.size,},clearQueue: {// clearQueue方法,用于清空队列value: () => {queue.clear();},},
});

最后,通过 Object.defineProperties 方法,给 generator 函数添加了一些属性。activeCount 属性返回当前正在执行的异步操作数量,pendingCount 属性返回队列中待执行的操作数量,clearQueue 方法用于清空队列。

该函数的核心思想是使用一个队列来管理并发执行的异步操作,通过控制队列中的操作数和异步操作的完成情况,实现了对并发数量的限制。

/* eslint-disable @typescript-eslint/member-ordering */export interface LimitFunction {/**The number of promises that are currently running.*/readonly activeCount: number;/**The number of promises that are waiting to run (i.e. their internal `fn` was not called yet).*/readonly pendingCount: number;/**Discard pending promises that are waiting to run.This might be useful if you want to teardown the queue at the end of your program's lifecycle or discard any function calls referencing an intermediary state of your app.Note: This does not cancel promises that are already running.*/clearQueue: () => void;/**@param fn - Promise-returning/async function.@param arguments - Any arguments to pass through to `fn`. Support for passing arguments on to the `fn` is provided in order to be able to avoid creating unnecessary closures. You probably don't need this optimization unless you're pushing a lot of functions.@returns The promise returned by calling `fn(...arguments)`.*/<Arguments extends unknown[], ReturnType>(fn: (...arguments: Arguments) => PromiseLike<ReturnType> | ReturnType,...arguments: Arguments): Promise<ReturnType>;
}/**
Run multiple promise-returning & async functions with limited concurrency.@param concurrency - Concurrency limit. Minimum: `1`.
@returns A `limit` function.
*/
export default function pLimit(concurrency: number): LimitFunction;

export interface LimitFunction 是一个接口定义,它描述了一个具有特定属性和方法的对象。

  • activeCount 是一个只读属性,表示当前正在运行的Promise的数量。
  • pendingCount 是一个只读属性,表示等待运行的Promise的数量(其内部的fn尚未被调用的数量)。
  • clearQueue 是一个方法,用于丢弃等待运行的Promise。这在程序生命周期的末尾或丢弃引用应用程序的中间状态的函数调用时可能会有用。注意:这不会取消已经在运行的Promise。
  • pLimit 是一个默认导出的函数,它接受一个表示并发限制的concurrency参数,并返回一个实现了LimitFunction接口的对象。
<Arguments extends unknown[], ReturnType>(fn: (...arguments: Arguments) => PromiseLike<ReturnType> | ReturnType,...arguments: Arguments
): Promise<ReturnType>;

这个类型声明描述了一个函数声明,该函数接受一个函数作为第一个参数,该函数接受一个参数数组并返回一个PromisePromiseLike对象,或者直接返回一个值。函数的剩余参数是一个参数数组,最后该函数返回一个PromisePromiseresolved值的类型与ReturnType相同。

  • <Arguments extends unknown[], ReturnType>:这是一个泛型参数声明,其中Arguments表示参数数组的类型,ReturnType表示返回值的类型。
  • (fn: (...arguments: Arguments) => PromiseLike<ReturnType> | ReturnType, ...arguments: Arguments) => Promise<ReturnType>:这是一个函数声明,它接受两个参数。第一个参数fn是一个函数,它接受Arguments类型的参数数组并返回一个PromisePromiseLike对象,或者直接返回ReturnType类型的值。第二个参数arguments是一个剩余参数,它接受Arguments类型的参数数组。
  • : Promise<ReturnType>:这是函数的返回类型,表示该函数返回一个Promise对象,并且Promiseresolved值的类型为ReturnType。也就是说,该函数执行后将返回一个PromisePromiseresolved值的类型将与ReturnType相同。

完整源码:

import Queue from 'yocto-queue';export default function pLimit(concurrency) {// 检查并发数concurrency是否为正整数或正无穷大,并且大于0if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {throw new TypeError('Expected `concurrency` to be a number from 1 and up');}// 创建一个队列实例const queue = new Queue();// 当前正在执行的任务数量let activeCount = 0;// 下一个任务的处理函数const next = () => {activeCount--;// 如果队列中还有任务,则出队并执行if (queue.size > 0) {queue.dequeue()();}};// 执行任务的函数const run = async (fn, resolve, args) => {activeCount++;// 执行任务函数fn,并将结果保存到result中const result = (async () => fn(...args))();// 将结果resolve出去resolve(result);try {await result;} catch {}// 执行完任务后调用next函数,继续执行下一个任务next();};// 入队任务的函数const enqueue = (fn, resolve, args) => {// 将任务函数fn和resolve函数作为参数传递给run函数,并将run函数入队queue.enqueue(run.bind(undefined, fn, resolve, args));// 异步执行以下代码块(async () => {// 这个函数需要在下一个微任务中执行,以便在比较`activeCount`和`concurrency`之前等待// `activeCount`在任务函数出队并调用时会异步更新。if语句中的比较也需要异步执行,以获取`activeCount`的最新值。await Promise.resolve();// 如果当前正在执行的任务数量小于并发数并且队列中还有任务,则出队并执行if (activeCount < concurrency && queue.size > 0) {queue.dequeue()();}})();};// 生成器函数,用于创建一个Promise,将任务入队const generator = (fn, ...args) => new Promise(resolve => {enqueue(fn, resolve, args);});// 使用Object.defineProperties定义generator对象的属性Object.defineProperties(generator, {activeCount: {// activeCount属性的getter函数,返回当前正在执行的任务数量get: () => activeCount,},pendingCount: {// pendingCount属性的getter函数,返回队列中等待执行的任务数量get: () => queue.size,},clearQueue: {// clearQueue方法,用于清空队列value: () => {queue.clear();},},});return generator;
}

学习与收获

从这段代码中,我们可以学到以下几点:

  1. 使用第三方库 yocto-queue 来管理队列。该库提供了队列的常用操作方法,如入队、出队等。
  2. 使用 Number.isInteger() 方法来检查一个值是否为整数。
  3. 使用 Number.POSITIVE_INFINITY 来表示正无穷大。
  4. 抛出类型错误 TypeError,并提供错误消息。
  5. 使用 async/await 来处理异步操作。在这里,run 函数中的异步函数会被执行,并且在异步操作完成后调用 next 函数。
  6. 使用 await Promise.resolve() 来等待获取下一个微任务,并在微任务中进行异步操作。
  7. 使用 Object.defineProperties() 方法为一个对象添加多个属性并劫持。

总的来说,这段代码展示了如何使用队列和异步操作来限制并发数量,并提供了一些属性和方法来管理队列的状态。

相关文章:

源码阅读:p-limit

源码阅读&#xff1a;p-limit 源码阅读&#xff1a;p-limit简介源码解读学习与收获 源码阅读&#xff1a;p-limit 简介 p-limit是一个用于限制并发操作的包&#xff0c;它可以控制同时执行的异步操作数量。它提供了一种简单的方式来管理并发操作&#xff0c;以避免系统资源过…...

目标检测-击穿黑夜的PE-YOLO

前言 当前的目标检测模型在许多基准数据集上取得了良好的结果&#xff0c;但在暗光条件下检测目标仍然是一个巨大的挑战。为了解决这个问题&#xff0c;作者提出了金字塔增强网络&#xff08;PENet&#xff09;并将其与YOLOv3结合&#xff0c;构建了一个名为PE-YOLO的暗光目标检…...

优化性能压力测试的关键策略和技巧

在现代软件开发中&#xff0c;性能压力测试是不可或缺的一环。它可以帮助开发团队评估系统在负载压力下的性能表现&#xff0c;识别潜在的性能瓶颈&#xff0c;并采取适当的措施进行优化。然而&#xff0c;仅仅进行性能压力测试是不够的&#xff0c;关键的在于如何优化测试的过…...

VMware Linux 可视化增加磁盘

1、VMware 增加磁盘 2、disks挂载磁盘 此处我挂载的是20G磁盘&#xff0c;截图只是用5G的做过程演示例子。 3、验证挂载磁盘...

从 axios 源码学习设计模式

文章目录 一、源码分析1.1 axios 为什么可以多种方式调用1.2 拦截器实现注册使用&#xff1a;promise链式调用 二、从 axios 看设计模式axios 的精髓在哪2.1 抽象工厂axios.create -- 创建新实例的工厂 2.2 微内核设计2.3 适配器思想2.4 责任链模式2.5 桥接模式举例&#xff1a…...

输出不同程序执行的时间

简单的测试工具代码&#xff0c;它可以输出不同程序执行的时间。我们可以使用Python的time模块来实现这个功能。 import timedef test_function(func, *args, **kwargs):"""测试函数执行时间的工具函数:param func: 待测试的函数:param *args: 函数的位置参数:…...

HDU 6391 组合数学 + DP

题意 传送门 HDU 6391 Lord Li’s problem 题解 仅考虑 S i ≠ T i S_i\neq T_i Si​Ti​ 的数量 m m m&#xff0c;最后答案除以 ( n m ) \binom{n}{m} (mn​) 即可。考虑 X X X 的排列&#xff0c;最后答案除以 k ! k! k! 即可。 d p [ i 1 ] [ j ] dp[i1][j] dp[…...

StopWatch与ThreadLocal

目录 1、StopWatch 1、1作用&#xff1a; 1、2方法&#xff1a; 1、3使用方法 2、ThreadLocal 2、1什么是ThreadLocal 2、2简单例子 2、3使用ThreadLocal带来的四个好处 2、4主要方法 2、5ThreadLocal内存泄漏问题 1、StopWatch 1、1作用&#xff1a; 统计代码块耗时时…...

20. 有效的括号

给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。 左括号必须以正确的顺序闭合。 每个右括号都有一个对应的相同类型的左括…...

微信小程序原生写法传递参数

微信小程序原生写法传递参数 data-xxx 自定义参数名 &#xff0c;接收参数&#xff1a;方法&#xff08;变量名&#xff09; checkVip:function(event) {let that thisconsole.log(event,event)console.log(event.currentTarget.dataset.idx,index)let index Number(eve…...

JavaWeb+jsp+Tomcat的教务查询系统

点击以下链接获取源码&#xff1a; https://download.csdn.net/download/qq_64505944/88134601?spm1001.2014.3001.5503 jsp/tomcat7.05/MySQL5.7或8版本/ssm框架/spring/ Web框架&#xff1a;SpringBoot/ORM框架&#xff1a;Mybatis/安全框架&#xff1a;Shiro/分页插件&am…...

C# FTP下载 采用Ssh.Net方式

不要再用FTPClient了 nuget下载Ssh.Net 然后代码如下&#xff1a; /// <summary>/// SFTP操作类/// </summary>public class SFTPHelper{#region 字段或属性private SftpClient sftp;/// <summary>/// SFTP连接状态/// </summary>public bool Conne…...

【C++】做一个飞机空战小游戏(三)——模块化程序设计

[导读]本系列博文内容链接如下&#xff1a; 【C】做一个飞机空战小游戏(一)——使用getch()函数获得键盘码值 【C】做一个飞机空战小游戏(二)——利用getch()函数实现键盘控制单个字符移动【C】做一个飞机空战小游戏(三)——模块化程设设计 在前两讲当中&#xff0c;介绍了利用…...

Django使用WebSocket

1、websocket 相关 实现一个系统&#xff0c;20 个用户同时打开网站&#xff0c;呈现出来一个群聊界面 解决方案 轮询&#xff1a;让浏览器每隔2s向后台发送一次请求&#xff0c;缺点&#xff1a;延迟&#xff0c;请求太多网站压力大 长轮询&#xff1a;客户端向服务端发送请…...

看完这篇 教你玩转渗透测试靶机Vulnhub——HarryPotter:Nagini

Vulnhub靶机HarryPotter:Nagini渗透测试详解 Vulnhub靶机介绍&#xff1a;Vulnhub靶机下载&#xff1a;Vulnhub靶机安装&#xff1a;Vulnhub靶机漏洞详解&#xff1a;①&#xff1a;信息收集&#xff1a;②&#xff1a;漏洞发现&#xff1a;③&#xff1a;SSRF漏洞利用&#xf…...

IPO要收紧?业内人士未予以完全确认

“IPO全面收紧、吃穿住等行业标的基本劝退&#xff08;除非行业龙头&#xff09;、科创板第五套标准暂停受理……”在上周末&#xff0c;一篇关于IPO收紧的“小作文”在投行圈内疯狂转发。 距离全面注册制正式实施已过去了5个半月&#xff0c;IPO节奏是否在发生较大变化&#…...

stable difussion Pytorch实现与测试

引言: Stable Diffusion是目前最火的AI绘画工具之一,它是一个免费开源的项目,可以被任何人免费部署和使用。通过Stable Diffusion,可以很轻松的通过文字描述,生成对应的图片。由于它是一个开源项目,开源社区(如:GitHub)中有很多插件和训练好的模型,我们可以直接使用。…...

Redis简述

Redis是什么Redis数据类型Redis应用场景缓存计数器分布式会话排行榜最新列表分布式锁消息队列 Redis出现的问题穿透击穿雪崩 Redis为什么速度快 Redis是什么 redis是一种高速缓存数据库 Redis数据类型 string hash list set zset Redis应用场景 缓存 Redis作为缓存层&…...

Redis 操作List

【分布式】Redis 分布式之List_redissonclient.getlist_比嗨皮兔的博客-CSDN博客 说明 配置文件参考&#xff1a;https://blog.csdn.net/qq_38428623/article/details/123217001?utm_sourceapp&app_version5.1.1&codeapp_1562916241&uLinkIdusr1mkqgl919blen ——…...

多个List 合并变成一个List+一个List 根据某个字段相等的另一个字段相加,并排序变成新的List

List<CurveTimeAndValueDomain> curves new ArrayList<>();for (int i 0; i < columnNames.size(); i){if (columnNames.get(i).equals(PlantConstant.TENDOWNFX) || columnNames.get(i).equals(PlantConstant.TENDOWNQP)) {//10千伏以下 数据 进行缓慢处理cu…...

国防科技大学计算机基础课程笔记02信息编码

1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制&#xff0c;因此这个了16进制的数据既可以翻译成为这个机器码&#xff0c;也可以翻译成为这个国标码&#xff0c;所以这个时候很容易会出现这个歧义的情况&#xff1b; 因此&#xff0c;我们的这个国…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

【Java学习笔记】Arrays类

Arrays 类 1. 导入包&#xff1a;import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序&#xff08;自然排序和定制排序&#xff09;Arrays.binarySearch()通过二分搜索法进行查找&#xff08;前提&#xff1a;数组是…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)

目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关&#xff0…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来&#xff0c;在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂&#xff0c;网络攻击、数据泄露等事件频发&#xff0c;现行法律已难以完全适应新的风险挑战。 2025年3月28日&#xff0c;国家网信办会同相关部门起草了《网络安全…...

【Linux系统】Linux环境变量:系统配置的隐形指挥官

。# Linux系列 文章目录 前言一、环境变量的概念二、常见的环境变量三、环境变量特点及其相关指令3.1 环境变量的全局性3.2、环境变量的生命周期 四、环境变量的组织方式五、C语言对环境变量的操作5.1 设置环境变量&#xff1a;setenv5.2 删除环境变量:unsetenv5.3 遍历所有环境…...