当前位置: 首页 > news >正文

并发包工具之 批量处理任务 CompletionService(异步)、CompletableFuture(回调)

文章目录

      • 一、处理异步任务并获取返回值——CompletionService
      • 二、线程池
      • 三、Callable 与 Future
      • 四、通过回调方式处理可组合编排任务——CompletableFuture

一、处理异步任务并获取返回值——CompletionService

特点描述:
        对于比较复杂的计算,把任务进行提交,并发执行,哪个任务先执行完,get()方法就会获取到相应的任务结果。

范式:
1、
        假设有一组针对某个问题的任务solvers(需要实现Callable接口,任务的具体逻辑就在其call方法里),每个任务都返回一个类型为Result的值,并且想要并发地运行它们,处理每个返回一个非空值的结果,在某些方法使用:

void solve(Executor e,Collection<Callable<Result>> solvers)throws InterruptedException, ExecutionException {CompletionService<Result> ecs= new ExecutorCompletionService<Result>(e);for (Callable<Result> s : solvers)ecs.submit(s);int n = solvers.size();for (int i = 0; i < n; ++i) {Result r = ecs.take().get();if (r != null)use(r);}}

2、
        假设想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务(比如:多仓库文件/镜像下载,从最近的服务中心下载后,终止其他下载过程)

void solve(Executor e,Collection<Callable<Result>> solvers)throws InterruptedException {CompletionService<Result> ecs= new ExecutorCompletionService<Result>(e);int n = solvers.size();List<Future<Result>> futures= new ArrayList<Future<Result>>(n);Result result = null;try {for (Callable<Result> s : solvers)futures.add(ecs.submit(s));for (int i = 0; i < n; ++i) {try {Result r = ecs.take().get();if (r != null) {result = r;break;}} catch (ExecutionException ignore) {}}}finally {for (Future<Result> f : futures)// 注意这里的参数给的是 true,详解同样在前序 Future 源码分析文章中f.cancel(true);}if (result != null)use(result);}

总得来说分两步:
1、提交异步任务 submit方法(submit最终会委托给内部的 executor 去执行任务)
2、从队列中拿取并移除元素 take(如果队列为空,那么调用 take() 方法的线程会被阻塞)/poll(…不会被阻塞,返回null)/poll带超时参数(获取并移除阻塞队列中的第一个元素,如果超时时间到而队列还是空,该方法返回null) 方法

实现原理:
        将异步任务的生产、任务完成结果的消费进行解耦,类似mq,哪个任务先执行完,就把结果放到队列中。

唯一实现类:
        ExecutorCompletionService;阻塞队列默认是 LinkedBlockingQueue

二、线程池

为什么要用线程池
        ∵ 手动创建线程的缺点:
1、不受控,系统资源有限,每个人如果都创建的话,标准不一样,线程疯狂抢占资源.,混乱…
2、开销大,创建一个线程需要调用操作系统内核API,然后操作系统要为线程分配一系列资源,创建个线程啥也不干大概需要1M左右大小。

        线程池可以统一管理、控制最大并发数并实现拒绝策略、隔离线程环境;当执行大量异步任务时,线程池里的线程能复用,不用频繁创建和销毁,能够提供好的性能。
       
Java并发包里的线程池——ThreadPoolExecutor; (接口是ExecutorService)
Spring对线程池的封装——ThreadPoolTaskExecutor
       
关于线程池核心线程数的设置:
        CPU是时间片轮转机制来让线程占用的,也就是说程序表面上是同时进行的,实际上是切换执行的,CPU每个时刻只能由一个线程占用,比如 4核CPU,只能同时跑4个线程。
       
对于CPU密集型程序(如运算、逻辑判断等,I/O操作可以在短时间完成,但CPU运算比较多)
——最佳线程数量=CPU核数+1,这个1可以理解为替补,如果某个线程因为发生错误或其他原因暂停了,这个线程可以继续工作。
       
对于I/O密集型(如涉及网络、磁盘、内存等)
——最佳线程数=CPU核心数 * (1/CPU利用率)=CPU核心数 * (1 + (I/O耗时/CPU耗时)),如果几乎都是I/O耗时,可取2N+1(1为替补)
       
(p.s.线程数不是越多越好,线程上下文切换开销不小)
       

三、Callable 与 Future

        Runnable接口的方法没有返回值;Callable 是泛型接口,可以返回指定类型的结果。
        当提交一个Callable 任务后,会同时获得一个Future对象,然后,在主线程某个时刻调用Future对象的get() 方法,就可以获得异步执行的结果。在调用get()时,如果异步任务已经完成,就直接获得结果。如果异步任务还没有完成,那么get()会阻塞,直到任务完成后才返回结果。
       

四、通过回调方式处理可组合编排任务——CompletableFuture

特点描述:
        CompletableFuture是由 Java 8 引入的,在 Java 8之 前一般通过 Future 实现异步,CompletableFuture 对 Future 进行了扩展,可以通过设置回调的方式处理计算结果,同时也支持组合操作,比如步骤1、2、3存在依赖关系,支持对步骤进一步的编排,降低依赖之间的阻塞。

使用:
在这里插入图片描述

        如上图所示,这里描绘的是一个业务接口的流程,其中包括 CF1\CF2\CF3\CF4\CF5 共5个步骤,并描绘了这些步骤之间的依赖关系,每个步骤可以是一次 RPC 调用、一次数据库操作或者是一次本地方法调用等,在使用 CompletableFuture 进行异步化编程时,图中的每个步骤都会产生一个 CompletableFuture 对象,最终结果也会用一个 CompletableFuture 来进行表示。(只看第一层的话 好像跟 CompletionService 效果差不多… 都可以异步执行批量任务并拿到结果…)

1、零依赖,CompletableFuture 的创建
比如图中所示的 CF1、CF2,可以有以下方式:

     // 1、使用 runAsync 或 supplyAsync 发起异步调用// 线程池ExecutorService executorService = Executors.newFixedThreadPool(5);CompletableFuture<String> CF1 = CompletableFuture.supplyAsync(() -> {return "CF1 result";}, executorService);// 2、CompletableFuture.completedFuture() 直接创建一个已完成状态的 CompletableFuture<CompletableFuture<String> CF2 = CompletableFuture.completedFuture("CF2 result");// 3、先初始化一个未完成的 CompletableFutureCompletableFuture<String> CF3 = new CompletableFuture<>();// 然后通过complete()、completeExceptionally(),完成该CompletableFutureCF3.complete("CF3result");

2、一元依赖,依赖一个 CompletableFuture
比如图中所示的 CF3、CF5,可以用 thenApply、thenAccept、thenCompose 等方法来实现:

// result为CF1的结果CompletableFuture<String> CF3=CF1.thenApply(result->{return "CF3result";});

3、二元依赖:依赖两个 CompletableFuture
比如图中所示的 CF4,这种二元依赖可以通过 thenCombine 等回调来实现:

   // result1、result2分别为CF1、CF2的结果CompletableFuture<String> CF4 = CF1.thenCombine(CF2, (result1, result2) -> {return "CF4result";});

4、多元依赖,依赖多个 CompletableFuture
比如图中所示的 CF6,依赖于三个步骤CF3、CF4、CF5,这种多元依赖可以通过 allOf 或 anyOf 方法来实现,区别是当需要多个依赖全部完成时使用allOf,当多个依赖中的任意一个完成即可时使用anyOf:

    CompletableFuture<Void> CF6 = CompletableFuture.allOf(CF3, CF4, CF5);CompletableFuture<String> result = CF6.thenApply(v ->{// 这里的 join是完成任务后用来获取结果的,并不会阻塞// 因为传给 thenApply 的函数都是在 CF3、CF4、CF5 全都完成时才会执行String result3 = CF3.join();String result4 = CF4.join();String result5 = CF5.join();// 根据 result3、result4、result5组装最终 resultreturn result3 + result4 + result5;});

        如果只用一层的话,异步执行批量任务并拿到总的结果,参考api里 allOf:
在这里插入图片描述

代码示例:

 // 任务入参集合ArrayList<String> paramList = new ArrayList<>();// 用于汇总所有结果ArrayList<String> resultList = new ArrayList<>();CompletableFuture.allOf(paramList.stream().map(string ->CompletableFuture.supplyAsync(() ->// 这里返回了本身,实际上也可以是具体的方法string,asyncServiceExecutor)// thenApply是对结果做简单映射,类似于Stream.map,list->list就是原样往下传递,这里不使用thenApply也行.thenApply(list -> list).whenComplete((result, e) -> {// 对异常结果的处理if (e != null) System.out.println("exception");// 汇总结果resultList.add(result);})).toArray(CompletableFuture[]::new)// 完成后返回结果值,如果异常完成则抛出(未经检查)异常,相当于一个等待任务完成的动作).join(); 

参考文档:
https://dayarch.top/p/how-many-threads-should-be-created.html
https://segmentfault.com/a/1190000023129592?utm_source=sf-similar-article
https://segmentfault.com/a/1190000023587881
https://tech.meituan.com/2022/05/12/principles-and-practices-of-completablefuture.html

相关文章:

并发包工具之 批量处理任务 CompletionService(异步)、CompletableFuture(回调)

文章目录一、处理异步任务并获取返回值——CompletionService二、线程池三、Callable 与 Future四、通过回调方式处理可组合编排任务——CompletableFuture一、处理异步任务并获取返回值——CompletionService 特点描述&#xff1a; 对于比较复杂的计算&#xff0c;把…...

验收测试分类

α测试 Alpha 是内测版本&#xff0c;即现在所说的CB。 此版本表示该软件仅仅是一个初步完成品, 通常只在软件开发者内部交流, 也有很少一部分发布给专业测试人员。 一般而言, 该版本软件的bug 较多, 普通用户最好不要安装。 β测试 Beta是公测版本&#xff0c;是对所有用户…...

因新硬件支持内核问题Ubuntu 22.04.2推迟发布

导读Ubuntu 22.04.2 LTS 原定于 2 月 9 日发布。但 Canonical 宣布该版本因各种问题不得不推迟两周&#xff0c;定于 2 月 23 日发布。 Ubuntu 22.04.2 LTS 原定于 2 月 9 日发布。但 Canonical 宣布该版本因各种问题不得不推迟两周&#xff0c;定于 2 月 23 日发布。 Canonica…...

agent扩展-自定义外部加载路径

自定义classLoader实现加载外部jar, 以skywalking agent 类加载器为例子 整体思路 扩展findClass &#xff0c;解决loadClass可以查找到扩展findResource&#xff0c;解决getResources可以获取到资源 基本原理 ClassLoader loadClass的加载顺序 findLoadedClass 加载本地已经…...

Elasticsearch使用篇 - 指标聚合

指标聚合 指标聚合从聚合文档中提取出指标进行计算。可以从文档的字段或者使用脚本方式进行提取。 聚合统计可以同时返回明细数据&#xff0c;可以分页查询&#xff0c;可以返回总数量。 可以结合查询条件&#xff0c;限制数据范围&#xff0c;结合倒排索引列式存储。 指标…...

Python生命周期及内存管理

文章目录 一、Python的生命周期 1、概念2、如何监听生命周期二、内存管理 1.存储2.垃圾回收3.引用计数一、生命周期&#xff1a; 1、概念&#xff1a;一个对象从创建到消亡的过程 当一个对象呗创建是&#xff0c;会在内存中分配响应的内存空间进行存储 当这个对象不再使…...

Elasticsearch7.8.0版本进阶——数据写流程

目录一、数据写流程概述二、数据写流程步骤2.1、数据写流程图2.2、数据写流程步骤&#xff08;新建索引和删除文档所需要的步骤顺序&#xff09;2.3、数据写流程的请求参数一、数据写流程概述 新建、删除索引和新建、删除文档的请求都是写操作&#xff0c; 必须在主分片上面完…...

化学试剂Glutaric Acid-PEG-Glutaric Acid,GA-PEG-GA,戊二酸-聚乙二醇-戊二酸

一&#xff1a;产品描述 1、名称 英文&#xff1a;Glutaric Acid-PEG-Glutaric Acid&#xff0c;GA-PEG-GA 中文&#xff1a;戊二酸-聚乙二醇-戊二酸 2、CAS编号&#xff1a;N/A 3、所属分类&#xff1a;Carboxylic acid PEG 4、分子量&#xff1a;可定制&#xff0c; 戊…...

知识图谱业务落地技术推荐之国内知识图谱平台汇总(竞品)[阿里、腾讯、华为等】

各位可以参考国内知识图谱平台产品进行对技术链路搭建和产品参考提供借鉴。...

ABC 289 G - Shopping in AtCoder store 数学推导+凸包

大意&#xff1a; n个顾客&#xff0c;每个人有一个购买的欲望bi,m件物品&#xff0c;每一件物品有一个价值ci,每一个顾客会买商品当且仅当bici>定价. 现在要求对每一个商品定价&#xff0c;求出它的最大销售值&#xff08;数量*定价&#xff09; n,m<2e5 思路&#x…...

ARM Linux 如何在sysfs用户态命令行中控制 GPIO 引脚?

ARM Linux 如何在sysfs用户态命令行中控制 GPIO 引脚&#xff1f;我们在开发工作中&#xff0c;经常需要确定内核gpio驱动&#xff0c;是否有异常&#xff0c;或者在没有应用的情况下&#xff0c;像控制某个外设&#xff0c;这时我们就可以在控制台命令行中&#xff0c;用命令导…...

【Linux】生产者消费者模型 - 详解

目录 一.生产者消费者模型概念 1.为何要使用生产者消费者模型 2.生产者消费者之间的关系 3.生产者消费者模型的优点 二.基于阻塞队列的生产消费模型 1.在阻塞队列中的三种关系 2.BlockingQueue.hpp - 阻塞队列类 3.LockGurad.hpp - RAII互斥锁类 4.Task.hpp - 在阻塞队…...

源码深度解析Spring Bean的加载

在应用spring 的过程中&#xff0c;就会涉及到bean的加载&#xff0c;bean的加载经历一个相当复杂的过程&#xff0c;bean的加载入口如下&#xff1a; 使用getBean&#xff08;&#xff09;方法进行加载Bean&#xff0c;最终调用的是AbstractBeanFactory.doGetBean() 进行Bean的…...

STL——priority_queue

一、priority_queue介绍及使用 1.priority_queue文档介绍 &#xff08;1&#xff09;优先队列是一种容器适配器&#xff0c;根据严格的弱排序标准&#xff0c;它的第一个元素总是它所包含的元素中最大的。 &#xff08;2&#xff09;此上下文类似与堆&#xff0c;在堆中可以…...

Springboot集成工作流Activity

介绍 官网&#xff1a;https://www.activiti.org/ 一 、工作流介绍 1.工作流&#xff08;workflow&#xff09; 就是通过计算机对业务流程自动化执行管理&#xff0c;它主要解决的是“使在多个参与这之间按照某种预定义规则自动化进行传递文档、信息或任务的过程&#xff0c…...

2023软件测试工程师涨薪攻略,3年如何达到月薪30K?

1.软件测试如何实现涨薪 首先涨薪并不是从8000涨到9000这种涨薪&#xff0c;而是从8000涨到15K加到25K的涨薪。基本上三年之内就可以实现。 如果我们只是普通的有应届毕业生或者是普通本科那我们就只能从小公司开始慢慢往上走。 有些同学想去做测试&#xff0c;是希望能够日…...

Java面试——Spring Bean相关知识

目录 1.Bean的定义 2.Bean的生命周期 3.BeanFactory及Factory Bean 4.Bean的作用域 5.Bean的线程安全问题 1.Bean的定义 JavaBean是描述Java的软件组件模型。在Java模型中&#xff0c;通过JavaBean可以无限扩充Java程序的功能&#xff0c;通过JavaBean的组合可以快速的生…...

上班在群里摸鱼,逮到一个字节8年测试开发,聊过之后羞愧难当...

老话说的好&#xff0c;这人呐&#xff0c;一旦在某个领域鲜有敌手了&#xff0c;就会闲得某疼。前几天我在上班摸鱼刷群的时候认识了一位字节测试开发大佬&#xff0c;在字节工作了8年&#xff0c;因为本人天赋比较高&#xff0c;平时工作也兢兢业业&#xff0c;现在企业内有一…...

HTTP、WebSocket和Socket.IO

一、HTTP协议 HTTP协议是Hyper Text Transfer Protocol&#xff08;超文本传输协议&#xff09;。HTTP 协议和 TCP/IP 协议族内的其他众多的协议相同&#xff0c; 用于客户端和服务器之间的通信。请求访问文本或图像等资源的一端称为客户端&#xff0c; 而提供资源响应的一端称…...

Fluent Python 笔记 第 11 章 接口:从协议到抽象基类

本章讨论的话题是接口:从鸭子类型的代表特征动态协议&#xff0c;到使接口更明确、能验证实现是否符合规定的抽象基类(Abstract Base Class&#xff0c;ABC)。 11.1 Python 文化中的接口和协议 对 Python 程序员来说&#xff0c;“X 类对象”“X 协 议”和“X 接口”都是一个…...

Vue记事本应用实现教程

文章目录 1. 项目介绍2. 开发环境准备3. 设计应用界面4. 创建Vue实例和数据模型5. 实现记事本功能5.1 添加新记事项5.2 删除记事项5.3 清空所有记事 6. 添加样式7. 功能扩展&#xff1a;显示创建时间8. 功能扩展&#xff1a;记事项搜索9. 完整代码10. Vue知识点解析10.1 数据绑…...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

基于matlab策略迭代和值迭代法的动态规划

经典的基于策略迭代和值迭代法的动态规划matlab代码&#xff0c;实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决

Spring Cloud Gateway 中自定义验证码接口返回 404 的排查与解决 问题背景 在一个基于 Spring Cloud Gateway WebFlux 构建的微服务项目中&#xff0c;新增了一个本地验证码接口 /code&#xff0c;使用函数式路由&#xff08;RouterFunction&#xff09;和 Hutool 的 Circle…...

在Ubuntu24上采用Wine打开SourceInsight

1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

AGain DB和倍数增益的关系

我在设置一款索尼CMOS芯片时&#xff0c;Again增益0db变化为6DB&#xff0c;画面的变化只有2倍DN的增益&#xff0c;比如10变为20。 这与dB和线性增益的关系以及传感器处理流程有关。以下是具体原因分析&#xff1a; 1. dB与线性增益的换算关系 6dB对应的理论线性增益应为&…...

使用Spring AI和MCP协议构建图片搜索服务

目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式&#xff08;本地调用&#xff09; SSE模式&#xff08;远程调用&#xff09; 4. 注册工具提…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)

RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发&#xff0c;后来由Pivotal Software Inc.&#xff08;现为VMware子公司&#xff09;接管。RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;用 Erlang 语言编写。广泛应用于各种分布…...