当前位置: 首页 > news >正文

【Java异步编程】CompletableFuture基础(1):创建不同线程的子任务、子任务链式调用与异常处理

文章目录

    • 1. 三种实现接口
    • 2. 链式调用:保证链的顺序性与异步性
    • 3. CompletableFuture创建CompletionStage子任务
    • 4. 处理异常
      • a. 创建回调钩子
      • b. 调用handle()方法统一处理异常和结果
    • 5. 如何选择线程池:不同的业务选择不同的线程池

CompletableFuture是JDK 1.8引入的实现类,该类实现了Future和CompletionStage两个接口。该类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。

CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会进入另一个阶段。一个阶段可以理解为一个子任务,每一个子任务会包装一个Java函数式接口实例,表示该子任务所要执行的操作。

 

1. 三种实现接口

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。

这三个常用的函数式接口的特点如下:

被包装接口功能描述
FunctionFunction接口的特点是:有输入、有输出。包装了Function实例的CompletionStage子任务需要一个输入参数,并会产生一个输出结果到下一步
RunnableRunnable接口的特点是:无输入、无输出。包装了Runnable实例的CompletionStage子任务既不需要任何输入参数,又不会产生任何输出。
ConsumerConsumer接口的特点是:有输入、无输出。包装了Consumer实例的CompletionStage子任务需要一个输入参数,但不会产生任何输出。

 

2. 链式调用:保证链的顺序性与异步性

多个CompletionStage构成了一条任务流水线,一个环节执行完成了可以将结果移交给下一个环节(子任务)​。多个CompletionStage子任务之间可以使用链式调用。

下面是一个顺序调用的例子:

使用 CompletionStage 及其方法构建了一个异步任务链,thenApply 用于对前一个阶段的结果进行计算并传递结果,thenAccept 用于消费前一个阶段的结果并执行操作,thenRun 用于执行无输入输出的操作。

oneStage//被thenApply包装CompletionStage子任务,由输入输出.thenApply(x -> square(x))  //消耗上游输出,但是没有输出.thenAccept(y -> System.out.println(y)) //不消耗上一个子任务的输出又不产生结果.thenRun(() -> System.out.println()) 

这种链式操作可以方便地将多个异步操作连接起来,同时保证了操作的顺序性和异步性,提高了代码的可维护性和并发性能。

 

接下来是一个异步调用的例子:

在这个例子中,task2 和 task3 都依赖于 task1 完成后执行,但它们可能并行执行,也就是说,task2 和 task3 的执行顺序是不确定的,它们不一定会按照 thenRunAsync 的顺序执行。

CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {System.out.println("Task 1");
});CompletableFuture<Void> task2 = task1.thenRunAsync(() -> {System.out.println("Task 2");
});CompletableFuture<Void> task3 = task1.thenRunAsync(() -> {System.out.println("Task 3");
});

 

3. CompletableFuture创建CompletionStage子任务

CompletableFuture定义了一组方法用于创建CompletionStage子任务(或者阶段性任务)​,基础的方法如下:

//子任务包装一个Runnable实例,并调用ForkJoinPool.commonPool()线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable)//子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)//子任务包装一个Supplier实例,并调用ForkJoinPool.commonPool()线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//子任务包装一个Supplier实例,并使用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

其中主要注意的信息是:

  1. Supplier 表示一个无参数但有返回值的函数,Runnable表示无惨无返回值的函数
  2. 在使用CompletableFuture创建CompletionStage子任务时,如果没有指定Executor线程池,在默认情况下CompletionStage会使用公共的ForkJoinPool线程池。
  3. 它们都会交给线程池执行,get方法会堵塞主线程等待执行结果。

给一个例子:

//无返回值异步调用  
@Test  
public void runAsyncDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("run end ...");  });  //等待异步任务执行完成,最多等待2秒  future.get(2, TimeUnit.SECONDS);  
}  //有返回值异步调用  
@Test  
public void supplyAsyncDemo() throws Exception {  CompletableFuture<Long> future = CompletableFuture.supplyAsync(() ->  {  long start = System.currentTimeMillis();  sleepSeconds(1);//模拟执行1秒  Print.tco("run end ...");  return System.currentTimeMillis() - start;  });  //等待异步任务执行完成,现时等待2秒  long time = future.get(2, TimeUnit.SECONDS);  Print.tco("异步执行耗时(秒) = " + time / 1000);  
}

 

4. 处理异常

a. 创建回调钩子

可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的回调钩子。

//设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)//设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)//设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action,Executor executor)//设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
@Test  
public void whenCompleteDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("抛出异常!");  throw new RuntimeException("发生异常");  //Print.tco("run end ...");  });  //设置执行完成后的回调钩子  future.whenComplete(new BiConsumer<Void, Throwable>() {  @Override  public void accept(Void t, Throwable action) {  Print.tco("执行完成!");  }  });  //设置发生异常后的回调钩子  future.exceptionally(new Function<Throwable, Void>() {  @Override  public Void apply(Throwable t) {  Print.tco("执行失败!" + t.getMessage());  return null;  }  });  future.get();  
}[ForkJoinPool.commonPool-worker-9]:抛出异常!
[main]:执行完成!
[ForkJoinPool.commonPool-worker-9]:执行失败!java.lang.RuntimeException: 发生异常
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 发生异常

有如下几个注意点:

  1. 调用cancel()方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally()方法所设置的异常回调钩子也会被执行。
  2. 如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:
    • 在调用get()时,如果遇到内部异常,get()方法就会抛出ExecutionException(执行异常)​。
    • 在调用join()和getNow(T)启动任务时,如果遇到内部异常,join()和getNow(T)方法就会抛出CompletionException。

 

b. 调用handle()方法统一处理异常和结果

//在执行任务的同一个线程中处理异常和结果
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);//可能不在执行任务的**同一个线程**中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);//在指定线程池executor中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
@Test  
public void handleDemo() throws Exception {  CompletableFuture<Void> future = CompletableFuture.runAsync(() ->  {  sleepSeconds(1);//模拟执行1秒  Print.tco("抛出异常!");  throw new RuntimeException("发生异常");  //Print.tco("run end ...");  });  //设置执行完成后的回调钩子  future.handle(new BiFunction<Void, Throwable, Void>() {  @Override  public Void apply(Void input, Throwable throwable) {  if (throwable == null) {  Print.tcfo("没有发生异常!");  } else {  Print.tcfo("sorry,发生了异常!");  }  return null;  }  });  future.get();  
}//
//[ForkJoinPool.commonPool-worker-1]:抛出异常! 
//[ForkJoinPool.commonPool-worker-1|CompletableFutureDemo$3.apply]: sorry,发生了异常!

 

5. 如何选择线程池:不同的业务选择不同的线程池

默认情况下,通过静态方法runAsync()、supplyAsync()创建的CompletableFuture任务会使用公共的ForkJoinPool线程池,默认的线程数是CPU的核数。当然,它的线程数可以通过以下JVM参数设置

     option:-Djava.util.concurrent.ForkJoinPool.common.parallelism

 

如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的IO操作,就会导致线程池中的所有线程都阻塞在IO操作上,造成线程饥饿,进而影响整个系统的性能。所以,强烈建议大家根据不同的业务类型创建不同的线程池,以避免互相干扰。

相关文章:

【Java异步编程】CompletableFuture基础(1):创建不同线程的子任务、子任务链式调用与异常处理

文章目录 1. 三种实现接口2. 链式调用&#xff1a;保证链的顺序性与异步性3. CompletableFuture创建CompletionStage子任务4. 处理异常a. 创建回调钩子b. 调用handle()方法统一处理异常和结果 5. 如何选择线程池&#xff1a;不同的业务选择不同的线程池 CompletableFuture是JDK…...

ESXI虚拟机中部署docker会降低服务器性能

在 8 核 16GB 的 ESXi 虚拟机中部署 Docker 的性能影响分析 在 ESXi 虚拟机中运行 Docker 容器时&#xff0c;性能影响主要来自以下几个方面&#xff1a; 虚拟化开销&#xff1a;ESXi 虚拟化层和 Docker 容器化层的叠加。资源竞争&#xff1a;虚拟机与容器之间对 CPU、内存、…...

ASP.NET Core与配置系统的集成

目录 配置系统 默认添加的配置提供者 加载命令行中的配置。 运行环境 读取方法 User Secrets 注意事项 Zack.AnyDBConfigProvider 案例 配置系统 默认添加的配置提供者 加载现有的IConfiguration。加载项目根目录下的appsettings.json。加载项目根目录下的appsettin…...

中间件的概念及基本使用

什么是中间件 中间件是ASP.NET Core的核心组件&#xff0c;MVC框架、响应缓存、身份验证、CORS、Swagger等都是内置中间件。 广义上来讲&#xff1a;Tomcat、WebLogic、Redis、IIS&#xff1b;狭义上来讲&#xff0c;ASP.NET Core中的中间件指ASP.NET Core中的一个组件。中间件…...

SpringBoot 整合 Mybatis:注解版

第一章&#xff1a;注解版 导入配置&#xff1a; <groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.3.1</version> </dependency> 步骤&#xff1a; 配置数据源见 Druid…...

18.[前端开发]Day18-王者荣耀项目实战(一)

01-06 项目实战 1 代码规范 2 CSS编写顺序 3 组件化开发思想 组件化开发思路 项目整体思路 – 各个击破 07_(掌握)王者荣耀-top-整体布局完成 完整代码 01_page_top1.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8…...

Kafka 使用说明(kafka官方文档中文)

文章来源:kafka -- 南京筱麦软件有限公司 第 1 步:获取 KAFKA 下载最新的 Kafka 版本并提取它: $ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz $ cd kafka_{{scalaVersion}}-{{fullDotVersion}} 第 2 步:启动 KAFKA 环境 注意:您的本地环境必须安装 Java 8+。…...

基于多智能体强化学习的医疗AI中RAG系统程序架构优化研究

一、引言 1.1 研究背景与意义 在数智化医疗飞速发展的当下,医疗人工智能(AI)已成为提升医疗服务质量、优化医疗流程以及推动医学研究进步的关键力量。医疗 AI 借助机器学习、深度学习等先进技术,能够处理和分析海量的医疗数据,从而辅助医生进行疾病诊断、制定治疗方案以…...

Airflow:深入理解Apache Airflow Task

Apache Airflow是一个开源工作流管理平台&#xff0c;支持以编程方式编写、调度和监控工作流。由于其灵活性、可扩展性和强大的社区支持&#xff0c;它已迅速成为编排复杂数据管道的首选工具。在这篇博文中&#xff0c;我们将深入研究Apache Airflow 中的任务概念&#xff0c;探…...

multisim入门学习设计电路

文章目录 1.软件的安装2.电路基本设计2.1二极管的简介2.2最终的设计效果2.3设计流程介绍 3.如何测试电路 1.软件的安装 我是参考的下面的这个文章&#xff0c;文章的链接放在下面&#xff0c;亲测是有效的&#xff0c;如果是小白的话&#xff0c;可以参考一下&#xff1a; 【…...

【算法精练】二分查找算法总结

目录 前言 1. 二分查找&#xff08;基础版&#xff09; 2. 寻找左右端点 循环判断条件 求中间点 总结 前言 说起二分查找&#xff0c;也是一种十分常见的算法&#xff0c;最常听说的就是&#xff1a;二分查找只能在数组有序的场景下使用&#xff1b;其实也未必&#xff0c;…...

从零开始实现一个双向循环链表:C语言实战

文章目录 1链表的再次介绍2为什么选择双向循环链表&#xff1f;3代码实现&#xff1a;从初始化到销毁1. 定义链表节点2. 初始化链表3. 插入和删除节点4. 链表的其他操作5. 打印链表和判断链表是否为空6. 销毁链表 4测试代码5链表种类介绍6链表与顺序表的区别7存储金字塔L0: 寄存…...

MYSQL面试题总结(题目来源JavaGuide)

MYSQL基础架构 问题1&#xff1a;一条 SQL语句在MySQL中的执行过程 1. 解析阶段 (Parsing) 查询分析&#xff1a;当用户提交一个 SQL 语句时&#xff0c;MySQL 首先会对语句进行解析。这个过程会检查语法是否正确&#xff0c;确保 SQL 语句符合 MySQL 的语法规则。如果发现…...

visual studio安装

一、下载Visual Studio 访问Visual Studio官方网站。下载 Visual Studio Tools - 免费安装 Windows、Mac、Linux 在主页上找到并点击“下载 Visual Studio”按钮。 选择适合需求的版本&#xff0c;例如“Visual Studio Community”&#xff08;免费版本&#xff09;&#x…...

JVM执行引擎

一、执行引擎的概述: 执行引擎是]ava虚拟机核心的组成部分之一; “虚拟机”是一个相对于“物理机”的概念&#xff0c;这两种机器都有代码执行能力&#xff0c;其区别是物理机的执行引擎是直接建立在处理器、缓存、指令集和操作系统层面上的&#xff0c;而虚拟机的执行引擎则…...

C# 9.0记录类型:解锁开发效率的魔法密码

一、引言&#xff1a;记录类型的神奇登场 在 C# 的编程世界中&#xff0c;数据结构就像是构建软件大厦的基石&#xff0c;其重要性不言而喻。然而&#xff0c;传统的数据结构定义方式&#xff0c;尤其是在处理简单的数据承载对象时&#xff0c;常常显得繁琐复杂。例如&#xf…...

搭建自己的专属AI——使用Ollama+AnythingLLM+Python实现DeepSeek本地部署

前言 最近DeepSeek模型非常火&#xff0c;其通过对大模型的蒸馏得到的小模型可以较轻松地在个人电脑上运行&#xff0c;这也使得我们有机会在本地构建一个专属于自己的AI&#xff0c;进而把AI“调教”为我们希望的样子。本篇文章中我将介绍如何使用OllamaAnythingLLMPython实现…...

『 C++ 』中理解回调类型在 C++ 中的使用方式。

文章目录 案例 1&#xff1a;图形绘制库中的回调使用场景说明代码实现代码解释 案例 2&#xff1a;网络服务器中的连接和消息处理回调场景说明代码实现代码解释 案例 3&#xff1a;定时器中的回调使用场景说明代码实现代码解释 以下将通过不同场景给出几个使用回调类型的具体案…...

git多人协作

目录 一、项目克隆 二、 1、进入克隆仓库设置 2、协作处理 3、冲突处理 4、多人协作分支的推送拉取删除 1、分支推送&#xff08;2种&#xff09; 2、远程分支拉取&#xff08;2种&#xff09; 3、远程分支删除 一、项目克隆 git clone 画船听雨眠/test1 (自定义的名…...

CTFSHOW-WEB入门-命令执行71-77

题目&#xff1a;web 71 题目&#xff1a;解题思路&#xff1a;分析可知highlight_file() 函数被禁了&#xff0c;先想办法看看根目录&#xff1a;cvar_export(scandir(dirname(‘/’))); 尝试一下发现很惊奇&#xff1a;&#xff08;全是&#xff1f;&#xff09;这种情况我也…...

从“共和国之辉”到AI原生应用:一个关于“哥布林”诞生的技术启示录

从“共和国之辉”到AI原生应用&#xff1a;一个关于“哥布林”诞生的技术启示录 2025年7月&#xff0c;一篇名为《Where the goblins came from》的文章在Hacker News上引发了超过710票的热议。当大多数技术评论者将目光聚焦于AI模型的最新突破时&#xff0c;这篇来自OpenAI的文…...

AI智能体开发工具栈全解析:从框架、可观测性到部署实战指南

1. 项目概述与核心价值如果你正在构建AI智能体应用&#xff0c;并且已经厌倦了在GitHub、Twitter和各种技术论坛里大海捞针般地寻找合适的开发工具&#xff0c;那么你很可能已经遇到了一个共同的痛点&#xff1a;生态碎片化。从让大语言模型&#xff08;LLM&#xff09;具备“记…...

Cloudflare + PlanetScale:在边缘运行全栈应用,数据库也不例外

全栈开发者面对的一道老难题 Cloudflare Workers 解决了计算层的全球分发问题——你的代码跑在 Cloudflare 遍布全球的 300 多个数据中心里&#xff0c;离用户近&#xff0c;启动快&#xff0c;不需要管理任何服务器。 但数据不一样。 数据库天然是"有状态的"&#x…...

GDScript Mod Loader:为Godot游戏打造专业模组生态的完整指南

1. 项目概述&#xff1a;为你的Godot游戏注入社区活力如果你是一名使用Godot引擎的独立游戏开发者&#xff0c;或者是一位热衷于为喜爱的游戏创造新内容的玩家&#xff0c;那么“模组”这个概念你一定不陌生。模组&#xff0c;或者说Mod&#xff0c;是游戏社区生命力的重要源泉…...

Linux 基础篇 -- Linux介绍(怎么读、是什么、创始人、吉祥物、发版本、目前存在的操作系统) Linux和Unix的关系 linux和Windows比较

Linux 基础篇 – Linux介绍&#xff08;怎么读、是什么、创始人、吉祥物、发版本、目前存在的操作系统&#xff09; & Linux和Unix的关系 & linux和Windows比较 文章目录 1. Linux介绍 1.1 Linux怎么读:1.2 Linux是什么&#xff1a;1.3 Linux创始人:1.4 Linux 的吉祥…...

Cadence 17.4 保姆级教程:从DRC检查到Gerber输出的完整避坑指南

Cadence 17.4 终极避坑指南&#xff1a;从DRC检查到Gerber输出的全流程实战 第一次使用Cadence Allegro 17.4导出Gerber文件时&#xff0c;那种如履薄冰的感觉至今记忆犹新。记得去年为TMC2300电机驱动模块导出生产文件时&#xff0c;因为一个简单的单位设置错误&#xff0c;导…...

Prometheus 自定义指标监控:Python Exporter 编写与业务指标告警配置

前言 Prometheus 监控系统指标&#xff08;CPU、内存、磁盘&#xff09;这件事很多人熟悉&#xff0c;但不少开发者有个共同疑问&#xff1a;业务特有的指标——比如队列积压数、订单待处理量、API 调用成功率——Prometheus 能监控吗&#xff1f; 答案是&#xff1a;完全可以…...

AI Token中转副业火爆!小白也能快速上手?3小时建站+真实盈利模式全解析!

很多观望的小白最纠结两个核心问题&#xff1a;普通人搭建一个Token中转站到底要多久&#xff1f;建好之后真的能赚钱吗&#xff0c;真实赚钱逻辑是什么&#xff1f; 今天不讲噱头、不吹月入几万&#xff0c;结合行业真实现状、新手实操经验&#xff0c;一次性讲透搭建耗时、成…...

FPGA调试实录:我的SPI Master模块为什么读不到数据?常见问题排查指南

FPGA调试实录&#xff1a;SPI Master模块数据读取失败的深度排查指南 当你的SPI Master模块在调试过程中突然"罢工"&#xff0c;示波器上的波形看似正常却始终无法读取数据时&#xff0c;那种挫败感每个硬件工程师都深有体会。本文将从实战角度出发&#xff0c;分享一…...

降AI提示词大全!10个prompt让AI输出人类味+嘎嘎降AI兜底!

降AI提示词大全&#xff01;10个prompt让AI输出人类味嘎嘎降AI兜底&#xff01; 用 ChatGPT、DeepSeek、Kimi、豆包写论文最大的痛是&#xff1a;写得快但被检测判 AI、改起来比自己写还累。其实在写作环节就能预防一部分 AI 痕迹&#xff0c;靠的是会写降 AI 提示词。 这篇先给…...