研发必会-异步编程利器之CompletableFuture(含源码 中)
近期热推文章:
1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表;
2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;
3、基于Redis的Geo实现附近商铺搜索(含源码)
4、基于Redis实现关注、取关、共同关注及消息推送(含源码)
5、SpringBoot整合多数据源,并支持动态新增与切换(详细教程)
6、基于Redis实现点赞及排行榜功能
一、多任务组合回调
备注:源码获取方式在文底。
1.1、AND组合关系
thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。也即:当任务一和任务二都完成再执行任务三(异步任务)。
区别在于:
1、runAfterBoth:不会把执行结果当做方法入参,且没有返回值。
2、thenAcceptBoth:会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值。
3、thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值。
代码案例:
/**
* 功能描述:多任务组合回调:AND组合关系
* @MethodName: testCompleteAnd
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/10/11 17:30
*/
public void testCompleteAnd() throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
//1、使用自定义线程池,开启异步任务01
CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务1 开始执行任务01,当前线程为:12
log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=1; //模拟加1
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//2、使用自定义线程池,开启异步任务02
CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务02 开始执行任务02,当前线程为:13
log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=2; //模拟加2
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
});
//3、任务02:将任务1与任务2开始任务组合
CompletableFuture<Integer> thenCombineAsyncRes=supplyAsyncRes01.thenCombineAsync(supplyAsyncRes02,(res01, res02)->{
//始执行任务03,当前线程为:14
log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());
log.info("任务01返回值:"+res01);
log.info("任务02返回值:"+res02);
//任务组合返回值 可以拿到任务01和任务02的返回结果进行相关操作,然后统一返回结果
return res01+res02;
},executorService);
//4、最终返回结果
log.info("最终返回结果为:"+thenCombineAsyncRes.get());
log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
运行结果:
1.2、OR组合关系
将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。(两个任务,只要有一个任务完成,就执行任务三)
区别在于:
1、runAfterEither:不会把执行结果当做方法入参,且没有返回值。
2、acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值。
3、applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值。(个人推荐)
参考代码:
/**
* 功能描述:OR组合关系
* @MethodName: testCompleteOr
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/10/11 18:14
*/
public void testCompleteOr(){
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
//1、使用自定义线程池,开启异步任务01
CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务1 开始执行任务01,当前线程为:12
log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=2; //模拟加1
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//2、使用自定义线程池,开启异步任务02
CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务02 开始执行任务02,当前线程为:13
log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=3; //模拟加2
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//3、任务组合or
supplyAsyncRes01.acceptEitherAsync(supplyAsyncRes02,(res)->{
try {
log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
log.info("上一个任务返回值:"+res);
log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
},executorService);
}
返回结果:
若将异步任务02中的Thread.sleep(600)改为300,将输出的结果为:
从结果中不难对比发现,任务03的参数是任务01和任务02中执行最快的返回结果。
注意:若把核心线程数量改为1,会是什么样的呢?
ExecutorService executorService = Executors.newFixedThreadPool(1);
运行结果:
从上面看出,改为1就变成单线程执行了。
1.3、多任务组合(allOf\anyOf)
1.allOf:等待所有任务都执行完成后,才会执行 allOf 返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常。(等待所有任务完成才会执行)
2.anyOf:任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常。(只要有一个任务完成)
参考案例:
public void testAllOfOrAnyOf() throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(10);
long startTime = System.currentTimeMillis();
//1、使用自定义线程池,开启异步任务01
CompletableFuture<Integer> supplyAsyncRes01=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务1 开始执行任务01,当前线程为:12
log.info("开始执行任务01,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=3; //模拟加1
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//2、使用自定义线程池,开启异步任务02
CompletableFuture<Integer> supplyAsyncRes02=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务02 开始执行任务02,当前线程为:13
log.info("开始执行任务02,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=4; //模拟加2
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//3、使用自定义线程池,开启异步任务03
CompletableFuture<Integer> supplyAsyncRes03=CompletableFuture.supplyAsync(()->{
int res=1;
try {
//执行任务02 开始执行任务02,当前线程为:13
log.info("开始执行任务03,当前线程为:"+Thread.currentThread().getId());
//执行具体的事务
Thread.sleep(600);
res+=5; //模拟加2
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回结果
return res;
},executorService);
//4、开始任务组合
CompletableFuture<Void> allOfRes=CompletableFuture.allOf(supplyAsyncRes01,supplyAsyncRes02,supplyAsyncRes03);
//等待所有任务完成
log.info("所有任务执行完成,组合后返回结果为:"+allOfRes.get());
//获取所有任务的返回结果
log.info("任务01返回值:"+supplyAsyncRes01.get());
log.info("任务02返回值:"+supplyAsyncRes02.get());
log.info("任务03返回值:"+supplyAsyncRes03.get());
log.info("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
结果返回:
从结果中看出:等待所有任务都执行完成后,才会执行 allOf 返回的CompletableFuture。
同理anyOf,只需要调整代码:
CompletableFuture<Object> allOfRes=CompletableFuture.anyOf(supplyAsyncRes01,supplyAsyncRes02,supplyAsyncRes03);
运行结果:
1.4、thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例。
1、如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
2、如果该CompletableFuture实例为null,然后就执行这个新任务。
代码案例:
/**
* 功能描述:thenCompose
* @MethodName: testThenCompose
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/10/12 9:38
*/
public void testThenCompose() throws ExecutionException, InterruptedException {
CompletableFuture<String> res01=CompletableFuture.completedFuture("任务01");
ExecutorService executor = Executors.newSingleThreadExecutor();
//第二个任务 在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法,
// 该方法会返回一个新的CompletableFuture实例。
CompletableFuture<String> futureRes =CompletableFuture.supplyAsync(()-> "第二个任务02"
,executor).thenComposeAsync(data->{
log.info("data数据为:"+data);
return res01;
},executor);
log.info("最终返回:"+futureRes.get());
executor.shutdown();
}
结果:
二、使用注意点
CompletableFuture 使异步编程更加便利的、代码更加优雅的同时,也要关注使用的一些注意点。
2.1、Future需要获取返回值,才能获取异常信息
代码案例:
/**
* 功能描述:使用注意点
* @MethodName: testFuture
* @MethodParam: []
* @Return: void
* @Author: yyalin
* @CreateDate: 2023/10/12 9:54
*/
public void testFuture() throws ExecutionException, InterruptedException {
//自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
5L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
//创建任务
CompletableFuture<Void> res01=CompletableFuture.supplyAsync(()->{
int sum=1/0;
return "分母不能为0";
},executorService).thenAccept((res)->{ //3、异常捕获
log.info("系统出现异常,需要处理:"+res);
});
log.info("返回结果:"+res01.get());
}
输出结果:
Future需要获取返回值(res01.get()),才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。使用的时候,注意一下,考虑是否加try…catch…或者使用exceptionally方法。
若改成exceptionally方法,无需get或join也可以捕获异常信息:
CompletableFuture<String> res01=CompletableFuture.supplyAsync(()->{
int sum=1/0;
return "分母不能为0";
},executorService).exceptionally((throwable)->{ //3、异常捕获
log.info("系统出现异常,需要处理:"+throwable.getMessage());
return "00";
});
// log.info("返回结果:"+res01.get());
结果:
2.2、CompletableFuture的get()方法是阻塞的
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。
推荐使用:
log.info("返回结果:"+res01.get(5,TimeUnit.SECONDS));
2.3、建议使用自定义线程池,不要使用默认的
CompletableFuture代码中使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数。
参考案例:
//自定义线程池
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
5L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10));
但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离。
/**
* 参数信息:
* int corePoolSize 核心线程大小
* int maximumPoolSize 线程池最大容量大小
* long keepAliveTime 线程空闲时,线程存活的时间
* TimeUnit unit 时间单位
* BlockingQueue<Runnable> workQueue 任务队列。一个阻塞队列
* AbortPolicy(默认):直接抛弃
*/
ThreadPoolExecutor pool = new ThreadPoolExecutor(4,
4,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(10),
new ThreadPoolExecutor.AbortPolicy());
说明:
AbortPolicy(默认):直接抛弃
CallerRunsPolicy:用调用者的线程执行任务
DiscardOldestPolicy:抛弃队列中最久的任务
DiscardPolicy:抛弃当前任务。
三、源码获取方式
更多优秀文章,请关注个人微信公众号或搜索“程序猿小杨”查阅。然后回复:源码,可以获取对应的源码,开箱即可使用。
如果大家对相关文章感兴趣,可以关注微信公众号"程序猿小杨",会持续更新优秀文章!欢迎大家 分享、收藏、点赞、在看,您的支持就是我坚持下去的最大动力!谢谢!
参考网站:
https://blog.csdn.net/ThinkWon/article/details/123390393
https://mp.weixin.qq.com/s/shjANruBk6VL492JaWLTEg
相关文章:

研发必会-异步编程利器之CompletableFuture(含源码 中)
近期热推文章: 1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表; 2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据; 3、基于Redis的Geo实现附近商铺搜索(含源码) 4、基于Redis实现关注、取关、共同关注及消息推送(含源码) 5…...

上海亚商投顾:沪指高开高走 锂电等新能源赛道大反攻
上海亚商投顾前言:无惧大盘涨跌,解密龙虎榜资金,跟踪一线游资和机构资金动向,识别短期热点和强势个股。 一.市场情绪 沪指昨日高开后强势震荡,创业板指盘中一度翻绿,随后探底回升再度走高。碳酸锂期货合约…...

力扣第235题 二又搜索树的最近公共祖先 c++
题目 235. 二叉搜索树的最近公共祖先 中等 (简单) 相关标签 树 深度优先搜索 二叉搜索树 二叉树 给定一个二叉搜索树, 找到该树中两个指定节点的最近公共祖先。 百度百科中最近公共祖先的定义为:“对于有根树 T 的两个结点 p、q&…...

时代风口中的Web3.0基建平台,重新定义Web3.0!
近年来,Web3.0概念的广泛兴起,给加密行业带来了崭新的叙事方式,同时也为加密行业提供了更加具有想象力的应用场景与商业空间,并让越来越多的行业从业者们意识到只有更大众化的市场共性需求才能推动加密市场的持续繁荣。当前围绕这…...

React学习笔记 001
什么是React 1.发送请求获取数据 处理数据(过滤、整理格式等) 3.操作DOM呈现页面 react 主要是负责第三部 操作dom 处理页面 数据渲染为HTML视图的开源js库。 好处 避免dom繁琐 组件化 提升复用率 特点 声明式编程: 简单 组件化编程…...

2023 | github无法访问或速度慢的问题解决方案
github无法访问或速度慢的问题解决方案 前言: 最近经常遇到github无法访问, 或者访问特别慢的问题, 在搜索了一圈解决方案后, 有些不再有效了, 但是其中有几个还特别好用, 总结一下. 首选方案 直接在github.com的域名上加一个fast > githubfast.com, 访问的是与github完全相…...

unity各种插件集合(自用)
2D Animation——2D序列帧/骨骼动画相关 2D PSD Importer——psb骨骼动画(unity官方建议使用psb而非psd) (Advanced —show preview package 勾选)出现 2D IK——反向动力学IK Universal RP——升级项目到Urp(通用渲…...

内网收集哈希传递
1.内网收集的前提 获得一个主机权限 补丁提权 可以使用 systeminfo 然后使用python脚本找到缺少的补丁 下载下来 让后使用exp提权 收集信息 路由信息 补丁接口 dns域看一看是不是域控 扫描别的端口 看看有没有内在的web网站 哈希传递 哈希是啥 哈希…...
前端目录笔记
HTML HTML 笔记:初识 HTML(HTML文本标签、文本列表、嵌入图片、背景色、网页链接)-CSDN博客html 笔记:CSS_UQI-LIUWJ的博客-CSDN博客HTML 笔记 表格_UQI-LIUWJ的博客-CSDN博客 javascript JavaScript 笔记 初识JavaScript&…...

Sui主网升级至V1.11.2版本
Sui主网现已升级至V1.11.2版本,同时Sui协议升级至27版本。其他升级要点如下: 对于一些更高级别的交易,更改了一些gas费设置,使其gas费消耗的更快。这些更改不影响以前在网络上运行的任何交易,只是为了确保在开始大量使…...
Mysql-数据库和数据表的基本操作
Mysql数据库和数据表的基本操作 一.数据库 1.创建数据库 创建数据库就是在数据库系统中划分一块空间存储数据 (1)语法 create database 数据库名称;(2)查看数据库 show create database 数据库名;(3)…...

拓扑排序求最长路
P1807 最长路 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 题目要求我们求出第1号到第n号节点之间最长的距离。 我们想到使用拓扑排序来求最长路。 正常来讲,我们应该把1号节点入队列,再出队列,把一号节点能到达的所有的点的入度减一&a…...

sqli-lab靶场通关
文章目录 less-1less-2less-3less-4less-5less-6less-7less-8less-9less-10 less-1 1、提示输入参数id,且值为数字; 2、判断是否存在注入点 id1报错,说明存在 SQL注入漏洞。 3、判断字符型还是数字型 id1 and 11 --id1 and 12 --id1&quo…...

使用 Apache Camel 和 Quarkus 的微服务(五)
【squids.cn】 全网zui低价RDS,免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 在本系列的第三部分中,我们了解了如何在 Minikube 中部署基于 Quarkus/Camel 的微服务,这是最常用的 Kubernetes 本地实现之一。虽然这样的本地…...

Ubuntu磁盘满了,导致黑屏
前言 (1)最近要玩Milk-V Duo,配置环境过程中,发现磁盘小了。于是退出虚拟机,扩大Ubuntu大小,重新开机,发现无法进入Ubuntu界面。 (2)查了很久,后面发现是磁盘…...
安装sklearn包错误解决以及 scikit-learn简介
安装sklearn包错误解决以及 scikit-learn简介 利用 pip install sklearn时出现错误 pip install sklearn Looking in indexes: https://mirrors.aliyun.com/pypi/simple/ Collecting sklearnUsing cached https://mirrors.aliyun.com/pypi/packages/b9/0e/b2a4cfaa9e12b9ca4…...

CSS点击切换或隐藏盒子的卷起、展开效果
<template><div class"main"><el-button click"onCllick">切换</el-button><transition name"slideDown"><div class"info" v-if"isShow">1111</div></transition></di…...
关于信息安全软考的一些记录1
1、网络信息安全的基本属性 机密性:网络信息不泄露给非授权的用户完整性:未经授权必能改的特性可用性:可以及时获取网络信息和服务的特性可控性:责任主体对网络信息系统具有管理、支配的能力【可管理、可支配】扛抵赖性ÿ…...

如何选择UMLChina服务
服务口号:聚焦最后一公里 斐力庇第斯从马拉松跑回雅典报信,虽然已是满身血迹、精疲力尽,但他知道:没有出现在雅典人民面前,前面的路程都是白费。 学到的知识如果不能最终【用】于您自己的项目之中,也同样是…...
关于信息安全软考的记录3
1、网络安全体系的特征 网络安全体系:网络安全保障系统的最高层概念抽象 特征内容整体性网络安全单元按照一定的规则,相互依赖、相互作用而形成人机物一体化的网络安全保护方式协同性通过各种安全机制的相互协作,构建系统性的网络安全保护方…...

Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...

【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

汽车生产虚拟实训中的技能提升与生产优化
在制造业蓬勃发展的大背景下,虚拟教学实训宛如一颗璀璨的新星,正发挥着不可或缺且日益凸显的关键作用,源源不断地为企业的稳健前行与创新发展注入磅礴强大的动力。就以汽车制造企业这一极具代表性的行业主体为例,汽车生产线上各类…...

HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...

DBAPI如何优雅的获取单条数据
API如何优雅的获取单条数据 案例一 对于查询类API,查询的是单条数据,比如根据主键ID查询用户信息,sql如下: select id, name, age from user where id #{id}API默认返回的数据格式是多条的,如下: {&qu…...