线程池源码解析项目中如何配置线程池
目录
基础回顾
线程池执行任务流程
简单使用
构造函数
execute方法
execute中出现的ctl属性
execute中出现的addWorker方法
addWorker中出现的addWorkerFailed方法
addWorker中出现的Worker类
Worker类中run方法出现的runWorker方法
runWorker中出现的getTask
runWorker中出现的processWorkerExit
项目中如何配置使用线程池
参考
基础回顾
线程池基础不好的还是要先了解线程池大体知识,不要眼高手低❌
ThreadPoolExecutor线程池有关_明天一定.的博客-CSDN博客
我都忘记了我要写线程池源码相关文章了,填个多年前的坑➿
线程池执行任务流程
线程池新增线程过程
简单使用
看源码,当然要先会简单使用:
public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1),new ThreadPoolExecutor.CallerRunsPolicy());executor.execute(() -> {System.out.println("线程池执行");});}
我们创建了一个ThreadPoolExecutor对象,然后调用execute方法
构造函数
源码中共有4中构造方法
最终都是调用最后一个构造方法,其他构造都是给出了默认配置,比如默认线程工厂,默认拒绝策略。所以我们只看参数最长的构造
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
可以看出来有两步,第一步判断参数是否合理,第二步给属性赋值。重要字段具体含义不再赘述,不懂的看文章开头那篇文章。比较生疏的是acc这个属性,他是一个三元表达式去赋值,判断检查操作是否有权限执行,如果有权限则拿到权限控制的上下文,源码中属性注释也说了,该属性用来执行finalizer。
execute方法
这个方法分三步:
- 如果运行线程比核心线程数少
- 如果任务可以成功的放入队列
- 如果任务放入队列失败,我们就新增线程,如果失败则执行拒绝策略或是因为线程池关闭了
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 获取当前工作线程数和线程池运行状态if (workerCountOf(c) < corePoolSize) { // 判断工作线程是否比核心线程少if (addWorker(command, true)) // 新增workerreturn;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) { // 线程池状态是否是running,是的话往阻塞队列加任务int recheck = ctl.get(); // 双重检查,因为从上次检查到进入此方法,线程池可能改变状态if (! isRunning(recheck) && remove(command)) // 如果当前线程池状态不是RUNNING则从队列删除任务reject(command); // 拒绝策略触发else if (workerCountOf(recheck) == 0) // wc如果是0时,则新增worker执行queue的任务addWorker(null, false);}else if (!addWorker(command, false))// 阻塞队列已满才会走的逻辑reject(command);}
execute中出现的ctl属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;
可以看出来ctl的类型是原子整数,初始值是状态或上工作线程数。cti共同记录了运行状态和工作线程数。ctl的组成前三位是状态,后29位是表示线程数。
善用位运算可以节省空间(复合值),而在这里我认为是想可以保证状态和数量的统一变化。
顺便补一下线程池的状态和生命周期的转换:
execute中出现的addWorker方法
该方法主要分为两步:
- cas自旋新增线程
- 创建线程实例并且执行任务
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池状态不是running;线程池状态为SHUTDOWN,且要执行的任务不为空;线程池状态为SHUTDOWN,且任务队列为空;都返回失败if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);// 工作线程数>=线程池容量 || 工作线程数>=(核心线程数||最大线程数)if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c)) // 线程新增自旋成功break retry; // 退出外层循环c = ctl.get(); // Re-read ctl// 重新获取状态和之前状态对比,若一样则内层循坏,否则外循环if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false; // 工作线程调用start()方法标志boolean workerAdded = false; // 工作线程被添加标志Worker w = null;try {w = new Worker(firstTask); // 创建工作线程实例final Thread t = w.thread; // 获取工作线程持有的线程实例if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// 线程池状态为RUNNING或者(线程池状态为SHUTDOWN并且没有新任务时)if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 预检查该线程状态throw new IllegalThreadStateException();workers.add(w); // 线程加入到存放工作线程的HashSet容器,workers全局唯一并被mainLock持有。方便索引出线程int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;// 工作线程被添加标志置为true}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 调用该线程执行任务 workerStarted = true; // 工作线程调用start()方法标志置为true}}} finally {if (! workerStarted) // 如果线程启动失败addWorkerFailed(w);}return workerStarted;}
流程辅助查看
addWorker中出现的addWorkerFailed方法
回滚工作线程的创建
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (w != null)workers.remove(w);// 从hash表移除decrementWorkerCount(); // ctl减1tryTerminate(); // 尝试把线程池状态变为Terminate} finally {mainLock.unlock();}}
addWorker中出现的Worker类
它是ThreadPoolExecutor的内部类,继承了AQS实现了Runnable接口,我们主要关注它的run方法
public void run() {runWorker(this); }
Worker类中run方法出现的runWorker方法
真正执行任务的方法,通过getTask从队列拿任务
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask; // 获取工作线程中用来执行任务的线程实例w.firstTask = null;w.unlock(); // 允许中断boolean completedAbruptly = true; // 线程意外终止标志try {// 如果当前任务不为空,则直接执行;否则调用getTask()从任务队列中取出一个任务执行while (task != null || (task = getTask()) != null) {w.lock();// 如果状态值大于等于STOP且当前线程还没有被中断,则主动中断线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task); // 任务执行前的回调,空实现,可以在子类中自定义Throwable thrown = null;try {task.run(); // 执行线程任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); // 任务执行后的回调,空实现,可以在子类中自定义}} finally {task = null; // 将循环变量task设置为null,表示已处理完成w.completedTasks++; // 当前已完成的任务数+1w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly); // 工作线程退出}}
runWorker流程图如下
runWorker中出现的getTask
根据配置,对任务进行阻塞或超时等待。
private Runnable getTask() {boolean timedOut = false; // // 通过timeOut变量表示拿出的线程是否超时了for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查线程池状态以及阻塞队列的大小if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 当前线程是否允许超时销毁的标志// 允许超时销毁:当线程池允许核心线程超时 或 工作线程数>核心线程数boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 如果(当前线程数大于最大线程数 或 (允许超时销毁 且 当前发生了空闲时间超时))// 且(当前线程数大于1 或 阻塞队列为空)// 则减少worker计数并返回nullif ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 根据线程是否允许超时判断用poll还是take(会阻塞)方法从任务队列头部取出一个任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
runWorker中出现的processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 如果中断,调整减少worker的数量decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w); // 从工作线程集合中移除该工作线程} finally {mainLock.unlock();}tryTerminate(); // 尝试把线程状态变为terminateint c = ctl.get();// 如果是RUNNING 或 SHUTDOWN状态if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {// 如果允许核心线程超时则最小线程数是0,否则最小线程数等于核心线程数int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 如果阻塞队列非空,则至少要有一个线程继续执行剩下的任务if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false); // 重新创建一个worker来代替被销毁的线程}}
项目中如何配置使用线程池
用过线程池的都知道,那些核心参数是真的不好确定,需要做大量修改、发布、验证这套工作。市面上常说的是分io密集型和cpu密集型,但是具体参数不是那么好确定的,比如线程池设置为 2*CPU 核心数,有点像是把任务都当做 IO 密集型去处理了。而且一个项目里面一般来说不止一个自定义线程池吧?比如有专门处理数据上送的线程池,有专门处理查询请求的线程池,这样去做一个简单的线程隔离。但是如果都用这样的参数配置的话,显然是不合理的。
所以我们需要一个可动态配置的线程池,可以自己写一个模块,也可以用已经开源的dynamic-tp或者hippo4j。具体实现还是利用ThreadPoolExecutor中的一些set方法
关于队列的动态调整:美团他们有一个名字为 ResizableCapacityLinkedBlockIngQueue 的队列:很明显,这是一个自定义队列了。我们也可以按照这个思路自定义一个队列,让其可以对 Capacity 参数进行修改即可。把 LinkedBlockingQueue 粘贴一份出来,修改个名字,然后把 Capacity 参数的 final 修饰符去掉,并提供其对应的 get/set 方法。然后在程序里面把原来的队列换掉即可。
不过比较好的是开源项目自带监控告警和配置文件配置会比较全面一点。具体配置还是得做大量修改、发布、验证。不过也可以从网上常说的理论值入手去尝试。
参考
【超详细】Java线程池源码解析 - 掘金 (juejin.cn)
Java线程池实现原理及其在美团业务中的实践 - 美团技术团队 (meituan.com)
相关文章:

线程池源码解析项目中如何配置线程池
目录 基础回顾 线程池执行任务流程 简单使用 构造函数 execute方法 execute中出现的ctl属性 execute中出现的addWorker方法 addWorker中出现的addWorkerFailed方法 addWorker中出现的Worker类 Worker类中run方法出现的runWorker方法 runWorker中出现的getTask runWo…...

Echarts 更改K线度颜色,解释K线图4个数字意义
第019个点击查看专栏目录本示例修改K线度的颜色,方法参考源代码。 这里面讲一下K线图的四个数字,如[20, 34, 10, 38], 第一位:20代表开盘价格, 第二位:34代表闭盘价格, 第三位:10代表最低价&…...
JavaScript和Java两种方法实现百度地图和高德、腾讯地图的相互转换
目录一、常见的经纬度标准二、百度地图和高德、腾讯地图经纬度的转换1、前端JavaScript转换2、后端Java实现转换一、常见的经纬度标准 高德、腾讯(使用GCJ02) GCJ-02坐标系,也称火星坐标系,由中国国家测绘局在02年发布࿰…...
Vue中常见的几种组件间通信方法
1.props(父传子) 父组件Parent.vue <template><child :msg"message"></child> </template>父组件通过:val"value"的形式定义要传给子组件的值value绑定到val上 子组件Child.vue export default {//写法一…...

Outcome VS. Output:研发效能提升中,谁会更胜一筹?
2007 年,网景通信公司(Netscape)的联合创始人 Marc Andreessen 在博客 The Pmarca Guide to Startups 中提出 「Product/Market Fit」 ,他写道, 「这意味着在一个良好的市场中,拥有能够满足该市场的产品。」…...
ptp4l与phc2sys进行系统时钟同步
linuxptp用于时钟同步。安装采用apt install linuxptp主要包含2个程序,ptp4l 进行时钟同步,实时网卡时钟与远端的时钟同步,支持1588 和 802.1AS 两种协议phc2sys 将网卡上的时钟同步到操作系统,或者反之命令demo:某主机P通过eth2连…...
使用注解JSON序列化
JsonSerialize(using ToStringSerializer.class) 将返回数据转成String序列化 JsonFormat(pattern "yyyy-MM-dd hh:mm",timezone"GMT8") 将日期数据转换成特定格式 使用JsonSerialize自定义注解接口 定义接口 import java.lang.annotation.ElementTyp…...

kubernetes教程 --Pod生命周期
Pod生命周期 pod创建过程运行初始化容器(init container)过程运行主容器(main container)过程 容器启动后钩子(post start)、容器终止前钩子(pre stop)容器的存活性探测(…...

高校房产管理系统用到了哪些技术?
数图互通高校房产管理系统是基于公司自主研发的FMCenterV5.0平通过在中国100多所高校的成功实施和迭代,形成了一套成熟、完善、全生命周期的房屋资源管理解决方案。台,是针对中国高校房产的管理特点和管理要求,研发的一套标准产品;…...
【Python学习笔记】37.Python3 MySQL - mysql-connector 驱动(2)
前言 本章继续介绍MySQL - mysql-connector 驱动。 where 条件语句 如果我们要读取指定条件的数据,可以使用 where 语句: demo_mysql_test.py 读取 name 字段为 CSDN 的记录: import mysql.connectormydb mysql.connector.connect(host…...
【高级Java】高级Java实验
一、反射与动态代理1、(4分)请通过反射技术,为附件中的Person.class生成相应的.java代码,java代码中的方法的方法体为空,即方法内部代码不用生成。请注意生成的java代码的格式。2、(3分)请为第1…...

SYN480R 解码
目录1.空载情况下2.当有按键被按下3.数据帧分析4.同步码5.数据码6.对24位数据帧分析1.空载情况下 在空载情况下,syn480r 输出引脚,输出的是杂乱无序的波形 2.当有按键被按下 按下按键,会连续输出相同的脉冲波形,放大分析 3.数据…...

ASP .NET(基于.NET 6.0)源码解读
这几天一直在琢磨在我现有技术认知基础上,未来如何做技术提升。 日思夜想,我整理出了我自己的一套学习规划方案,并希望在实施过程中能够不断调整学习方案与方式,以接近自我提升的效率最大化。 从以下几个大的方面来得到提升&…...

阿里工作7年,一个30岁女软件测试工程师的心路历程
简单的先说一下,坐标杭州,14届本科毕业,算上年前在阿里巴巴的面试,一共有面试了有6家公司(因为不想请假,因此只是每个晚上去其他公司面试,所以面试的公司比较少) 其中成功的有4家&am…...

学生党必备的 Keychron 无线机械键盘
学生党必备的 Keychron 无线机械键盘 由于专业需要,之间的键盘使用起来不太舒服,于是准备重新买一个适合工作学习的键盘,于是通过朋友介绍了解到了keychron k3pro,当时也看到网上一些资料说道这款键盘比较到位,今天就来带大家了解…...

FPGA MAX 10 10M50系列10M50DAF484C8G/10M50DAF484C7G/10M50DCF484C7G规格
介绍MAX 10器件是单芯片、非易失性低成本可编程逻辑器件(pld),用于集成最优的系统组件集。MAX 10设备的亮点包括:内部存储双配置闪存用户闪存即时支持集成模数转换器(adc)支持Nios II单芯片软核处理器MAX 10设备是系统管理、I/O扩展、通信控制平面、工业、汽车和消费…...
【codequ】Java学习路线整理(韩顺平)
文章目录Java学习路线一、Java基础1.建立编程思想Java概述变量运算符控制结构数据、排序和查找面向对象编程(基础)面向对象编程(中级)项目&学以致用2.提升编程能力3.分析需求,代码实现能力Java8新特性二、Java高级…...

服务器容器配置日志(Linux+x86_64+Ubuntu18.04+CUDA11.0+python3.7)
一、创建并进入容器 (平台使用教学详细,这部分略写) 登上服务器后,打开终端输入如下进入自己建的容器 ssh -p XXXXX root10.XXX.XXX.XXX //按自己的宿主机端口写二、安装Conda(miniconda3) (…...
2023年美赛赛题思路分析
2023年的赛题A-F题的整体难度不算太难,难度在于数据的收集上。整体难度上来看,难度上F题难度最小,建议直接上手。本次先给大家分享一些数据网站,在对各题做简单的思路分析。1、美国国家海洋和大气管理局Homepage | National Ocean…...
[C++]服务器与客户端建立连接与检测断开的demo
该程序在IP127.0.0.1以及端口5000环境下测试 有一段时间没有在Windows下用C进行网络编程了,这段日子都在做QT的网络编程和OpenCV的图像识别。 今天重新写个Windows下C的,基于TCP的双端连接建立与断开检测的demo,巩固下自己Windows下的网络编程…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...

JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
React Native 导航系统实战(React Navigation)
导航系统实战(React Navigation) React Navigation 是 React Native 应用中最常用的导航库之一,它提供了多种导航模式,如堆栈导航(Stack Navigator)、标签导航(Tab Navigator)和抽屉…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...

HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...