源码角度看待线程池的执行流程
文章目录
- 前言
- 一、线程池的相关接口和实现类
- 1.Executor接口
- 2.ExecutorService接口
- 3.AbstractExecutorService接口
- 4.ThreadPoolExecutor 实现类
- 二、ThreadPoolExecutor源码解析
- 1.Worker内部类
- 2.execute()方法
- 3.addWorker()方法
- 总结
前言
线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或者放入队列(工作队列)中等待。
线程池在多线程编程中扮演着重要角色,它能够管理和复用线程,提高并发执行效率。
在之前的学习中,我们知道了线程池的基本流程如下,而今天我们则用这个流程配合着源码的来重新分析线程池的执行流程。
一、线程池的相关接口和实现类
1.Executor接口
public interface Executor {/*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command);
}
Executor接口作为线程池技术中的顶层接口,它的作用是用来定义线程池中,用于提交并执行线程任务的核心方法:exuecte()方法。未来线程池中所有的线程任务,都将由exuecte()方法来执行。
2.ExecutorService接口
public interface ExecutorService extends Executor {//.....
}
ExecutorService接口继承了Executor接口,扩展了awaitTermination()、submit()、shutdown()等专门用于管理线程任务的方法。
3.AbstractExecutorService接口
public abstract class AbstractExecutorService implements ExecutorService {//....
}
ExecutorService接口的抽象实现类AbstractExecutorService,为不同的线程池实现类,提供submit()、invokeAll()等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()执行策略不同,所以在AbstractExecutorService并未提供该方法的具体实现。
4.ThreadPoolExecutor 实现类
public class ThreadPoolExecutor extends AbstractExecutorService {//...
}
ThreadPoolExecutor实现类是AbstractExecutorService接口的的两个重要实现类之一,(ForkJoinPool是另一个),也是要掌握的关于线程池的重点区域;
ThreadPoolExecutor线程池通过Woker工作线程、BlockingQueue阻塞工作队列 以及 拒绝策略实现了一个标准的线程池;
二、ThreadPoolExecutor源码解析
在对源码进行解析之前,我们先看看官方给我们关于ThreadPoolExecutor的解析是什么:
The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* …
这句话什么意思呢?
主池控制状态ctl是一个原子整数封装?
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
再结合ctl的实例化,我们这个发现这个ctl是一个具有原子性的整数,再往后就是告诉我们这个具有原子性的整数是由两个概念组成的:workerCount工作线程数和runState运行状态。
他是一个32位的整数,具体表示形式位:
所以说,它的作用就是通过位运算来存储线程池的状态和活动线程数信息。
1.Worker内部类
每个Woker类的对象,都代表线程池中的一个工作线程。
Worker类是ThreadPoolExecutor类中定义的一个私有内部类,保存了每个Worker工作线程要执行的Runnable线程任务和Thread线程对象。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L;//用于存储当前工作线程的引用。final Thread thread;//用于存储该工作线程要执行的第一个任务。Runnable firstTask;//用于记录该工作线程已经完成的任务数量volatile long completedTasks;//初始化工作线程的状态,设置第一个任务并创建一个线程对象Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//Runnable 接口的 run 方法,调用了 runWorker(this),将工作线程自身作为参数传递给 runWorker方法public void run() {runWorker(this);}//因为继承了AbstractQueuedSynchronizer 类,一下方法都是基于线程安全的方法//..... }
当ThreadPoolExecutor线程池,通过exeute()方法执行1个线程任务时,会调用addWorker()方法创建一个Woker工作线程对象。并且,创建好的Worker工作线程对象,会被添加到一个HashSet workders工作线程集合,统一由线程池进行管理。
当Worker工作线程,在第一次执行完成线程任务后,这个Worker工作线程并不会销毁,而是会以循环的方式,通过线程池的getTask()方法,获取阻塞工作队列中新的Runnable线程任务,并通过当前Worker工作线程中所绑定Thread线程,完成新线程任务的执行,从而实现了线程池的中Thread线程的重复使用。
2.execute()方法
ThreadPoolExecutor线程池中,会通过execute(Runnable command)方法执行Runnable类型的线程任务。
在分析execute()方法之前,也来看看官方是怎么解释这个方法的:
/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/
简单来说就是将execute()方法分成三个步骤:
1.如果,工作线程的数量小于核心线程数,则通过addWorker()方法,创建新的Worker工作线程,并添加至workers工作线程集合;
2.如果一个任务可以成功排队(工作线程的数量大于核心线程数),并且线程池处于RUNNING状态,那么,线程池会将Runnable类型的线程任务,缓存至workQueue阻塞工作队列,等待某个空闲工作线程获取并执行该任务;
3.如果我们不能排队任务,那么我们尝试添加一个新的线程。如果它失败了,我们知道我们被关闭或饱和了,因此拒绝该任务。
源码:
/* *@param 要提交给线程池的任务
*/
public void execute(Runnable command) {//如果传入的任务为空,抛出空指针异常。if (command == null)throw new NullPointerException();//获取当前线程池的状态和活动线程数。int c = ctl.get();//如果当前线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {//如果可以通过addWorker方法创建一个新的工作线程来执行任务//因为当前线程数小于核心线程数,所以第二个参数穿入true代表创建的是核心线程if (addWorker(command, true))return;//创建成功,直接返回。//创建失败,获取更新后的线程池状态c = ctl.get();}//如果线程池状态是运行中且工作队列能够接受新任务if (isRunning(c) && workQueue.offer(command)) {//任务进入工作队列//重新获取线程池的状态和工作线程数int recheck = ctl.get();//如果线程池不是运行状态,则删除任务if (! isRunning(recheck) && remove(command))、//执行拒绝策略reject(command);// 如果工作线程数等于零,通过addWorker()方法检查线程池状态和工作队列else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果无法将任务添加到队列中,也无法创建新的工作线程,那么拒绝任务的执行else if (!addWorker(command, false))reject(command);}
3.addWorker()方法
在分析execute()方法中,发现execute()方法多次调用了addWorker()创建一个工作线程,用于执行当前线程任务。
addWorker()可以分为两个执行部分,检查线程池的状态和工作线程数量和创建并执行工作线程。
第1部分:检查线程池的状态和工作线程数量
//参数:1.传入的任务,2.是否创建核心线程
private boolean addWorker(Runnable firstTask, boolean core) {// 循环检查线程池的状态,直到符合创建工作线程的条件,通过retry标签break退出retry:for (;;) {//获取线程池运行状态int c = ctl.get();int rs = runStateOf(c);//如果线程池处于开始关闭的状态(获取线程任务为空,同时工作队列不等于空)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;//检查工作线程数量for (;;) {//获取当前工作线程数int wc = workerCountOf(c);//如果工作线程数量如果超出线程池的最大容量或者核心线程数(最大线程数)//三元运算符表示的是当前要的是核心线程还是非核心线程if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//不再创建新的线程//通过ctl对象,将当前工作线程数量+1,并通过retry标签break退出外层循环if (compareAndIncrementWorkerCount(c))break retry;//再次获取线程池状态,检查是否发生变化c = ctl.get(); if (runStateOf(c) != rs)continue retry;//...}}}
第二部分:创建并执行线程工程
//....//用于判断工作线程是否启动和保存boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建新工作线程,并通过线程工厂创建Thread线程w = new Worker(firstTask);//获取新工作线程的Thread线程对象,用于启动真正的线程final Thread t = w.thread; if (t != null) {//获取线程池的reentrantLock主锁,保证线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//检查线程池运行状态int rs = runStateOf(ctl.get());//如果线程池状态小于关闭状态或者线程池状态为关闭且没有初始任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果工作线程已经在运行(存活),抛出非法线程状态异常。if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//保存工作线程workers.add(w);//记录线程池的最大工作线程数int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//正式启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {//如果工作线程没有成功启动,则调用添加失败的方法if (! workerStarted)addWorkerFailed(w);}//返回线程启动状态return workerStarted;
总结
execute()方法是ThreadPoolExecutor线程池执行的开始,它完整实现了Executor接口定义execute()方法,这个方法作用是执行一个Runnable类型的线程任务。整体的执行流程是:
相关文章:

源码角度看待线程池的执行流程
文章目录 前言一、线程池的相关接口和实现类1.Executor接口2.ExecutorService接口3.AbstractExecutorService接口4.ThreadPoolExecutor 实现类 二、ThreadPoolExecutor源码解析1.Worker内部类2.execute()方法3.addWorker()方法 总结 前言 线程池内部维护了若干个线程ÿ…...

我们的第一个 Qt 窗口程序
Qt 入门实战教程(目录) Windows Qt 5.12.10下载与安装 为何使用Qt Creator开发QT 本文介绍用Qt自带的集成开发工具Qt Creator创建Qt默认的窗口程序。 本文不需要你另外安装Visual Studio 2022这样的集成开发环境,也不需要你再在Visual St…...

Linux 8 下的容器引擎Podman概述
一、前言 最近在进行OS国产化交流中,了解到部分业务迁移到BClinux 8.2或Anolis 8.2时,原有docker业务需要迁移到新的容器平台:Podman,来完成容器的新的管理。Podman(全称 Pod Manager)是一款用于在 Linux 系…...

C++编辑修改PDF
PDFWriter是一个易于使用的C创建、修改PDF文档的库 1.创建一个PDF文件 #include #include “PDFWriter.h” int main() { std::cout << “Hello World!\n”; PDFWriter pdfWriter; int retpdfWriter.StartPDF(“D:\mytestwriterpdf.pdf”, ePDFVersion13); if (ret eS…...

数据倾斜优化
数据倾斜发生的原因有哪些? map输出数据按key Hash的分配到reduce中,由于key分布不均匀、业务数据本身的特性、建表时考虑不周等原因造成的reduce 上的数据量差异过大。 数据倾斜解决方式有哪些 group by 导致的数据倾斜 1.开启Map-Side聚合后&#x…...

Acwing796.子矩阵的和
理解二维前缀和: #include <iostream>using namespace std;const int N 1010;int a[N][N], s[N][N];int main() {int n, m, q;cin >> n >> m >> q;for (int i 1; i < n; i)for (int j 1; j < m; j) {scanf("%d", &a…...

【ELK日志收集系统】
目录 一、概述 1.作用 2.为什么使用? 二、组件 1.elasticsearch 1.1 作用 1.2 特点 2.logstash 2.1 作用 2.2 工作过程 2.3 INPUT 2.4 FILETER 2.5 OUTPUTS 3.kibana 三、架构类型 1.ELK 2.ELKK 3.ELFK 4.ELFKK 四、案例 - 构建ELK集群 1.环境…...
Java项目中实现信号的连续接收
系列文章目录 文章目录 系列文章目录前言一、监听信号二、信号处理逻辑三、停止信号监听总结 前言 在Java项目中,信号的连续接收是一项重要的任务,特别是在处理异步事件或者需要对外部事件做出响应时。本篇博客将介绍如何在Java项目中实现信号的连续接收…...
vue权限管理——按钮控制
1.按钮根据后端返回数据决定展示与否 根据right中的数据对应增删改查按钮 const menuList [{id: 1, path:/uploadSpec,authName: "上传spec", icon: User, children:[], rights:[view,add,edit,delete]},{id: 2, path:/showSpec, authName: "Spec预览",…...

jvm的内存区域
JVM 内存分为线程私有区和线程共享区,其中方法区和堆是线程共享区,虚拟机栈、本地方法栈和程序计数器是线程隔离的数据区。 1)程序计数器 程序计数器(Program Counter Register)也被称为 PC 寄存器,是一块…...

即时通讯开发中的性能优化技巧
即时通讯开发在如今的数字化社会中扮演着重要角色,然而,随着用户对即时通讯应用的需求不断增长,开发者们面临着使其应用保持高性能和可靠性的挑战。本文将探讨即时通讯开发中关键的性能优化技巧,帮助开发者们提升应用的用户体验和…...
flinkcdc同步完全量数据就不同步增量数据了
flinkcdc同步完全量数据就不同步增量数据了 使用flinkcdc同步mysql数据,使用的是全量采集模型 startupOptions(StartupOptions.earliest()) 全量阶段同步完成之后,发现并不开始同步增量数据,原因有以下两个: 原因1: …...
VBA:Application.GetOpenFilename打开指定文件夹里的excel类型文件(xls、xlsx)
GetOpenFilename相当于Excel打开窗口,通过该窗口选择要打开的文件,并可以返回选择的文件完整路径和文件名。 Application.GetOpenFilename(“文件类型筛选规则(就是说明)”,“优先显示第几个类型的文件”,“标题”,“是否允许选择多个文件名”) 打开类型…...

利用R作圆环条形图
从理念上看,本质就是增加了圆环弧度的条形图。如上图2。 需要以下步骤: 数据处理,将EXCEL中的数据做成3*N的表格导入系统,代码如下:library(tidyverse) library(stringr)library(ggplot2)library(viridis) stuper &…...

JavaScript(笔记)
目录 Hello World JavaScript 的变量 JavaScript 动态类型 隐式类型转换 JavaScript 数组 JavaScript 函数 JavaScript 中变量的作用域 对象 DOM 选中页面元素 事件 获取 / 修改元素内容 获取 / 修改元素属性 获取 / 修改 表单元素属性 获取 / 修改样式属性 新…...

软件工程(九) UML顺序-活动-状态-通信图
顺序图和后面的一些图,要求没有用例图和类图那么高,但仍然是比较重要的,我们也需要按程度去了解。 1、顺序图 顺序图(sequence diagram, 顺序图),顺序图是一种交互图(interaction diagram),它强调的是对象之间消息发送的顺序,同时显示对象之间的交互。 下面以一个简…...

JVM 是怎么设计来保证new对象的线程安全
1、采用 CAS 分配重试的方式来保证更新操作的原子性 2、每个线程在 Java 堆中预先分配一小块内存,也就是本地线程分配缓冲(Thread Local AllocationBuffer,TLAB),要分配内存的线程,先在本地缓冲区中分配&a…...

【JavaEE基础学习打卡00】该专栏知识大纲在这里!
目录 前言一、为什么有该教程二、教程内容介绍1.JavaEE2.JDBC3.JSP编程4.JavaBean5.Servlet6.综合案例7.拦截器、过滤器 三、学习前置要求四、课程服务总结 前言 📜 本系列教程适用于 Java Web 初学者、爱好者,小白白。我们的天赋并不高,可贵…...

C# 跨线程访问窗体控件
在不加任何修饰的情况下,C# 默认不允许跨线程访问控件,实际在项目开发过程中,经常使用跨线程操作控件属性,需要设置相关属性才能正确使用,两种方法设置如下: 方法1:告诉编译器取消跨线程访问检…...

Ctenos7安装mysql-8.1.0/tomcat-9.0.80/LNMT部署
目录 一、实验拓扑 二、部署mysql 三、部署Tomcat 四、配置NGINX 五、 配置NGINX的双机热备提高可用性 一、实验拓扑 二、部署mysql 官网下载地址https://dev.mysql.com/downloads/mysql/ 1、移除mariadb,安装所需应用 mysql-8.1.0 社区版 安装说明官网下载地址…...

超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...

Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)
CSI-2 协议详细解析 (一) 1. CSI-2层定义(CSI-2 Layer Definitions) 分层结构 :CSI-2协议分为6层: 物理层(PHY Layer) : 定义电气特性、时钟机制和传输介质(导线&#…...

基于当前项目通过npm包形式暴露公共组件
1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹,并新增内容 3.创建package文件夹...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...

【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案
在大数据时代,海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构,在处理大规模数据抓取任务时展现出强大的能力。然而,随着业务规模的不断扩大和数据抓取需求的日益复杂,传统…...

wpf在image控件上快速显示内存图像
wpf在image控件上快速显示内存图像https://www.cnblogs.com/haodafeng/p/10431387.html 如果你在寻找能够快速在image控件刷新大图像(比如分辨率3000*3000的图像)的办法,尤其是想把内存中的裸数据(只有图像的数据,不包…...

Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...
pycharm 设置环境出错
pycharm 设置环境出错 pycharm 新建项目,设置虚拟环境,出错 pycharm 出错 Cannot open Local Failed to start [powershell.exe, -NoExit, -ExecutionPolicy, Bypass, -File, C:\Program Files\JetBrains\PyCharm 2024.1.3\plugins\terminal\shell-int…...