ThreadPoolExecutor源码阅读流程图
1.创建线程池
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
10–核心线程数
20–最大线程数
1 TimeUnit.MINUTES 非核心线程数存活时间 1分钟
new ArrayBlockingQueue(100) 阻塞队列类型 数组类型传入队列长度,需要无限长度可以使用链表类型LinkedBlockingDeque
线程工厂ThreadFactory和超出队列的策略ThreadPoolExecutor.AbortPolicy(抛出异常)暂时先按默认来。
ThreadPoolExecutor executor = new ThreadPoolExecutor
(10,20,1, TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(100));
创建线程池的时候是不会启动线程的,需要在执行具体业务逻辑时候才会执行
2.ThreadPoolExecutor重要参数及方法介绍
//ctl Int原子操作类,32位,前三位代表线程池状态,后28位记录线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;//线程池5种状态
//RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】
private static final int RUNNING = -1 << COUNT_BITS;
//SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
private static final int SHUTDOWN = 0 << COUNT_BITS;//执行shutDown()方法
//STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】
private static final int STOP = 1 << COUNT_BITS;//执行shutDownNow()方法
//TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
private static final int TIDYING = 2 << COUNT_BITS;
//TERMINATED状态 terminated()执行完之后就会转变为TERMINATED
private static final int TERMINATED = 3 << COUNT_BITS;//获取线程池状态
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
//获取当前工作线程数
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
2.1线程的五种状态
- RUNNING状态【可提交新任务】和【可执行阻塞队列中的任务】 11100000 00000000 00000000 00000000
- SHUTDOWN状态【不可提交新任务】提交新任务会抛出异常和【可执行阻塞队列中的任务】
00000000 00000000 00000000 00000000 - STOP状态【不可提交新任务】和【不可执行阻塞队列中的任务】 00100000 00000000 00000000 00000000
- TIDYING状态 所有任务都终止了,线程池中也没有线程了,这样线程池的状态就会转为TIDYING,一旦达到此状态,就会调用线程池的terminated()
00100000 00000000 00000000 00000000 - TERMINATED状态 terminated()执行完之后就会转变为TERMINATED 01100000 00000000 00000000 00000000
private static boolean runStateLessThan(int c, int s) {return c < s;
}private static boolean runStateAtLeast(int c, int s) {return c >= s;
}private static boolean isRunning(int c) {return c < SHUTDOWN;
}/*** Attempts to CAS-increment the workerCount field of ctl.* 通过CAS来对当前工作线程数增加*/
private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
}/*** Attempts to CAS-decrement the workerCount field of ctl.* 通过CAS来对当前工作线程数减少*/
private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
}
任务执行流程图
3.提交任务execute
executor.execute(new Runnable() {@Overridepublic void run() {//业务代码}
});
public void execute(Runnable command) {if (command == null)throw new NullPointerException();//获取ctl 初始值为ctlOf(RUNNING, 0) 运行状态,工作线程数0int c = ctl.get();//计算获取工作线程数<核心线程数if (workerCountOf(c) < corePoolSize) {//当前command增加为核心工作线程,添加失败下面会进行入队操作if (addWorker(command, true))return;c = ctl.get();}//判断线程池状态(判断是因为防止别的线程把状态进行修改)//workQueue.offer(command) 加入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再对线程池状态二次检查,如果不是running则移除队列if (! isRunning(recheck) && remove(command))//拒绝策略,默认抛出异常reject(command);else if (workerCountOf(recheck) == 0)//这里就是执行队列中的任务,下面addWorker里面有体现和讲解addWorker(null, false);}//线程池达到最大了的maxPool,添加失败执行拒绝策略else if (!addWorker(command, false))reject(command);}
3.1 submit
Future<?> submit = executor.submit(new Runnable() {@Overridepublic void run() {//业务代码}
});
这个里面执行了execute,多了一个返回Future
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
4.addWorker
这里不同版本jdk有差异
private boolean addWorker(Runnable firstTask, boolean core) {//类似于gotoretry:for (int c = ctl.get();;) {// 线程池状态>=SHUTDOWN 并且 线程池状态>=STOP或者传入的任务!=null或者阻塞队列为空则返回if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP)|| firstTask != null|| workQueue.isEmpty()))return false;for (;;) {////判断工作的线程是否超过核心线程数或者最大线程数,addWork时候会传入coreif (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;//如果没有超过核心线程数或者最大线程数,这里通过cas对工作线程数量增加,多个竞争失败的话循环cas操作if (compareAndIncrementWorkerCount(c))break retry;//跳出外层循环c = ctl.get(); // Re-read ctl//如果线程池状态>=SHUTDOWN 跳到外层循环继续执行if (runStateAtLeast(c, SHUTDOWN))continue retry;}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建任务Worker,会利用线程工厂去创建一个线程默认的是/*** Worker(Runnable firstTask) {* //这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0* // 正常应该是acquire时候+1 release时候-1 这里重写过方法* setState(-1); * this.firstTask = firstTask;* this.thread = getThreadFactory().newThread(this); }**/w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//这里的mainLock是对Workers进行操作的,防止出现并发问题//用锁是因为private final HashSet<Worker> workers = new HashSet<>(); 这个不是线程安全的final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int c = ctl.get();//线程池如果是RUNNING状态// 或者状态<STOP并且传入的任务为空 这个是从阻塞队列里面拿任务执行if (isRunning(c) ||(runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive()) // 如果线程已经在运行,就抛出异常throw new IllegalThreadStateException();//添加任务到工作线程的容器里workers.add(w);int s = workers.size();//largestPoolSize 这个是记录工作线程数,没看到具体作用,但既然有肯定是有用的if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//这里才到线程运行if (workerAdded) {t.start();workerStarted = true;}}} finally {//这里类似于一个回滚操作,异常情况会对worker进行移除,修改ctlif (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
5.Worker相关
5.1 构造器
Worker(Runnable firstTask) {//这个状态有0 -1 1 创建时候为-1,运行时候改为1,运行结束改为0// 正常应该是acquire时候+1 release时候-1 这里重写过方法setState(-1); this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); }
5.2 tryAcquire和tryRelease
重写过从+1,-1变成cas为1和设置为0,0代表执行完任务空闲,1代表在执行任务,里面有个
protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;
}
protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;
}
5.3 runWork
public void run() {runWorker(this);}final void runWorker(Worker w) {//获取当前工作线程Thread wt = Thread.currentThread();//获取需要执行的任务Runnable task = w.firstTask;w.firstTask = null;w.unlock(); boolean completedAbruptly = true;try {//执行任务不为空 或者 队列中获取到了需要执行的任务//如果没有获取到getTask是会阻塞的while (task != null || (task = getTask()) != null) {w.lock();//如果线程池状态>=STOP 并且当前线程没有被打断//线程池被打断并且线程池状态>=STOP 并且当前线程没有被打断//这里是对线程池状态作验证,如果状态发生了变更则要去尝试中断线程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行前切面 可以用来记录工作中线程和计算空闲线程,Tomcat线程池有这个行为beforeExecute(wt, task);try {task.run();//执行后或异常切面afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;//执行任务数w.completedTasks++;w.unlock();}}//正常执行才会为false,表示正常退出completedAbruptly = false;} finally {//执行失败completedAbruptly为trueprocessWorkerExit(w, completedAbruptly);}}
5.4 getTask()
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 线程池状态不为RUNNING,队列为空就不需要处理任务了,直接返回空,上层runWorker也会正常退出循环if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}//工作中的线程数量int wc = workerCountOf(c);// 核心线程是否超时回收标志,可以通过executor.allowCoreThreadTimeOut(true);设置//工作线程数量>核心线程数量//用来判断是否是无限阻塞boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//大于最大线程数或者超时 并且 工作线程数量>1或者队列为空 则ctl减少// && (wc > 1 || workQueue.isEmpty()) 这个判断就是要留下至少一个线程去处理队列中的任务if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//超时阻塞和无限阻塞Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//超时,循环时会去处理返回nulltimedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
6.shutdown()
shutdown会把线程池状态修改为SHUTDOWN,提交新任务会抛出异常,但会继续执行队列中的任务。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//修改状态为SHOUTDOWN,并修改ctladvanceRunState(SHUTDOWN);//这里会中断工作中的线程interruptIdleWorkers();onShutdown(); // 空方法} finally {mainLock.unlock();}tryTerminate();
}
//中间还有个方法,传入的onlyOne为false
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//遍历Workers,遍历前加锁for (Worker w : workers) {Thread t = w.thread;//把没有被打断并且没有在工作中的线程打断//获取到锁说明线程是空闲的,没有获取到锁说明在执行任务if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}
7.shutdownNow()
shutdownNow会把线程池状态修改为STOP,提交新任务会抛出异常,也不执行队列中的任务。但会返回队列中的任务。
List<Runnable> runnables = executor.shutdownNow();
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//修改状态为STOP,并修改ctladvanceRunState(STOP);//中断线程interruptWorkers();//返回队列中的任务tasks = drainQueue();} finally {mainLock.unlock();}//最后一个线程结束时候会把线程池状态改为TERMINATEDtryTerminate();return tasks;
}
private void interruptWorkers() {//中断所有工作线程for (Worker w : workers)w.interruptIfStarted();
}void interruptIfStarted() {Thread t;//getState() >= 0 代表空闲线程和正常执行中的线程,不为空并且没有被打断的就执行打断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}
相关文章:

ThreadPoolExecutor源码阅读流程图
1.创建线程池 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), def…...

如何通过筛选高质量爬虫IP提升爬虫效率?
前言 对于做数据抓取的技术员来说,如何稳定高效的爬取数据ip库池起到决定性作用,对于爬虫ip池的维护,可以从以下几个方面入手: 目录 一、验证爬虫ip的可用性二、更新爬虫ip池三、维护爬虫ip的质量四、监控爬虫ip的使用情况 一、验…...

C#中定义数组--字符串及数组操作
C#中定义数组–字符串及数组操作 以前用VB的时候经常使用数组,不过C#用习惯后数组基本上用的不多了。 像用List<>,ArrayList,Dirctionary<,>都比较好用。 一、一维: int[] numbers new int[]{1,2,3,4,5,6}; //不…...

嵌入式就业怎么样?
嵌入式就业怎么样? 现在的IT行业,嵌入式是大热门,下面也要来给大家介绍下学习嵌入式之后的发展以及就业怎么样。 首先是好找工作。嵌入式人才目前是处于供不应求的状态中,据权威统计机构统计在所有软件开发类人才的需求中,对嵌入式工程师的…...

用户订阅付费如何拆解分析?看这篇就够了
会员制的订阅付费在影音娱乐行业中已相当普及,近几年,不少游戏厂商也开始尝试订阅收费模式。在分析具体的用户订阅偏好以及订阅付费模式带来的增长效果时,我们常常会有这些疑问: 如何从用户的整体付费行为中具体拆解订阅付费事件…...

智能合约中如何调用其他智能合约
智能合约是区块链技术中的一项关键功能,它可以让开发者编写代码来自动执行一系列的操作,从而实现各种复杂的业务逻辑。在许多应用场景中,一个智能合约可能需要调用另一个智能合约来完成某些任务。本文将介绍智能合约如何调用其他智能合约&…...

python的多任务处理
在现代计算机系统中,多任务处理是一项重要的技术,可以大幅提高程序的运行效率。Python语言提供了多种多任务处理的方式,本文将介绍其中几种常见的方式,包括多进程、多线程和协程。 多进程 进程是计算机中运行程序的实例…...

Vue收集表单数据学习笔记
收集表单数据 v-model双向数据绑定,收集的是input框的value,单选按钮不存在value,就像代码中的男女选项,即使绑定性别v-model“sex”,控制台依然不能接收性别的值,因为没有value值,,…...

Linux搭建GitLab私有仓库,并内网穿透实现公网访问
文章目录 前言1. 下载Gitlab2. 安装Gitlab3. 启动Gitlab4. 安装cpolar5. 创建隧道配置访问地址6. 固定GitLab访问地址6.1 保留二级子域名6.2 配置二级子域名 7. 测试访问二级子域名 转载自远控源码文章:Linux搭建GitLab私有仓库,并内网穿透实现公网访问 …...

SpringBoot项目防重复提交注解开发
背景 在实际开发过程中,防重复提交的操作很常见。有细分配置针对某一些路径进行拦截,也有基于注解去实现的指定方法拦截的。 分析 实现原理 实现防重复提交,我们很容易想到就是用过滤器或者拦截器来实现。 使用拦截器就是继承HandlerInt…...

从软件哲学角度谈 Amazon SageMaker
如果你喜欢哲学并且你是一个 IT 从业者,那么你很可能对软件哲学感兴趣,你能发现存在于软件领域的哲学之美。本文我们就从软件哲学的角度来了解一下亚马逊云科技的拳头级产品 Amazon SageMaker,有两个出发点:一是 SageMaker 本身设…...

C++内联函数
目录 一、常规函数和内联函数的对比 二、如何使用 三、内联函数的特性 四、内联函数与宏 五、如何查看内联函数 六、【面试题】 前言-----内联函数是C中为程序运行速度所做的一项该进。常规函数和内联函数之间的主要区别不在于编写方式,而在于C编译器如何将他…...

JAVA大师的秘籍:轻松掌握高质量代码之道
如果你想写出高质量的代码,那掌握编写技巧可是必不可少哦!这不仅能让你的代码变得更加易读易维护,还可以让你的应用程序性能更强、稳定性更高!所以,别怕麻烦,多花些时间和心思在代码上,相信你一定能成为优秀的JAVA开发者! 要想让代码易读易维护、性能稳定,得拿出耐心和…...

OpenGL入门教程之 变换
引言 这是一个闪耀的时刻,因为我们即将能生产出令人惊叹的3D效果! 变换 向量和矩阵变换包括太多内容,但由于学过线性代数和GAMES101,因此不在此做过多阐述。仅阐述包括代码的GLM内容。 GLM的使用 (1)GLM…...

ASPICE详细介绍-4.车载项目为什么要符合ASPICE标准?
目录 车载项目为什么要符合ASPICE标准?ASPICE与功能安全的关系、区别?各大车厂对软件体系的要求 车载项目为什么要符合ASPICE标准? ASPICE(Automotive Software Process Improvement and Capability Determination)最…...

一文彻底理解Java 17中的新特性密封类
密封类的作用 在面向对象语言中,我们可以通过继承(extend)来实现类的能力复用、扩展与增强。但有的时候,有些能力我们不希望被继承了去做一些不可预知的扩展。所以,我们需要对继承关系有一些限制的控制手段。而密封类…...

【Git 入门教程】第四节、Git冲突:如何解决版本控制的矛盾
Git是目前最流行的版本控制系统之一,它为团队协作开发提供了方便和高效的方式。然而,在多人同时修改同一个文件时,可能会出现代码冲突(conflict),导致代码无法正确合并。那么,如何解决Git冲突呢…...

c++验证用户输入合法性的示例代码
c验证用户输入合法性的示例代码 本文介绍c验证用户输入合法性,用于检测限定用户输入值。包括:1、限定用户输入为整数(正负整数);2、限定用户输入为正整数;3、限定用户输入为正数(可以含有小数&…...

ctfshow web入门phpcve web311-315
1.web311 通过抓包发现php版本时为PHP/7.1.33dev 漏洞cve2019-11043 远程代码执行漏洞 利用条件: nginx配置了fastcgi_split_path_info 受影响系统: PHP 5.6-7.x,Nginx>0.7.31 下载工具进行利用 需要安装go环境 yum install golang -y …...

gpt.4.0-gpt 国内版
gpt 使用 GPT(Generative Pre-trained Transformer)是一种预训练的语言模型,可用于多种自然语言处理任务,如情感分析、文本分类、文本生成等。下面是使用GPT的一些步骤和建议: 确定任务和数据集:首先&…...

放弃手动测试,快来了解JMeter压测神器的安装和使用吧~~
目录:导读 引言 jmeter的安装 JMeter是干什么的 JMeter都可以做那些测试 JMeter的使用和组件介绍 下面我们进行XML格式的实战练习 jmeter与postman的区别 JSON的插件 另附视频教程资源 引言 你是否曾经为手动测试而苦恼?是不是觉得手动测试太费…...

SQL函数
文章目录 一、SQL 函数二、SQL COUNT() 函数三、SQL FIRST() 函数四、SQL LAST() 函数五、SQL MAX() 函数总结 一、SQL 函数 SQL 拥有很多可用于计数和计算的内建函数。 SQL Aggregate 函数 SQL Aggregate 函数计算从列中取得的值,返回一个单一的值。 有用的 Aggre…...

苦熬10年,国产操作系统“归零”,新操作系统上新,跟Excel很像
苦熬10余年,国产操作系统自主研发 说到国内自主研发的操作系统,经验最丰富的品牌,当然是麒麟OS. 从诞生到发展,历经10多年的努力,麒麟os逐渐成为了国内自主研发操作系统领域中的一颗耀眼的明珠。麒麟OS不仅推出了许多…...

什么是shell脚本和简单shell脚本练习
文章目录 什么是shell脚本和简单shell脚本练习什么是shell脚本为什么要学习shell脚本第一个脚本编写与执行编写第一个脚本 简单的shell脚本练习简单案例交互式脚本:变量内容由用户决定随日期变化:利用date建立文件数值运算:简单的加减乘除数值…...

MySQL MyBatis
MySQL从表中随机查一条数据 SELECT * FROM address ORDER BY RAND() LIMIT 1MySQL查询表是否存在 select count(*) from information_schema.TABLES where table_name #{tableName}插入数据插入随机的uuid <insert id"insertComment" parameterType"com.…...

Leetcode力扣秋招刷题路-0802
从0开始的秋招刷题路,记录下所刷每道题的题解,帮助自己回顾总结 802. 找到最终的安全状态 有一个有 n 个节点的有向图,节点按 0 到 n - 1 编号。图由一个 索引从 0 开始 的 2D 整数数组 graph表示, graph[i]是与节点 i 相邻的节…...

编程中最难的就是命名?这几招教你快速上手
作者:陈立(勤仁) 你可不能像给狗狗取名字那样给类、方法、变量命名。仅仅因为它很可爱或者听上去不错。 在写代码的时候,你要经常想着,那个最终维护你代码的人可能将是一个有暴力倾向的疯子,并且他还知道你住在哪里。 01 为什么…...

NUXT规范及常见问题
props中不要使用Web环境才有的对象,服务端渲染的时候会失败 使用<Nuxt/>组件代替<router-view/>,使用<NuxtLink/>代替<router-link/>static目录下的资源是静态资源,不应该通过import或../static/img/logo.png等方式…...

2023年Q1天猫空调品牌销量排行榜
如今,空调的普及水平较高,空调行业进入存量换新为主的发展阶段。 根据鲸参谋数据分析平台的相关数据显示,2023年Q1在天猫平台上,空调的销量将近100万件,销售额将近30亿,同时,空调产品的产品均价…...

如何在比特币系统内创造人工生命
信息来源:coingeek.com 自2015年以来,关于比特币能否进行复杂计算以及比特币是否“图灵完备”的争论一直在持续。不幸的是,现在存在着一种流传甚广的谬论,有人说比特币并非图灵完备的,它不能像以太坊区块链那样进行复杂…...