研发必会-异步编程利器之CompletableFuture(含源码 中)
近期热推文章:
1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表;
2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;
3、基于Redis的Geo实现附近商铺搜索(含源码)
4、基于Redis实现关注、取关、共同关注及消息推送(含源码)
5、SpringBoot整合多数据源,并支持动态新增与切换(详细教程)
6、基于Redis实现点赞及排行榜功能
一、多任务组合回调
备注:源码获取方式在文底。
1.1、AND组合关系
thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。也即:当任务一和任务二都完成再执行任务三(异步任务)。
区别在于:
1、runAfterBoth:不会把执行结果当做方法入参,且没有返回值。
2、thenAcceptBoth:会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值。
3、thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值。
代码案例:
/**
* 功能描述:多任务组合回调:AND组合关系
* @MethodName: testCompleteAnd
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/10/11 17:30
*/
public void testCompleteAnd() throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
//1、使用自定义线程池,开启异步任务01
CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务1 开始执行任务01,当前线程为:12
log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=1; //模拟加1
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//2、使用自定义线程池,开启异步任务02
CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务02 开始执行任务02,当前线程为:13
log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=2; //模拟加2
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
});
//3、任务02:将任务1与任务2开始任务组合
CompletableFuture<Integer> thenCombineAsyncRes=supplyAsyncRes01.thenCombineAsync(supplyAsyncRes02,(res01, res02)->{
//始执行任务03,当前线程为:14
log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());
log.info("任务01返回值:"+res01);
log.info("任务02返回值:"+res02);
//任务组合返回值 可以拿到任务01和任务02的返回结果进行相关操作,然后统一返回结果
return res01+res02;
},executorService);
//4、最终返回结果
log.info("最终返回结果为:"+thenCombineAsyncRes.get());
log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
运行结果:
1.2、OR组合关系
将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。(两个任务,只要有一个任务完成,就执行任务三)
区别在于:
1、runAfterEither:不会把执行结果当做方法入参,且没有返回值。
2、acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值。
3、applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值。(个人推荐)
参考代码:
/**
* 功能描述:OR组合关系
* @MethodName: testCompleteOr
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/10/11 18:14
*/
public void testCompleteOr(){
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
//1、使用自定义线程池,开启异步任务01
CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务1 开始执行任务01,当前线程为:12
log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=2; //模拟加1
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//2、使用自定义线程池,开启异步任务02
CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务02 开始执行任务02,当前线程为:13
log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=3; //模拟加2
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//3、任务组合or
supplyAsyncRes01.acceptEitherAsync(supplyAsyncRes02,(res)->{
try {
log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
log.info("上一个任务返回值:"+res);
log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
},executorService);
}
返回结果:
若将异步任务02中的Thread.sleep(600)改为300,将输出的结果为:
从结果中不难对比发现,任务03的参数是任务01和任务02中执行最快的返回结果。
注意:若把核心线程数量改为1,会是什么样的呢?
ExecutorService executorService = Executors.newFixedThreadPool(1);
运行结果:
从上面看出,改为1就变成单线程执行了。
1.3、多任务组合(allOf\anyOf)
1.allOf:等待所有任务都执行完成后,才会执行 allOf 返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常。(等待所有任务完成才会执行)
2.anyOf:任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常。(只要有一个任务完成)
参考案例:
public void testAllOfOrAnyOf() throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
//1、使用自定义线程池,开启异步任务01
CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务1 开始执行任务01,当前线程为:12
log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=3; //模拟加1
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//2、使用自定义线程池,开启异步任务02
CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务02 开始执行任务02,当前线程为:13
log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=4; //模拟加2
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//3、使用自定义线程池,开启异步任务03
CompletableFuture<Integer> supplyAsyncRes03=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务02 开始执行任务02,当前线程为:13
log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=5; //模拟加2
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//4、开始任务组合
CompletableFuture<Void> allOfRes=CompletableFuture.allOf(supplyAsyncRes01,supplyAsyncRes02,supplyAsyncRes03);
//等待所有任务完成
log.info("所有任务执行完成,组合后返回结果为:"+allOfRes.get());
//获取所有任务的返回结果
log.info("任务01返回值:"+supplyAsyncRes01.get());
log.info("任务02返回值:"+supplyAsyncRes02.get());
log.info("任务03返回值:"+supplyAsyncRes03.get());
log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
结果返回:
从结果中看出:等待所有任务都执行完成后,才会执行 allOf 返回的CompletableFuture。
同理anyOf,只需要调整代码:
CompletableFuture<Object> allOfRes=CompletableFuture.anyOf(supplyAsyncRes01,supplyAsyncRes02,supplyAsyncRes03);
运行结果:
1.4、thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例。
1、如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
2、如果该CompletableFuture实例为null,然后就执行这个新任务。
代码案例:
/**
* 功能描述:thenCompose
* @MethodName: testThenCompose
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/10/12 9:38
*/
public void testThenCompose() throws ExecutionException, InterruptedException {
CompletableFuture<String> res01=CompletableFuture.completedFuture("任务01");
ExecutorService executor = Executors.newSingleThreadExecutor();
//第二个任务 在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法,
// 该方法会返回一个新的CompletableFuture实例。
CompletableFuture<String> futureRes =CompletableFuture.supplyAsync(()-> "第二个任务02"
,executor).thenComposeAsync(data->{
log.info("data数据为:"+data);
return res01;
},executor);
log.info("最终返回:"+futureRes.get());
executor.shutdown();
}
结果:
二、使用注意点
CompletableFuture 使异步编程更加便利的、代码更加优雅的同时,也要关注使用的一些注意点。
2.1、Future需要获取返回值,才能获取异常信息
代码案例:
/**
* 功能描述:使用注意点
* @MethodName: testFuture
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/10/12 9:54
*/
public void testFuture() throws ExecutionException, InterruptedException {
//自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
5L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
//创建任务
CompletableFuture<Void> res01=CompletableFuture.supplyAsync(()->{
int sum=1/0;
return "分母不能为0";
},executorService).thenAccept((res)->{ //3、异常捕获
log.info("系统出现异常,需要处理:"+res);
});
log.info("返回结果:"+res01.get());
}
输出结果:
Future需要获取返回值(res01.get()),才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。使用的时候,注意一下,考虑是否加try…catch…或者使用exceptionally方法。
若改成exceptionally方法,无需get或join也可以捕获异常信息:
CompletableFuture<String> res01=CompletableFuture.supplyAsync(()->{
int sum=1/0;
return "分母不能为0";
},executorService).exceptionally((throwable)->{ //3、异常捕获
log.info("系统出现异常,需要处理:"+throwable.getMessage());
return "00";
});
// log.info("返回结果:"+res01.get());
结果:
2.2、CompletableFuture的get()方法是阻塞的
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。
推荐使用:
log.info("返回结果:"+res01.get(5,TimeUnit.SECONDS));
2.3、建议使用自定义线程池,不要使用默认的
CompletableFuture代码中使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。
参考案例:
//自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
5L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离。
/**
* 参数信息:
* int corePoolSize 核心线程大小
* int maximumPoolSize 线程池最大容量大小
* long keepAliveTime 线程空闲时,线程存活的时间
* TimeUnit unit 时间单位
* BlockingQueue<Runnable> workQueue 任务队列。一个阻塞队列
* AbortPolicy(默认):直接抛弃
*/
ThreadPoolExecutor pool = new ThreadPoolExecutor(4,
4,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(10),
new ThreadPoolExecutor.AbortPolicy());
说明:
AbortPolicy(默认):直接抛弃
CallerRunsPolicy:用调用者的线程执行任务
DiscardOldestPolicy:抛弃队列中最久的任务
DiscardPolicy:抛弃当前任务。
三、源码获取方式
更多优秀文章,请关注个人微信公众号或搜索“程序猿小杨”查阅。然后回复:源码,可以获取对应的源码,开箱即可使用。
如果大家对相关文章感兴趣,可以关注微信公众号"程序猿小杨",会持续更新优秀文章!欢迎大家 分享、收藏、点赞、在看,您的支持就是我坚持下去的最大动力!谢谢!
参考网站:
https://blog.csdn.net/ThinkWon/article/details/123390393
https://mp.weixin.qq.com/s/shjANruBk6VL492JaWLTEg
相关文章:

研发必会-异步编程利器之CompletableFuture(含源码 中)
近期热推文章: 1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表; 2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据; 3、基于Redis的Geo实现附近商铺搜索(含源码) 4、基于Redis实现关注、取关、共同关注及消息推送(含源码) 5…...

上海亚商投顾:沪指高开高走 锂电等新能源赛道大反攻
上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。 一.市场情绪 沪指昨日高开后强势震荡,创业板指盘中一度翻绿,随后探底回升再度走高。碳酸锂期货合约…...

力扣第235题 二又搜索树的最近公共祖先 c++
题目 235. 二叉搜索树的最近公共祖先 中等 (简单) 相关标签 树 深度优先搜索 二叉搜索树 二叉树 给定一个二叉搜索树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为:“对于有根树 T 的两个结点 p、q&…...

时代风口中的Web3.0基建平台,重新定义Web3.0!
近年来,Web3.0概念的广泛兴起,给加密行业带来了崭新的叙事方式,同时也为加密行业提供了更加具有想象力的应用场景与商业空间,并让越来越多的行业从业者们意识到只有更大众化的市场共性需求才能推动加密市场的持续繁荣。当前围绕这…...

React学习笔记 001
什么是React 1.发送请求获取数据 处理数据(过滤、整理格式等) 3.操作DOM呈现页面 react 主要是负责第三部 操作dom 处理页面 数据渲染为HTML视图的开源js库。 好处 避免dom繁琐 组件化 提升复用率 特点 声明式编程: 简单 组件化编程…...

2023 | github无法访问或速度慢的问题解决方案
github无法访问或速度慢的问题解决方案 前言: 最近经常遇到github无法访问, 或者访问特别慢的问题, 在搜索了一圈解决方案后, 有些不再有效了, 但是其中有几个还特别好用, 总结一下. 首选方案 直接在github.com的域名上加一个fast > githubfast.com, 访问的是与github完全相…...

unity各种插件集合(自用)
2D Animation——2D序列帧/骨骼动画相关 2D PSD Importer——psb骨骼动画(unity官方建议使用psb而非psd) (Advanced —show preview package 勾选)出现 2D IK——反向动力学IK Universal RP——升级项目到Urp(通用渲…...

内网收集哈希传递
1.内网收集的前提 获得一个主机权限 补丁提权 可以使用 systeminfo 然后使用python脚本找到缺少的补丁 下载下来 让后使用exp提权 收集信息 路由信息 补丁接口 dns域看一看是不是域控 扫描别的端口 看看有没有内在的web网站 哈希传递 哈希是啥 哈希…...
前端目录笔记
HTML HTML 笔记:初识 HTML(HTML文本标签、文本列表、嵌入图片、背景色、网页链接)-CSDN博客html 笔记:CSS_UQI-LIUWJ的博客-CSDN博客HTML 笔记 表格_UQI-LIUWJ的博客-CSDN博客 javascript JavaScript 笔记 初识JavaScript&…...

Sui主网升级至V1.11.2版本
Sui主网现已升级至V1.11.2版本,同时Sui协议升级至27版本。其他升级要点如下: 对于一些更高级别的交易,更改了一些gas费设置,使其gas费消耗的更快。这些更改不影响以前在网络上运行的任何交易,只是为了确保在开始大量使…...
Mysql-数据库和数据表的基本操作
Mysql数据库和数据表的基本操作 一.数据库 1.创建数据库 创建数据库就是在数据库系统中划分一块空间存储数据 (1)语法 create database 数据库名称;(2)查看数据库 show create database 数据库名;(3)…...

拓扑排序求最长路
P1807 最长路 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 题目要求我们求出第1号到第n号节点之间最长的距离。 我们想到使用拓扑排序来求最长路。 正常来讲,我们应该把1号节点入队列,再出队列,把一号节点能到达的所有的点的入度减一&a…...

sqli-lab靶场通关
文章目录 less-1less-2less-3less-4less-5less-6less-7less-8less-9less-10 less-1 1、提示输入参数id,且值为数字; 2、判断是否存在注入点 id1报错,说明存在 SQL注入漏洞。 3、判断字符型还是数字型 id1 and 11 --id1 and 12 --id1&quo…...

使用 Apache Camel 和 Quarkus 的微服务(五)
【squids.cn】 全网zui低价RDS,免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 在本系列的第三部分中,我们了解了如何在 Minikube 中部署基于 Quarkus/Camel 的微服务,这是最常用的 Kubernetes 本地实现之一。虽然这样的本地…...

Ubuntu磁盘满了,导致黑屏
前言 (1)最近要玩Milk-V Duo,配置环境过程中,发现磁盘小了。于是退出虚拟机,扩大Ubuntu大小,重新开机,发现无法进入Ubuntu界面。 (2)查了很久,后面发现是磁盘…...
安装sklearn包错误解决以及 scikit-learn简介
安装sklearn包错误解决以及 scikit-learn简介 利用 pip install sklearn时出现错误 pip install sklearn Looking in indexes: https://mirrors.aliyun.com/pypi/simple/ Collecting sklearnUsing cached https://mirrors.aliyun.com/pypi/packages/b9/0e/b2a4cfaa9e12b9ca4…...

CSS点击切换或隐藏盒子的卷起、展开效果
<template><div class"main"><el-button click"onCllick">切换</el-button><transition name"slideDown"><div class"info" v-if"isShow">1111</div></transition></di…...
关于信息安全软考的一些记录1
1、网络信息安全的基本属性 机密性:网络信息不泄露给非授权的用户完整性:未经授权必能改的特性可用性:可以及时获取网络信息和服务的特性可控性:责任主体对网络信息系统具有管理、支配的能力【可管理、可支配】扛抵赖性ÿ…...

如何选择UMLChina服务
服务口号:聚焦最后一公里 斐力庇第斯从马拉松跑回雅典报信,虽然已是满身血迹、精疲力尽,但他知道:没有出现在雅典人民面前,前面的路程都是白费。 学到的知识如果不能最终【用】于您自己的项目之中,也同样是…...
关于信息安全软考的记录3
1、网络安全体系的特征 网络安全体系:网络安全保障系统的最高层概念抽象 特征内容整体性网络安全单元按照一定的规则,相互依赖、相互作用而形成人机物一体化的网络安全保护方式协同性通过各种安全机制的相互协作,构建系统性的网络安全保护方…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合
强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

【电力电子】基于STM32F103C8T6单片机双极性SPWM逆变(硬件篇)
本项目是基于 STM32F103C8T6 微控制器的 SPWM(正弦脉宽调制)电源模块,能够生成可调频率和幅值的正弦波交流电源输出。该项目适用于逆变器、UPS电源、变频器等应用场景。 供电电源 输入电压采集 上图为本设计的电源电路,图中 D1 为二极管, 其目的是防止正负极电源反接, …...

群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?
在工业自动化持续演进的今天,通信网络的角色正变得愈发关键。 2025年6月6日,为期三天的华南国际工业博览会在深圳国际会展中心(宝安)圆满落幕。作为国内工业通信领域的技术型企业,光路科技(Fiberroad&…...
Qt 事件处理中 return 的深入解析
Qt 事件处理中 return 的深入解析 在 Qt 事件处理中,return 语句的使用是另一个关键概念,它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别:不同层级的事件处理 方…...

C++_哈希表
本篇文章是对C学习的哈希表部分的学习分享 相信一定会对你有所帮助~ 那咱们废话不多说,直接开始吧! 一、基础概念 1. 哈希核心思想: 哈希函数的作用:通过此函数建立一个Key与存储位置之间的映射关系。理想目标:实现…...

【技巧】dify前端源代码修改第一弹-增加tab页
回到目录 【技巧】dify前端源代码修改第一弹-增加tab页 尝试修改dify的前端源代码,在知识库增加一个tab页"HELLO WORLD",完成后的效果如下 [gif01] 1. 前端代码进入调试模式 参考 【部署】win10的wsl环境下启动dify的web前端服务 启动调试…...

初探用uniapp写微信小程序遇到的问题及解决(vue3+ts)
零、关于开发思路 (一)拿到工作任务,先理清楚需求 1.逻辑部分 不放过原型里说的每一句话,有疑惑的部分该问产品/测试/之前的开发就问 2.页面部分(含国际化) 整体看过需要开发页面的原型后,分类一下哪些组件/样式可以复用,直接提取出来使用 (时间充分的前提下,不…...