并发编程 | CompletionService - 如何优雅地处理批量异步任务
引言
上一篇文章中,我们详细地介绍了 CompletableFuture,它是一种强大的并发工具,能帮助我们以声明式的方式处理异步任务。虽然 CompletableFuture 很强大,但它并不总是最适合所有场景的解决方案。
在这篇文章中,我们将介绍 Java 的 CompletionService,这是一种能处理批量异步任务并在完成时获取结果的并发工具。
CompletionService
与 CompletableFuture
在很多方面都相似。它们都用于处理异步任务,并且都提供了获取任务完成结果的机制。然而,CompletionService 采用了更传统并发模型,它将生产者和消费者的角色更明确地分离开来。
回顾我们在上一篇文章:并发编程 | 从Future到CompletableFuture 中讨论的需求,我们需要查找并计算一系列旅行套餐的价格。我们使用 CompletableFuture 实现了这个需求,并且代码看起来很简洁明了。然而,事情都有两面性。有些人并不习惯这种写法,觉得CompletableFuture 的实现中存在大量的嵌套,会让代码难以阅读和理解。另外,我们的代码中有大量的函数式编程,这在一定程度上增加了对代码阅读的门槛,如果你不熟悉这种编程范式,代码可能会看起来很混乱。
有没有一种方法,既简洁的同时,又不回到Future的回调地狱陷阱中去?有,CompletionService 。来看下CompletionService 是怎么解决问题。
使用CompletionService 解决问题
如果我们用 CompletionService 来实现这个需求,会是什么样呢?我们来看下代码:
public List<TravelPackage> searchTravelPackages(SearchCondition searchCondition) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(10);CompletionService<List<TravelPackage>> completionService = new ExecutorCompletionService<>(executorService);List<Flight> flights = searchFlights(searchCondition);for (Flight flight : flights) {// 提交所有的任务completionService.submit(() -> {List<TravelPackage> travelPackagesForFlight = new ArrayList<>();List<Hotel> hotels = searchHotels(flight);for (Hotel hotel : hotels) {TravelPackage travelPackage = calculatePrice(flight, hotel);travelPackagesForFlight.add(travelPackage);}return travelPackagesForFlight;});}List<TravelPackage> allTravelPackages = new ArrayList<>();for (int i = 0; i < flights.size(); i++) {// 等待它们的完成Future<List<TravelPackage>> future = completionService.take();// 如果没完成,这里会阻塞List<TravelPackage> travelPackagesForFlight = future.get();allTravelPackages.addAll(travelPackagesForFlight);}executorService.shutdown();allTravelPackages.sort(Comparator.comparing(TravelPackage::getPrice));return allTravelPackages;
}
通过上面的代码,我们可以看到 CompletionService 提供了一个更传统的并发模型来处理异步任务。相比CompletableFuture 而言,我们的代码中没有复杂的嵌套,代码更加直观。
对初学者来说,这个模型会更容易理解,特别是对于那些不熟悉函数式编程的读者来说。
当然,作为老手的你(假如你弄懂了上篇文章,并实践完),如果你在使用CompletableFuture 过程中发现它嵌套太深太复杂,CompletionService 可能也是个不错的选择。
基于上述代码抽取CompletionService
我们把关键代码抽取出来并简化,就可以得到下面这段代码:
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);long start = System.currentTimeMillis();
// 提交3个任务
completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(5000);return "任务1完成";
});
completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(3000);return "任务2完成";
});
completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);return "任务3完成";
});
completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);return "任务4完成";
});// 获取结果
for (int i = 0; i < 4; i++) {try {Future<String> future = completionService.take();// 如果没完成,这里会阻塞System.out.println(future.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
}
executor.shutdown();
long end = System.currentTimeMillis();
System.out.println("任务花费时间: " + (end - start) + " ms");
结合文中代码注释,我把它总结为一句口诀:批量提交,快速获取。
批量我知道啊,就是遍历呗,但是提交到那里去?快速获取是什么意思?别急,我们接着往下看。
使用ExecutorService 实现需求
在回答这个问题之前,我们先来看一下代码。我们先sumbit()一下…然后get()拿到数据…
嗯?这不是和之前ExecutorService 差不多吗?好像可以用它实现啊,你看代码:
public List<TravelPackage> searchTravelPackages(SearchCondition searchCondition) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(10);List<Flight> flights = searchFlights(searchCondition);List<Future<List<TravelPackage>>> futureList = new ArrayList<>();for (Flight flight : flights) {Future<List<TravelPackage>> future = executorService.submit(() -> {List<TravelPackage> travelPackagesForFlight = new ArrayList<>();List<Hotel> hotels = searchHotels(flight);for (Hotel hotel : hotels) {TravelPackage travelPackage = calculatePrice(flight, hotel);travelPackagesForFlight.add(travelPackage);}return travelPackagesForFlight;});futureList.add(future);}List<TravelPackage> allTravelPackages = new ArrayList<>();for (Future<List<TravelPackage>> future : futureList) {List<TravelPackage> travelPackagesForFlight = future.get();allTravelPackages.addAll(travelPackagesForFlight);}executorService.shutdown();allTravelPackages.sort(Comparator.comparing(TravelPackage::getPrice));return allTravelPackages;
}
看,是不是可以实现了。那CompletionService这玩意存在的意义是啥?我们继续往下看。
提交先后顺序 VS 任务完成快慢顺序
我们先把上面抽取出来的代码执行,结果如下:
任务3完成
任务4完成
任务2完成
任务1完成
任务花费时间: 5012 ms
Disconnected from the target VM, address: '127.0.0.1:10373', transport: 'socket'Process finished with exit code 0
然后,我们换成ExecutorService 执行,抽取的ExecutorService 代码如下:
ExecutorService executor = Executors.newFixedThreadPool(3);
ArrayList<Future<String>> futures = new ArrayList<>();
long start = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(4);futures.add(executor.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(5000);latch.countDown();return "任务1完成";
}));
futures.add(executor.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(3000);latch.countDown();return "任务2完成";
}));
futures.add(executor.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);latch.countDown();return "任务3完成";
}));
futures.add(executor.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);latch.countDown();return "任务4完成";
}));for (Future<String> future : futures) {try {// 如果没完成,这里会阻塞System.out.println(future.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
}
latch.await();
executor.shutdown();
long end = System.currentTimeMillis();
System.out.println("任务花费时间: " + (end - start) + " ms");
执行结果如下:
任务1完成
任务2完成
任务3完成
任务4完成
任务花费时间: 5007 ms
Disconnected from the target VM, address: '127.0.0.1:14882', transport: 'socket'Process finished with exit code 0
细心的你肯定可以看到它们执行结果上的差异。CompletionService 是按照任务时间的顺序消费的。好,搞懂了这个,我们就可以回答上面其中一个问题:
快速获取是什么?
CompletionService是按照任务的快慢,谁先执行完谁就先返回。可以看到上面示例代码的结果,任务3只需要500ms,所以任务3先返回。
CompletionService 的适用场景
既然CompletionService 可以按照任务快慢顺序来返回,我们来看下它适合哪些场景:
执行一组任务并处理结果
上面就是很好的例子,我们可以在任何任务完成后立即获取并处理其结果,以实现快速响应。提高程序的吞吐量(先执行完任务,就有多的线程空闲,可以响应更多任务)。
生产者-消费者模式
我们在最早的开篇说过,CompletionService可以天然地实现生产者-消费者模式。这个模式中,生产者线程负责批量提交任务,消费者线程负责获取并处理任务的结果,而且它也可以安全地在多个线程之间共享。
新的问题又出现了,为什么又可以在多个线程之间共享?提交到那里去?快速获取是怎么做到的?以问题为导向,我们来分析下源码。
CompletionService源码分析
提交到那里去?为什么可以在多线程之间共享?
我们先看下构造函数中做了什么:
public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
ExecutorCompletionService
使用了一个BlockingQueue
来存储已完成的任务。因为,任务的提交Executor
和BlockingQueue
都是线程安全的。所以多线程共享的数据竞争问题已经在内部解决了。
快速获取是怎么做到的?
我们可以看下submit()
方法是怎么实现的。当你提交一个任务时,这个任务被封装在一个QueueingFuture
对象中:
public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;
}
QueueingFuture
重写了done()
方法。当任务完成时,done()
方法会被调用,QueueingFuture
会将自己添加到completionQueue
中:
private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}protected void done() { completionQueue.add(task); } //当任务完成时,将任务添加到队列中private final Future<V> task;
}
这样似乎就可以解释,快速获取的机制。完成的任务优先被放入BlockingQueue
中按照完成顺序排队。
现在,我换一种表述,你看下是否正确:快的任务在消费的时候就会被排在队列前面先被消费,这样就形成一个任务完成快慢
的顺序,第一个被消费到的任务一定是最快的。
第一个被消费到的任务一定是最快的吗?
从上面的代码测试示例结果来看, 确实如此。但是,我很遗憾的告诉你,这句话是错误的。
这句话的正确性是建立在任务数等于线程数的前提下。这就显得很鸡肋了,在在生产中很难达到这个效果,因为资源是稀缺的。当然,我们还是拿代码说话:
ExecutorService executor = Executors.newFixedThreadPool(3);CompletionService<String> completionService = new ExecutorCompletionService<>(executor);long start = System.currentTimeMillis();completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(5000);return "任务1完成";});completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(3000);return "任务2完成";});completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(6000);return "任务3完成";});completionService.submit(() -> {// 业务返回的实践可能不一样,模拟不一样的任务执行时间Thread.sleep(500);return "任务4完成";});for (int i = 0; i < 4; i++) {try {System.out.println(completionService.take().get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}executor.shutdown();long end = System.currentTimeMillis();System.out.println("任务花费时间: " + (end - start) + " ms");
假如遵循执行快慢顺序,理想的状态应该是:4 -> 2 -> 1 -> 3;而结果却是:
Connected to the target VM, address: '127.0.0.1:5068', transport: 'socket'
任务2完成
任务4完成
任务1完成
任务3完成
任务花费时间: 6020 ms
Disconnected from the target VM, address: '127.0.0.1:5068', transport: 'socket'
这个结果也是意料之外,但在情理之中。因为线程总共只有3个,在1,2,3之间排序,任务顺序应该是2,1,3;然后当2执行完之后,1和3依然未执行完;这个时候4正好执行完。于是就插队到任务中。最终得到2,4,1,3的结果。
因此,我们可以说:在生产环境中,这个顺序是不可控的,除非你把线程设置为1;
CompletionService相关面试题
如何使用CompletionService处理一组任务并获取结果?
比较ExecutorService和CompletionService,它们有什么相同之处和不同之处?
在何种情况下,你会选择使用CompletionService而不是ExecutorService?
解释CompletionService是如何保证按任务完成顺序获取结果的
当一个任务被提交到CompletionService后,它的生命周期是怎样的?在任务执行过程中,CompletionService内部都发生了什么?
在使用CompletionService处理任务时,如果某个任务执行异常,应该如何处理?
如果我想取消CompletionService中的所有任务,应该如何做?
谈谈你对Java中的Executor,ExecutorService,CompletionService和Future之间关系的理解
看完上面的文章,你可以试着来回答了吗?
参考文献
- Java并发编程小册
总结
让我们一起回顾今天所学。首先,我引导你使用了CompletionService和ExecutorService来实现了先前复杂的需求。相较于CompletableFuture,它们可能显得更为传统,但也更易理解。然后,我们一起探索了CompletionService的存在意义。我们试图解答,既然ExecutorService已经足够应对需求,为什么还要有CompletionService这样的设计。为了揭示这个疑惑,我们深入到源码中,同时也纠正了一个错误观点,以帮助你对CompletionService有更深刻的理解。最后,我们通过面试题形式,来巩固和复习我们所学的知识。
相关文章:

并发编程 | CompletionService - 如何优雅地处理批量异步任务
引言 上一篇文章中,我们详细地介绍了 CompletableFuture,它是一种强大的并发工具,能帮助我们以声明式的方式处理异步任务。虽然 CompletableFuture 很强大,但它并不总是最适合所有场景的解决方案。 在这篇文章中,我们…...

医学案例|ROC曲线之面积对比
一、案例介绍 为评价CT和CT增强对肝癌的诊断效果,共检查了32例患者,每例患者分别用两种方法检查,由医生盲态按4个等级诊断,最后经手术病理检查确诊其中有16例患有肝癌,评价CT个CT增强对肝癌是有有诊断效果并且试着比较…...

Kotlin线程的基本用法
线程的基本用法 新建一个类继承自Thread,然后重写父类的run()方法 class MyThread : Thread() {override fun run() {// 编写具体的逻辑} }// 使用 MyThread().start()实现Runnable接口 class MyThread : Runnable {override fun run() {// 编写具体的逻辑} }// …...

2.03 PageHelper分页工具
步骤1:在application.yml中添加分页配置 # 分页插件配置 pagehelper:helperDialect: mysqlsupportMethodsArguments: true步骤2:在顶级工程pom文件下引入分页插件依赖 <!--5.PageHelper --> <dependency><groupId>com.github.pagehe…...

VUE中使用ElementUI组件的单选按钮el-radio-button实现第二点击时取消选择的功能
页面样式为: html 代码为: 日志等级: <el-radio-group v-model"logLevel"><el-radio-button label"DEBUG" click.native.prevent"changeLogLevel(DEBUG)">DEBUG</el-radio-button><el-r…...

瓴羊Quick BI:可视化大屏界面设计满足企业个性需求
大数据技术成为现阶段企业缩短与竞争对手之间差距的重要抓手,依托以瓴羊Quick BI为代表的工具开展内部数据处理分析工作,也成为诸多企业持续获取竞争优势的必由之路。早年间国内企业倾向于使用进口BI工具,但随着瓴羊Quick BI等一众国内数据处…...

617. 合并二叉树
题目 题解一:递归 /*** 递归* param root1* param root2* return*/public TreeNode mergeTrees(TreeNode root1, TreeNode root2) {//结束条件if (root1 null) {return root2;} //结束条件if (root2 null) {return root1;}//两节点数值相加TreeNode me…...

【T1】存货成本异常、数量为零金额不为零的处理方法。
【问题描述】 使用T1飞跃专业版的过程中, 由于业务问题或者是操作问题, 经常会遇到某个商品成本异常不准确, 或者是遇到数量为0金额不为0的情况,需要将其成本调为0。 但是T1软件没有出入库调整单,并且结账无法针对数量…...

EtherNet IP转PROFINET网关连接西门子与欧姆龙方法
本文主要介绍了捷米特JM-PN-EIP(EtherNet/IP转PROFINET)网关西门子200智能PLC(PROFINET)和欧姆龙系统EtherNet/IP通信的配置过程。 1, 将 EDS 文件复制到欧姆龙软件的对应文件夹下 2, 首先添加捷米特JM-PN-EIP网关的全局变量&…...

低代码开发重要工具:jvs-flow(流程引擎)审批功能配置说明
流程引擎场景介绍 流程引擎基于一组节点与执行界面,通过人机交互的形式自动地执行和协调各个任务和活动。它可以实现任务的分配、协作、路由和跟踪。通过流程引擎,组织能够实现业务流程的优化、标准化和自动化,提高工作效率和质量。 在企业…...

[SQL挖掘机] - GROUP BY语句
介绍: group by 是 sql 中用于对结果集进行分组的关键字。通过使用 group by,可以根据一个或多个列的值将结果集中的行分组,并对每个分组应用某种聚合函数(如 count、sum、avg 等)以生成汇总信息。这样可以方便地对数据进行分类、…...

【ubuntu|内核】ubuntu 22.04修改内核为指定版本
every blog every motto: You can do more than you think. https://blog.csdn.net/weixin_39190382?typeblog 0. 前言 ubuntu 22.04 安装指定内核 1. 正文 查看已安装的内核镜像 dpkg --get-selections | grep linux-image1.1 安装指定版本的内核 安装镜像 sudo apt-g…...

Carla教程一:动力学模型到LQR
Carla教程一、动力学模型到LQR 从运动学模型和动力学模型到LQR 模型就是可以描述车辆运动规律的模型。车辆建模都是基于自行车模型的设定,也就是将四个轮子抽象为自行车一样的两个轮子来建模。 1、运动学模型 运动学模型是基于几何关系分析出来的,一般适用于低俗情况下,…...

IDE/mingw下动态库(.dll和.a文件)的生成和部署使用(对比MSVC下.dll和.lib)
文章目录 概述问题的产生基于mingw的DLL动态库基于mingw的EXE可执行程序Makefile文件中使用Qt库的\*.a文件mingw下的*.a 文件 和 *.dll 到底谁起作用小插曲 mingw 生成的 \*.a文件到底是什么为啥mingw的dll可用以编译链接过程转换为lib引导文件 概述 本文介绍了 QtCreator mi…...

点击加号添加新的输入框
实现如上图的效果 html部分: <el-form-item class"forminput" v-for"(item,index) in formdata.description" :key"index" :label"描述(index1)" prop"description"><el-input v-model"formdata…...

SQL AND OR 运算符
AND & OR 运算符用于基于一个以上的条件对记录进行过滤。 如果第一个条件和第二个条件都成立,则 AND 运算符显示一条记录。 如果第一个条件和第二个条件中只要有一个成立,则 OR 运算符显示一条记录。 下面是选自 "students" 表的数据&a…...

6、C++内存模型
原文: https://my.oschina.net/u/2516597/blog/805489 背景 C11开始支持多线程,其中提供了原子类型atomic, 和atomic关系比较密切的是memory_order,所有的内存模型都是指atomic类型 enum memory_order {memory_order_relaxed,memory_order…...

上海市青少年算法2023年1月月赛(丙组)
上海市青少年算法2023年1月月赛(丙组)T1 实验日志 题目描述 小爱正在完成一个物理实验,为期n天,其中第i天,小爱会记录ai条实验数据在实验日志中。 已知小爱的实验日志每一页最多纪录m条数据,每天做完实验后他都会将日志合上,第二天,他便从第一页开始依次翻页,直到找到…...

移动开发之Wifi列表获取功能
一、场景 业务需要通过App给设备配置无线网络连接,所以需要App获取附近的WiFi列表,并进行网络连接验证。 二、安卓端实现 1、阅读谷歌官网文档,关于Wifi 接口使用 https://developer.android.com/guide/topics/connectivity/wifi-scan?hl…...

MyBatisPlus - 实体类 的 常用注解
TableName(“表名”) 假设 表名是 book,实体类类名是 Book MyBatisPlus会进行自动映射 但如果 表名是 tab_book,实体类类名是 Book 那么MyBatisPlus就无法进行自动映射,需要我们使用 TableName注解 去指定实体类对应的表 如下 TableNa…...

vue3+ts+elementui-plus二次封装树形表格实现不同层级展开收起的功能
一、TableTreeLevel组件 <template><div classmain><div class"btns"><el-button type"primary" click"expandLevel(1)">展开一级</el-button><el-button type"primary" click"expandLevel(2…...

Qt之切换语言的方法(传统数组法与Qt语言家)
http://t.csdn.cn/BVigB 传统数组法: 定义一个字符串二维数组, QString weekStr[2][7] {"星期一","星期二","星期三","星期四","星期五","星期六","星期日",\ "Monday&…...

qt root start faild
深入解析chown -r root:root命令_笔记大全_设计学院 ffmpeg第五弹:QtSDLffmpeg视频播放演示_txp玩Linux的博客-CSDN博客...

数据结构—串
4.1串 4.1.1串的定义 串(String)——零个或多个任意字符组成的有限序列 S"a1 a2...an"串的定义——几个术语 子串:串中任意个连续字符组成的子序列称为该串的子串 例如,“abcde”的子串有: “ ”、“a”、…...

hive 全量表、增量表、快照表、切片表和拉链表
全量表:记录每天的所有的最新状态的数据,增量表:记录每天的新增数据,增量数据是上次导出之后的新数据。快照表:按日分区,记录截止数据日期的全量数据切片表:切片表根据基础表,往往只…...

数据结构07:查找[C++][B树Btree]
图源:文心一言 考研对于B树的要求重点在推理手算的部分,只参考王道论坛咸鱼老师的视频就可以了;若时间非常充裕的小伙伴,也可以往下滑了解一下代码~🥝🥝 备注: 这次的代码是从这里复制的&…...

在CSDN学Golang云原生(Kubernetes集群管理)
一,Node的隔离与恢复 在 Kubernetes 集群中,Node 的隔离与恢复通常可以通过以下方式实现: 使用 Taints 和 Tolerations 实现隔离 Taints 和 Tolerations 是 Kubernetes 中用于节点调度的机制。通过给节点添加 taints(污点&…...

WPF实战学习笔记18-优化设计TodoView
文章目录 优化设计TodoView修复新增项目无法编辑问题增加了对完成状态的区分增加了选项卡删除功能更新删除请求URI添加删除命令并初始化UI添加删除按钮更改控制器 增加查询结果为空的图片增加转换器修改UI添加资源、命名空间 添加相关元素 增加了根据状态查询的功能Mytodo.Serv…...

Python版day59
503. 下一个更大元素 II 给定一个循环数组 nums ( nums[nums.length - 1] 的下一个元素是 nums[0] ),返回 nums 中每个元素的 下一个更大元素 。 数字 x 的 下一个更大的元素 是按数组遍历顺序,这个数字之后的第一个比它更大的数&…...

[SQL挖掘机] - 算术运算符
在 sql 中,算术运算符主要用于执行数值计算操作,并且在查询语句中具有重要的地位。下面是算术运算符在 sql 中的一些作用和地位: 进行数值计算:算术运算符可以对数值类型的数据进行加减乘除等数值计算操作。例如,可以…...