多线程-定时任务线程池源码
定时任务线程池
ScheduledThreadPoolExecutor,可以执行定时任务的线程池。这里学习它的基本原理。
定时任务线程池,和普通线程池不同的地方在于,它使用一个延迟队列,延迟队列使用最小堆作为它的数据结构,它会按照任务的执行顺序,把最先执行的任务放到第一个,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞完成后,去执行任务。对于周期性执行的任务,执行完成后,会计算下一次启动时间,然后把任务重新提交到延迟队列。
源码分析
定时任务线程池的继承体系
定时任务线程池继承了ThreadPoolExecutor,同时实现了ScheduledExecutorService,这个接口定义了定时调度相关的功能
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {
ScheduledExecutorService:定义了定时调度的功能
public interface ScheduledExecutorService extends ExecutorService {// 定时调度1次的任务public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);// 定时调度1次的任务,有返回值public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);// 以固定频率调度的任务public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);// 以固定延迟调度的任务public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
}
描述定时任务的类
描述定时任务的类:ScheduledThreadPoolExecutor的内部类ScheduledFutureTask
// ScheduledFutureTask:是定时任务线程池的内部类,封装了任务的启动时间、周期时间(隔多长时间执行一次),
// 任务在延迟队列中的索引、任务序号
private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {// 任务的启动时间,单位是纳秒private long time;// 任务的执行周期,单位是纳秒private final long period;// 任务在队列中的索引int heapIndex;// 任务序号,通过原子类生成private final long sequenceNumber;// 持有自己的实例RunnableScheduledFuture<V> outerTask = this;// 构造方法,参数1 异步任务,参数2 结果,参数3 任务的启动时间,参数4 任务的周期时间ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;this.period = period;this.sequenceNumber = sequencer.getAndIncrement();}
}// ScheduledFutureTask实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {// 定时任务是否是周期性的boolean isPeriodic();
}// RunnableScheduledFuture继承了ScheduledFuture
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}// ScheduledFuture继承了Delayed,它封装了任务的延迟时间,表示任务延迟多久启动,继承
// Comparable接口,因为在循环队列中排序时需要用到
public interface Delayed extends Comparable<Delayed> {// 返回对象相关的延迟时长long getDelay(TimeUnit unit);
}
ScheduledFutureTask的继承体系上:
- 继承了FutureTask,代表一个异步任务。 // FutureTask在之前学习Callable接口的时候已经接触到了。
- 实现了RunnableScheduledFuture,它代表一个可调度的异步任务的结果,同时间接实现了Delayed接口,用于排序
定时任务主要有两个参数来描述任务的执行时间:
- time:任务的启动时间,这是一个绝对时间,描述到了某个时间点,任务应该启动执行
- period:任务的周期,描述两个任务之间间隔多长时间
延迟队列
定时任务线程池和普通线程池不一样的地方,在于它使用延迟队列,定时任务中封装好了任务的执行时间,任务的调度工作,是由延迟队列来执行的。
延迟队列的结构:
static class DelayedWorkQueue extends AbstractQueue<Runnable>implements BlockingQueue<Runnable> {// 队列内部使用的数组private static final int INITIAL_CAPACITY = 16;private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];private int size = 0;// 等待队列中第一个任务的线程private Thread leader = null;
}
这里的结构很简单,主要是它的计算比较复杂,任务之间需要排序,组成一个最小堆,最先执行的任务放到前面,以及元素出队的方法、元素入队的方法。
工作机制
这里以scheduleAtFixedRate为例, 固定频率的定时任务,讲解定时任务的执行流程,其它类型的定时任务也类似。
提交定时任务
通过scheduleAtFixedRate方法,创建定时任务:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =// 构建ScheduledFutureTask实例new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit), // 任务的触发时间unit.toNanos(period)); // 任务的执行周期RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;
}
第一步:创建ScheduledFutureTask的实例
ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result); // 调用父类FutureTask的构造方法this.time = ns; // 任务的触发时间,这是一个绝对时间this.period = period; // 任务的执行周期,表示两个任务之间间隔多长时间this.sequenceNumber = sequencer.getAndIncrement(); // 当前任务的序列号,通过原子类生成
}// 父类FutureTask的构造方法
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable
}
第二步:添加任务到延迟队列,并且新建线程执行任务
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task); // 任务添加到延迟队列if (isShutdown() && // 判断线程池是否关闭,如果关闭,移除任务!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);elseensurePrestart(); // 向线程池中添加线程,确保有线程执行任务}
}// 添加任务到延迟队列的方法
public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; // 异步任务实例final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();size = i + 1;if (i == 0) {queue[0] = e;setIndex(e, 0);} else {siftUp(i, e);}if (queue[0] == e) { // 如果当前任务是第一个任务,要唤醒在条件变量上阻塞的线程leader = null;available.signal();}} finally {lock.unlock();}return true;
}// 向线程池中添加线程
void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)// 注意,这里Worker实例的第一个参数 firstTask,值为null,表示Worker只可以从队列中获取任务addWorker(null, true);else if (wc == 0)addWorker(null, false);
}
从阻塞队列中获取任务
新线程启动后,会执行Worker类的run方法 (参考ThreadPoolExecutor的执行原理),在run方法中,会从阻塞队列中获取异步任务,定时任务使用的阻塞队列是DelayedWorkQueue。
从阻塞队列中获取任务的方法:
// take方法没有指定超时时长,类似的,还有指定了超时时长的poll方法
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly(); // 获取锁try {for (;;) {RunnableScheduledFuture<?> first = queue[0]; // 队列中的第一个元素,if (first == null)available.await(); // 如果队列为空,阻塞else {// 获取第一个任务的延迟时间,表示延迟指定时长后,开始执行任务long delay = first.getDelay(NANOSECONDS);if (delay <= 0)return finishPoll(first); // 如果延迟时长小于等于0,证明可以开始执行任务了first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread; // 设置当前线程为leadertry {available.awaitNanos(delay); // 如果延迟时长大于0,那么线程进入阻塞状态并且指定时长} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}
}// 获取任务的延迟时间
public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS); // time是绝对时间,它减去now(),就是相对时间,也就是延迟时间
}
阻塞队列负责按照任务的执行时间,对任务进行排序,最先执行的任务放在队列的第一位,这里没有展示排序的逻辑,排序是按照最小堆的逻辑来排序的。线程从阻塞队列中获取任务,会计算第一个任务的延迟时长,然后等待指定时长,在执行任务,这就是定时任务可以在指定时长后启动的逻辑,如果延迟队列中没有任务,线程会一直等待,同时,向延迟队列中添加任务时,如果发现当前任务是第一个任务,会唤醒正在等待的线程。
执行定时任务
从延迟队列中获取到任务后,线程会执行ScheduledFutureTask的run方法,因为ScheduledFutureTask间接继承了Runnable接口
// ScheduledFutureTask的run方法
public void run() {boolean periodic = isPeriodic(); // 任务是否是周期性的if (!canRunInCurrentRunState(periodic)) // 判断线程池是否还在运行cancel(false);else if (!periodic) // 如果不是周期性的任务,直接执行,这里执行的是FutureTask中的run方法ScheduledFutureTask.super.run();// 如果是周期性的任务,执行完之后计算下次执行时间,然后重新提交任务实例到阻塞队列else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); // 计算下次任务的执行时间,这个方法会更新任务的time属性reExecutePeriodic(outerTask); // 再次向线程池中提交任务实例}
}// 判断任务是否是周期性的
public boolean isPeriodic() {// 参考之前ScheduledFutureTask的实例的创建过程,period代表任务的执行周期,// 这个值不为0,证明是周期性的任务return period != 0;
}
1、为什么执行任务时会执行FutureTask中的run方法?因为在FutureTask的run方法中,会调用用户编写的run方法,也就是异步任务,ScheduledFutureTask中的run方法负责整体流程。
2、如果是周期性的任务,执行FutureTask中的runAndReset方法,它和run方法有什么不同?它执行完任务后,不会设置返回值,同时会把任务设置为初始状态,这个方法是为了执行多次的异步任务而设计的。
3、周期性的任务,执行完任务后,如何计算下次任务的执行时间?
// 计算任务下次执行时间的方法:
private void setNextRunTime() {long p = period; // 任务的执行周期if (p > 0)time += p; // 当前时间加上周期,固定频率(scheduleAtFixedRate)的定时任务走这段逻辑elsetime = triggerTime(-p); // 更新任务的执行时间,固定延迟(scheduleWithFixedDelay)的定时任务走这段逻辑
}
这里需要解释一下,time属性是任务的执行时间,是一个绝对时间,表示到了某个点,例如 2020-01-01 00:00:00 这个固定的点,启动定时任务,period,是两个任务之间的间隔时长,例如,每隔10分钟,执行一次定时任务。对于固定频率的定时任务和固定延迟的定时任务,它们在创建任务实例的过程中稍有不同:
// 创建固定频率的定时任务
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit), // 计算time的值unit.toNanos(period)); // 计算period的值,注意,period是正数
// 创建固定延迟的定时任务
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit), // 计算time的值unit.toNanos(-delay)); // 计算period的值,注意,period是负数
一个执行周期是正数,一个执行周期时负数,所以在计算任务下次执行时间的方法中,它们会走向不同的链路,把该方法重新粘贴到下面,重新再看:
// 计算任务下次执行时间的方法:
private void setNextRunTime() {long p = period; // 任务的执行周期if (p > 0)time += p; // 固定频率,上次任务执行时间加上执行周期,就是下次执行时间elsetime = triggerTime(-p); // 固定延迟,当前时间加上执行周期,就是下次执行时间, // triggerTime方法在下面
}long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
所以,固定频率执行的任务,如果上一次的任务执行超时,直到下一次任务该启动时还没有执行完成,一旦上一次任务执行完成,下一次任务立刻启动,因为上一次任务执行完成后,计算下一次任务的执行时间,发现执行时间在当前时间之前,所以线程获取任务时不会阻塞,会立刻取出任务,然后执行。固定延迟的任务,是根据上次任务结束时间来计算下次任务开始时间的,所以它是固定延迟。
总结
定时任务的执行过程:
- 第一步:向线程池提交定时任务(schedule方法)
- 第二步:创建定时任务实例(ScheduleFutureTask实例)
- 第三步:把定时任务添加到延迟队列,延迟队列会对任务进行排序,最先执行的定时任务放到开头
- 第四步:新建线程,从延迟队列中获取定时任务,线程会获取第一个任务的延迟时长,然后阻塞指定时长,阻塞结束后,执行定时任务
- 第五步:执行完成后,计算下一次任务的执行时间,然后重新向线程池中提交任务实例
Q&A
只执行一次的定时任务和周期性的定时任务,分别是如何执行的?
周期性的定时任务,在执行完一次后,会计算下次任务的启动时间,然后再次向阻塞队列中提交任务实例,只执行一次的定时任务则不会
线程是如何在指定时间启动定时任务的?
阻塞队列会把需要最先执行的定时任务放在队列的开头,线程会获取第一个任务的延迟时间,然后根据延迟时间休眠指定时长,休眠结束后,执行定时任务。
按照固定频率执行的定时任务和按照固定延迟执行的定时任务,分别是如何执行的?
按照固定频率执行的定时任务,下次任务的执行时间 = 上次任务的启动时间 + 周期
按照固定延迟执行的定时任务,下次任务的执行时间 = 上次任务的结束时间 + 周期
依据不同的计算方式,计算出下次任务的执行时间,然后提交任务实例到队列中
相关文章:
多线程-定时任务线程池源码
定时任务线程池 ScheduledThreadPoolExecutor,可以执行定时任务的线程池。这里学习它的基本原理。 定时任务线程池,和普通线程池不同的地方在于,它使用一个延迟队列,延迟队列使用最小堆作为它的数据结构,它会按照任务…...
初次使用 IDE 搭配 Lombok 注解的配置
前言 在 Java 开发的漫漫征程中,我们总会遇到各种提升效率的工具。Lombok 便是其中一款能让代码编写变得更加简洁高效的神奇库。它通过注解的方式,巧妙地在编译阶段为我们生成那些繁琐的样板代码,比如 getter、setter、构造函数等。然而&…...
云服数据存储接口:CloudSever
云服数据存储接口:CloudSever 迷你世界 更新时间: 2024-04-28 19:09:10 具体函数名及描述如下: 序号 函数名 函数描述 1 setOrderDataBykey(...) 设置排行榜中指定键的数值 2 removeOrderDataByKey(...) 删除排行榜中指定键的数值 …...
关于 QPalette设置按钮背景未显示出来 的解决方法
若该文为原创文章,转载请注明原文出处 本文章博客地址:https://hpzwl.blog.csdn.net/article/details/146047054 长沙红胖子Qt(长沙创微智科)博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV…...
上传文件到对象存储是选择前端还是后端
对于云上对象存储的上传方式选择(前端直传或后端代理上传),需综合考虑安全性、性能、成本、业务需求等因素。 1. 推荐前端直传的场景 适用条件: 大文件上传(如视频、大型数据集)高并发场景(如…...
mysql下载与安装
一、mysql下载: MySQL获取: 官网:www.mysql.com 也可以从Oracle官方进入:https://www.oracle.com/ 下载地址:https://downloads.mysql.com/archives/community/ 选择对应的版本和对应的操作系统ÿ…...
Python练习(握手问题,进制转换,日期问题,位运算,求和)
一. 握手问题 代码实现 ans0for i in range(1,51):for j in range(i1,51):if i<7 and j<7:continueelse:ans 1print(ans) 这道题可以看成是50个人都握了手减去7个人没握手的次数 答案:1204 二.将十进制整数拆解 2.1门牌制作 代码实现 ans0for i in ra…...
小程序分类页面
1创建cate分支 2.cate滑动界面布局 获取滑动界面高度 3.获取并渲染一级分类的列表数据 4.渲染二级和三级分类列表 获取二级列表的数据 5.渲染二级分类列表的UI结构 6.动态渲染三级分类列表...
HTML + CSS 题目
1.说说你对盒子模型的理解? 一、是什么 对一个文档进行布局的时候,浏览器渲染引擎会根据标准之一的css基础盒模型,将所有元素表示为一个个矩形的盒子。 一个盒子由四个部分组成: content,padding,border,margin 下…...
计算机视觉|ViT详解:打破视觉与语言界限
一、ViT 的诞生背景 在计算机视觉领域的发展中,卷积神经网络(CNN)一直占据重要地位。自 2012 年 AlexNet 在 ImageNet 大赛中取得优异成绩后,CNN 在图像分类任务中显示出强大能力。随后,VGG、ResNet 等深度网络架构不…...
Node JS 调用模型Xenova_all-MiniLM-L6-v2实战
本篇通过将句子数组转换为句子的向量表示,并通过平均池化和归一化处理,生成适合机器学习或深度学习任务使用的特征向量为例,演示通过NodeJS 的方式调用Xenova/all-MiniLM-L6-v2 的过程。 关于 all-MiniLM-L6-v2 的介绍,可以参照上…...
React + TypeScript 实战指南:用类型守护你的组件
TypeScript 为 React 开发带来了强大的类型安全保障,这里解析常见的一些TS写法: 一、组件基础类型 1. 函数组件定义 // 显式声明 Props 类型并标注返回值 interface WelcomeProps {name: string;age?: number; // 可选属性 }const Welcome: React.FC…...
ASP.NET Core JWT认证与授权
1.JWT结构 JSON Web Token(JWT)是一种用于在网络应用之间安全传输声明的开放标准(RFC 7519)。它通常由三部分组成,以紧凑的字符串形式表示,在身份验证、信息交换等场景中广泛应用。 2.JWT权限认证 2.1添…...
【车规芯片】如何引导时钟树生长方向
12nm车规DFTAPR项目中,我们可以看到,绝大部分的sink都受控于xxxx_tessent_occ_clk_cpu_inst/tessent_persistent_cell_clock_out_mux/C10_ctmi_1这个mux,这是我们DFT设计结果: 这里我们重新打开place的数据 Anchor,也就…...
突破传统:用Polars解锁ICU医疗数据分析新范式
一、ICU数据革命的临界点 在重症监护室(ICU),每秒都在产生关乎生死的关键数据:从持续监测的生命体征到高频更新的实验室指标,从呼吸机参数到血管活性药物剂量,现代ICU每天产生的数据量级已突破TB级别。传统…...
《深度学习实战》第11集:AI大模型压缩与加速
深度学习实战 | 第11集:AI大模型压缩与加速 在深度学习领域,随着模型规模的不断增大,模型的推理速度和部署效率成为实际应用中的关键挑战。本篇博客将带你深入了解模型压缩与加速的核心技术,并通过一个实战项目展示如何使用知识蒸…...
golang进阶知识专项-理解值传递
在 Go 语言中,所有函数的参数传递都是值传递(Pass by Value)。当你将一个变量作为参数传递给函数时,实际上传递的是该变量的副本,而不是变量本身。理解这一点对于避免常见的编程错误至关重要。根据不同的类型ÿ…...
OCPP与ISO 15118集成:实现即插即充与车网互动(V2G)- 慧知开源充电桩平台
OCPP与ISO 15118集成:实现即插即充与车网互动(V2G) 引言 随着电动汽车(EV)与电网双向能量交互(V2G)技术的成熟,OCPP协议与ISO 15118标准的协同成为智能充电基础设施的核心挑战。本文…...
大语言模型中温度参数(Temperature)的核心原理
大语言模型中温度参数(Temperature)的核心原理是通过调整模型输出的概率分布,控制生成结果的随机性和多样性。以下是其原理的详细说明: 一、定义与核心作用 温度参数是生成式模型(如GPT系列)中的一个超参数…...
K8s控制器Deployment详解
回顾 ReplicaSet 控制器,该控制器是用来维护集群中运行的 Pod 数量的,但是往往在实际操作的时候,我们反而不会去直接使用 RS,而是会使用更上层的控制器,比如说 Deployment。 Deployment 一个非常重要的功能就是实现了 Pod 的滚动…...
鸿蒙HarmonyOS评论功能小demo
评论页面小demo 效果展示 1.拆解组件,分层搭建 我们将整个评论页面拆解为三个组件,分别是头部导航,评论项,回复三个部分,然后统一在index界面导入 2.头部导航界面搭建 Preview Component struct HmNavBar {// 属性&a…...
基于PyTorch的深度学习3——基于autograd的反向传播
反向传播,可以理解为函数关系的反向传播。...
日期格式与字符串不匹配bug
异常特征:java.lang.IllegalArgumentException: invalid comparison: java.time.LocalDateTime and java.lang.String ### Error updating database. Cause: java.lang.IllegalArgumentException: invalid comparison: java.time.LocalDateTime and java.lang.Str…...
打印三角形及Debug
打印三角形及Debug package struct; public class TestDemo01 {public static void main(String[] args) {//打印三角形 五行 for (int i 1; i < 5; i) {for (int j 5 ; j >i; j--) {System.out.print(" ");}for (int k1;k<i;k) {System.out.print(&…...
大语言模型揭秘:从诞生到智能
引言 在人工智能飞速发展的今天,大语言模型(Large Language Models, LLMs)无疑是技术领域最耀眼的明星之一。它们不仅能够理解人类的自然语言,还能生成流畅的文本,甚至在对话、翻译、创作等任务中表现出接近人类的智能…...
Collab-Overcooked:专注于多智能体协作的语言模型基准测试平台
2025-02-27,由北京邮电大学和理想汽车公司联合创建。该平台基于《Overcooked-AI》游戏环境,设计了更具挑战性和实用性的交互任务,目的通过自然语言沟通促进多智能体协作。 一、研究背景 近年来,基于大型语言模型的智能体系统在复…...
SpringBoot接入DeepSeek(硅基流动版)+ 前端页面调试(WebSocket连接模式)
文章目录 前言正文一、项目环境二、项目代码2.1 pom.xml2.2 DeepSeekController.java2.3 启动类2.4 logback-spring.xml2.5 application.yaml2.6 WebsocketConfig.java2.7 AiChatWebSocketHandler.java2.8 SaveChatSessionParamRequest.java2.9 index.html 三、页面调试3.1 主页…...
LINUX网络基础 [一] - 初识网络,理解网络协议
目录 前言 一. 计算机网络背景 1.1 发展历程 1.1.1 独立模式 1.1.2 网络互联 1.1.3 局域网LAN 1.1.4 广域网WAN 1.2 总结 二. "协议" 2.1 什么是协议 2.2 网络协议的理解 2.3 网络协议的分层结构 三. OSI七层模型(理论标准) …...
由麻省理工学院计算机科学与人工智能实验室等机构创建低成本、高效率的物理驱动数据生成框架,助力接触丰富的机器人操作任务
2025-02-28,由麻省理工学院计算机科学与人工智能实验室(CSAIL)和机器人与人工智能研究所的研究团队创建了一种低成本的数据生成框架,通过结合物理模拟、人类演示和基于模型的规划,高效生成大规模、高质量的接触丰富型机…...
【RAG从入门到精通系列】【RAG From Scratch 系列教程2:Query Transformations】
目录 前言一、概述1-1、RAG概念1-2、前置知识1-2-1、ModelScopeEmbeddings 词嵌入模型1-2-2、FAISS介绍&安装 (向量相似性搜索)1-2-3、Tiktoken 分词工具 二、Rag From Scratch:Query Transformations2-1、前置环境安装2-2、多查询检索器2-2-1、加载网页内容2-2…...
