work-stealing算法 ForkJoinPool
专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162
本文目标:
- 重点是通过例子程序理解work-stealing算法原理
目录
- work-stealing算法
- 算法原理和优缺点介绍
- 使用场景
- work-stealing例子代码
- ForkJoinPool
- new ForkJoinPool()
- ForkJoinTask
- 例子代码1
- 例子代码2
work-stealing算法
算法原理和优缺点介绍
work-stealing算法是一种用于多线程并行计算中的任务调度算法。该算法的核心思想是允许空闲的线程从其他忙碌线程的工作队列中“窃取”任务来执行,以提高整体的资源利用率和系统的吞吐量。
在work-stealing算法中,每个线程通常都维护一个自己的任务队列,用于存储需要执行的任务。当某个线程完成自己的任务队列中的所有任务后,它会尝试从其他线程的任务队列中窃取任务。为了防止多个线程同时窃取同一个线程的任务,通常需要使用一些同步机制,如锁或原子操作等。
work-stealing算法的优点在于它能够动态地平衡负载,使得各个线程之间的任务分配更加均匀,从而提高了系统的并行效率和资源利用率。此外,该算法还具有较好的可扩展性和适应性,能够随着任务量的增加或减少而自动调整线程的数量和工作负载。
然而,work-stealing算法也存在一些挑战和限制。例如,在窃取任务时需要进行同步操作,这可能会增加一定的开销。此外,如果任务之间存在数据依赖关系,那么窃取任务可能会破坏这种依赖关系,从而导致错误的结果。因此,在使用work-stealing算法时需要根据具体的应用场景和任务特点进行权衡和选择。
介绍部分来自ai回答。可以简单理解如下:
一个大任务分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。
线程1 - 队列1(任务1,任务2,任务3,…)
线程2 - 队列2(任务1,任务2,任务3,…)
比如线程1早早的把队列中任务都处理完了有空闲,但是队列2执行任务较慢;这样队列2中任务可以让线程1帮忙执行(即窃取
线程1的任务)
使用场景
work-stealing算法主要应用于多线程并行计算场景,特别是在任务数量不确定、任务粒度不均匀或者负载容易波动的情况下。以下是一些具体的应用场景:
并行计算框架:work-stealing算法被广泛应用于各种并行计算框架中,如Intel TBB(Threading Building Blocks)、Cilk Plus以及Java的ForkJoinPool等。这些框架利用work-stealing算法来动态地平衡各个线程之间的负载,提高并行计算的效率。
大数据处理:在大数据处理领域,如Hadoop、Spark等分布式计算框架中,work-stealing算法可以用于优化任务调度。通过允许空闲节点窃取其他忙碌节点的任务,可以更加均衡地分配工作负载,提高整个集群的处理能力。
高性能计算:在高性能计算领域,work-stealing算法也被用于优化并行任务的调度。特别是在处理大规模科学计算和模拟仿真等任务时,work-stealing算法能够有效地平衡各个计算节点之间的负载,提高整体的计算效率。
实时系统:在实时系统中,任务的及时完成至关重要。work-stealing算法可以通过动态地调整任务分配,确保各个线程都能够及时完成任务,从而提高系统的实时性能。
云计算和虚拟化环境:在云计算和虚拟化环境中,资源的使用是动态的,并且负载容易波动。work-stealing算法可以用于优化虚拟机或容器之间的任务调度,确保资源的有效利用和负载均衡。
总之,work-stealing算法适用于各种需要高效利用多线程并行计算能力的场景。通过动态地平衡负载和提高资源利用率,它能够显著地提高系统的并行效率和整体性能。
work-stealing例子代码
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;class Task implements Runnable {private final int taskId;public Task(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try{TimeUnit.MILLISECONDS.sleep(300);// 线程0执行的任务慢一点,可以被窃取if (taskId % 4 == 0) {TimeUnit.MILLISECONDS.sleep(800);}}catch (Exception e){}System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());}public int getTaskId() {return taskId;}
}class WorkerThread extends Thread {private final BlockingQueue<Task> taskQueue;private final List<WorkerThread> allWorkers;private final AtomicInteger taskIdGenerator;private volatile boolean running = true;public WorkerThread(BlockingQueue<Task> taskQueue, List<WorkerThread> allWorkers, AtomicInteger taskIdGenerator) {this.taskQueue = taskQueue;this.allWorkers = allWorkers;this.taskIdGenerator = taskIdGenerator;}@Overridepublic void run() {while (running) {Task task = null;try {// Try to retrieve a task from this worker's queuetask = taskQueue.poll(100, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();return;}if (task != null) {// Execute the retrieved tasktask.run();} else {// If no task is retrieved, try to steal a task from another workertask = stealTask();if (task != null) {System.out.println("task " + task.getTaskId() + " stolen by " + Thread.currentThread().getName());task.run();}}}}private Task stealTask() {List<WorkerThread> shuffledWorkers = new ArrayList<>(allWorkers);Collections.shuffle(shuffledWorkers);for (WorkerThread worker : shuffledWorkers) {if (worker != this && !worker.taskQueue.isEmpty()) {Task stolenTask = worker.taskQueue.poll();if (stolenTask != null) {return stolenTask;}}}return null;}public void addTask(Task task) {taskQueue.offer(task);}public void stopTask() {running = false;}public static void main(String[] args) {int numWorkers = 4;BlockingQueue<Task> sharedQueue = new LinkedBlockingQueue<>();List<WorkerThread> workers = new ArrayList<>();AtomicInteger taskIdGenerator = new AtomicInteger(0);// Create and start worker threadsfor (int i = 0; i < numWorkers; i++) {BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>();WorkerThread worker = new WorkerThread(taskQueue, workers, taskIdGenerator);workers.add(worker);worker.start();}// Add tasks to the shared queue (or directly to worker queues for simplicity in this example)for (int i = 0; i < 20; i++) {int taskId = taskIdGenerator.incrementAndGet();int selectWorkId = taskId % numWorkers;Task task = new Task(taskId);workers.get(selectWorkId).addTask(task); // Distribute tasks round-robin for simplicity}// Let the workers run for some time before stopping themtry {Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}// Stop all workersfor (WorkerThread worker : workers) {worker.stopTask();}// Wait for all workers to terminate (not strictly necessary in this example, but good practice)for (WorkerThread worker : workers) {try {worker.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}
}
注意:
- 在这个示例中,我们使用了BlockingQueue来实现任务队列,它支持线程安全的队列操作。
- WorkerThread类表示一个工作线程,它有自己的任务队列,并会尝试执行自己的任务或窃取其他线程的任务。
- Task类表示一个可执行的任务,它简单地打印出正在执行任务的线程名称和任务ID。
- 在main方法中,我们创建了指定数量的工作线程,并向它们分配了任务。然后,让工作线程运行一段时间后停止。
这个示例是一个简化的版本,实际的生产环境中可能需要更复杂的同步机制和错误处理。此外,为了简化示例,任务被直接添加到了工作线程的任务队列中,而不是使用一个共享的窃取队列。在实际实现中,可以考虑使用一个共享的窃取队列来优化任务窃取过程。
测试输出如下:
可以看到线程0的任务被其它线程窃取执行了。
ForkJoinPool
new ForkJoinPool()
public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);}
/*** Creates a {@code ForkJoinPool} with the given parameters, without* any security checks or parameter validation. Invoked directly by* makeCommonPool.*/
private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory,UncaughtExceptionHandler handler,int mode,String workerNamePrefix) {this.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
默认线程数量是:Runtime.getRuntime().availableProcessors()
ForkJoinTask
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Abstract base class for tasks that run within a ForkJoinPool
. A ForkJoinTask
is a thread-like entity that is much lighter weight than a normal thread. Huge numbers of tasks and subtasks may be hosted by a small number of actual threads in a ForkJoinPool, at the price of some usage limitations.
任务提交处理如下:
steal逻辑
例子代码1
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;public class ForkJoinTaskExample extends RecursiveTask<Integer> {public static final int threshold = 2;private int start;private int end;public ForkJoinTaskExample(int start, int end) {this.start = start;this.end = end;}@Overrideprotected Integer compute() {int sum = 0;boolean canCompute = (end - start) <= threshold;if (canCompute) {for (int i = start; i <= end; i++) {sum += i;}} else {// 如果任务大于阈值,就分裂成两个子任务计算int middle = (start + end) / 2;ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);// 执行子任务leftTask.fork();rightTask.fork();// invokeAll(leftTask, rightTask);// 等待任务执行结束合并其结果int leftResult = leftTask.join();int rightResult = rightTask.join();// 合并子任务sum = leftResult + rightResult;}return sum;}static void testForkJoinPool() throws Exception{ForkJoinPool forkjoinPool = new ForkJoinPool();int sta = 1;int end = 100;//生成一个计算任务,计算连续区间范围的和ForkJoinTaskExample task = new ForkJoinTaskExample(sta, end);//执行一个任务Future<Integer> result = forkjoinPool.submit(task);System.out.println("result:" + result.get());}public static void main(String[] args) throws Exception{testForkJoinPool();TimeUnit.SECONDS.sleep(1);}
}
例子代码2
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;class MyTask implements Runnable {private final int taskId;public MyTask(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try{TimeUnit.MILLISECONDS.sleep(300);// 线程0执行的任务慢一点,可以被窃取if (taskId % 4 == 0) {TimeUnit.MILLISECONDS.sleep(800);}}catch (Exception e){}System.out.println("Executing task " + taskId + " by " + Thread.currentThread().getName());}public int getTaskId() {return taskId;}
}public class Main {public static void main(String[] args) throws Exception {AtomicInteger taskIdGenerator = new AtomicInteger(0);ForkJoinPool forkjoinPool = new ForkJoinPool();// Add tasks to the shared queue (or directly to worker queues for simplicity in this example)for (int i = 0; i < 20; i++) {int taskId = taskIdGenerator.incrementAndGet();MyTask task = new MyTask(taskId);forkjoinPool.submit(task);}while (true){}}}
可以看到有窃取行为
相关文章:

work-stealing算法 ForkJoinPool
专栏系列文章地址:https://blog.csdn.net/qq_26437925/article/details/145290162 本文目标: 重点是通过例子程序理解work-stealing算法原理 目录 work-stealing算法算法原理和优缺点介绍使用场景work-stealing例子代码 ForkJoinPoolnew ForkJoinPool(…...

DeepSeek Janus-Pro:多模态AI模型的突破与创新
近年来,人工智能领域取得了显著的进展,尤其是在多模态模型(Multimodal Models)方面。多模态模型能够同时处理和理解文本、图像等多种类型的数据,极大地扩展了AI的应用场景。DeepSeek(DeepSeek-V3 深度剖析:…...

STM32-时钟树
STM32-时钟树 时钟 时钟...

hot100_21. 合并两个有序链表
将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1: 输入:l1 [1,2,4], l2 [1,3,4] 输出:[1,1,2,3,4,4] 示例 2: 输入:l1 [], l2 [] 输出:[…...

代码讲解系列-CV(一)——CV基础框架
文章目录 一、环境配置IDE选择一套完整复现安装自定义cuda算子 二、Linux基础文件和目录操作查看显卡状态压缩和解压 三、常用工具和pipeline远程文件工具版本管理代码辅助工具 随手记录下一个晚课 一、环境配置 pytorch是AI框架用的很多,或者 其他是国内的框架 an…...

C++ Primer 标准库类型string
欢迎阅读我的 【CPrimer】专栏 专栏简介:本专栏主要面向C初学者,解释C的一些基本概念和基础语言特性,涉及C标准库的用法,面向对象特性,泛型特性高级用法。通过使用标准库中定义的抽象设施,使你更加适应高级…...

计算机网络安全与运维的关键 —— 常用端口全解析
目录 前言 常见端口分类及用途 20 端口(FTP 数据传输) 21 端口(FTP 消息控制) 22 端口(SSH) 23 端口(Telnet) 25 端口(SMTP) 53 端口(DNS&…...
Vue.js 的介绍与组件开发初步
Vue.js 的介绍与组件开发初步 Vue.js 的介绍与组件开发初步引言第一部分:Vue.js 基础入门1.1 什么是 Vue.js?1.2 搭建 Vue.js 开发环境安装 Node.js 和 npm安装 Vue CLI创建新项目运行示例 1.3 第一个 Vue.js 示例 第二部分:Vue.js 组件开发基…...

【仿12306项目】通过加“锁”,解决高并发抢票的超卖问题
文章目录 一. 测试工具二. 超卖现象演示三. 原因分析四. 解决办法方法一:加synchronized锁1. 单个服务节点情况2. 增加服务器节点,分布式环境synchronized失效演示 方法二:使用Redis分布式锁锁解决超卖问题1. 添加Redis分布式锁2. 结果 方法三…...
wow-agent---task4 MetaGPT初体验
先说坑: 1.使用git clone模式安装metagpt 2.模型尽量使用在线模型或本地高参数模型。 这里使用python3.10.11调试成功 一,安装 安装 | MetaGPT,参考这里的以开发模型进行安装 git clone https://github.com/geekan/MetaGPT.git cd /you…...

MVANet——小范围内捕捉高分辨率细节而在大范围内不损失精度的强大的背景消除模型
一、概述 前景提取(背景去除)是现代计算机视觉的关键挑战之一,在各种应用中的重要性与日俱增。在图像编辑和视频制作中有效地去除背景不仅能提高美学价值,还能提高工作流程的效率。在要求精确度的领域,如医学图像分析…...

94,【2】buuctf web [安洵杯 2019]easy_serialize_php
进入靶场 可以查看源代码 <?php // 从 GET 请求中获取名为 f 的参数值,并赋值给变量 $function // 符号用于抑制可能出现的错误信息 $function $_GET[f];// 定义一个名为 filter 的函数,用于过滤字符串中的敏感词汇 function filter($img) {// 定义…...

LabVIEW如何有效地进行数据采集?
数据采集(DAQ)是许多工程项目中的核心环节,无论是测试、监控还是控制系统,准确、高效的数据采集都是至关重要的。LabVIEW作为一个图形化编程环境,提供了丰富的功能来实现数据采集,确保数据的实时性与可靠性…...

6 [新一代Github投毒针对网络安全人员钓鱼]
0x01 前言 在Github上APT组织“海莲花”发布存在后门的提权BOF,通过该项目针对网络安全从业人员进行钓鱼。不过其实早在几年前就已经有人对Visual Studio项目恶意利用进行过研究,所以投毒的手法也不算是新的技术。但这次国内有大量的安全从业者转发该钓…...

《Origin画百图》之脊线图
1.数据准备:将数据设置为y 2.选择绘图>统计图>脊线图 3.生成基础图形,并不好看,接下来对图形属性进行设置 4.双击图形>选择图案>颜色选择按点>Y值 5.这里发现颜色有色阶,过度并不平滑,需要对色阶进行更…...

linux 函数 sem_init () 信号量、sem_destroy()
(1) (2) 代码举例: #include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <semaphore.h> #include <unistd.h>sem_t semaphore;void* thread_function(void* arg) …...
Kafka架构
引言 Kafka 凭借其独树一帜的分区架构,在消息中间件领域展现出了卓越的性能表现。其分区架构不仅赋予了 Kafka 强大的并行计算能力,使其能够高效处理海量数据,还显著提升了系统的容灾能力,确保在复杂的运行环境中始终保持稳定可靠…...
刷题记录 动态规划-2: 509. 斐波那契数
题目:509. 斐波那契数 难度:简单 斐波那契数 (通常用 F(n) 表示)形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始,后面的每一项数字都是前面两项数字的和。也就是: F(0) 0,F(1) 1 F(n…...
RDP协议详解
以下内容包含对 RDP(Remote Desktop Protocol,远程桌面协议)及其开源实现 FreeRDP 的较为系统、深入的讲解,涵盖协议概要、历史沿革、核心原理、安全机制、安装与使用方法、扩展与未来发展趋势等方面, --- ## 一、引…...

设计模式的艺术-观察者模式
行为型模式的名称、定义、学习难度和使用频率如下表所示: 1.如何理解观察者模式 一个对象的状态或行为的变化将导致其他对象的状态或行为也发生改变,它们之间将产生联动,正所谓“触一而牵百发”。为了更好地描述对象之间存在的这种一对多&…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...
Unit 1 深度强化学习简介
Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库,例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体,比如 SnowballFight、Huggy the Do…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
快刀集(1): 一刀斩断视频片头广告
一刀流:用一个简单脚本,秒杀视频片头广告,还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农,平时写代码之余看看电影、补补片,是再正常不过的事。 电影嘛,要沉浸,…...

基于Springboot+Vue的办公管理系统
角色: 管理员、员工 技术: 后端: SpringBoot, Vue2, MySQL, Mybatis-Plus 前端: Vue2, Element-UI, Axios, Echarts, Vue-Router 核心功能: 该办公管理系统是一个综合性的企业内部管理平台,旨在提升企业运营效率和员工管理水…...

什么是VR全景技术
VR全景技术,全称为虚拟现实全景技术,是通过计算机图像模拟生成三维空间中的虚拟世界,使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验,结合图文、3D、音视频等多媒体元素…...
如何配置一个sql server使得其它用户可以通过excel odbc获取数据
要让其他用户通过 Excel 使用 ODBC 连接到 SQL Server 获取数据,你需要完成以下配置步骤: ✅ 一、在 SQL Server 端配置(服务器设置) 1. 启用 TCP/IP 协议 打开 “SQL Server 配置管理器”。导航到:SQL Server 网络配…...