ThreadPoolExecutor源码阅读流程图
1.创建线程池
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
10–核心线程数
20–最大线程数
1 TimeUnit.MINUTES 非核心线程数存活时间 1分钟
new ArrayBlockingQueue(100) 阻塞队列类型 数组类型传入队列长度,需要无限长度可以使用链表类型LinkedBlockingDeque
线程工厂ThreadFactory和超出队列的策略ThreadPoolExecutor.AbortPolicy(抛出异常)暂时先按默认来。
ThreadPoolExecutor executor = new ThreadPoolExecutor
(10,20,1, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(100));
创建线程池的时候是不会启动线程的,需要在执行具体业务逻辑时候才会执行
2.ThreadPoolExecutor重要参数及方法介绍
//ctl Int原子操作类,32位,前三位代表线程池状态,后28位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//线程池5种状态
//RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
private static final int SHUTDOWN = 0 << COUNT_BITS;//执行shutDown()方法
//STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】
private static final int STOP = 1 << COUNT_BITS;//执行shutDownNow()方法
//TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//TERMINATED状态 terminated()执行完之后就会转变为TERMINATED
private static final int TERMINATED = 3 << COUNT_BITS;//获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//获取当前工作线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.1线程的五种状态
- RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】 11100000 00000000 00000000 00000000
- SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
00000000 00000000 00000000 00000000 - STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】 00100000 00000000 00000000 00000000
- TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
00100000 00000000 00000000 00000000 - TERMINATED状态 terminated()执行完之后就会转变为TERMINATED 01100000 00000000 00000000 00000000
private static boolean runStateLessThan(int c, int s) {return c < s;
}private static boolean runStateAtLeast(int c, int s) {return c >= s;
}private static boolean isRunning(int c) {return c < SHUTDOWN;
}/*** Attempts to CAS-increment the workerCount field of ctl.* 通过CAS来对当前工作线程数增加*/
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}/*** Attempts to CAS-decrement the workerCount field of ctl.* 通过CAS来对当前工作线程数减少*/
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
}
任务执行流程图

3.提交任务execute
executor.execute(new Runnable() {@Overridepublic void run() {//业务代码}
});
public void execute(Runnable command) {if (command == null)throw new NullPointerException();//获取ctl 初始值为ctlOf(RUNNING, 0) 运行状态,工作线程数0int c = ctl.get();//计算获取工作线程数<核心线程数if (workerCountOf(c) < corePoolSize) {//当前command增加为核心工作线程,添加失败下面会进行入队操作if (addWorker(command, true))return;c = ctl.get();}//判断线程池状态(判断是因为防止别的线程把状态进行修改)//workQueue.offer(command) 加入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再对线程池状态二次检查,如果不是running则移除队列if (! isRunning(recheck) && remove(command))//拒绝策略,默认抛出异常reject(command);else if (workerCountOf(recheck) == 0)//这里就是执行队列中的任务,下面addWorker里面有体现和讲解addWorker(null, false);}//线程池达到最大了的maxPool,添加失败执行拒绝策略else if (!addWorker(command, false))reject(command);}
3.1 submit
Future<?> submit = executor.submit(new Runnable() {@Overridepublic void run() {//业务代码}
});
这个里面执行了execute,多了一个返回Future
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
4.addWorker
这里不同版本jdk有差异
private boolean addWorker(Runnable firstTask, boolean core) {//类似于gotoretry:for (int c = ctl.get();;) {// 线程池状态>=SHUTDOWN 并且 线程池状态>=STOP或者传入的任务!=null或者阻塞队列为空则返回if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {////判断工作的线程是否超过核心线程数或者最大线程数,addWork时候会传入coreif (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;//如果没有超过核心线程数或者最大线程数,这里通过cas对工作线程数量增加,多个竞争失败的话循环cas操作if (compareAndIncrementWorkerCount(c))break retry;//跳出外层循环c = ctl.get(); // Re-read ctl//如果线程池状态>=SHUTDOWN 跳到外层循环继续执行if (runStateAtLeast(c, SHUTDOWN))continue retry;}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建任务Worker,会利用线程工厂去创建一个线程默认的是/*** Worker(Runnable firstTask) {* //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0* // 正常应该是acquire时候+1 release时候-1 这里重写过方法* setState(-1); * this.firstTask = firstTask;* this.thread = getThreadFactory().newThread(this); }**/w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//这里的mainLock是对Workers进行操作的,防止出现并发问题//用锁是因为private final HashSet<Worker> workers = new HashSet<>(); 这个不是线程安全的final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int c = ctl.get();//线程池如果是RUNNING状态// 或者状态<STOP并且传入的任务为空 这个是从阻塞队列里面拿任务执行if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive()) // 如果线程已经在运行,就抛出异常throw new IllegalThreadStateException();//添加任务到工作线程的容器里workers.add(w);int s = workers.size();//largestPoolSize 这个是记录工作线程数,没看到具体作用,但既然有肯定是有用的if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//这里才到线程运行if (workerAdded) {t.start();workerStarted = true;}}} finally {//这里类似于一个回滚操作,异常情况会对worker进行移除,修改ctlif (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
5.Worker相关

5.1 构造器
Worker(Runnable firstTask) {//这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0// 正常应该是acquire时候+1 release时候-1 这里重写过方法setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); }
5.2 tryAcquire和tryRelease
重写过从+1,-1变成cas为1和设置为0,0代表执行完任务空闲,1代表在执行任务,里面有个
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;
}
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;
}
5.3 runWork
public void run() {runWorker(this);}final void runWorker(Worker w) {//获取当前工作线程Thread wt = Thread.currentThread();//获取需要执行的任务Runnable task = w.firstTask;w.firstTask = null;w.unlock(); boolean completedAbruptly = true;try {//执行任务不为空 或者 队列中获取到了需要执行的任务//如果没有获取到getTask是会阻塞的while (task != null || (task = getTask()) != null) {w.lock();//如果线程池状态>=STOP 并且当前线程没有被打断//线程池被打断并且线程池状态>=STOP 并且当前线程没有被打断//这里是对线程池状态作验证,如果状态发生了变更则要去尝试中断线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行前切面 可以用来记录工作中线程和计算空闲线程,Tomcat线程池有这个行为beforeExecute(wt, task);try {task.run();//执行后或异常切面afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;//执行任务数w.completedTasks++;w.unlock();}}//正常执行才会为false,表示正常退出completedAbruptly = false;} finally {//执行失败completedAbruptly为trueprocessWorkerExit(w, completedAbruptly);}}
5.4 getTask()
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 线程池状态不为RUNNING,队列为空就不需要处理任务了,直接返回空,上层runWorker也会正常退出循环if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}//工作中的线程数量int wc = workerCountOf(c);// 核心线程是否超时回收标志,可以通过executor.allowCoreThreadTimeOut(true);设置//工作线程数量>核心线程数量//用来判断是否是无限阻塞boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//大于最大线程数或者超时 并且 工作线程数量>1或者队列为空 则ctl减少// && (wc > 1 || workQueue.isEmpty()) 这个判断就是要留下至少一个线程去处理队列中的任务if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//超时阻塞和无限阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//超时,循环时会去处理返回nulltimedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
6.shutdown()
shutdown会把线程池状态修改为SHUTDOWN,提交新任务会抛出异常,但会继续执行队列中的任务。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//修改状态为SHOUTDOWN,并修改ctladvanceRunState(SHUTDOWN);//这里会中断工作中的线程interruptIdleWorkers();onShutdown(); // 空方法} finally {mainLock.unlock();}tryTerminate();
}
//中间还有个方法,传入的onlyOne为false
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//遍历Workers,遍历前加锁for (Worker w : workers) {Thread t = w.thread;//把没有被打断并且没有在工作中的线程打断//获取到锁说明线程是空闲的,没有获取到锁说明在执行任务if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}
7.shutdownNow()
shutdownNow会把线程池状态修改为STOP,提交新任务会抛出异常,也不执行队列中的任务。但会返回队列中的任务。
List<Runnable> runnables = executor.shutdownNow();
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//修改状态为STOP,并修改ctladvanceRunState(STOP);//中断线程interruptWorkers();//返回队列中的任务tasks = drainQueue();} finally {mainLock.unlock();}//最后一个线程结束时候会把线程池状态改为TERMINATEDtryTerminate();return tasks;
}
private void interruptWorkers() {//中断所有工作线程for (Worker w : workers)w.interruptIfStarted();
}void interruptIfStarted() {Thread t;//getState() >= 0 代表空闲线程和正常执行中的线程,不为空并且没有被打断的就执行打断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}
相关文章:
ThreadPoolExecutor源码阅读流程图
1.创建线程池 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), def…...
如何通过筛选高质量爬虫IP提升爬虫效率?
前言 对于做数据抓取的技术员来说,如何稳定高效的爬取数据ip库池起到决定性作用,对于爬虫ip池的维护,可以从以下几个方面入手: 目录 一、验证爬虫ip的可用性二、更新爬虫ip池三、维护爬虫ip的质量四、监控爬虫ip的使用情况 一、验…...
C#中定义数组--字符串及数组操作
C#中定义数组–字符串及数组操作 以前用VB的时候经常使用数组,不过C#用习惯后数组基本上用的不多了。 像用List<>,ArrayList,Dirctionary<,>都比较好用。 一、一维: int[] numbers new int[]{1,2,3,4,5,6}; //不…...
嵌入式就业怎么样?
嵌入式就业怎么样? 现在的IT行业,嵌入式是大热门,下面也要来给大家介绍下学习嵌入式之后的发展以及就业怎么样。 首先是好找工作。嵌入式人才目前是处于供不应求的状态中,据权威统计机构统计在所有软件开发类人才的需求中,对嵌入式工程师的…...
用户订阅付费如何拆解分析?看这篇就够了
会员制的订阅付费在影音娱乐行业中已相当普及,近几年,不少游戏厂商也开始尝试订阅收费模式。在分析具体的用户订阅偏好以及订阅付费模式带来的增长效果时,我们常常会有这些疑问: 如何从用户的整体付费行为中具体拆解订阅付费事件…...
智能合约中如何调用其他智能合约
智能合约是区块链技术中的一项关键功能,它可以让开发者编写代码来自动执行一系列的操作,从而实现各种复杂的业务逻辑。在许多应用场景中,一个智能合约可能需要调用另一个智能合约来完成某些任务。本文将介绍智能合约如何调用其他智能合约&…...
python的多任务处理
在现代计算机系统中,多任务处理是一项重要的技术,可以大幅提高程序的运行效率。Python语言提供了多种多任务处理的方式,本文将介绍其中几种常见的方式,包括多进程、多线程和协程。 多进程 进程是计算机中运行程序的实例…...
Vue收集表单数据学习笔记
收集表单数据 v-model双向数据绑定,收集的是input框的value,单选按钮不存在value,就像代码中的男女选项,即使绑定性别v-model“sex”,控制台依然不能接收性别的值,因为没有value值,,…...
Linux搭建GitLab私有仓库,并内网穿透实现公网访问
文章目录 前言1. 下载Gitlab2. 安装Gitlab3. 启动Gitlab4. 安装cpolar5. 创建隧道配置访问地址6. 固定GitLab访问地址6.1 保留二级子域名6.2 配置二级子域名 7. 测试访问二级子域名 转载自远控源码文章:Linux搭建GitLab私有仓库,并内网穿透实现公网访问 …...
SpringBoot项目防重复提交注解开发
背景 在实际开发过程中,防重复提交的操作很常见。有细分配置针对某一些路径进行拦截,也有基于注解去实现的指定方法拦截的。 分析 实现原理 实现防重复提交,我们很容易想到就是用过滤器或者拦截器来实现。 使用拦截器就是继承HandlerInt…...
从软件哲学角度谈 Amazon SageMaker
如果你喜欢哲学并且你是一个 IT 从业者,那么你很可能对软件哲学感兴趣,你能发现存在于软件领域的哲学之美。本文我们就从软件哲学的角度来了解一下亚马逊云科技的拳头级产品 Amazon SageMaker,有两个出发点:一是 SageMaker 本身设…...
C++内联函数
目录 一、常规函数和内联函数的对比 二、如何使用 三、内联函数的特性 四、内联函数与宏 五、如何查看内联函数 六、【面试题】 前言-----内联函数是C中为程序运行速度所做的一项该进。常规函数和内联函数之间的主要区别不在于编写方式,而在于C编译器如何将他…...
JAVA大师的秘籍:轻松掌握高质量代码之道
如果你想写出高质量的代码,那掌握编写技巧可是必不可少哦!这不仅能让你的代码变得更加易读易维护,还可以让你的应用程序性能更强、稳定性更高!所以,别怕麻烦,多花些时间和心思在代码上,相信你一定能成为优秀的JAVA开发者! 要想让代码易读易维护、性能稳定,得拿出耐心和…...
OpenGL入门教程之 变换
引言 这是一个闪耀的时刻,因为我们即将能生产出令人惊叹的3D效果! 变换 向量和矩阵变换包括太多内容,但由于学过线性代数和GAMES101,因此不在此做过多阐述。仅阐述包括代码的GLM内容。 GLM的使用 (1)GLM…...
ASPICE详细介绍-4.车载项目为什么要符合ASPICE标准?
目录 车载项目为什么要符合ASPICE标准?ASPICE与功能安全的关系、区别?各大车厂对软件体系的要求 车载项目为什么要符合ASPICE标准? ASPICE(Automotive Software Process Improvement and Capability Determination)最…...
一文彻底理解Java 17中的新特性密封类
密封类的作用 在面向对象语言中,我们可以通过继承(extend)来实现类的能力复用、扩展与增强。但有的时候,有些能力我们不希望被继承了去做一些不可预知的扩展。所以,我们需要对继承关系有一些限制的控制手段。而密封类…...
【Git 入门教程】第四节、Git冲突:如何解决版本控制的矛盾
Git是目前最流行的版本控制系统之一,它为团队协作开发提供了方便和高效的方式。然而,在多人同时修改同一个文件时,可能会出现代码冲突(conflict),导致代码无法正确合并。那么,如何解决Git冲突呢…...
c++验证用户输入合法性的示例代码
c验证用户输入合法性的示例代码 本文介绍c验证用户输入合法性,用于检测限定用户输入值。包括:1、限定用户输入为整数(正负整数);2、限定用户输入为正整数;3、限定用户输入为正数(可以含有小数&…...
ctfshow web入门phpcve web311-315
1.web311 通过抓包发现php版本时为PHP/7.1.33dev 漏洞cve2019-11043 远程代码执行漏洞 利用条件: nginx配置了fastcgi_split_path_info 受影响系统: PHP 5.6-7.x,Nginx>0.7.31 下载工具进行利用 需要安装go环境 yum install golang -y …...
gpt.4.0-gpt 国内版
gpt 使用 GPT(Generative Pre-trained Transformer)是一种预训练的语言模型,可用于多种自然语言处理任务,如情感分析、文本分类、文本生成等。下面是使用GPT的一些步骤和建议: 确定任务和数据集:首先&…...
SpringBoot-17-MyBatis动态SQL标签之常用标签
文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
2021-03-15 iview一些问题
1.iview 在使用tree组件时,发现没有set类的方法,只有get,那么要改变tree值,只能遍历treeData,递归修改treeData的checked,发现无法更改,原因在于check模式下,子元素的勾选状态跟父节…...
使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装
以下是基于 vant-ui(适配 Vue2 版本 )实现截图中照片上传预览、删除功能,并封装成可复用组件的完整代码,包含样式和逻辑实现,可直接在 Vue2 项目中使用: 1. 封装的图片上传组件 ImageUploader.vue <te…...
用docker来安装部署freeswitch记录
今天刚才测试一个callcenter的项目,所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
Mac下Android Studio扫描根目录卡死问题记录
环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中,提示一个依赖外部头文件的cpp源文件需要同步,点…...
回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
