当前位置: 首页 > news >正文

源码阅读:p-limit

源码阅读:p-limit

  • 源码阅读:p-limit
    • 简介
    • 源码解读
    • 学习与收获

源码阅读:p-limit

简介

p-limit是一个用于限制并发操作的包,它可以控制同时执行的异步操作数量。它提供了一种简单的方式来管理并发操作,以避免系统资源过度占用和性能下降。

p-limit的工作原理是使用一个计数器来跟踪当前正在执行的操作数量。当有新的操作需要执行时,它会检查当前的计数器值,如果小于设定的并发限制数,则立即执行操作并将计数器加一。如果计数器已达到并发限制数,则将操作加入等待队列,直到有空闲的位置。

p-limit提供了以下功能和特点:

  1. 简单易用:使用p-limit非常简单,只需创建一个限制对象,并将需要执行的异步操作包装成一个函数进行调用即可。
  2. 并发限制:通过设置并发限制数,可以控制同时执行的操作数量,以避免过度占用系统资源。
  3. 异步操作支持:p-limit适用于异步操作,可以是Promise对象、回调函数或任何需要异步执行的操作。
  4. 队列管理:当并发限制数已满时,p-limit会自动将操作加入等待队列,并在有空闲位置时按照先进先出的顺序执行等待的操作。
  5. 可以清空队列:p-limit还提供了清空队列的方法,可以在需要时立即取消所有等待执行的操作。

使用p-limit可以很方便地控制并发操作的数量,特别适用于需要限制资源消耗或避免性能问题的场景,例如网络请求、文件操作、数据库查询等。

使用p-limit非常简单,以下是p-limit的基本用法:

创建一个限制对象,指定并发限制数:

const limit = pLimit(2); // 限制同时执行的操作数量为3

将需要执行的异步操作包装成一个函数,并调用限制对象的函数来执行操作:

const asyncTask = (id) => {return limit(() => {return new Promise((resolve) => {console.log(`Start task ${id}`);setTimeout(() => {console.log(`End task ${id}`);resolve();}, 1000);});});
};asyncTask(1);
asyncTask(2);
asyncTask(3);

在上面的示例中,我们创建了一个 p-limit 实例,并将并发限制设置为 2。然后,我们定义了一个异步任务 asyncTask,通过 limit 方法包装这个任务,确保最多同时执行 2 个任务。最后,我们按顺序调用 asyncTask,可以看到只有同时运行的任务数不超过 2

此外,p-limit还提供了其他一些功能和方法,例如:

  • 获取当前正在执行的操作数量:limit.activeCount可以获取当前正在执行的操作数量。
  • 获取等待执行的操作数量:limit.pendingCount可以获取等待执行的操作数量。
  • 清空等待队列:limit.clearQueue()可以清空等待队列,取消所有等待执行的操作。

这些功能可以帮助我们更好地管理并发操作的执行和控制。

源码解读

import Queue from 'yocto-queue';export default function pLimit(concurrency) {// 检查并发数concurrency是否为正整数或正无穷大,并且大于0if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {throw new TypeError('Expected `concurrency` to be a number from 1 and up');}// 创建一个队列实例const queue = new Queue();// 当前正在执行的任务数量let activeCount = 0;return generator;
}

这段代码实现了一个 pLimit 函数,用于限制并发执行异步操作的数量。首先,代码导入了一个名为 yocto-queue 的链表队列库(详细请看:源码阅读:yocto-queue),并将其命名为 Queue,然后导出了一个名为 pLimit 的函数。pLimit 函数接受一个参数 concurrency,表示允许同时执行的异步操作数量。接下来,代码进行了一系列的校验。它判断 concurrency 是否为一个大于零的整数,或者是否为正无穷大。如果不满足这些条件,将抛出一个类型错误。然后,代码创建了一个队列实例 queue,用于存储需要执行的异步操作。同时,定义了一个变量 activeCount,用于记录当前正在执行的异步操作数量。最后,返回了 generator 函数作为 pLimit 函数的结果。

接下来我们来看看 generator 函数:

// 生成器函数,用于创建一个Promise,将任务入队
const generator = (fn, ...args) => new Promise(resolve => {enqueue(fn, resolve, args);
});

生成器函数 generator用于创建一个新的 Promise,并在其内部调用 enqueue 函数,并将异步操作的结果通过 resolve 函数传递出去。

紧接着我们来看一下 enqueue 函数:

// 入队任务的函数
const enqueue = (fn, resolve, args) => {// 将任务函数fn和resolve函数作为参数传递给run函数,并将run函数入队queue.enqueue(run.bind(undefined, fn, resolve, args));// 异步执行以下代码块(async () => {// 这个函数需要在下一个微任务中执行,以便在比较`activeCount`和`concurrency`之前等待// `activeCount`在任务函数出队并调用时会异步更新。if语句中的比较也需要异步执行,以获取`activeCount`的最新值。await Promise.resolve();// 如果当前正在执行的任务数量小于并发数并且队列中还有任务,则出队并执行if (activeCount < concurrency && queue.size > 0) {queue.dequeue()();}})();
};

函数 enqueue用于将异步函数包装成一个新的异步函数,并将其加入队列中。在加入队列之后,通过一个立即执行的异步函数,检查是否可以从队列中取出并执行下一个操作。这里需要通过 Promise.resolve() 来等待下一个微任务,以确保 activeCountconcurrency 的比较是在队列中的异步操作被执行之后进行的。

接下来我们来看一下入队的 run 函数:

// 执行任务的函数
const run = async (fn, resolve, args) => {activeCount++;// 执行任务函数fn,并将结果保存到result中const result = (async () => fn(...args))();// 将结果resolve出去resolve(result);try {await result;} catch {}// 执行完任务后调用next函数,继续执行下一个任务next();
};

异步函数 run用于执行传入的异步函数,并在异步操作完成后执行 next 函数。在执行异步函数之前,将 activeCount 加一,并将异步函数的返回结果通过 resolve 函数传递出去。在 try...catch 语句中,捕获异步函数可能抛出的错误,并在最后调用 next 函数。

接下来看看 next 函数:

// 下一个任务的处理函数
const next = () => {activeCount--;// 如果队列中还有任务,则出队并执行if (queue.size > 0) {queue.dequeue()();}
};

函数 next,用于执行队列中的下一个异步操作。它将 activeCount 减一,并检查队列中是否还有待执行的操作。如果有,就从队列中取出一个操作并执行。

// 使用Object.defineProperties定义generator对象的属性
Object.defineProperties(generator, {activeCount: {// activeCount属性的getter函数,返回当前正在执行的任务数量get: () => activeCount,},pendingCount: {// pendingCount属性的getter函数,返回队列中等待执行的任务数量get: () => queue.size,},clearQueue: {// clearQueue方法,用于清空队列value: () => {queue.clear();},},
});

最后,通过 Object.defineProperties 方法,给 generator 函数添加了一些属性。activeCount 属性返回当前正在执行的异步操作数量,pendingCount 属性返回队列中待执行的操作数量,clearQueue 方法用于清空队列。

该函数的核心思想是使用一个队列来管理并发执行的异步操作,通过控制队列中的操作数和异步操作的完成情况,实现了对并发数量的限制。

/* eslint-disable @typescript-eslint/member-ordering */export interface LimitFunction {/**The number of promises that are currently running.*/readonly activeCount: number;/**The number of promises that are waiting to run (i.e. their internal `fn` was not called yet).*/readonly pendingCount: number;/**Discard pending promises that are waiting to run.This might be useful if you want to teardown the queue at the end of your program's lifecycle or discard any function calls referencing an intermediary state of your app.Note: This does not cancel promises that are already running.*/clearQueue: () => void;/**@param fn - Promise-returning/async function.@param arguments - Any arguments to pass through to `fn`. Support for passing arguments on to the `fn` is provided in order to be able to avoid creating unnecessary closures. You probably don't need this optimization unless you're pushing a lot of functions.@returns The promise returned by calling `fn(...arguments)`.*/<Arguments extends unknown[], ReturnType>(fn: (...arguments: Arguments) => PromiseLike<ReturnType> | ReturnType,...arguments: Arguments): Promise<ReturnType>;
}/**
Run multiple promise-returning & async functions with limited concurrency.@param concurrency - Concurrency limit. Minimum: `1`.
@returns A `limit` function.
*/
export default function pLimit(concurrency: number): LimitFunction;

export interface LimitFunction 是一个接口定义,它描述了一个具有特定属性和方法的对象。

  • activeCount 是一个只读属性,表示当前正在运行的Promise的数量。
  • pendingCount 是一个只读属性,表示等待运行的Promise的数量(其内部的fn尚未被调用的数量)。
  • clearQueue 是一个方法,用于丢弃等待运行的Promise。这在程序生命周期的末尾或丢弃引用应用程序的中间状态的函数调用时可能会有用。注意:这不会取消已经在运行的Promise。
  • pLimit 是一个默认导出的函数,它接受一个表示并发限制的concurrency参数,并返回一个实现了LimitFunction接口的对象。
<Arguments extends unknown[], ReturnType>(fn: (...arguments: Arguments) => PromiseLike<ReturnType> | ReturnType,...arguments: Arguments
): Promise<ReturnType>;

这个类型声明描述了一个函数声明,该函数接受一个函数作为第一个参数,该函数接受一个参数数组并返回一个PromisePromiseLike对象,或者直接返回一个值。函数的剩余参数是一个参数数组,最后该函数返回一个PromisePromiseresolved值的类型与ReturnType相同。

  • <Arguments extends unknown[], ReturnType>:这是一个泛型参数声明,其中Arguments表示参数数组的类型,ReturnType表示返回值的类型。
  • (fn: (...arguments: Arguments) => PromiseLike<ReturnType> | ReturnType, ...arguments: Arguments) => Promise<ReturnType>:这是一个函数声明,它接受两个参数。第一个参数fn是一个函数,它接受Arguments类型的参数数组并返回一个PromisePromiseLike对象,或者直接返回ReturnType类型的值。第二个参数arguments是一个剩余参数,它接受Arguments类型的参数数组。
  • : Promise<ReturnType>:这是函数的返回类型,表示该函数返回一个Promise对象,并且Promiseresolved值的类型为ReturnType。也就是说,该函数执行后将返回一个PromisePromiseresolved值的类型将与ReturnType相同。

完整源码:

import Queue from 'yocto-queue';export default function pLimit(concurrency) {// 检查并发数concurrency是否为正整数或正无穷大,并且大于0if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {throw new TypeError('Expected `concurrency` to be a number from 1 and up');}// 创建一个队列实例const queue = new Queue();// 当前正在执行的任务数量let activeCount = 0;// 下一个任务的处理函数const next = () => {activeCount--;// 如果队列中还有任务,则出队并执行if (queue.size > 0) {queue.dequeue()();}};// 执行任务的函数const run = async (fn, resolve, args) => {activeCount++;// 执行任务函数fn,并将结果保存到result中const result = (async () => fn(...args))();// 将结果resolve出去resolve(result);try {await result;} catch {}// 执行完任务后调用next函数,继续执行下一个任务next();};// 入队任务的函数const enqueue = (fn, resolve, args) => {// 将任务函数fn和resolve函数作为参数传递给run函数,并将run函数入队queue.enqueue(run.bind(undefined, fn, resolve, args));// 异步执行以下代码块(async () => {// 这个函数需要在下一个微任务中执行,以便在比较`activeCount`和`concurrency`之前等待// `activeCount`在任务函数出队并调用时会异步更新。if语句中的比较也需要异步执行,以获取`activeCount`的最新值。await Promise.resolve();// 如果当前正在执行的任务数量小于并发数并且队列中还有任务,则出队并执行if (activeCount < concurrency && queue.size > 0) {queue.dequeue()();}})();};// 生成器函数,用于创建一个Promise,将任务入队const generator = (fn, ...args) => new Promise(resolve => {enqueue(fn, resolve, args);});// 使用Object.defineProperties定义generator对象的属性Object.defineProperties(generator, {activeCount: {// activeCount属性的getter函数,返回当前正在执行的任务数量get: () => activeCount,},pendingCount: {// pendingCount属性的getter函数,返回队列中等待执行的任务数量get: () => queue.size,},clearQueue: {// clearQueue方法,用于清空队列value: () => {queue.clear();},},});return generator;
}

学习与收获

从这段代码中,我们可以学到以下几点:

  1. 使用第三方库 yocto-queue 来管理队列。该库提供了队列的常用操作方法,如入队、出队等。
  2. 使用 Number.isInteger() 方法来检查一个值是否为整数。
  3. 使用 Number.POSITIVE_INFINITY 来表示正无穷大。
  4. 抛出类型错误 TypeError,并提供错误消息。
  5. 使用 async/await 来处理异步操作。在这里,run 函数中的异步函数会被执行,并且在异步操作完成后调用 next 函数。
  6. 使用 await Promise.resolve() 来等待获取下一个微任务,并在微任务中进行异步操作。
  7. 使用 Object.defineProperties() 方法为一个对象添加多个属性并劫持。

总的来说,这段代码展示了如何使用队列和异步操作来限制并发数量,并提供了一些属性和方法来管理队列的状态。

相关文章:

源码阅读:p-limit

源码阅读&#xff1a;p-limit 源码阅读&#xff1a;p-limit简介源码解读学习与收获 源码阅读&#xff1a;p-limit 简介 p-limit是一个用于限制并发操作的包&#xff0c;它可以控制同时执行的异步操作数量。它提供了一种简单的方式来管理并发操作&#xff0c;以避免系统资源过…...

目标检测-击穿黑夜的PE-YOLO

前言 当前的目标检测模型在许多基准数据集上取得了良好的结果&#xff0c;但在暗光条件下检测目标仍然是一个巨大的挑战。为了解决这个问题&#xff0c;作者提出了金字塔增强网络&#xff08;PENet&#xff09;并将其与YOLOv3结合&#xff0c;构建了一个名为PE-YOLO的暗光目标检…...

优化性能压力测试的关键策略和技巧

在现代软件开发中&#xff0c;性能压力测试是不可或缺的一环。它可以帮助开发团队评估系统在负载压力下的性能表现&#xff0c;识别潜在的性能瓶颈&#xff0c;并采取适当的措施进行优化。然而&#xff0c;仅仅进行性能压力测试是不够的&#xff0c;关键的在于如何优化测试的过…...

VMware Linux 可视化增加磁盘

1、VMware 增加磁盘 2、disks挂载磁盘 此处我挂载的是20G磁盘&#xff0c;截图只是用5G的做过程演示例子。 3、验证挂载磁盘...

从 axios 源码学习设计模式

文章目录 一、源码分析1.1 axios 为什么可以多种方式调用1.2 拦截器实现注册使用&#xff1a;promise链式调用 二、从 axios 看设计模式axios 的精髓在哪2.1 抽象工厂axios.create -- 创建新实例的工厂 2.2 微内核设计2.3 适配器思想2.4 责任链模式2.5 桥接模式举例&#xff1a…...

输出不同程序执行的时间

简单的测试工具代码&#xff0c;它可以输出不同程序执行的时间。我们可以使用Python的time模块来实现这个功能。 import timedef test_function(func, *args, **kwargs):"""测试函数执行时间的工具函数:param func: 待测试的函数:param *args: 函数的位置参数:…...

HDU 6391 组合数学 + DP

题意 传送门 HDU 6391 Lord Li’s problem 题解 仅考虑 S i ≠ T i S_i\neq T_i Si​Ti​ 的数量 m m m&#xff0c;最后答案除以 ( n m ) \binom{n}{m} (mn​) 即可。考虑 X X X 的排列&#xff0c;最后答案除以 k ! k! k! 即可。 d p [ i 1 ] [ j ] dp[i1][j] dp[…...

StopWatch与ThreadLocal

目录 1、StopWatch 1、1作用&#xff1a; 1、2方法&#xff1a; 1、3使用方法 2、ThreadLocal 2、1什么是ThreadLocal 2、2简单例子 2、3使用ThreadLocal带来的四个好处 2、4主要方法 2、5ThreadLocal内存泄漏问题 1、StopWatch 1、1作用&#xff1a; 统计代码块耗时时…...

20. 有效的括号

给定一个只包括 (&#xff0c;)&#xff0c;{&#xff0c;}&#xff0c;[&#xff0c;] 的字符串 s &#xff0c;判断字符串是否有效。 有效字符串需满足&#xff1a; 左括号必须用相同类型的右括号闭合。 左括号必须以正确的顺序闭合。 每个右括号都有一个对应的相同类型的左括…...

微信小程序原生写法传递参数

微信小程序原生写法传递参数 data-xxx 自定义参数名 &#xff0c;接收参数&#xff1a;方法&#xff08;变量名&#xff09; checkVip:function(event) {let that thisconsole.log(event,event)console.log(event.currentTarget.dataset.idx,index)let index Number(eve…...

JavaWeb+jsp+Tomcat的教务查询系统

点击以下链接获取源码&#xff1a; https://download.csdn.net/download/qq_64505944/88134601?spm1001.2014.3001.5503 jsp/tomcat7.05/MySQL5.7或8版本/ssm框架/spring/ Web框架&#xff1a;SpringBoot/ORM框架&#xff1a;Mybatis/安全框架&#xff1a;Shiro/分页插件&am…...

C# FTP下载 采用Ssh.Net方式

不要再用FTPClient了 nuget下载Ssh.Net 然后代码如下&#xff1a; /// <summary>/// SFTP操作类/// </summary>public class SFTPHelper{#region 字段或属性private SftpClient sftp;/// <summary>/// SFTP连接状态/// </summary>public bool Conne…...

【C++】做一个飞机空战小游戏(三)——模块化程序设计

[导读]本系列博文内容链接如下&#xff1a; 【C】做一个飞机空战小游戏(一)——使用getch()函数获得键盘码值 【C】做一个飞机空战小游戏(二)——利用getch()函数实现键盘控制单个字符移动【C】做一个飞机空战小游戏(三)——模块化程设设计 在前两讲当中&#xff0c;介绍了利用…...

Django使用WebSocket

1、websocket 相关 实现一个系统&#xff0c;20 个用户同时打开网站&#xff0c;呈现出来一个群聊界面 解决方案 轮询&#xff1a;让浏览器每隔2s向后台发送一次请求&#xff0c;缺点&#xff1a;延迟&#xff0c;请求太多网站压力大 长轮询&#xff1a;客户端向服务端发送请…...

看完这篇 教你玩转渗透测试靶机Vulnhub——HarryPotter:Nagini

Vulnhub靶机HarryPotter:Nagini渗透测试详解 Vulnhub靶机介绍&#xff1a;Vulnhub靶机下载&#xff1a;Vulnhub靶机安装&#xff1a;Vulnhub靶机漏洞详解&#xff1a;①&#xff1a;信息收集&#xff1a;②&#xff1a;漏洞发现&#xff1a;③&#xff1a;SSRF漏洞利用&#xf…...

IPO要收紧?业内人士未予以完全确认

“IPO全面收紧、吃穿住等行业标的基本劝退&#xff08;除非行业龙头&#xff09;、科创板第五套标准暂停受理……”在上周末&#xff0c;一篇关于IPO收紧的“小作文”在投行圈内疯狂转发。 距离全面注册制正式实施已过去了5个半月&#xff0c;IPO节奏是否在发生较大变化&#…...

stable difussion Pytorch实现与测试

引言: Stable Diffusion是目前最火的AI绘画工具之一,它是一个免费开源的项目,可以被任何人免费部署和使用。通过Stable Diffusion,可以很轻松的通过文字描述,生成对应的图片。由于它是一个开源项目,开源社区(如:GitHub)中有很多插件和训练好的模型,我们可以直接使用。…...

Redis简述

Redis是什么Redis数据类型Redis应用场景缓存计数器分布式会话排行榜最新列表分布式锁消息队列 Redis出现的问题穿透击穿雪崩 Redis为什么速度快 Redis是什么 redis是一种高速缓存数据库 Redis数据类型 string hash list set zset Redis应用场景 缓存 Redis作为缓存层&…...

Redis 操作List

【分布式】Redis 分布式之List_redissonclient.getlist_比嗨皮兔的博客-CSDN博客 说明 配置文件参考&#xff1a;https://blog.csdn.net/qq_38428623/article/details/123217001?utm_sourceapp&app_version5.1.1&codeapp_1562916241&uLinkIdusr1mkqgl919blen ——…...

多个List 合并变成一个List+一个List 根据某个字段相等的另一个字段相加,并排序变成新的List

List<CurveTimeAndValueDomain> curves new ArrayList<>();for (int i 0; i < columnNames.size(); i){if (columnNames.get(i).equals(PlantConstant.TENDOWNFX) || columnNames.get(i).equals(PlantConstant.TENDOWNQP)) {//10千伏以下 数据 进行缓慢处理cu…...

华为流程体系:流程架构「OES方法」

目录 内容简介 OES方法 端到端的流程 专栏列表 CSDN学院 作者简介 内容简介 今天继续来谈谈华为流程体系中的流程架构。 在前期的内容已经介绍过 POS 流程架构的方法。 这里就先回顾一下 POS 方法的相关内容&#xff1a; 关于 POS&#xff0c;大家可以参看上面的这张图…...

c# 创建一个未定义类的临时对象列表

使用场景&#xff1a;要使用的数据太多&#xff0c;列表/字典无法满足需求&#xff0c;需要传入对象&#xff0c;但是又不想创建模型 new[] 是一种用于创建匿名类型数组的写法。它是 C# 中的一种语法糖&#xff0c;用于简化数组的初始化过程。 在下面代码示例中&#xff0c;ne…...

el-button增加下载功能

vue3和element-plus <el-uploadv-model:file-list"fileList"action"/api/upload"multiple:limit"1":headers"headers" ><el-button type"primary">选择文件</el-button><template #file"{ file …...

prometheus和cAdvisor组合

文章目录 docker内部署PromethuesPrometheuscAdvisorPrometheus和cAdvisor关系配置 docker内部署Promethues Prometheus Prometheus是一个开源的系统监控和报警工具&#xff0c;由SoundCloud开发并在2012年捐赠给了Cloud Native Computing Foundation (CNCF)。它被广泛用于监…...

计算机网络(2) --- 网络套接字UDP

计算机网络&#xff08;1&#xff09; --- 网络介绍_哈里沃克的博客-CSDN博客https://blog.csdn.net/m0_63488627/article/details/131967378?spm1001.2014.3001.5501 目录 1.端口号 2.TCP与UDP协议 1.TCP协议介绍 1.TCP协议 2.UDP协议 3.理解 2.网络字节序 发送逻辑…...

Idea 结合docker-compose 发布项目

Idea 结合docker-compose 发布项目 这里写目录标题 Idea 结合docker-compose 发布项目Docker 开启远程访问功能 添加相应端口配置IDEA 链接Docker配置项目 docker-compose.yml本地还需要安装 dockerwin11 安装本地Docker 可能存在问题 Linux内核不是最新 Docker 开启远程访问功…...

django

django学习 初识Django1.安装django2.创建项目2.1 在终端2.2 Pycharm 3. 创建app4.快速上手4.1 再写一个页面4.2 templates模板4.3 静态文件4.3.1 static目录4.3.2 引用静态文件 5.模板语法案例&#xff1a;伪联通新闻中心6.请求和响应案例&#xff1a;用户登录7.数据库操作7.1…...

c++游戏框架

游戏类 class Sprite { public:Sprite(int x, int y, int w, int h, const char* imagePath);~Sprite();void render(SDL_Renderer* renderer);void move(int x, int y); private:SDL_Texture* texture_;SDL_Rect rect_; }; 物理引擎类 class PhysicsEngine { public:Physi…...

v-model绑定checkbox无法动态更新视图

在vue2中使用v-model绑定checkbox <input type"checkbox" v-model"isChecked" :valueisChecked change"handleCheckboxChange" />监听change事件&#xff0c;并在change事件中做一些特殊处理&#xff0c;比如用户在登录时有没有阅读过隐私…...

原生html—摆脱ps、excel 在线绘制财务表格加水印(html绘制表格js加水印)

文章目录 ⭐前言⭐html标签&#x1f496;table表格的属性&#x1f496;实现财务报表 ⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享原生html——绘制表格报表加水印。 背景&#xff1a;解决没有ps的情况下使用前端html制作表格报表。 html介绍 HTML&#xf…...