work-stealing算法 ForkJoinPool
专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162
本文目标:
- 重点是通过例子程序理解work-stealing算法原理
目录
- work-stealing算法
- 算法原理和优缺点介绍
- 使用场景
- work-stealing例子代码
- ForkJoinPool
- new ForkJoinPool()
- ForkJoinTask
- 例子代码1
- 例子代码2
work-stealing算法
算法原理和优缺点介绍
work-stealing算法是一种用于多线程并行计算中的任务调度算法。该算法的核心思想是允许空闲的线程从其他忙碌线程的工作队列中“窃取”任务来执行,以提高整体的资源利用率和系统的吞吐量。
在work-stealing算法中,每个线程通常都维护一个自己的任务队列,用于存储需要执行的任务。当某个线程完成自己的任务队列中的所有任务后,它会尝试从其他线程的任务队列中窃取任务。为了防止多个线程同时窃取同一个线程的任务,通常需要使用一些同步机制,如锁或原子操作等。
work-stealing算法的优点在于它能够动态地平衡负载,使得各个线程之间的任务分配更加均匀,从而提高了系统的并行效率和资源利用率。此外,该算法还具有较好的可扩展性和适应性,能够随着任务量的增加或减少而自动调整线程的数量和工作负载。
然而,work-stealing算法也存在一些挑战和限制。例如,在窃取任务时需要进行同步操作,这可能会增加一定的开销。此外,如果任务之间存在数据依赖关系,那么窃取任务可能会破坏这种依赖关系,从而导致错误的结果。因此,在使用work-stealing算法时需要根据具体的应用场景和任务特点进行权衡和选择。
介绍部分来自ai回答。可以简单理解如下:
一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。
线程1 - 队列1(任务1,任务2,任务3,…)
线程2 - 队列2(任务1,任务2,任务3,…)
比如线程1早早的把队列中任务都处理完了有空闲,但是队列2执行任务较慢;这样队列2中任务可以让线程1帮忙执行(即窃取
线程1的任务)
使用场景
work-stealing算法主要应用于多线程并行计算场景,特别是在任务数量不确定、任务粒度不均匀或者负载容易波动的情况下。以下是一些具体的应用场景:
并行计算框架:work-stealing算法被广泛应用于各种并行计算框架中,如Intel TBB(Threading Building Blocks)、Cilk Plus以及Java的ForkJoinPool等。这些框架利用work-stealing算法来动态地平衡各个线程之间的负载,提高并行计算的效率。
大数据处理:在大数据处理领域,如Hadoop、Spark等分布式计算框架中,work-stealing算法可以用于优化任务调度。通过允许空闲节点窃取其他忙碌节点的任务,可以更加均衡地分配工作负载,提高整个集群的处理能力。
高性能计算:在高性能计算领域,work-stealing算法也被用于优化并行任务的调度。特别是在处理大规模科学计算和模拟仿真等任务时,work-stealing算法能够有效地平衡各个计算节点之间的负载,提高整体的计算效率。
实时系统:在实时系统中,任务的及时完成至关重要。work-stealing算法可以通过动态地调整任务分配,确保各个线程都能够及时完成任务,从而提高系统的实时性能。
云计算和虚拟化环境:在云计算和虚拟化环境中,资源的使用是动态的,并且负载容易波动。work-stealing算法可以用于优化虚拟机或容器之间的任务调度,确保资源的有效利用和负载均衡。
总之,work-stealing算法适用于各种需要高效利用多线程并行计算能力的场景。通过动态地平衡负载和提高资源利用率,它能够显著地提高系统的并行效率和整体性能。
work-stealing例子代码
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;class Task implements Runnable {private final int taskId;public Task(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try{TimeUnit.MILLISECONDS.sleep(300);// 线程0执行的任务慢一点,可以被窃取if (taskId % 4 == 0) {TimeUnit.MILLISECONDS.sleep(800);}}catch (Exception e){}System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());}public int getTaskId() {return taskId;}
}class WorkerThread extends Thread {private final BlockingQueue<Task> taskQueue;private final List<WorkerThread> allWorkers;private final AtomicInteger taskIdGenerator;private volatile boolean running = true;public WorkerThread(BlockingQueue<Task> taskQueue, List<WorkerThread> allWorkers, AtomicInteger taskIdGenerator) {this.taskQueue = taskQueue;this.allWorkers = allWorkers;this.taskIdGenerator = taskIdGenerator;}@Overridepublic void run() {while (running) {Task task = null;try {// Try to retrieve a task from this worker's queuetask = taskQueue.poll(100, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}if (task != null) {// Execute the retrieved tasktask.run();} else {// If no task is retrieved, try to steal a task from another workertask = stealTask();if (task != null) {System.out.println("task " + task.getTaskId() + " stolen by " + Thread.currentThread().getName());task.run();}}}}private Task stealTask() {List<WorkerThread> shuffledWorkers = new ArrayList<>(allWorkers);Collections.shuffle(shuffledWorkers);for (WorkerThread worker : shuffledWorkers) {if (worker != this && !worker.taskQueue.isEmpty()) {Task stolenTask = worker.taskQueue.poll();if (stolenTask != null) {return stolenTask;}}}return null;}public void addTask(Task task) {taskQueue.offer(task);}public void stopTask() {running = false;}public static void main(String[] args) {int numWorkers = 4;BlockingQueue<Task> sharedQueue = new LinkedBlockingQueue<>();List<WorkerThread> workers = new ArrayList<>();AtomicInteger taskIdGenerator = new AtomicInteger(0);// Create and start worker threadsfor (int i = 0; i < numWorkers; i++) {BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>();WorkerThread worker = new WorkerThread(taskQueue, workers, taskIdGenerator);workers.add(worker);worker.start();}// Add tasks to the shared queue (or directly to worker queues for simplicity in this example)for (int i = 0; i < 20; i++) {int taskId = taskIdGenerator.incrementAndGet();int selectWorkId = taskId % numWorkers;Task task = new Task(taskId);workers.get(selectWorkId).addTask(task); // Distribute tasks round-robin for simplicity}// Let the workers run for some time before stopping themtry {Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}// Stop all workersfor (WorkerThread worker : workers) {worker.stopTask();}// Wait for all workers to terminate (not strictly necessary in this example, but good practice)for (WorkerThread worker : workers) {try {worker.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
注意:
- 在这个示例中,我们使用了BlockingQueue来实现任务队列,它支持线程安全的队列操作。
- WorkerThread类表示一个工作线程,它有自己的任务队列,并会尝试执行自己的任务或窃取其他线程的任务。
- Task类表示一个可执行的任务,它简单地打印出正在执行任务的线程名称和任务ID。
- 在main方法中,我们创建了指定数量的工作线程,并向它们分配了任务。然后,让工作线程运行一段时间后停止。
这个示例是一个简化的版本,实际的生产环境中可能需要更复杂的同步机制和错误处理。此外,为了简化示例,任务被直接添加到了工作线程的任务队列中,而不是使用一个共享的窃取队列。在实际实现中,可以考虑使用一个共享的窃取队列来优化任务窃取过程。
测试输出如下:
可以看到线程0的任务被其它线程窃取执行了。
ForkJoinPool
new ForkJoinPool()
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);}
/*** Creates a {@code ForkJoinPool} with the given parameters, without* any security checks or parameter validation. Invoked directly by* makeCommonPool.*/
private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
默认线程数量是:Runtime.getRuntime().availableProcessors()
ForkJoinTask
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Abstract base class for tasks that run within a ForkJoinPool
. A ForkJoinTask
is a thread-like entity that is much lighter weight than a normal thread. Huge numbers of tasks and subtasks may be hosted by a small number of actual threads in a ForkJoinPool, at the price of some usage limitations.
任务提交处理如下:
steal逻辑
例子代码1
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分裂成两个子任务计算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// invokeAll(leftTask, rightTask);// 等待任务执行结束合并其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}static void testForkJoinPool() throws Exception{ForkJoinPool forkjoinPool = new ForkJoinPool();int sta = 1;int end = 100;//生成一个计算任务,计算连续区间范围的和ForkJoinTaskExample task = new ForkJoinTaskExample(sta, end);//执行一个任务Future<Integer> result = forkjoinPool.submit(task);System.out.println("result:" + result.get());}public static void main(String[] args) throws Exception{testForkJoinPool();TimeUnit.SECONDS.sleep(1);}
}
例子代码2
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;class MyTask implements Runnable {private final int taskId;public MyTask(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try{TimeUnit.MILLISECONDS.sleep(300);// 线程0执行的任务慢一点,可以被窃取if (taskId % 4 == 0) {TimeUnit.MILLISECONDS.sleep(800);}}catch (Exception e){}System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());}public int getTaskId() {return taskId;}
}public class Main {public static void main(String[] args) throws Exception {AtomicInteger taskIdGenerator = new AtomicInteger(0);ForkJoinPool forkjoinPool = new ForkJoinPool();// Add tasks to the shared queue (or directly to worker queues for simplicity in this example)for (int i = 0; i < 20; i++) {int taskId = taskIdGenerator.incrementAndGet();MyTask task = new MyTask(taskId);forkjoinPool.submit(task);}while (true){}}}
可以看到有窃取行为
相关文章:

work-stealing算法 ForkJoinPool
专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标: 重点是通过例子程序理解work-stealing算法原理 目录 work-stealing算法算法原理和优缺点介绍使用场景work-stealing例子代码 ForkJoinPoolnew ForkJoinPool(…...

DeepSeek Janus-Pro:多模态AI模型的突破与创新
近年来,人工智能领域取得了显著的进展,尤其是在多模态模型(Multimodal Models)方面。多模态模型能够同时处理和理解文本、图像等多种类型的数据,极大地扩展了AI的应用场景。DeepSeek(DeepSeek-V3 深度剖析:…...

STM32-时钟树
STM32-时钟树 时钟 时钟...

hot100_21. 合并两个有序链表
将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1: 输入:l1 [1,2,4], l2 [1,3,4] 输出:[1,1,2,3,4,4] 示例 2: 输入:l1 [], l2 [] 输出:[…...

代码讲解系列-CV(一)——CV基础框架
文章目录 一、环境配置IDE选择一套完整复现安装自定义cuda算子 二、Linux基础文件和目录操作查看显卡状态压缩和解压 三、常用工具和pipeline远程文件工具版本管理代码辅助工具 随手记录下一个晚课 一、环境配置 pytorch是AI框架用的很多,或者 其他是国内的框架 an…...

C++ Primer 标准库类型string
欢迎阅读我的 【CPrimer】专栏 专栏简介:本专栏主要面向C初学者,解释C的一些基本概念和基础语言特性,涉及C标准库的用法,面向对象特性,泛型特性高级用法。通过使用标准库中定义的抽象设施,使你更加适应高级…...

计算机网络安全与运维的关键 —— 常用端口全解析
目录 前言 常见端口分类及用途 20 端口(FTP 数据传输) 21 端口(FTP 消息控制) 22 端口(SSH) 23 端口(Telnet) 25 端口(SMTP) 53 端口(DNS&…...
Vue.js 的介绍与组件开发初步
Vue.js 的介绍与组件开发初步 Vue.js 的介绍与组件开发初步引言第一部分:Vue.js 基础入门1.1 什么是 Vue.js?1.2 搭建 Vue.js 开发环境安装 Node.js 和 npm安装 Vue CLI创建新项目运行示例 1.3 第一个 Vue.js 示例 第二部分:Vue.js 组件开发基…...

【仿12306项目】通过加“锁”,解决高并发抢票的超卖问题
文章目录 一. 测试工具二. 超卖现象演示三. 原因分析四. 解决办法方法一:加synchronized锁1. 单个服务节点情况2. 增加服务器节点,分布式环境synchronized失效演示 方法二:使用Redis分布式锁锁解决超卖问题1. 添加Redis分布式锁2. 结果 方法三…...
wow-agent---task4 MetaGPT初体验
先说坑: 1.使用git clone模式安装metagpt 2.模型尽量使用在线模型或本地高参数模型。 这里使用python3.10.11调试成功 一,安装 安装 | MetaGPT,参考这里的以开发模型进行安装 git clone https://github.com/geekan/MetaGPT.git cd /you…...

MVANet——小范围内捕捉高分辨率细节而在大范围内不损失精度的强大的背景消除模型
一、概述 前景提取(背景去除)是现代计算机视觉的关键挑战之一,在各种应用中的重要性与日俱增。在图像编辑和视频制作中有效地去除背景不仅能提高美学价值,还能提高工作流程的效率。在要求精确度的领域,如医学图像分析…...

94,【2】buuctf web [安洵杯 2019]easy_serialize_php
进入靶场 可以查看源代码 <?php // 从 GET 请求中获取名为 f 的参数值,并赋值给变量 $function // 符号用于抑制可能出现的错误信息 $function $_GET[f];// 定义一个名为 filter 的函数,用于过滤字符串中的敏感词汇 function filter($img) {// 定义…...

LabVIEW如何有效地进行数据采集?
数据采集(DAQ)是许多工程项目中的核心环节,无论是测试、监控还是控制系统,准确、高效的数据采集都是至关重要的。LabVIEW作为一个图形化编程环境,提供了丰富的功能来实现数据采集,确保数据的实时性与可靠性…...

6 [新一代Github投毒针对网络安全人员钓鱼]
0x01 前言 在Github上APT组织“海莲花”发布存在后门的提权BOF,通过该项目针对网络安全从业人员进行钓鱼。不过其实早在几年前就已经有人对Visual Studio项目恶意利用进行过研究,所以投毒的手法也不算是新的技术。但这次国内有大量的安全从业者转发该钓…...

《Origin画百图》之脊线图
1.数据准备:将数据设置为y 2.选择绘图>统计图>脊线图 3.生成基础图形,并不好看,接下来对图形属性进行设置 4.双击图形>选择图案>颜色选择按点>Y值 5.这里发现颜色有色阶,过度并不平滑,需要对色阶进行更…...

linux 函数 sem_init () 信号量、sem_destroy()
(1) (2) 代码举例: #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <semaphore.h> #include <unistd.h>sem_t semaphore;void* thread_function(void* arg) …...
Kafka架构
引言 Kafka 凭借其独树一帜的分区架构,在消息中间件领域展现出了卓越的性能表现。其分区架构不仅赋予了 Kafka 强大的并行计算能力,使其能够高效处理海量数据,还显著提升了系统的容灾能力,确保在复杂的运行环境中始终保持稳定可靠…...
刷题记录 动态规划-2: 509. 斐波那契数
题目:509. 斐波那契数 难度:简单 斐波那契数 (通常用 F(n) 表示)形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始,后面的每一项数字都是前面两项数字的和。也就是: F(0) 0,F(1) 1 F(n…...
RDP协议详解
以下内容包含对 RDP(Remote Desktop Protocol,远程桌面协议)及其开源实现 FreeRDP 的较为系统、深入的讲解,涵盖协议概要、历史沿革、核心原理、安全机制、安装与使用方法、扩展与未来发展趋势等方面, --- ## 一、引…...

设计模式的艺术-观察者模式
行为型模式的名称、定义、学习难度和使用频率如下表所示: 1.如何理解观察者模式 一个对象的状态或行为的变化将导致其他对象的状态或行为也发生改变,它们之间将产生联动,正所谓“触一而牵百发”。为了更好地描述对象之间存在的这种一对多&…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式
一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明:假设每台服务器已…...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...

《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序
一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制
目录 节点的功能承载层(GATT/Adv)局限性: 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能,如 Configuration …...
LCTF液晶可调谐滤波器在多光谱相机捕捉无人机目标检测中的作用
中达瑞和自2005年成立以来,一直在光谱成像领域深度钻研和发展,始终致力于研发高性能、高可靠性的光谱成像相机,为科研院校提供更优的产品和服务。在《低空背景下无人机目标的光谱特征研究及目标检测应用》这篇论文中提到中达瑞和 LCTF 作为多…...