Fork/Join框架
是什么
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork: 把一个大任务切分为若干子任务并行的执行
Join: 合并这些子任务的执行结果,最后得到这个大任务的结果。

运行流程图
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取流程
工作窃取算法的优缺点:
优点:充分利用线程进行并行计算,减少了线程间的竞争。
缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
Fork/Join框架的设计
步骤1 分割任务。需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情。
ForkJoinTask:要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
Fork/Join框架基本使用
需求:计算1+2+3+4的结果。
使用Fork/Join框架首先要考虑到的是如何分割任务,上述需求希望每个子任务最多执行两个数相加,因此,分割阈值设置为2。
有返回结果的任务,所以必须继承RecursiveTask
示例代码如下:
public class CountTask extends RecursiveTask<Integer> {/*** 阈值*/private static final int THRESHOLD = 2;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;// 如果任务足够小就计算任务boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分割成两个子任务计算int middle = (start + end) / 2;CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待子任务执行完,并得到其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();// 生成一个计算任务,负责计算1+2+3+4CountTask task = new CountTask(1, 4);// 执行一个任务Future<Integer> result = forkJoinPool.submit(task);try {System.out.println(result.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException();}}
}
ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,需要判断任务是否足够小(小于阈值),如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。 使用join方法会等待子任务执行完并得到其结果。
Fork/Join框架的异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。
if(task.isCompletedAbnormally()){System.out.println(task.getException());}
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
工作队列:
static final class WorkQueue {static final int INITIAL_QUEUE_CAPACITY = 1 << 13;static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M// Instance fieldsvolatile int scanState; // versioned, <0: inactive; odd:scanningint stackPred; // pool stack (ctl) predecessorint nsteals; // number of stealsint hint; // randomization and stealer index hintint config; // pool index and modevolatile int qlock; // 1: locked, < 0: terminate; else 0volatile int base; // index of next slot for pollint top; // index of next slot for pushForkJoinTask<?>[] array; // the elements (initially unallocated)final ForkJoinPool pool; // the containing pool (may be null)final ForkJoinWorkerThread owner; // owning thread or null if sharedvolatile Thread parker; // == owner during call to park; else nullvolatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoinvolatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer// ...
}
(1)ForkJoinTask的fork方法实现原理
调用ForkJoinTask的fork方法时,程序会将任务丢到任务队列里,然后立即返回结果。
public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;}
static final ForkJoinPool common;
ForkJoinPool.common.externalPush(this);
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;int r = ThreadLocalRandom.getProbe();int rs = runState;if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);U.putIntVolatile(q, QLOCK, 0);if (n <= 1)signalWork(ws, q);return;}U.compareAndSwapInt(q, QLOCK, 1, 0);}externalSubmit(task);}
上述代码:把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。
(2)ForkJoinTask的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果。代码如下:
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();}private void reportException(int s) {if (s == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL)rethrow(getThrowableException());}
调用了doJoin()方法,通过doJoin()方法得到当前任务的状态与运算来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。
- 如果任务状态是已完成,则直接返回任务结果。
- 如果任务状态是被取消,则直接抛出CancellationException。
- 如果任务状态是抛出异常,则直接抛出对应的异常。
doJoin()方法的实现代码。
private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();}final int doExec() {int s; boolean completed;if ((s = status) >= 0) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)s = setCompletion(NORMAL);}return s;}
首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。
相关文章:
Fork/Join框架
是什么 Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 Fork: 把一个大任务切分为若干子任务并行的执行 Join: 合并这些子任务的执行结果,最后…...
LeetCode_字符串_中等_468.验证 IP 地址
目录 1.题目2.思路3.代码实现(Java) 1.题目 给定一个字符串 queryIP。如果是有效的 IPv4 地址,返回 “IPv4” ;如果是有效的 IPv6 地址,返回 “IPv6” ;如果不是上述类型的 IP 地址,返回 “Nei…...
ABAP Der Open SQL command is too big.
ABAP Der Open SQL command is too big. DBSQL_STMNT_TOO_LARGE CX_SY_OPEN_SQL_DB 应该是选择条件中 维护的条件值条数太多了...
QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis)
QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis) 1、数据序列类 继承关系 2、坐标轴类 的继承关系 3、图例类 什么是图例? 图例:是集中于地图…...
Servlet+JDBC实战开发书店项目讲解第10篇:在线客服功能实现
在线客服功能实现 实现思路 要实现在线客服功能,您可以考虑以下步骤: 创建一个用于存储客户消息和回复的数据库表。您可以使用JDBC连接到数据库,并使用SQL语句创建表格。 在您的Servlet中,创建一个用于处理客户消息和回复的POS…...
CVE-2023-21292 AMS框架层高危漏洞分析
文章目录 前言漏洞细节故事起源漏洞利用漏洞修复 总结 前言 本周在分析 Google 官方发布的 Android 2023 年8 月安全公告 涉及的漏洞补丁的时候,遇到一个有意思的漏洞:CVE-2023-21292。 之所以说它有意思是因为这个漏洞早在去年年底就在某平台上被国外…...
cuda、cuDNN、深度学习框架、pytorch、tentsorflow、keras这些概念之间的关系
当讨论CUDA、cuDNN、深度学习框架、pytorch、tensorflow、keras这些概念的时候,我们讨论的是与GPU加速深度学习相关的技术和工具。 CUDA(Compute Unified Device Architecture): CUDA是由NVIDIA开发的一种并行计算平台和编程模型&…...
第二讲:BeanFactory的实现
BeanFactory的实现 1. 环境准备2. 初始化DefaultListableBeanFactory3. 手动注册BeanDefinition4. 手动添加后置处理器5. 获取被依赖注入的Bean对象6. 让所有的单例bean初始化时加载7. 总结 Spring 的发展历史较为悠久,因此很多资料还在讲解它较旧的实现,…...
vue2+Spring Boot2.7 大文件分片上传
之前我们文章 手把手带大家实现 vue2Spring Boot2.7 文件上传功能 将了上传文件 但如果文件很大 就不太好处理了 按正常情况甚至因为超量而报错 这里 我弄了个足够大的文件 我们先搭建 Spring Boot2.7 环境 首先 application.yml 代码编写如下 server:port: 80 upload:path:…...
Vite更新依赖缓存失败,强制更新依赖缓存
使用vitets开发一段时间了,感觉并不是想象中的好用,特别是出现些稀奇古怪的问题不好解决,比如下面这个问题 上午9:50:08 [vite] error while updating dependencies: Error: ENOENT: no such file or directory, open E:/workspace-dir/node…...
Linux命令200例:tail用来显示文件的末尾内容(常用)
🏆作者简介,黑夜开发者,全栈领域新星创作者✌。CSDN专家博主,阿里云社区专家博主,2023年6月csdn上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 &…...
【Unity每日一记】进行发射,位置相关的方法总结
👨💻个人主页:元宇宙-秩沅 👨💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨💻 本文由 秩沅 原创 👨💻 收录于专栏:uni…...
MISRA 2012学习笔记(3)-Rules 8.4-8.7
文章目录 Rules8.4 字符集和词汇约定(Character sets and lexical conventions)Rule 4.1 八进制和十六进制转译序列应有明确的终止识别标识Rule 4.2 禁止使用三字母词(trigraphs) 8.5 标识符(Identifiers)Rule 5.1 外部标识符不得重名Rule 5.2 同范围和命名空间内的标识符不得重…...
centos7组件搭建
Linux(包括centos) 如何查看服务器内存、CPU su - root 切换用户 centos 密码 空格 https://blog.csdn.net/weixin_45277161/article/details/131524555 CentOS 7 安装 Docker 的详细步骤 https://blog.csdn.net/qq_39997939/article/details/13100…...
webpack5和webpack4的一些区别
自动清除打包目录 webpack4 // bash npm i clean-webpack-plugin -D //webpack.config.js const {CleanWebpackPlugin} require(clean-webpack-plugin); module.exports {plugins: [new CleanWebpackPlugin()} } webpack5 module.exports {output: {clean: true} } topLevel…...
攻防世界-fileclude
原题 解题思路 直接展示源码了,flag.php应该存放了flag,在file1与file2都不为空且file2是“hello ctf”时file1将被导入。接下来做法很明显,让file为flag.php,file2为“hello ctf”。“?file1php://filter/readconvert.base64-en…...
深度学习的“前世今生”
1、“感知机”的诞生 20世纪50年代,人工智能派生出了这样两个学派,分别是“符号学派”及“连接学派”。前者的领军学者有Marvin Minsky及John McCarthy,后者则是由Frank Rosenblatt所领导。 “符号学派”的人相信对机器从头编程,…...
第一百一十九回 如何通过蓝牙设备读写数据
文章目录 概念介绍实现方法示例代码经验总结我们在上一章回中介绍了如何获取蓝牙状态相关的内容,本章回中将介绍 如何通过蓝牙设备读写数据。闲话休提,让我们一起Talk Flutter吧。 概念介绍 通过蓝牙设备读写数据有两种方法: 一种是读写Characteristics;一种是读写Descri…...
linux:Temporary failure in name resolutionCouldn’t resolve host
所有域名无法正常解析。 ping www.baidu.com 等域名提示 Temporary failure in name resolution错误。 rootlocalhost:~# ping www.baidu.com ping: www.baidu.com: Temporary failure in name resolution rootlocalhost:~# 一、ubuntu/debian(emporary failure i…...
C 语言的 sprintf() 函数
<stdio.h> 原型: int sprintf(char *str, const char *format, …) 发送格式化输出到 str 所指向的字符串。 参数 str – 这是指向一个字符数组的指针,该数组存储了 C 字符串。 format – 这是字符串,包含了要被写入到字符串 str 的文本。它…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试
作者:Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位:中南大学地球科学与信息物理学院论文标题:BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接:https://arxiv.…...
从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路
进入2025年以来,尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断,但全球市场热度依然高涨,入局者持续增加。 以国内市场为例,天眼查专业版数据显示,截至5月底,我国现存在业、存续状态的机器人相关企…...
OPENCV形态学基础之二腐蚀
一.腐蚀的原理 (图1) 数学表达式:dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一,腐蚀跟膨胀属于反向操作,膨胀是把图像图像变大,而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
Mysql8 忘记密码重置,以及问题解决
1.使用免密登录 找到配置MySQL文件,我的文件路径是/etc/mysql/my.cnf,有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...
力扣热题100 k个一组反转链表题解
题目: 代码: func reverseKGroup(head *ListNode, k int) *ListNode {cur : headfor i : 0; i < k; i {if cur nil {return head}cur cur.Next}newHead : reverse(head, cur)head.Next reverseKGroup(cur, k)return newHead }func reverse(start, end *ListNode) *ListN…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...
AI语音助手的Python实现
引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...
