Fork/Join框架
是什么
Fork/Join框架是Java 7提供的一个用于并行执行任务
的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
Fork: 把一个大任务切分为若干子任务并行的执行
Join: 合并这些子任务的执行结果,最后得到这个大任务的结果。
运行流程图
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行
。
做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里
,并为每个队列创建一个单独的线程
来执行队列里的任务,线程和队列一一对应
。有的线程会先把自己队列里的任务干完
,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行
。
为了减少窃取任务线程和被窃取任务线程之间的竞争
,通常会使用双端队列,被窃取任务线程
永远从双端队列的头部拿任务执行
,而窃取任务的线程
永远从双端队列的尾部拿任务执行
。
工作窃取流程
工作窃取算法的优缺点:
优点:
充分利用线程进行并行计算,减少了线程间的竞争。
缺点:
在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
Fork/Join框架的设计
步骤1 分割任务。
需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。
分割的子任务分别放在双端队列里,然后启动几个线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情。
ForkJoinTask:
要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
ForkJoinPool:
ForkJoinTask需要通过ForkJoinPool来执行。
任务分割出的子任务
会添加
到当前工作线程所维护的双端队列
中,进入队列的头部
。当一个工作线程的队列里暂时没有任务
时,它会随机
从其他工作线程的队列的尾部获取一个任务
。
Fork/Join框架基本使用
需求:计算1+2+3+4的结果。
使用Fork/Join框架首先要考虑到的是如何分割任务
,上述需求希望每个子任务最多执行两个数相加,因此,分割阈值设置为2。
有返回结果的任务,所以必须继承RecursiveTask
示例代码如下:
public class CountTask extends RecursiveTask<Integer> {/*** 阈值*/private static final int THRESHOLD = 2;private int start;private int end;public CountTask(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;// 如果任务足够小就计算任务boolean canCompute = (end - start) <= THRESHOLD;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分割成两个子任务计算int middle = (start + end) / 2;CountTask leftTask = new CountTask(start, middle);CountTask rightTask = new CountTask(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// 等待子任务执行完,并得到其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}public static void main(String[] args) {ForkJoinPool forkJoinPool = new ForkJoinPool();// 生成一个计算任务,负责计算1+2+3+4CountTask task = new CountTask(1, 4);// 执行一个任务Future<Integer> result = forkJoinPool.submit(task);try {System.out.println(result.get());} catch (InterruptedException | ExecutionException e) {throw new RuntimeException();}}
}
ForkJoinTask
与一般任务的主要区别在于它需要实现compute方法
,需要判断任务是否足够小(小于阈值)
,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法
,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。 使用join方法会等待子任务执行完并得到其结果
。
Fork/Join框架的异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常
,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了
,并且可以通过ForkJoinTask的getException方法获取异常
。
if(task.isCompletedAbnormally()){System.out.println(task.getException());}
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。
Fork/Join框架的实现原理
ForkJoinPool由ForkJoinTask数组
和ForkJoinWorkerThread数组
组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务
,而ForkJoinWorkerThread数组负责执行这些任务
。
工作队列:
static final class WorkQueue {static final int INITIAL_QUEUE_CAPACITY = 1 << 13;static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M// Instance fieldsvolatile int scanState; // versioned, <0: inactive; odd:scanningint stackPred; // pool stack (ctl) predecessorint nsteals; // number of stealsint hint; // randomization and stealer index hintint config; // pool index and modevolatile int qlock; // 1: locked, < 0: terminate; else 0volatile int base; // index of next slot for pollint top; // index of next slot for pushForkJoinTask<?>[] array; // the elements (initially unallocated)final ForkJoinPool pool; // the containing pool (may be null)final ForkJoinWorkerThread owner; // owning thread or null if sharedvolatile Thread parker; // == owner during call to park; else nullvolatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoinvolatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer// ...
}
(1)ForkJoinTask的fork方法实现原理
调用ForkJoinTask的fork方法时,程序会将任务丢到任务队列里,然后立即返回结果。
public final ForkJoinTask<V> fork() {Thread t;if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);return this;}
static final ForkJoinPool common;
ForkJoinPool.common.externalPush(this);
final void externalPush(ForkJoinTask<?> task) {WorkQueue[] ws; WorkQueue q; int m;int r = ThreadLocalRandom.getProbe();int rs = runState;if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&U.compareAndSwapInt(q, QLOCK, 0, 1)) {ForkJoinTask<?>[] a; int am, n, s;if ((a = q.array) != null &&(am = a.length - 1) > (n = (s = q.top) - q.base)) {int j = ((am & s) << ASHIFT) + ABASE;U.putOrderedObject(a, j, task);U.putOrderedInt(q, QTOP, s + 1);U.putIntVolatile(q, QLOCK, 0);if (n <= 1)signalWork(ws, q);return;}U.compareAndSwapInt(q, QLOCK, 1, 0);}externalSubmit(task);}
上述代码:把当前任务存放在ForkJoinTask数组队列里。然后再调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。
(2)ForkJoinTask的join方法实现原理
Join方法的主要作用是阻塞当前线程并等待获取结果。代码如下:
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();}private void reportException(int s) {if (s == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL)rethrow(getThrowableException());}
调用了doJoin()方法,通过doJoin()方法得到当前任务的状态与运算来判断返回什么结果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常(EXCEPTIONAL)。
- 如果任务状态是已完成,则直接返回任务结果。
- 如果任务状态是被取消,则直接抛出CancellationException。
- 如果任务状态是抛出异常,则直接抛出对应的异常。
doJoin()
方法的实现代码。
private int doJoin() {int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;return (s = status) < 0 ? s :((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L) :externalAwaitDone();}final int doExec() {int s; boolean completed;if ((s = status) >= 0) {try {completed = exec();} catch (Throwable rex) {return setExceptionalCompletion(rex);}if (completed)s = setCompletion(NORMAL);}return s;}
首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;
如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。
相关文章:

Fork/Join框架
是什么 Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 Fork: 把一个大任务切分为若干子任务并行的执行 Join: 合并这些子任务的执行结果,最后…...
LeetCode_字符串_中等_468.验证 IP 地址
目录 1.题目2.思路3.代码实现(Java) 1.题目 给定一个字符串 queryIP。如果是有效的 IPv4 地址,返回 “IPv4” ;如果是有效的 IPv6 地址,返回 “IPv6” ;如果不是上述类型的 IP 地址,返回 “Nei…...

ABAP Der Open SQL command is too big.
ABAP Der Open SQL command is too big. DBSQL_STMNT_TOO_LARGE CX_SY_OPEN_SQL_DB 应该是选择条件中 维护的条件值条数太多了...

QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis)
QChart类用来 管理 图表的:数据序列(series)、图例(legend)和坐标轴(axis) 1、数据序列类 继承关系 2、坐标轴类 的继承关系 3、图例类 什么是图例? 图例:是集中于地图…...
Servlet+JDBC实战开发书店项目讲解第10篇:在线客服功能实现
在线客服功能实现 实现思路 要实现在线客服功能,您可以考虑以下步骤: 创建一个用于存储客户消息和回复的数据库表。您可以使用JDBC连接到数据库,并使用SQL语句创建表格。 在您的Servlet中,创建一个用于处理客户消息和回复的POS…...

CVE-2023-21292 AMS框架层高危漏洞分析
文章目录 前言漏洞细节故事起源漏洞利用漏洞修复 总结 前言 本周在分析 Google 官方发布的 Android 2023 年8 月安全公告 涉及的漏洞补丁的时候,遇到一个有意思的漏洞:CVE-2023-21292。 之所以说它有意思是因为这个漏洞早在去年年底就在某平台上被国外…...
cuda、cuDNN、深度学习框架、pytorch、tentsorflow、keras这些概念之间的关系
当讨论CUDA、cuDNN、深度学习框架、pytorch、tensorflow、keras这些概念的时候,我们讨论的是与GPU加速深度学习相关的技术和工具。 CUDA(Compute Unified Device Architecture): CUDA是由NVIDIA开发的一种并行计算平台和编程模型&…...

第二讲:BeanFactory的实现
BeanFactory的实现 1. 环境准备2. 初始化DefaultListableBeanFactory3. 手动注册BeanDefinition4. 手动添加后置处理器5. 获取被依赖注入的Bean对象6. 让所有的单例bean初始化时加载7. 总结 Spring 的发展历史较为悠久,因此很多资料还在讲解它较旧的实现,…...

vue2+Spring Boot2.7 大文件分片上传
之前我们文章 手把手带大家实现 vue2Spring Boot2.7 文件上传功能 将了上传文件 但如果文件很大 就不太好处理了 按正常情况甚至因为超量而报错 这里 我弄了个足够大的文件 我们先搭建 Spring Boot2.7 环境 首先 application.yml 代码编写如下 server:port: 80 upload:path:…...

Vite更新依赖缓存失败,强制更新依赖缓存
使用vitets开发一段时间了,感觉并不是想象中的好用,特别是出现些稀奇古怪的问题不好解决,比如下面这个问题 上午9:50:08 [vite] error while updating dependencies: Error: ENOENT: no such file or directory, open E:/workspace-dir/node…...

Linux命令200例:tail用来显示文件的末尾内容(常用)
🏆作者简介,黑夜开发者,全栈领域新星创作者✌。CSDN专家博主,阿里云社区专家博主,2023年6月csdn上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 &…...

【Unity每日一记】进行发射,位置相关的方法总结
👨💻个人主页:元宇宙-秩沅 👨💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨💻 本文由 秩沅 原创 👨💻 收录于专栏:uni…...

MISRA 2012学习笔记(3)-Rules 8.4-8.7
文章目录 Rules8.4 字符集和词汇约定(Character sets and lexical conventions)Rule 4.1 八进制和十六进制转译序列应有明确的终止识别标识Rule 4.2 禁止使用三字母词(trigraphs) 8.5 标识符(Identifiers)Rule 5.1 外部标识符不得重名Rule 5.2 同范围和命名空间内的标识符不得重…...
centos7组件搭建
Linux(包括centos) 如何查看服务器内存、CPU su - root 切换用户 centos 密码 空格 https://blog.csdn.net/weixin_45277161/article/details/131524555 CentOS 7 安装 Docker 的详细步骤 https://blog.csdn.net/qq_39997939/article/details/13100…...
webpack5和webpack4的一些区别
自动清除打包目录 webpack4 // bash npm i clean-webpack-plugin -D //webpack.config.js const {CleanWebpackPlugin} require(clean-webpack-plugin); module.exports {plugins: [new CleanWebpackPlugin()} } webpack5 module.exports {output: {clean: true} } topLevel…...

攻防世界-fileclude
原题 解题思路 直接展示源码了,flag.php应该存放了flag,在file1与file2都不为空且file2是“hello ctf”时file1将被导入。接下来做法很明显,让file为flag.php,file2为“hello ctf”。“?file1php://filter/readconvert.base64-en…...

深度学习的“前世今生”
1、“感知机”的诞生 20世纪50年代,人工智能派生出了这样两个学派,分别是“符号学派”及“连接学派”。前者的领军学者有Marvin Minsky及John McCarthy,后者则是由Frank Rosenblatt所领导。 “符号学派”的人相信对机器从头编程,…...
第一百一十九回 如何通过蓝牙设备读写数据
文章目录 概念介绍实现方法示例代码经验总结我们在上一章回中介绍了如何获取蓝牙状态相关的内容,本章回中将介绍 如何通过蓝牙设备读写数据。闲话休提,让我们一起Talk Flutter吧。 概念介绍 通过蓝牙设备读写数据有两种方法: 一种是读写Characteristics;一种是读写Descri…...

linux:Temporary failure in name resolutionCouldn’t resolve host
所有域名无法正常解析。 ping www.baidu.com 等域名提示 Temporary failure in name resolution错误。 rootlocalhost:~# ping www.baidu.com ping: www.baidu.com: Temporary failure in name resolution rootlocalhost:~# 一、ubuntu/debian(emporary failure i…...
C 语言的 sprintf() 函数
<stdio.h> 原型: int sprintf(char *str, const char *format, …) 发送格式化输出到 str 所指向的字符串。 参数 str – 这是指向一个字符数组的指针,该数组存储了 C 字符串。 format – 这是字符串,包含了要被写入到字符串 str 的文本。它…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...

Reasoning over Uncertain Text by Generative Large Language Models
https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...
基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解
JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用,结合SQLite数据库实现联系人管理功能,并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能,同时可以最小化到系统…...

Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...
Python Einops库:深度学习中的张量操作革命
Einops(爱因斯坦操作库)就像给张量操作戴上了一副"语义眼镜"——让你用人类能理解的方式告诉计算机如何操作多维数组。这个基于爱因斯坦求和约定的库,用类似自然语言的表达式替代了晦涩的API调用,彻底改变了深度学习工程…...

STM32---外部32.768K晶振(LSE)无法起振问题
晶振是否起振主要就检查两个1、晶振与MCU是否兼容;2、晶振的负载电容是否匹配 目录 一、判断晶振与MCU是否兼容 二、判断负载电容是否匹配 1. 晶振负载电容(CL)与匹配电容(CL1、CL2)的关系 2. 如何选择 CL1 和 CL…...

Rust 开发环境搭建
环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行: rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu 2、Hello World fn main() { println…...