Promise的并发控制 - 从普通并发池到动态并发池
一、场景
给你一个有200个URL的数组,通过这些URL来发送请求,要求并发请求数不能超过五个。
这是一道很常考的面试题,接下来让我们来学习一下Promise并发控制
二、普通并发池的实现
主要思路就是,判断当前队列是否满,满则等待,有空闲则补齐。
利用 Promise.race 方法,可以判断一个Promise数组中 “谁最先完成” ,从而让等待中的函数开始运行。
/**Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量* @param taskList 任务列表* @param max 最大并发数量* @param oneFinishCallback 每个完成的回调,参数是当前完成的个数和执行结果,可以用来制作进度条* @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果*/
export const promisePool = <T>(taskList: task<T>[], limit: number) => {return new Promise<T[]>(async (resolve, reject) => {try {const length = taskList.length/**当前并发池 */const pool: Promise<T>[] = []/**结果数组 */const res = new Array<T>(length)/**完成的数量 */let count = 0for (let i = 0; i < length; i++) {const task = taskList[i]();//promise结束的回调const handler = (info: T) => {pool.splice(pool.indexOf(task), 1) //任务执行完就删除res[i] = info //不能使用res.push,否则不能保证结果顺序count++if (count === length) {resolve(res)}}task.then((data) => {handler(data)console.log(`第${i}个任务完成,结果为`, data);}, (err) => {handler(err)console.log(`第${i}个任务失败,原因为`, err);})pool.push(task)//如果到达了并发限制,就等到池子中任意一个结束if (pool.length >= limit) {await Promise.race(pool)}}} catch (error) {console.error('并发池出错', error);reject(error)}})
}
测试用例:
/**创造一个1s后得到结果的Promise */const getTask = () => {return async () => {await new Promise((resolve) => setTimeout(resolve, 1000))return new Date()}}//测试用例:
const testIt = async () => {const list = new Array(20).fill(0).map(() => getTask())const res = await promisePool(list, 5)console.log('res', res);
}
testIt()
打印结果:(观察控制台,可以发现是五个五个出现的)
三、让并发池可中断
好,现在来了个新要求,用户点击了取消按钮后,你需要中断继续往并发池添加任务。 (常见场景:分片上传时,用户点击取消上传按钮)
问题的关键核心就是,如何从外部 让内部的循环终止。 其实也很简单,设置一个变量,初始为false,当用户点击取消按钮时,变量变为true。在for循环中检测这个变量的值,为true就退出循环。
但是我们不能将这个变量设置为全局变量!否则如果有多处需要使用这个并发池,一处中断,全部遭殃。 在这里,我们就可以利用面向对象的思想,把这个变量作为对象内部的值,每个实例之间独立。“你终止你的,关我什么事? ”
/**Promise并发池 - 可终止 - 每次都创建一个实例,避免另一个池子的取消导致这个池子的取消 */
export class PromisePoolStatic<T, Err>{/**是否取消。在循环中若发现这个变成了true,就会中断 */private isStop = false/**运行静态Promise并发池,当有大量promise并发时,可以通过这个来限制并发数量* @param taskList 任务列表* @param max 最大并发数量* @retrun 返回每个promise的结果,顺序和任务列表相同。 目前是成功和失败都会放入该结果*/run = async (taskList: task<T>[], max: number) => {return new Promise<Array<T | Err>>(async (resolve, reject) => {type resType = T | Errtry {this.isStop = false //开始的时候设为falseconst length = taskList.lengthconst pool: Promise<resType>[] = []//并发池 let count = 0//当前结束了几个const res = new Array<resType>(length)for (let i = 0; i < length; i++) {let task = taskList[i]();if (this.isStop) return reject('并发池终止')//成功和失败都要执行的函数const handler = (_res: resType) => {pool.splice(pool.indexOf(task), 1) //每当并发池跑完一个任务,从并发池删除个任务res[i] = _res //放入结果数组count++if (count === length) {return resolve(res)}}task.then((data) => {handler(data)console.log(`第${i}个任务完成,结果为`, data);}, (err) => {handler(err)console.log(`第${i}个任务失败,原因为`, err);})pool.push(task);if (pool.length === max) {//利用Promise.race方法来获得并发池中某任务完成的信号,当有任务完成才让程序继续执行,让循环把并发池塞满await Promise.race(pool)}}} catch (error) {console.error('promise并发池出错', error);reject(error)}})}/**停止并发池运行 */stop = () => {this.isStop = true}
}
测试用例:
/**可终止的并发池测试用例 */
const promisePoolStaticTest = () => {const list = new Array(18).fill(0).map(() => getTask())const pool = new PromisePoolStatic()pool.run(list, 3).catch((err) => {console.log('可终止的并发池测试用例出错 -- ', err)})//18个任务,每个花费1s完成,并发数量为3,共需要6s完成//我们在第三秒的时候中断setTimeout(() => pool.stop(), 3000)
}
promisePoolStaticTest()
结果如下:
可以看到第九个任务结束之后,并发池没有进入新的任务了。 但是为什么已经终止了,还有Promise完成的回调打印出来? 因为执行终止函数时,并发池内仍有三个函数在运行,而正在运行的Promise无法终止,所以只能阻止新任务进入并发池。 (虽然无法终止Promise,但是可以终止Promise完成后的操作,这里不阐述)
四、动态并发池
现在前面完成的操作,都是已经确定好了任务列表,才进行并发控制。如果我们需要动态添加任务的效果,如果队列没满就运行,队满则挂起等待,应该怎么做呢? (常见场景:全局axios请求并发控制)
主要思路: 队未满则直接运行,队满则加入等待队列。任务完成后,检查等待队列是否有任务。
type resolve<T> = (value?: T | PromiseLike<T>) => void
type reject = (reason?: any) => void
/**装着任务和它的resolve与reject函数 */
type taskWithCallbacks<T> = { task: task<T>; resolve: resolve<T>; reject: reject }/**动态并发池 */
export class PromisePoolDynamic<T> {/**最大并发数量 */private limit: number;/**当前正在跑的数量 */private runningCount: number;/**等待队列 */private queue: Array<taskWithCallbacks<T>>;/**动态并发池 - 构造函数* @param maxConcurrency 最大并发数量*/constructor(maxConcurrency: number) {this.limit = maxConcurrency;this.runningCount = 0;this.queue = [];}/**添加任务* @param task 任务,() => Promise<T>* @returns 结果*/addTask(task: task<T>) {//返回一个新的Promise实例,在任务完成前,会一直是pending状态return new Promise<T>((resolve, reject) => {const taskWithCallbacks = { task, resolve, reject } as taskWithCallbacks<T>;if (this.runningCount < this.limit) {//并发数量没满则运行console.log('任务添加:当前并发数', this.runningCount, '并发数量未满,直接运行');this.runTask(taskWithCallbacks);} else {//并发数量满则加入等待队列console.log('任务添加:当前并发数', this.runningCount, '并发数量满,挂起等待');this.queue.push(taskWithCallbacks);}});}/**运行任务* @param taskWithCallback 带有resolve和reject的任务*/private runTask(taskWithCallback: taskWithCallbacks<T>) {this.runningCount++;//当前并发数++taskWithCallback.task()//从对象中取出任务执行.then(result => {this.runningCount--;taskWithCallback.resolve(result);console.log('任务完成', result, '当前并发数', this.runningCount);this.checkQueue();}).catch(error => {this.runningCount--;taskWithCallback.reject(error);this.checkQueue();});}/**运行完成后,检查队列,看看是否有在等待的,有就取出第一个来运行 */private checkQueue() {if (this.queue.length > 0 && this.runningCount < this.limit) {const nextTask = this.queue.shift()!;console.log('并发池出现空位,取出等待队列的任务', nextTask);this.runTask(nextTask);}}
}
测试用例:
/**动态并发池的测试用例 */
const promisePoolDynamicTest = () => {const promisePoolDynamic = new PromisePoolDynamic(3) //一个最大并发3的动态并发池//最大并发3,我一次性添加7个任务promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())promisePoolDynamic.addTask(getTask())
}
promisePoolDynamicTest()
测试结果:
五、结语
关于并发池就到这里了。除了利用Promise.race,其实还可以递归等方式,不过Promise.race是最简单也是最容易理解的。
如果代码中有哪里出现的不对,欢迎指出
相关文章:

Promise的并发控制 - 从普通并发池到动态并发池
一、场景 给你一个有200个URL的数组,通过这些URL来发送请求,要求并发请求数不能超过五个。 这是一道很常考的面试题,接下来让我们来学习一下Promise并发控制 二、普通并发池的实现 主要思路就是,判断当前队列是否满,…...

Java类加载机制(类加载器,双亲委派模型,热部署示例)
Java类加载机制 类加载器类加载器的执行流程类加载器的种类加载器之间的关系ClassLoader 的主要方法Class.forName()与ClassLoader.loadClass()区别 双亲委派模型双亲委派 类加载流程优缺点 热部署简单示例 类加载器 类加载器的执行流程 类加载器的种类 AppClassLoader 应用类…...

【C语言初学者周冲刺计划】3.2将一个数组中的值逆序重新存放
目录 1解题思路: 2代码 3运行代码如图: 4总结: 1解题思路: 首先学会如何利用循环输入位数和输入数值,然后再利用循环逆序即可 2代码 #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> int main() { int…...

【C++心愿便利店】No.11---C++之string语法指南
文章目录 前言一、 为什么学习string类二、标准库中的string类 前言 👧个人主页:小沈YO. 😚小编介绍:欢迎来到我的乱七八糟小星球🌝 📋专栏:C 心愿便利店 🔑本章内容:str…...

OpenCV检测圆(Python版本)
文章目录 示例代码示例结果调参 示例代码 import cv2 import numpy as np# 加载图像 image_path DistanceComparison/test_image/1.png image cv2.imread(image_path, cv2.IMREAD_COLOR)# 将图像转换为灰度 gray cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)# 使用高斯模糊消除…...

轻量封装WebGPU渲染系统示例<15>- DrawInstance批量绘制(源码)
当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/main/src/voxgpu/sample/DrawInstanceTest.ts 此示例渲染系统实现的特性: 1. 用户态与系统态隔离。 细节请见:引擎系统设计思路 - 用户态与系统态隔离-CSDN博客 2. 高频调用与低频调用隔离。…...
E: 仓库 “http://cn.archive.ubuntu.com/ubuntu kinetic Release” 没有 Release 文件。
sudo apt-get update时报以下错误: E: 仓库 “http://cn.archive.ubuntu.com/ubuntu kinetic Release” 没有 Release 文件。 N: 无法安全地用该源进行更新,所以默认禁用该源。 N: 参见 apt-secure(8) 手册以了解仓库创建和用户配置方面的细节。 E: 仓库…...

【VR开发】【Unity】【VRTK】3-VR项目设置
任何VR避不开的步骤 如何设置VR项目,无论是PC VR还是安卓VR,我在不同的系列教程中都说过了,不过作为任何一个VR开发教程都难以避免的一环,本篇作为VRTK的开发教程还是对VR项目设置交代一下。 准备好你的硬件 头盔必须是6DoF的,推荐Oculus Quest系列,Rift系列,HTC和Pi…...
git log 用法
git log --format"%s" -n 1在 Git 中,您可以使用 git log 命令来查看提交历史,其中包含每个提交的详细信息,包括提交消息。如果您只想提取提交信息而不是完整的 git log 输出,可以使用 git log 命令的 --format 选项来指…...
Linux学习---有关监控系统zabbix的感悟
监控系统 监控系统就像咱们日常生活中小区监控(Monitor),用于及时发现问题(PROBLEM),根据相应的规则可以触发警告(Media),在后台显示屏(Dashboard)上以某种方面显示出来,高级的报警系统也许还能实现电话通知等功能,目的是为及时发…...

apollo云实验:定速巡航场景仿真调试
定速巡航场景仿真调试 概述启动仿真环境仿真系统修改默认巡航速度 实验目的福利活动 主页传送门:📀 传送 概述 自动驾驶汽车在实现落地应用前,需要经历大量的道路测试来验证算法的可行性和系统的稳定性,但道路测试存在成本高昂、…...

基于RK3568的新能源储能能量管理系统ems
新能源储能能量管理系统(EMS)是一种基于现代化技术的系统,旨在管理并优化新能源储能设备的能量使用。 该系统通过监测、调度和控制新能源储能设备来确保能源的高效利用和可持续发展。 本文将从不同的角度介绍新能源储能能量管理系统的原理、…...

dockerfile避坑笔记(VMWare下使用Ubuntu在Ubuntu20.04基础镜像下docker打包多个go项目)
一、docker简介 docker是一种方便跨平台迁移应用的程序,通过docker可以实现在同一类操作系统中,如Ubuntu和RedHat两个linux操作系统中,实现程序的跨平台部署。比如我在Ubuntu中打包了一个go项目的docker镜像(镜像为二进制文件&am…...

Qt 使用QtXlsx操作Excel表
1.环境搭建 QtXlsx是一个用于读写Microsoft Excel文件(.xlsx)的Qt库。它提供了一组简单易用的API,可以方便地处理电子表格数据。 Github下载:GitHub - dbzhang800/QtXlsxWriter: .xlsx file reader and writer for Qt5 官方文档…...

canal+es+kibana+springboot
1、环境准备 服务器:Centos7 Jdk版本:1.8 Mysql版本:5.7.44 Canal版本:1.17 Es版本:7.12.1 kibana版本:7.12.1 软件包下载地址:链接:https://pan.baidu.com/s/1jRpCJP0-hr9aI…...
【力扣】面试经典150题——双指针
文章目录 125. 验证回文串392. 判断子序列167. 两数之和 II - 输入有序数组11. 盛最多水的容器15. 三数之和 125. 验证回文串 如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后,短语正着读和反着读都一样。则可以认为该短语是一个 回文串 。 字…...

6-8 最宽层次结点数 分数 10
文章目录 1.题目描述2.本题ac答案2.1法一: 代码复用2.2法二: 顺序队列实现层序遍历 3.C层序遍历求最大宽度3.1层序遍历代码3.2求最大宽度 1.题目描述 2.本题ac答案 2.1法一: 代码复用 //二叉树第i层结点个数 int LevelNodeCount(BiTree T, int i) {if (T NULL || i < 1)re…...

Linux学习第28天:Platform设备驱动开发(二): 专注与分散
Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 三、硬件原理图分析 四、驱动开发 1、platform设备与驱动程序开发 53 /* 54 * 设备资源信息,也就是 LED0 所使用的所有寄存器 55 */ 56 static str…...
postgresql数组重叠(有共同元素)查询
直接上最终代码: select distinct id from a where string_to_array(in_area,,) && (select ARRAY_AGG( code) from areas where code like 11% or code 100000)::TEXT[] pg语法: 表 9.48显示了可用于数组类型的运算符。 表 9.48。数组运算符…...
ubuntu系统 生成RSA密钥对
在Ubuntu系统上生成密钥对通常指的是生成SSH密钥对,它常用于安全的远程登录、数据通信和其他安全网络操作。以下是如何在Ubuntu系统上生成SSH密钥对的步骤: 打开终端:你可以使用快捷键 Ctrl Alt T 在Ubuntu上打开一个终端窗口。 运行ssh-k…...

Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
谷歌浏览器插件
项目中有时候会用到插件 sync-cookie-extension1.0.0:开发环境同步测试 cookie 至 localhost,便于本地请求服务携带 cookie 参考地址:https://juejin.cn/post/7139354571712757767 里面有源码下载下来,加在到扩展即可使用FeHelp…...

C++初阶-list的底层
目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...

多模态大语言模型arxiv论文略读(108)
CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题:CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者:Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
python爬虫——气象数据爬取
一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用: 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests:发送 …...
AWS vs 阿里云:功能、服务与性能对比指南
在云计算领域,Amazon Web Services (AWS) 和阿里云 (Alibaba Cloud) 是全球领先的提供商,各自在功能范围、服务生态系统、性能表现和适用场景上具有独特优势。基于提供的引用[1]-[5],我将从功能、服务和性能三个方面进行结构化对比分析&#…...
调试快捷键 pycharm vscode
目录 调试快捷键 pycharm vscode 修改快捷键 方法 1:通过菜单打开 方法 2:用快捷键打开 调试快捷键 pycharm Resume Program F9 Step Over F8 两个离的比较近,比较方便,比vscode的好。 vscode Continue F5 改为F9 S…...

Continue 开源 AI 编程助手框架深度分析
Continue 开源 AI 编程助手框架深度分析 一、项目简介 Continue 是一个模块化、可配置、跨平台的开源 AI 编程助手框架,目标是让开发者能在本地或云端环境中,快速集成和使用自定义的 LLM 编程辅助工具。它通过支持 VS Code 与 JetBrains 等主流 IDE 插件…...