当前位置: 首页 > news >正文

Promise的并发控制 - 从普通并发池到动态并发池

一、场景

        给你一个有200个URL的数组,通过这些URL来发送请求,要求并发请求数不能超过五个。

        这是一道很常考的面试题,接下来让我们来学习一下Promise并发控制 

二、普通并发池的实现

        主要思路就是,判断当前队列是否满,满则等待,有空闲则补齐。

        利用 Promise.race 方法,可以判断一个Promise数组中 “谁最先完成” ,从而让等待中的函数开始运行。

/**Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量* @param taskList 任务列表* @param max 最大并发数量* @param oneFinishCallback 每个完成的回调,参数是当前完成的个数和执行结果,可以用来制作进度条* @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果*/
export const promisePool = <T>(taskList: task<T>[], limit: number) => {return new Promise<T[]>(async (resolve, reject) => {try {const length = taskList.length/**当前并发池 */const pool: Promise<T>[] = []/**结果数组 */const res = new Array<T>(length)/**完成的数量 */let count = 0for (let i = 0; i < length; i++) {const task = taskList[i]();//promise结束的回调const handler = (info: T) => {pool.splice(pool.indexOf(task), 1) //任务执行完就删除res[i] = info //不能使用res.push,否则不能保证结果顺序count++if (count === length) {resolve(res)}}task.then((data) => {handler(data)console.log(`第${i}个任务完成,结果为`, data);}, (err) => {handler(err)console.log(`第${i}个任务失败,原因为`, err);})pool.push(task)//如果到达了并发限制,就等到池子中任意一个结束if (pool.length >= limit) {await Promise.race(pool)}}} catch (error) {console.error('并发池出错', error);reject(error)}})
}

测试用例:

/**创造一个1s后得到结果的Promise */const getTask = () => {return async () => {await new Promise((resolve) => setTimeout(resolve, 1000))return new Date()}}//测试用例:
const testIt = async () => {const list = new Array(20).fill(0).map(() => getTask())const res = await promisePool(list, 5)console.log('res', res);
}
testIt()

打印结果:(观察控制台,可以发现是五个五个出现的)

三、让并发池可中断

        好,现在来了个新要求,用户点击了取消按钮后,你需要中断继续往并发池添加任务。 (常见场景:分片上传时,用户点击取消上传按钮)

        问题的关键核心就是,如何从外部 让内部的循环终止。 其实也很简单,设置一个变量,初始为false,当用户点击取消按钮时,变量变为true。在for循环中检测这个变量的值,为true就退出循环

        但是我们不能将这个变量设置为全局变量!否则如果有多处需要使用这个并发池,一处中断,全部遭殃。 在这里,我们就可以利用面向对象的思想,把这个变量作为对象内部的值,每个实例之间独立。“你终止你的,关我什么事? ” 

/**Promise并发池 - 可终止 - 每次都创建一个实例,避免另一个池子的取消导致这个池子的取消 */
export class PromisePoolStatic<T, Err>{/**是否取消。在循环中若发现这个变成了true,就会中断 */private isStop = false/**运行静态Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量* @param taskList 任务列表* @param max 最大并发数量* @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果*/run = async (taskList: task<T>[], max: number) => {return new Promise<Array<T | Err>>(async (resolve, reject) => {type resType = T | Errtry {this.isStop = false //开始的时候设为falseconst length = taskList.lengthconst pool: Promise<resType>[] = []//并发池 let count = 0//当前结束了几个const res = new Array<resType>(length)for (let i = 0; i < length; i++) {let task = taskList[i]();if (this.isStop) return reject('并发池终止')//成功和失败都要执行的函数const handler = (_res: resType) => {pool.splice(pool.indexOf(task), 1) //每当并发池跑完一个任务,从并发池删除个任务res[i] = _res //放入结果数组count++if (count === length) {return resolve(res)}}task.then((data) => {handler(data)console.log(`第${i}个任务完成,结果为`, data);}, (err) => {handler(err)console.log(`第${i}个任务失败,原因为`, err);})pool.push(task);if (pool.length === max) {//利用Promise.race方法来获得并发池中某任务完成的信号,当有任务完成才让程序继续执行,让循环把并发池塞满await Promise.race(pool)}}} catch (error) {console.error('promise并发池出错', error);reject(error)}})}/**停止并发池运行 */stop = () => {this.isStop = true}
}

测试用例:

/**可终止的并发池测试用例 */
const promisePoolStaticTest = () => {const list = new Array(18).fill(0).map(() => getTask())const pool = new PromisePoolStatic()pool.run(list, 3).catch((err) => {console.log('可终止的并发池测试用例出错 -- ', err)})//18个任务,每个花费1s完成,并发数量为3,共需要6s完成//我们在第三秒的时候中断setTimeout(() => pool.stop(), 3000)
}
promisePoolStaticTest()

结果如下:

        可以看到第九个任务结束之后,并发池没有进入新的任务了。 但是为什么已经终止了,还有Promise完成的回调打印出来? 因为执行终止函数时,并发池内仍有三个函数在运行,而正在运行的Promise无法终止,所以只能阻止新任务进入并发池  (虽然无法终止Promise,但是可以终止Promise完成后的操作,这里不阐述)

四、动态并发池

        现在前面完成的操作,都是已经确定好了任务列表,才进行并发控制。如果我们需要动态添加任务的效果,如果队列没满就运行,队满则挂起等待,应该怎么做呢? (常见场景:全局axios请求并发控制

        主要思路: 队未满则直接运行,队满则加入等待队列。任务完成后,检查等待队列是否有任务


type resolve<T> = (value?: T | PromiseLike<T>) => void
type reject = (reason?: any) => void
/**装着任务和它的resolve与reject函数 */
type taskWithCallbacks<T> = { task: task<T>; resolve: resolve<T>; reject: reject }/**动态并发池 */
export class PromisePoolDynamic<T> {/**最大并发数量 */private limit: number;/**当前正在跑的数量 */private runningCount: number;/**等待队列 */private queue: Array<taskWithCallbacks<T>>;/**动态并发池 - 构造函数* @param maxConcurrency 最大并发数量*/constructor(maxConcurrency: number) {this.limit = maxConcurrency;this.runningCount = 0;this.queue = [];}/**添加任务* @param task 任务,() => Promise<T>* @returns 结果*/addTask(task: task<T>) {//返回一个新的Promise实例,在任务完成前,会一直是pending状态return new Promise<T>((resolve, reject) => {const taskWithCallbacks = { task, resolve, reject } as taskWithCallbacks<T>;if (this.runningCount < this.limit) {//并发数量没满则运行console.log('任务添加:当前并发数', this.runningCount, '并发数量未满,直接运行');this.runTask(taskWithCallbacks);} else {//并发数量满则加入等待队列console.log('任务添加:当前并发数', this.runningCount, '并发数量满,挂起等待');this.queue.push(taskWithCallbacks);}});}/**运行任务* @param taskWithCallback 带有resolve和reject的任务*/private runTask(taskWithCallback: taskWithCallbacks<T>) {this.runningCount++;//当前并发数++taskWithCallback.task()//从对象中取出任务执行.then(result => {this.runningCount--;taskWithCallback.resolve(result);console.log('任务完成', result, '当前并发数', this.runningCount);this.checkQueue();}).catch(error => {this.runningCount--;taskWithCallback.reject(error);this.checkQueue();});}/**运行完成后,检查队列,看看是否有在等待的,有就取出第一个来运行 */private checkQueue() {if (this.queue.length > 0 && this.runningCount < this.limit) {const nextTask = this.queue.shift()!;console.log('并发池出现空位,取出等待队列的任务', nextTask);this.runTask(nextTask);}}
}

测试用例:

/**动态并发池的测试用例 */
const promisePoolDynamicTest = () => {const promisePoolDynamic = new PromisePoolDynamic(3) //一个最大并发3的动态并发池//最大并发3,我一次性添加7个任务promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())
}
promisePoolDynamicTest()

测试结果:

五、结语

        关于并发池就到这里了。除了利用Promise.race,其实还可以递归等方式,不过Promise.race是最简单也是最容易理解的。

        如果代码中有哪里出现的不对,欢迎指出

相关文章:

Promise的并发控制 - 从普通并发池到动态并发池

一、场景 给你一个有200个URL的数组&#xff0c;通过这些URL来发送请求&#xff0c;要求并发请求数不能超过五个。 这是一道很常考的面试题&#xff0c;接下来让我们来学习一下Promise并发控制 二、普通并发池的实现 主要思路就是&#xff0c;判断当前队列是否满&#xff0c;…...

Java类加载机制(类加载器,双亲委派模型,热部署示例)

Java类加载机制 类加载器类加载器的执行流程类加载器的种类加载器之间的关系ClassLoader 的主要方法Class.forName()与ClassLoader.loadClass()区别 双亲委派模型双亲委派 类加载流程优缺点 热部署简单示例 类加载器 类加载器的执行流程 类加载器的种类 AppClassLoader 应用类…...

【C语言初学者周冲刺计划】3.2将一个数组中的值逆序重新存放

目录 1解题思路&#xff1a; 2代码 3运行代码如图&#xff1a; 4总结&#xff1a; 1解题思路&#xff1a; 首先学会如何利用循环输入位数和输入数值&#xff0c;然后再利用循环逆序即可 2代码 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> int main() { int…...

【C++心愿便利店】No.11---C++之string语法指南

文章目录 前言一、 为什么学习string类二、标准库中的string类 前言 &#x1f467;个人主页&#xff1a;小沈YO. &#x1f61a;小编介绍&#xff1a;欢迎来到我的乱七八糟小星球&#x1f31d; &#x1f4cb;专栏&#xff1a;C 心愿便利店 &#x1f511;本章内容&#xff1a;str…...

OpenCV检测圆(Python版本)

文章目录 示例代码示例结果调参 示例代码 import cv2 import numpy as np# 加载图像 image_path DistanceComparison/test_image/1.png image cv2.imread(image_path, cv2.IMREAD_COLOR)# 将图像转换为灰度 gray cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# 使用高斯模糊消除…...

轻量封装WebGPU渲染系统示例<15>- DrawInstance批量绘制(源码)

当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/main/src/voxgpu/sample/DrawInstanceTest.ts 此示例渲染系统实现的特性: 1. 用户态与系统态隔离。 细节请见&#xff1a;引擎系统设计思路 - 用户态与系统态隔离-CSDN博客 2. 高频调用与低频调用隔离。…...

E: 仓库 “http://cn.archive.ubuntu.com/ubuntu kinetic Release” 没有 Release 文件。

sudo apt-get update时报以下错误&#xff1a; E: 仓库 “http://cn.archive.ubuntu.com/ubuntu kinetic Release” 没有 Release 文件。 N: 无法安全地用该源进行更新&#xff0c;所以默认禁用该源。 N: 参见 apt-secure(8) 手册以了解仓库创建和用户配置方面的细节。 E: 仓库…...

【VR开发】【Unity】【VRTK】3-VR项目设置

任何VR避不开的步骤 如何设置VR项目,无论是PC VR还是安卓VR,我在不同的系列教程中都说过了,不过作为任何一个VR开发教程都难以避免的一环,本篇作为VRTK的开发教程还是对VR项目设置交代一下。 准备好你的硬件 头盔必须是6DoF的,推荐Oculus Quest系列,Rift系列,HTC和Pi…...

git log 用法

git log --format"%s" -n 1在 Git 中&#xff0c;您可以使用 git log 命令来查看提交历史&#xff0c;其中包含每个提交的详细信息&#xff0c;包括提交消息。如果您只想提取提交信息而不是完整的 git log 输出&#xff0c;可以使用 git log 命令的 --format 选项来指…...

Linux学习---有关监控系统zabbix的感悟

监控系统 监控系统就像咱们日常生活中小区监控(Monitor)&#xff0c;用于及时发现问题(PROBLEM)&#xff0c;根据相应的规则可以触发警告(Media)&#xff0c;在后台显示屏(Dashboard)上以某种方面显示出来,高级的报警系统也许还能实现电话通知等功能&#xff0c;目的是为及时发…...

apollo云实验:定速巡航场景仿真调试

定速巡航场景仿真调试 概述启动仿真环境仿真系统修改默认巡航速度 实验目的福利活动 主页传送门&#xff1a;&#x1f4c0; 传送 概述 自动驾驶汽车在实现落地应用前&#xff0c;需要经历大量的道路测试来验证算法的可行性和系统的稳定性&#xff0c;但道路测试存在成本高昂、…...

基于RK3568的新能源储能能量管理系统ems

新能源储能能量管理系统&#xff08;EMS&#xff09;是一种基于现代化技术的系统&#xff0c;旨在管理并优化新能源储能设备的能量使用。 该系统通过监测、调度和控制新能源储能设备来确保能源的高效利用和可持续发展。 本文将从不同的角度介绍新能源储能能量管理系统的原理、…...

dockerfile避坑笔记(VMWare下使用Ubuntu在Ubuntu20.04基础镜像下docker打包多个go项目)

一、docker简介 docker是一种方便跨平台迁移应用的程序&#xff0c;通过docker可以实现在同一类操作系统中&#xff0c;如Ubuntu和RedHat两个linux操作系统中&#xff0c;实现程序的跨平台部署。比如我在Ubuntu中打包了一个go项目的docker镜像&#xff08;镜像为二进制文件&am…...

Qt 使用QtXlsx操作Excel表

1.环境搭建 QtXlsx是一个用于读写Microsoft Excel文件&#xff08;.xlsx&#xff09;的Qt库。它提供了一组简单易用的API&#xff0c;可以方便地处理电子表格数据。 Github下载&#xff1a;GitHub - dbzhang800/QtXlsxWriter: .xlsx file reader and writer for Qt5 官方文档…...

canal+es+kibana+springboot

1、环境准备 服务器&#xff1a;Centos7 Jdk版本&#xff1a;1.8 Mysql版本&#xff1a;5.7.44 Canal版本&#xff1a;1.17 Es版本&#xff1a;7.12.1 kibana版本&#xff1a;7.12.1 软件包下载地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1jRpCJP0-hr9aI…...

【力扣】面试经典150题——双指针

文章目录 125. 验证回文串392. 判断子序列167. 两数之和 II - 输入有序数组11. 盛最多水的容器15. 三数之和 125. 验证回文串 如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后&#xff0c;短语正着读和反着读都一样。则可以认为该短语是一个 回文串 。 字…...

6-8 最宽层次结点数 分数 10

文章目录 1.题目描述2.本题ac答案2.1法一: 代码复用2.2法二: 顺序队列实现层序遍历 3.C层序遍历求最大宽度3.1层序遍历代码3.2求最大宽度 1.题目描述 2.本题ac答案 2.1法一: 代码复用 //二叉树第i层结点个数 int LevelNodeCount(BiTree T, int i) {if (T NULL || i < 1)re…...

Linux学习第28天:Platform设备驱动开发(二): 专注与分散

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 三、硬件原理图分析 四、驱动开发 1、platform设备与驱动程序开发 53 /* 54 * 设备资源信息&#xff0c;也就是 LED0 所使用的所有寄存器 55 */ 56 static str…...

postgresql数组重叠(有共同元素)查询

直接上最终代码&#xff1a; select distinct id from a where string_to_array(in_area,,) && (select ARRAY_AGG( code) from areas where code like 11% or code 100000)::TEXT[] pg语法&#xff1a; 表 9.48显示了可用于数组类型的运算符。 表 9.48。数组运算符…...

ubuntu系统 生成RSA密钥对

在Ubuntu系统上生成密钥对通常指的是生成SSH密钥对&#xff0c;它常用于安全的远程登录、数据通信和其他安全网络操作。以下是如何在Ubuntu系统上生成SSH密钥对的步骤&#xff1a; 打开终端&#xff1a;你可以使用快捷键 Ctrl Alt T 在Ubuntu上打开一个终端窗口。 运行ssh-k…...

【RtpSeqNumOnlyRefFinder】webrtc m98: ManageFrameInternal 的帧决策过程分析

Jitterbuffer(FrameBuffer)需要组帧以后GOP内的参考关系 JeffreyLau 大神分析 了组帧原理而参考关系(RtpFrameReferenceFinder)的生成伴随了帧决策 FrameDecisionFrameDecision 影响力 帧的缓存。调用 OnAssembledFrame 传递已经拿到的RtpFrameObject 那么,RtpFrameObject…...

centos系统源码编译安装nginx,并编写服务脚本

1.安装编译所需的依赖项&#xff1a; yum install -y gcc pcre-devel openssl-devel zlib-devel2.下载 Nginx 源代码&#xff1a; wget http://nginx.org/download/nginx-1.21.3.tar.gz tar -xf nginx-1.21.3.tar.gz cd nginx-1.21.33.配置编译选项并进行编译和安装&#xff…...

2023下半年软考高项答题技巧!

2023下半年软考倒计时最后一天&#xff0c;一些软考高项答题技巧分享&#xff01; 高项答题技巧 1、综合知识 &#xff08;1&#xff09;首先是分析试题的技巧 –先看清楚问题&#xff0c;再看选项&#xff1b; –判断题目到底考察的是什么知识点&#xff0c;排除干扰项。…...

windows server 2016调优

1. 增加TCP连接的最大数量&#xff1a; 在您当前的注册表路径&#xff08;HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters&#xff09;中的右侧窗格&#xff0c;右击空白处&#xff0c;选择“新建” -> “DWORD (32位) 值”。为新的值命名为TcpNu…...

Qt 插件开发详解

1.简介 Qt插件是一种扩展机制&#xff0c;用于将应用程序的功能模块化&#xff0c;并且可以在运行时动态加载和卸载。Qt框架为插件提供了一套标准的接口和管理机制&#xff0c;使得插件的使用和集成变得简单和灵活&#xff0c;通过插件机制&#xff0c;可以将应用程序的功能划…...

vue需求:实现签章/签字在页面上自由定位的功能(本质:元素在页面上的拖拽)

目录 第一章 效果展示 第二章 了解工具 2.1 draggable 2.1.1 了解draggable 2.1.2 draggable方法 2.1.3 利用例子理解方法 第三章 效果实现 3.1 实现思路 3.2 代码实现 3.2.1 涉及到的点 3.2.2 源代 第一章 效果展示 效果描述&#xff1a;通过点击左边栏的签名和…...

【深度学习基础】Pytorch框架CV开发(1)基础铺垫

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…...

uniapp原生插件之安卓热敏打印机打印插件

插件介绍 安卓热敏打印机打印插件&#xff0c;自动授权&#xff0c;打印机连接监听&#xff0c;打印文本&#xff0c;条形码&#xff0c;二维码&#xff0c;切纸&#xff0c;打印机状态&#xff0c;打印结果查询等 插件地址 安卓热敏打印机打印插件 - DCloud 插件市场 超级…...

巴菲特:卖比亚迪有助于资金配置

巴菲特表示&#xff0c;未来可能会有更多银行倒闭&#xff0c;但储户不必担心&#xff0c;他警告说&#xff0c;陷入困境的银行股不是价值投资&#xff0c;因为即使政府采取行动保护储户&#xff0c;股东的权益也会受到损失。他称&#xff0c;将加大对日本综合商社的投资&#…...

香港服务器有哪些特点

香港服务器具有以下特点&#xff1a; 速度快&#xff1a;香港服务器地理位置优越&#xff0c;与内地服务器相比&#xff0c;网络延迟更低&#xff0c;访问速度更快。 稳定性高&#xff1a;香港服务器位于全球重要的金融中心&#xff0c;网络环境稳定&#xff0c;服务器稳定性高…...