Fork/Join框架
是什么
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork: 把一个大任务切分为若干子任务并行的执行
Join: 合并这些子任务的执行结果,最后得到这个大任务的结果。

运行流程图
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取流程
工作窃取算法的优缺点:
优点:充分利用线程进行并行计算,减少了线程间的竞争。
缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
Fork/Join框架的设计
步骤1 分割任务。需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情。
ForkJoinTask:要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
Fork/Join框架基本使用
需求:计算1+2+3+4的结果。
使用Fork/Join框架首先要考虑到的是如何分割任务,上述需求希望每个子任务最多执行两个数相加,因此,分割阈值设置为2。
有返回结果的任务,所以必须继承RecursiveTask
示例代码如下:
public class CountTask extends RecursiveTask<Integer> {/*** 阈值*/private static final int THRESHOLD = 2;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;// 如果任务足够小就计算任务boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分割成两个子任务计算int middle = (start + end) / 2;CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待子任务执行完,并得到其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();// 生成一个计算任务,负责计算1+2+3+4CountTask task = new CountTask(1, 4);// 执行一个任务Future<Integer> result = forkJoinPool.submit(task);try {System.out.println(result.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException();}}
}
ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,需要判断任务是否足够小(小于阈值),如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。 使用join方法会等待子任务执行完并得到其结果。
Fork/Join框架的异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。
if(task.isCompletedAbnormally()){System.out.println(task.getException());}
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
工作队列:
static final class WorkQueue {static final int INITIAL_QUEUE_CAPACITY = 1 << 13;static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M// Instance fieldsvolatile int scanState; // versioned, <0: inactive; odd:scanningint stackPred; // pool stack (ctl) predecessorint nsteals; // number of stealsint hint; // randomization and stealer index hintint config; // pool index and modevolatile int qlock; // 1: locked, < 0: terminate; else 0volatile int base; // index of next slot for pollint top; // index of next slot for pushForkJoinTask<?>[] array; // the elements (initially unallocated)final ForkJoinPool pool; // the containing pool (may be null)final ForkJoinWorkerThread owner; // owning thread or null if sharedvolatile Thread parker; // == owner during call to park; else nullvolatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoinvolatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer// ...
}
(1)ForkJoinTask的fork方法实现原理
调用ForkJoinTask的fork方法时,程序会将任务丢到任务队列里,然后立即返回结果。
public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;}
static final ForkJoinPool common;
ForkJoinPool.common.externalPush(this);
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;int r = ThreadLocalRandom.getProbe();int rs = runState;if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);U.putIntVolatile(q, QLOCK, 0);if (n <= 1)signalWork(ws, q);return;}U.compareAndSwapInt(q, QLOCK, 1, 0);}externalSubmit(task);}
上述代码:把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。
(2)ForkJoinTask的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果。代码如下:
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();}private void reportException(int s) {if (s == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL)rethrow(getThrowableException());}
调用了doJoin()方法,通过doJoin()方法得到当前任务的状态与运算来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。
- 如果任务状态是已完成,则直接返回任务结果。
- 如果任务状态是被取消,则直接抛出CancellationException。
- 如果任务状态是抛出异常,则直接抛出对应的异常。
doJoin()方法的实现代码。
private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();}final int doExec() {int s; boolean completed;if ((s = status) >= 0) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)s = setCompletion(NORMAL);}return s;}
首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。
相关文章:
Fork/Join框架
是什么 Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 Fork: 把一个大任务切分为若干子任务并行的执行 Join: 合并这些子任务的执行结果,最后…...
LeetCode_字符串_中等_468.验证 IP 地址
目录 1.题目2.思路3.代码实现(Java) 1.题目 给定一个字符串 queryIP。如果是有效的 IPv4 地址,返回 “IPv4” ;如果是有效的 IPv6 地址,返回 “IPv6” ;如果不是上述类型的 IP 地址,返回 “Nei…...
ABAP Der Open SQL command is too big.
ABAP Der Open SQL command is too big. DBSQL_STMNT_TOO_LARGE CX_SY_OPEN_SQL_DB 应该是选择条件中 维护的条件值条数太多了...
QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis)
QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis) 1、数据序列类 继承关系 2、坐标轴类 的继承关系 3、图例类 什么是图例? 图例:是集中于地图…...
Servlet+JDBC实战开发书店项目讲解第10篇:在线客服功能实现
在线客服功能实现 实现思路 要实现在线客服功能,您可以考虑以下步骤: 创建一个用于存储客户消息和回复的数据库表。您可以使用JDBC连接到数据库,并使用SQL语句创建表格。 在您的Servlet中,创建一个用于处理客户消息和回复的POS…...
CVE-2023-21292 AMS框架层高危漏洞分析
文章目录 前言漏洞细节故事起源漏洞利用漏洞修复 总结 前言 本周在分析 Google 官方发布的 Android 2023 年8 月安全公告 涉及的漏洞补丁的时候,遇到一个有意思的漏洞:CVE-2023-21292。 之所以说它有意思是因为这个漏洞早在去年年底就在某平台上被国外…...
cuda、cuDNN、深度学习框架、pytorch、tentsorflow、keras这些概念之间的关系
当讨论CUDA、cuDNN、深度学习框架、pytorch、tensorflow、keras这些概念的时候,我们讨论的是与GPU加速深度学习相关的技术和工具。 CUDA(Compute Unified Device Architecture): CUDA是由NVIDIA开发的一种并行计算平台和编程模型&…...
第二讲:BeanFactory的实现
BeanFactory的实现 1. 环境准备2. 初始化DefaultListableBeanFactory3. 手动注册BeanDefinition4. 手动添加后置处理器5. 获取被依赖注入的Bean对象6. 让所有的单例bean初始化时加载7. 总结 Spring 的发展历史较为悠久,因此很多资料还在讲解它较旧的实现,…...
vue2+Spring Boot2.7 大文件分片上传
之前我们文章 手把手带大家实现 vue2Spring Boot2.7 文件上传功能 将了上传文件 但如果文件很大 就不太好处理了 按正常情况甚至因为超量而报错 这里 我弄了个足够大的文件 我们先搭建 Spring Boot2.7 环境 首先 application.yml 代码编写如下 server:port: 80 upload:path:…...
Vite更新依赖缓存失败,强制更新依赖缓存
使用vitets开发一段时间了,感觉并不是想象中的好用,特别是出现些稀奇古怪的问题不好解决,比如下面这个问题 上午9:50:08 [vite] error while updating dependencies: Error: ENOENT: no such file or directory, open E:/workspace-dir/node…...
Linux命令200例:tail用来显示文件的末尾内容(常用)
🏆作者简介,黑夜开发者,全栈领域新星创作者✌。CSDN专家博主,阿里云社区专家博主,2023年6月csdn上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 &…...
【Unity每日一记】进行发射,位置相关的方法总结
👨💻个人主页:元宇宙-秩沅 👨💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨💻 本文由 秩沅 原创 👨💻 收录于专栏:uni…...
MISRA 2012学习笔记(3)-Rules 8.4-8.7
文章目录 Rules8.4 字符集和词汇约定(Character sets and lexical conventions)Rule 4.1 八进制和十六进制转译序列应有明确的终止识别标识Rule 4.2 禁止使用三字母词(trigraphs) 8.5 标识符(Identifiers)Rule 5.1 外部标识符不得重名Rule 5.2 同范围和命名空间内的标识符不得重…...
centos7组件搭建
Linux(包括centos) 如何查看服务器内存、CPU su - root 切换用户 centos 密码 空格 https://blog.csdn.net/weixin_45277161/article/details/131524555 CentOS 7 安装 Docker 的详细步骤 https://blog.csdn.net/qq_39997939/article/details/13100…...
webpack5和webpack4的一些区别
自动清除打包目录 webpack4 // bash npm i clean-webpack-plugin -D //webpack.config.js const {CleanWebpackPlugin} require(clean-webpack-plugin); module.exports {plugins: [new CleanWebpackPlugin()} } webpack5 module.exports {output: {clean: true} } topLevel…...
攻防世界-fileclude
原题 解题思路 直接展示源码了,flag.php应该存放了flag,在file1与file2都不为空且file2是“hello ctf”时file1将被导入。接下来做法很明显,让file为flag.php,file2为“hello ctf”。“?file1php://filter/readconvert.base64-en…...
深度学习的“前世今生”
1、“感知机”的诞生 20世纪50年代,人工智能派生出了这样两个学派,分别是“符号学派”及“连接学派”。前者的领军学者有Marvin Minsky及John McCarthy,后者则是由Frank Rosenblatt所领导。 “符号学派”的人相信对机器从头编程,…...
第一百一十九回 如何通过蓝牙设备读写数据
文章目录 概念介绍实现方法示例代码经验总结我们在上一章回中介绍了如何获取蓝牙状态相关的内容,本章回中将介绍 如何通过蓝牙设备读写数据。闲话休提,让我们一起Talk Flutter吧。 概念介绍 通过蓝牙设备读写数据有两种方法: 一种是读写Characteristics;一种是读写Descri…...
linux:Temporary failure in name resolutionCouldn’t resolve host
所有域名无法正常解析。 ping www.baidu.com 等域名提示 Temporary failure in name resolution错误。 rootlocalhost:~# ping www.baidu.com ping: www.baidu.com: Temporary failure in name resolution rootlocalhost:~# 一、ubuntu/debian(emporary failure i…...
C 语言的 sprintf() 函数
<stdio.h> 原型: int sprintf(char *str, const char *format, …) 发送格式化输出到 str 所指向的字符串。 参数 str – 这是指向一个字符数组的指针,该数组存储了 C 字符串。 format – 这是字符串,包含了要被写入到字符串 str 的文本。它…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成
厌倦手动写WordPress文章?AI自动生成,效率提升10倍! 支持多语言、自动配图、定时发布,让内容创作更轻松! AI内容生成 → 不想每天写文章?AI一键生成高质量内容!多语言支持 → 跨境电商必备&am…...
【C++从零实现Json-Rpc框架】第六弹 —— 服务端模块划分
一、项目背景回顾 前五弹完成了Json-Rpc协议解析、请求处理、客户端调用等基础模块搭建。 本弹重点聚焦于服务端的模块划分与架构设计,提升代码结构的可维护性与扩展性。 二、服务端模块设计目标 高内聚低耦合:各模块职责清晰,便于独立开发…...
稳定币的深度剖析与展望
一、引言 在当今数字化浪潮席卷全球的时代,加密货币作为一种新兴的金融现象,正以前所未有的速度改变着我们对传统货币和金融体系的认知。然而,加密货币市场的高度波动性却成为了其广泛应用和普及的一大障碍。在这样的背景下,稳定…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
push [特殊字符] present
push 🆚 present 前言present和dismiss特点代码演示 push和pop特点代码演示 前言 在 iOS 开发中,push 和 present 是两种不同的视图控制器切换方式,它们有着显著的区别。 present和dismiss 特点 在当前控制器上方新建视图层级需要手动调用…...
