当前位置: 首页 > news >正文

源码角度看待线程池的执行流程

文章目录

  • 前言
  • 一、线程池的相关接口和实现类
    • 1.Executor接口
    • 2.ExecutorService接口
    • 3.AbstractExecutorService接口
    • 4.ThreadPoolExecutor 实现类
  • 二、ThreadPoolExecutor源码解析
    • 1.Worker内部类
    • 2.execute()方法
    • 3.addWorker()方法
  • 总结


前言

线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待空闲状态。如果有新的线程任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,线程池会创建一个新线程进行处理或者放入队列(工作队列)中等待。
线程池在多线程编程中扮演着重要角色,它能够管理和复用线程,提高并发执行效率。

在之前的学习中,我们知道了线程池的基本流程如下,而今天我们则用这个流程配合着源码的来重新分析线程池的执行流程。
在这里插入图片描述


一、线程池的相关接口和实现类

1.Executor接口

public interface Executor {/*** Executes the given command at some time in the future.  The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command);
}

Executor接口作为线程池技术中的顶层接口,它的作用是用来定义线程池中,用于提交并执行线程任务的核心方法:exuecte()方法。未来线程池中所有的线程任务,都将由exuecte()方法来执行。

2.ExecutorService接口

public interface ExecutorService extends Executor {//.....
}

ExecutorService接口继承了Executor接口,扩展了awaitTermination()、submit()、shutdown()等专门用于管理线程任务的方法。

3.AbstractExecutorService接口

public abstract class AbstractExecutorService implements ExecutorService {//....
}

ExecutorService接口的抽象实现类AbstractExecutorService,为不同的线程池实现类,提供submit()、invokeAll()等部分方法的公共实现。但是由于在不同线程池中的核心方法exuecte()执行策略不同,所以在AbstractExecutorService并未提供该方法的具体实现。

4.ThreadPoolExecutor 实现类

public class ThreadPoolExecutor extends AbstractExecutorService {//...
}

ThreadPoolExecutor实现类是AbstractExecutorService接口的的两个重要实现类之一,(ForkJoinPool是另一个),也是要掌握的关于线程池的重点区域;
ThreadPoolExecutor线程池通过Woker工作线程、BlockingQueue阻塞工作队列 以及 拒绝策略实现了一个标准的线程池;


二、ThreadPoolExecutor源码解析

在对源码进行解析之前,我们先看看官方给我们关于ThreadPoolExecutor的解析是什么:

The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
* …

这句话什么意思呢?
主池控制状态ctl是一个原子整数封装?

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

再结合ctl的实例化,我们这个发现这个ctl是一个具有原子性的整数,再往后就是告诉我们这个具有原子性的整数是由两个概念组成的:workerCount工作线程数和runState运行状态。
他是一个32位的整数,具体表示形式位:
在这里插入图片描述
所以说,它的作用就是通过位运算来存储线程池的状态和活动线程数信息

1.Worker内部类

每个Woker类的对象,都代表线程池中的一个工作线程。
Worker类是ThreadPoolExecutor类中定义的一个私有内部类,保存了每个Worker工作线程要执行的Runnable线程任务和Thread线程对象。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {	private static final long serialVersionUID = 6138294804551838833L;//用于存储当前工作线程的引用。final Thread thread;//用于存储该工作线程要执行的第一个任务。Runnable firstTask;//用于记录该工作线程已经完成的任务数量volatile long completedTasks;//初始化工作线程的状态,设置第一个任务并创建一个线程对象Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}//Runnable 接口的 run 方法,调用了 runWorker(this),将工作线程自身作为参数传递给 runWorker方法public void run() {runWorker(this);}//因为继承了AbstractQueuedSynchronizer 类,一下方法都是基于线程安全的方法//.....       }

当ThreadPoolExecutor线程池,通过exeute()方法执行1个线程任务时,会调用addWorker()方法创建一个Woker工作线程对象。并且,创建好的Worker工作线程对象,会被添加到一个HashSet workders工作线程集合,统一由线程池进行管理。
当Worker工作线程,在第一次执行完成线程任务后,这个Worker工作线程并不会销毁,而是会以循环的方式,通过线程池的getTask()方法,获取阻塞工作队列中新的Runnable线程任务,并通过当前Worker工作线程中所绑定Thread线程,完成新线程任务的执行,从而实现了线程池的中Thread线程的重复使用

2.execute()方法

ThreadPoolExecutor线程池中,会通过execute(Runnable command)方法执行Runnable类型的线程任务。
在分析execute()方法之前,也来看看官方是怎么解释这个方法的:

    /** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/

简单来说就是将execute()方法分成三个步骤:
1.如果,工作线程的数量小于核心线程数,则通过addWorker()方法,创建新的Worker工作线程,并添加至workers工作线程集合;
2.如果一个任务可以成功排队(工作线程的数量大于核心线程数),并且线程池处于RUNNING状态,那么,线程池会将Runnable类型的线程任务,缓存至workQueue阻塞工作队列,等待某个空闲工作线程获取并执行该任务;
3.如果我们不能排队任务,那么我们尝试添加一个新的线程。如果它失败了,我们知道我们被关闭或饱和了,因此拒绝该任务。

源码:

/* *@param 要提交给线程池的任务
*/
public void execute(Runnable command) {//如果传入的任务为空,抛出空指针异常。if (command == null)throw new NullPointerException();//获取当前线程池的状态和活动线程数。int c = ctl.get();//如果当前线程数小于核心线程数if (workerCountOf(c) < corePoolSize) {//如果可以通过addWorker方法创建一个新的工作线程来执行任务//因为当前线程数小于核心线程数,所以第二个参数穿入true代表创建的是核心线程if (addWorker(command, true))return;//创建成功,直接返回。//创建失败,获取更新后的线程池状态c = ctl.get();}//如果线程池状态是运行中且工作队列能够接受新任务if (isRunning(c) && workQueue.offer(command)) {//任务进入工作队列//重新获取线程池的状态和工作线程数int recheck = ctl.get();//如果线程池不是运行状态,则删除任务if (! isRunning(recheck) && remove(command))//执行拒绝策略reject(command);// 如果工作线程数等于零,通过addWorker()方法检查线程池状态和工作队列else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果无法将任务添加到队列中,也无法创建新的工作线程,那么拒绝任务的执行else if (!addWorker(command, false))reject(command);}

3.addWorker()方法

在分析execute()方法中,发现execute()方法多次调用了addWorker()创建一个工作线程,用于执行当前线程任务。
addWorker()可以分为两个执行部分,检查线程池的状态和工作线程数量和创建并执行工作线程。
第1部分:检查线程池的状态和工作线程数量

//参数:1.传入的任务,2.是否创建核心线程
private boolean addWorker(Runnable firstTask, boolean core) {// 循环检查线程池的状态,直到符合创建工作线程的条件,通过retry标签break退出retry:for (;;) {//获取线程池运行状态int c = ctl.get();int rs = runStateOf(c);//如果线程池处于开始关闭的状态(获取线程任务为空,同时工作队列不等于空)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;//检查工作线程数量for (;;) {//获取当前工作线程数int wc = workerCountOf(c);//如果工作线程数量如果超出线程池的最大容量或者核心线程数(最大线程数)//三元运算符表示的是当前要的是核心线程还是非核心线程if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//不再创建新的线程//通过ctl对象,将当前工作线程数量+1,并通过retry标签break退出外层循环if (compareAndIncrementWorkerCount(c))break retry;//再次获取线程池状态,检查是否发生变化c = ctl.get();  if (runStateOf(c) != rs)continue retry;//...}}}

第二部分:创建并执行线程工程

		//....//用于判断工作线程是否启动和保存boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建新工作线程,并通过线程工厂创建Thread线程w = new Worker(firstTask);//获取新工作线程的Thread线程对象,用于启动真正的线程final Thread t = w.thread;   if (t != null) {//获取线程池的reentrantLock主锁,保证线程安全final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//检查线程池运行状态int rs = runStateOf(ctl.get());//如果线程池状态小于关闭状态或者线程池状态为关闭且没有初始任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果工作线程已经在运行(存活),抛出非法线程状态异常。if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//保存工作线程workers.add(w);//记录线程池的最大工作线程数int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//正式启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {//如果工作线程没有成功启动,则调用添加失败的方法if (! workerStarted)addWorkerFailed(w);}//返回线程启动状态return workerStarted;

总结

execute()方法是ThreadPoolExecutor线程池执行的开始,它完整实现了Executor接口定义execute()方法,这个方法作用是执行一个Runnable类型的线程任务。整体的执行流程是:
在这里插入图片描述

相关文章:

源码角度看待线程池的执行流程

文章目录 前言一、线程池的相关接口和实现类1.Executor接口2.ExecutorService接口3.AbstractExecutorService接口4.ThreadPoolExecutor 实现类 二、ThreadPoolExecutor源码解析1.Worker内部类2.execute()方法3.addWorker()方法 总结 前言 线程池内部维护了若干个线程&#xff…...

我们的第一个 Qt 窗口程序

Qt 入门实战教程&#xff08;目录&#xff09; Windows Qt 5.12.10下载与安装 为何使用Qt Creator开发QT 本文介绍用Qt自带的集成开发工具Qt Creator创建Qt默认的窗口程序。 本文不需要你另外安装Visual Studio 2022这样的集成开发环境&#xff0c;也不需要你再在Visual St…...

Linux 8 下的容器引擎Podman概述

一、前言 最近在进行OS国产化交流中&#xff0c;了解到部分业务迁移到BClinux 8.2或Anolis 8.2时&#xff0c;原有docker业务需要迁移到新的容器平台&#xff1a;Podman&#xff0c;来完成容器的新的管理。Podman&#xff08;全称 Pod Manager&#xff09;是一款用于在 Linux 系…...

C++编辑修改PDF

PDFWriter是一个易于使用的C创建、修改PDF文档的库 1.创建一个PDF文件 #include #include “PDFWriter.h” int main() { std::cout << “Hello World!\n”; PDFWriter pdfWriter; int retpdfWriter.StartPDF(“D:\mytestwriterpdf.pdf”, ePDFVersion13); if (ret eS…...

数据倾斜优化

数据倾斜发生的原因有哪些&#xff1f; map输出数据按key Hash的分配到reduce中&#xff0c;由于key分布不均匀、业务数据本身的特性、建表时考虑不周等原因造成的reduce 上的数据量差异过大。 数据倾斜解决方式有哪些 group by 导致的数据倾斜 1.开启Map-Side聚合后&#x…...

Acwing796.子矩阵的和

理解二维前缀和&#xff1a; #include <iostream>using namespace std;const int N 1010;int a[N][N], s[N][N];int main() {int n, m, q;cin >> n >> m >> q;for (int i 1; i < n; i)for (int j 1; j < m; j) {scanf("%d", &a…...

【ELK日志收集系统】

目录 一、概述 1.作用 2.为什么使用&#xff1f; 二、组件 1.elasticsearch 1.1 作用 1.2 特点 2.logstash 2.1 作用 2.2 工作过程 2.3 INPUT 2.4 FILETER 2.5 OUTPUTS 3.kibana 三、架构类型 1.ELK 2.ELKK 3.ELFK 4.ELFKK 四、案例 - 构建ELK集群 1.环境…...

Java项目中实现信号的连续接收

系列文章目录 文章目录 系列文章目录前言一、监听信号二、信号处理逻辑三、停止信号监听总结 前言 在Java项目中&#xff0c;信号的连续接收是一项重要的任务&#xff0c;特别是在处理异步事件或者需要对外部事件做出响应时。本篇博客将介绍如何在Java项目中实现信号的连续接收…...

vue权限管理——按钮控制

1.按钮根据后端返回数据决定展示与否 根据right中的数据对应增删改查按钮 const menuList [{id: 1, path:/uploadSpec,authName: "上传spec", icon: User, children:[], rights:[view,add,edit,delete]},{id: 2, path:/showSpec, authName: "Spec预览",…...

jvm的内存区域

JVM 内存分为线程私有区和线程共享区&#xff0c;其中方法区和堆是线程共享区&#xff0c;虚拟机栈、本地方法栈和程序计数器是线程隔离的数据区。 1&#xff09;程序计数器 程序计数器&#xff08;Program Counter Register&#xff09;也被称为 PC 寄存器&#xff0c;是一块…...

即时通讯开发中的性能优化技巧

即时通讯开发在如今的数字化社会中扮演着重要角色&#xff0c;然而&#xff0c;随着用户对即时通讯应用的需求不断增长&#xff0c;开发者们面临着使其应用保持高性能和可靠性的挑战。本文将探讨即时通讯开发中关键的性能优化技巧&#xff0c;帮助开发者们提升应用的用户体验和…...

flinkcdc同步完全量数据就不同步增量数据了

flinkcdc同步完全量数据就不同步增量数据了 使用flinkcdc同步mysql数据&#xff0c;使用的是全量采集模型 startupOptions(StartupOptions.earliest()) 全量阶段同步完成之后&#xff0c;发现并不开始同步增量数据&#xff0c;原因有以下两个&#xff1a; 原因1&#xff1a; …...

VBA:Application.GetOpenFilename打开指定文件夹里的excel类型文件(xls、xlsx)

GetOpenFilename相当于Excel打开窗口&#xff0c;通过该窗口选择要打开的文件&#xff0c;并可以返回选择的文件完整路径和文件名。 Application.GetOpenFilename(“文件类型筛选规则(就是说明)”,“优先显示第几个类型的文件”,“标题”,“是否允许选择多个文件名”) 打开类型…...

利用R作圆环条形图

从理念上看&#xff0c;本质就是增加了圆环弧度的条形图。如上图2。 需要以下步骤&#xff1a; 数据处理&#xff0c;将EXCEL中的数据做成3*N的表格导入系统&#xff0c;代码如下&#xff1a;library(tidyverse) library(stringr)library(ggplot2)library(viridis) stuper &…...

JavaScript(笔记)

目录 Hello World JavaScript 的变量 JavaScript 动态类型 隐式类型转换 JavaScript 数组 JavaScript 函数 JavaScript 中变量的作用域 对象 DOM 选中页面元素 事件 获取 / 修改元素内容 获取 / 修改元素属性 获取 / 修改 表单元素属性 获取 / 修改样式属性 新…...

软件工程(九) UML顺序-活动-状态-通信图

顺序图和后面的一些图,要求没有用例图和类图那么高,但仍然是比较重要的,我们也需要按程度去了解。 1、顺序图 顺序图(sequence diagram, 顺序图),顺序图是一种交互图(interaction diagram),它强调的是对象之间消息发送的顺序,同时显示对象之间的交互。 下面以一个简…...

JVM 是怎么设计来保证new对象的线程安全

1、采用 CAS 分配重试的方式来保证更新操作的原子性 2、每个线程在 Java 堆中预先分配一小块内存&#xff0c;也就是本地线程分配缓冲&#xff08;Thread Local AllocationBuffer&#xff0c;TLAB&#xff09;&#xff0c;要分配内存的线程&#xff0c;先在本地缓冲区中分配&a…...

【JavaEE基础学习打卡00】该专栏知识大纲在这里!

目录 前言一、为什么有该教程二、教程内容介绍1.JavaEE2.JDBC3.JSP编程4.JavaBean5.Servlet6.综合案例7.拦截器、过滤器 三、学习前置要求四、课程服务总结 前言 &#x1f4dc; 本系列教程适用于 Java Web 初学者、爱好者&#xff0c;小白白。我们的天赋并不高&#xff0c;可贵…...

C# 跨线程访问窗体控件

在不加任何修饰的情况下&#xff0c;C# 默认不允许跨线程访问控件&#xff0c;实际在项目开发过程中&#xff0c;经常使用跨线程操作控件属性&#xff0c;需要设置相关属性才能正确使用&#xff0c;两种方法设置如下&#xff1a; 方法1&#xff1a;告诉编译器取消跨线程访问检…...

Ctenos7安装mysql-8.1.0/tomcat-9.0.80/LNMT部署

目录 一、实验拓扑 二、部署mysql 三、部署Tomcat 四、配置NGINX 五、 配置NGINX的双机热备提高可用性 一、实验拓扑 二、部署mysql 官网下载地址https://dev.mysql.com/downloads/mysql/ 1、移除mariadb&#xff0c;安装所需应用 mysql-8.1.0 社区版 安装说明官网下载地址…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

ElasticSearch搜索引擎之倒排索引及其底层算法

文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)

Aspose.PDF 限制绕过方案&#xff1a;Java 字节码技术实战分享&#xff08;仅供学习&#xff09; 一、Aspose.PDF 简介二、说明&#xff08;⚠️仅供学习与研究使用&#xff09;三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...

使用Spring AI和MCP协议构建图片搜索服务

目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式&#xff08;本地调用&#xff09; SSE模式&#xff08;远程调用&#xff09; 4. 注册工具提…...