当前位置: 首页 > news >正文

Java多线程篇(3)——线程池

文章目录

  • 线程池
  • ThreadPoolExecutor源码分析
    • 1、如何提交任务
    • 2、如何执行任务
    • 3、如何停止过期的非核心线程
    • 4、如何使用拒绝策略
  • ScheduledThreadPoolExecutor源码分析

线程池

快速过一遍基础知识
7大参数
corePoolSize : 核心线程数
maximumPoolSize: 最大线程数
keepAliveTime: 空闲线程存活时间
TimeUnit: 时间单位
BlockingQueue:任务队列
ThreadFactory: 创建线程的工厂
RejectedExecutionHandler:拒绝策略

拒绝策略
AbortPolicy:中止策略,线程池会抛出异常并中止执行此任务;
CallerRunsPolicy:把任务交给添加此任务的(main)线程来执行;
DiscardPolicy:忽略此任务,忽略最新的一个任务;
DiscardOldestPolicy:忽略最早的任务,最先加入队列的任务。

内置的线程池
SingleThreadExecutor(单线程):1 - 1 - Interge.MAX(核心线程-最大线程-队列长度)
FixedThreadPool(固定大小):N - N - Interge.MAX
CachedThreadPool(缓存):0 - Integer.MAX - 0
ScheduledThreadPool(定时):线程池的另一个关于定时的分支

为什么不推荐使用内置的线程池?
SingleThreadExecutor和FixedThreadPool无法控制队列长度可能导致OOM ,而CachedThreadPool无法控制线程数量可能导致大量的线程创建。


ThreadPoolExecutor源码分析

先不考虑ScheduledThreadPool,后面再单独说明定时线程池。

1、如何提交任务

ThreadPoolExecutor#execute

 public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//新建核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//加入阻塞队列if (isRunning(c) && workQueue.offer(command)) {//双重检测int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);//如果当前没有正在运行的线程,则新增一个非核心线程(任务为null,表示线程的任务将会从阻塞队列中获取)else if (workerCountOf(recheck) == 0)addWorker(null, false);}//新建非核心线程else if (!addWorker(command, false))reject(command);}

也就是
在这里插入图片描述
submit和execute的区别
在这里插入图片描述
其实没啥太大的区别,submit最后也是调用的execute,只不过在调用之前封装了task为FutureTask,表示有返回值的任务,最后将返回值返回
不过有一点需要注意的是。FutureTask,不仅会返回结果,还会把原本runnable中的异常吃了。所以submit提交的任务如果抛异常了,外部是无法感知的
FutureTask#run
在这里插入图片描述
测试结果
在这里插入图片描述

2、如何执行任务

ThreadPoolExecutor#addWorker

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c = ctl.get();;) {if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))return false;for (;;) {//COUNT_MASK掩码,舍去前3位(因为前3位是状态位,后面的才是任务数)if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();if (runStateAtLeast(c, SHUTDOWN))continue retry;}}//上面主要是ctl++,其他很多都是检测boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建一个worker,封装了firstTask//(worker也实现了Runnable,相当于对firstTask封装了一层)w = new Worker(firstTask);//这里线程的runable实现是worker而不是firstTaskfinal Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int c = ctl.get();//一些检测if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {if (t.isAlive())throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//Thread.start()->runnable.run()也就是worker.run()->runWorker(worker)t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

addWorker新建worker对象,封装了新建的线程对象和原始task。线程的执行调用如下:
thread.start()->runnable.run()也就是worker.run()->runWorker(worker)
在这里插入图片描述

ThreadPoolExecutor#runWorker

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//worker的task为null(addWorker传入的参数)则从阻塞队列中获取一个taskwhile (task != null || (task = getTask()) != null) {w.lock();//检测是否需要中止线程if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())wt.interrupt();try {//执行前回调beforeExecute(wt, task);try {//执行任务task.run();//执行后回调afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// finally 调用processWorkerExit(w, completedAbruptly);}}

所以runWorker就是如果worker手上有task,就先把手头上的task执行了,然后再(循环)去阻塞队列获取task执行。如果没有就直接去阻塞队列获取task执行。

那么 finally 那里的 processWorkerExit 是干嘛用的?

执行到processWorkerExit要么就是异常情况跳出循环(completedAbruptly=true),要么就是worker手上和阻塞队列均没有task跳出循环(completedAbruptly=false)。

private void processWorkerExit(Worker w, boolean completedAbruptly) {//如果是异常退出的,此时workerCount还没调整,所以需要工作线程数减1if (completedAbruptly)decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();//更新 完成任务数,以及移除workertry {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//尝试终止线程tryTerminate();int c = ctl.get();//如果不是异常退出,则根据配置计算需要的最小工作线程数//如果是异常退出,或者当前工作线程小于上面根据配置计算的最小工作线程//则都用一个新worker来替换原来的workerif (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return;}//启动一个worker替换原来的workeraddWorker(null, false);}}

总之这段代码的主要作用是在工作线程退出时,更新线程池的状态、计数,以及根据配置来决定是否需要新的worker替代退出的工作线程,以保持线程池的正常运行。

3、如何停止过期的非核心线程

答案在getTask()。

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();// 一些退出的状态就直接返回if (runStateAtLeast(c, SHUTDOWN)&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);//是否需要超时淘汰boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//在确保当workQueue不为空时至少有一个工作线程的前提下//来淘汰超出 maximumPoolSize 或者超时的线程if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//阻塞获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//标记超时timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

其实线程池并没有标记谁是核心线程,谁是非核心线程,只关心核心线程和非核心线程的数量。也就是说无论是哪个线程在获取任务时都有可能被标记为timeOut,并且每次获取任务都会根据核心线程数,最大线程数,当前线程数,timeout标记等判断是否需要当前worker,如果不需要就返回null,跳出runWorker的循环,进而结束线程。

4、如何使用拒绝策略

在提交任务的时候,如果addWorker失败就会进入拒绝策略的逻辑。

 public void execute(Runnable command) {//...//加入阻塞队列if (isRunning(c) && workQueue.offer(command)) {//...if (! isRunning(recheck) && remove(command))//双重检测失败进入拒绝策略reject(command);//...               }//新建非核心线程else if (!addWorker(command, false))//非核心线程添加失败,进入拒绝策略reject(command);
}final void reject(Runnable command) {handler.rejectedExecution(command, this);
}

ScheduledThreadPoolExecutor源码分析

.schedule():延迟执行,只执行一次。
.scheduleAtFixedRate():固定频率执行,按照固定的时间间隔来调度任务。
.scheduleWithFixedDelay():固定延迟执行,在上一次任务完成后的固定延迟之后再次执行任务。

无论是哪种都会先将task封装成 ScheduledFutureTask,然后调用 delayedExecute
scheduleAtFixedRate为例:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0L)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),//scheduleWithFixedDelay与scheduleAtFixedRate的区别就只在这里//scheduleWithFixedDelay 传的是 -unit.toNanos(period)//后续会根据这个值的正负来判断是固定频率还是固定延迟unit.toNanos(period),sequencer.getAndIncrement());//封装成 ScheduledFutureTask RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;//调用 delayedExecutedelayedExecute(t);return t;}

delayedExecute

private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {//task添加到队列//这同样也是自己实现的一个延迟队列,大概的逻辑就是:先按时间排,如果时间一样就按插入的顺序排。super.getQueue().add(task);//一些检测if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false);else//保证有足够的woker正在工作ensurePrestart();}}void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)//addWorker跟就上面的是一样的了addWorker(null, true);else if (wc == 0)addWorker(null, false);}

那么凭什么将Worker的task封装成 ScheduledFutureTask 能起到持续调用的效果,来看看他的 run 方法。
ScheduledFutureTask#run

        public void run() {//一些检测if (!canRunInCurrentRunState(this))cancel(false);//如果不是周期性任务就只调用一次(period不为0则表示不是周期性任务)else if (!isPeriodic())super.run();//如果是周期性任务就在调用完之后//设置下次调用时间并将任务放回队列且保证有足够的woker正在工作else if (super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}

ScheduledFutureTask#setNextRunTime

        private void setNextRunTime() {long p = period;//根据period的正负来区分是固定频率还是固定延迟if (p > 0)time += p;elsetime = triggerTime(-p);}

ScheduledThreadPoolExecutor#reExecutePeriodic

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(task)) {//放回队列super.getQueue().add(task);if (canRunInCurrentRunState(task) || !remove(task)) {//保证有足够的woker正在工作ensurePrestart();return;}}task.cancel(false);}

所以ScheduledThreadPoolExecutor的总体框架设计和上面的ThreadPoolExecutor是一样的(毕竟是他的子类)。
最主要的区别在于ScheduledThreadPoolExecutor里worker使用的task是自己内部实现的 ScheduledFutureTask 类,而该类的run方法在执行完后会设置下一次的执行时间并将任务放回队列中等待执行。

相关文章:

Java多线程篇(3)——线程池

文章目录 线程池ThreadPoolExecutor源码分析1、如何提交任务2、如何执行任务3、如何停止过期的非核心线程4、如何使用拒绝策略 ScheduledThreadPoolExecutor源码分析 线程池 快速过一遍基础知识 7大参数 corePoolSize &#xff1a; 核心线程数 maximumPoolSize&#xff1a; 最…...

那些年我们遇到过的关于excel的操作

本文为直接从百度上搜索的关于excel的函数使用&#xff0c;方便以后用&#xff0c;希望会持续补充 excel中筛选出两列重复的数据【场景&#xff1a;A、B两列数据个数不同且无序&#xff0c;想找出A列中的数据在B列中不存在的&#xff0c;通过比较后单元格为空的代表该行不存在的…...

Angular变更检测机制

前段时间遇到这样一个 bug&#xff0c;通过一个 click 事件跳转到一个新页面&#xff0c;新页面迟迟不加载&#xff1b; 经过多次测试发现&#xff0c;将鼠标移入某个 tab &#xff0c;页面就加载出来了。 举个例子&#xff0c;页面内容无法加载&#xff0c;但是将鼠标移入下图…...

Redis之String类型

文章目录 Redis之String类型1. 赋值/获取值2. 同时设置/获取多个键值3. 数值增减4. 获取字符串长度5. 向尾部追加值6. 分布式锁7.应用场景 Redis之String类型 Redis命令不区分大小写 1. 赋值/获取值 赋值&#xff1a;set key value 取值&#xff1a;get key (当键不存在时候&…...

使用redis中的zset实现滑动窗口限流

使用redis和zset实现滑动窗口限流 文章目录 使用redis和zset实现滑动窗口限流Zset**初始化一个ZSet**&#xff1a;其中包含所有用户的ID和时间戳。**添加元素到ZSet**&#xff1a;当用户发起请求时&#xff0c;将当前时间戳和用户ID作为元素添加到ZSet中。**删除过期的元素**&a…...

Linux下C语言使用 netlink sockets与内核模块通信

netlink简介 Netlink套接字是用以实现用户进程与内核进程通信的一种特殊的进程间通信(IPC) ,也是网络应用程序与内核通信的最常用的接口。在Linux标准内核中&#xff0c;系统默认集成了很多netlink实例&#xff0c;比如日志上报、路由系统等&#xff0c;netlink消息是双向的&a…...

excel中的引用与查找函数篇3

1、INDEX(array,row_num,[col_num])&#xff1a;获取指定范围中指定行号和列号对应的数据 index(查询范围,行号,列号) 行号和列号是相对选中查询范围来写的&#xff1a;分别把第二行第三列的数据和第四行第二列的数据查找出来。 数据是单行或单列&#xff0c;后面只需要给一个参…...

【Linux学习笔记】 - 常用指令学习及其验证(下)

前言&#xff1a;本文延续上一篇文章【Linux学习笔记】 - 常用指令学习及其验证&#xff08;上&#xff09;对常用的指令进行介绍和验证。 一、mv指令 &#xff08;1&#xff09;功能&#xff1a;用来移动文件或者将文件改名 &#xff08;2&#xff09;语法及验证&#xff1a…...

面试官:请说说flex布局_番茄出品.md

面试官&#xff1a;请说说flex布局_番茄出品.md start 依然记得当初学习 flex 布局时&#xff0c;用 flex 布局&#xff1a;画麻将。一筒到九筒&#xff0c;应有尽有。但是光和面试官说&#xff0c;我用 flex 布局画过麻将&#xff0c;并没有什么用。面试官问你一个语法&…...

ChatGLM DeepSpeed/P-Tuning v2 调参

之前尝试了基于ChatGLM-6B使用LoRA进行参数高效微调,本文给大家分享使用DeepSpeed和P-Tuning v2对ChatGLM-6B进行微调,相关代码放置在GitHub上面:llm-action。 ChatGLM-6B简介 ChatGLM-6B相关的简介请查看之前的文章,这里不再赘述。 P-Tuning v2简介 P-Tuning是一种较新…...

Leetcode每日一题:打家劫舍系列Ⅰ、Ⅱ、Ⅲ、Ⅳ(2023.9.16~2023.9.19 C++)

由于之前写过打家劫舍系列&#xff0c;这里直接弄个合集&#xff0c;后面应该还有个iv。 目录 198. 打家劫舍 213. 打家劫舍 II 337. 打家劫舍 III 2560. 打家劫舍 IV 198. 打家劫舍 题目描述&#xff1a; 你是一个专业的小偷&#xff0c;计划偷窃沿街的房屋。每间房内都…...

容易对一个异性产生依赖感怎么办?

歌词&#xff1a;爱总让人伤心&#xff0c;但你要学会去明白~ &#x1f442; Photograph - Ed Sheeran - 单曲 - 网易云音乐 目录 &#x1f33c;前言 &#x1f61f;一、对另一个人的依赖感&#xff0c;本质是什么&#xff1f; &#x1f60a;二、如何减少对伴侣的依赖感&am…...

Windows10/11无线网卡WIFI驱动详细下载安装教程

官网下载WIFI驱动 《intel官网》 找到下载Windows 10 and Windows 11* WiFi package drivers 查看详细信息 下载对应操作系统的WIFI驱动 安装驱动&#xff0c;然后重启电脑即可。...

面向面试知识--Lottery项目

面向面试知识–Lottery项目 1.设计模式 为什么需要设计模式&#xff1f; &#xff08;设计模式是什么&#xff1f;优点有哪些&#xff1f;&#xff09; 设计模式是一套经过验证的有效的软件开发指导思想/解决方案&#xff1b;提高代码的可重用性和可维护性&#xff1b;提高团…...

SpringBoot接口中如何直接返回图片数据

SpringBoot接口中如何直接返回图片数据 目录 接口直接返回图片数据 起因 类似这种 根据个人经验 优雅的实现图片返回 接口直接返回图片数据 起因 最近在做涉及到分享推广的业务&#xff0c;需要由业务员分享二维码进入推广页面&#xff0c;由于是新项目&#xff0c;前期…...

c语言进阶部分详解(指针进阶1)

大家好&#xff01;指针的初阶内容我已经写好&#xff0c;可移步至我的文章&#xff1a;c语言进阶部分详解&#xff08;指针初阶&#xff09;_总之就是非常唔姆的博客-CSDN博客 基本内容我便不再赘述&#xff0c;直接带大家进入进阶内容&#xff1a; 目录 一.字符指针 1.讲解…...

计算机竞赛 大数据商城人流数据分析与可视化 - python 大数据分析

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 基于大数据的基站数据分析与可视化 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f947;学长这里给一个题目综合评分(每项满分5分) 难度…...

各种电机驱动原理

步进电机 步进电机参考资料 野火官方文档 步进电机驱动原理 上面参考文档中有的内容就不写了&#xff0c;写一下我自己的总结吧。 说明&#xff1a; 电机驱动器输入信号有电机转动方向信号DIR&#xff0c;电机转速信号PWM&#xff0c;电机使能信号EN&#xff1b;电机驱动器…...

人脸图像数据增强

为什么要做数据增强 在计算机视觉相关任务中&#xff0c;数据增强&#xff08;Data Augmentation&#xff09;是一种常用的技术&#xff0c;用于扩展训练数据集的多样性。它包括对原始图像进行一系列随机或有规律的变换&#xff0c;以生成新的训练样本。数据增强的主要目的是增…...

Android 查看按键信息的常用命令详解

Android 查看按键信息的常用命令详解 文章目录 Android 查看按键信息的常用命令详解一、主要命令&#xff1a;二、命令详解1、getevent2、getevent -l3、dumsys input4、cat XXX.kl4、cat /dev/input/eventX5、getevent 其他命令6、input keyevent XX 三、简单示例修改四、总结…...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

OPENCV形态学基础之二腐蚀

一.腐蚀的原理 (图1) 数学表达式&#xff1a;dst(x,y) erode(src(x,y)) min(x,y)src(xx,yy) 腐蚀也是图像形态学的基本功能之一&#xff0c;腐蚀跟膨胀属于反向操作&#xff0c;膨胀是把图像图像变大&#xff0c;而腐蚀就是把图像变小。腐蚀后的图像变小变暗淡。 腐蚀…...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

云原生安全实战:API网关Kong的鉴权与限流详解

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关&#xff08;API Gateway&#xff09; API网关是微服务架构中的核心组件&#xff0c;负责统一管理所有API的流量入口。它像一座…...

uniapp 字符包含的相关方法

在uniapp中&#xff0c;如果你想检查一个字符串是否包含另一个子字符串&#xff0c;你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的&#xff0c;但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...

LangFlow技术架构分析

&#x1f527; LangFlow 的可视化技术栈 前端节点编辑器 底层框架&#xff1a;基于 &#xff08;一个现代化的 React 节点绘图库&#xff09; 功能&#xff1a; 拖拽式构建 LangGraph 状态机 实时连线定义节点依赖关系 可视化调试循环和分支逻辑 与 LangGraph 的深…...

Leetcode33( 搜索旋转排序数组)

题目表述 整数数组 nums 按升序排列&#xff0c;数组中的值 互不相同 。 在传递给函数之前&#xff0c;nums 在预先未知的某个下标 k&#xff08;0 < k < nums.length&#xff09;上进行了 旋转&#xff0c;使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...

如何配置一个sql server使得其它用户可以通过excel odbc获取数据

要让其他用户通过 Excel 使用 ODBC 连接到 SQL Server 获取数据&#xff0c;你需要完成以下配置步骤&#xff1a; ✅ 一、在 SQL Server 端配置&#xff08;服务器设置&#xff09; 1. 启用 TCP/IP 协议 打开 “SQL Server 配置管理器”。导航到&#xff1a;SQL Server 网络配…...

《信号与系统》第 6 章 信号与系统的时域和频域特性

目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...

Spring AOP代理对象生成原理

代理对象生成的关键类是【AnnotationAwareAspectJAutoProxyCreator】&#xff0c;这个类继承了【BeanPostProcessor】是一个后置处理器 在bean对象生命周期中初始化时执行【org.springframework.beans.factory.config.BeanPostProcessor#postProcessAfterInitialization】方法时…...