DelayQueue、ScheduledThreadPoolExecutor 和 PriorityBlockingQueue :怎么利用堆实现定时任务
DelayQueue
DelayQueue 的最大亮点:
- 并不是简单全局锁的“单调队列”实现,而是用Leader-Follower 模式极大减少了线程唤醒的开销。
- 插入与唤醒、等待与 leader 变更,都通过巧妙的锁和条件变量组合完成。
如果只关注“线程安全的优先队列+全局锁”,那就没什么意思;但 DelayQueue 对并发高效和唤醒机制的优化,是其核心看点。
DelayQueue 的核心结构和实现细节如下:
数据结构
- 内部主要用一个
PriorityQueue<E>
实现,队列元素要求实现Delayed
接口(这个接口说明可以排序,可以获得延迟时间。本质是按过期时间从小到大排序的优先队列)。 - 用
ReentrantLock
实现线程安全,所有与队列相关的操作都加锁。 - 用
Condition
(available
) 实现线程间等待与唤醒,以支持阻塞操作。 - 维护一个
leader
线程变量,用于 Leader-Follower 模式优化等待。
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
Leader-Follower 等待优化(有趣实现)
目的:减少线程唤醒的“惊群”现象,提高效率。
- 只有一个线程(leader)会以定时方式等待队头元素的到期时间,其他线程无限期等待。
- 队头元素变化(特别是有更早到期的元素插入时)会唤醒一个等待线程,使其成为新的 leader。
- 当 leader 线程被唤醒或完成任务后,要重置 leader 并适当唤醒其他线程。
- 一个有意思的点是,通过finally 唤醒线程 和 释放锁,会在try中的return之前执行,简化了代码。
关键代码片段(take 方法):
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0L)return q.poll();first = null;if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}
}
解释:
- 当队头元素未过期时,只有 leader 线程定时等待,其他线程无限期 await。
- 新元素插入(如果成为队头)会
leader = null; available.signal();
,唤醒(或更换)leader。 - leader 线程在等待超时或被唤醒后要检查自己是否还是 leader,并适时清理。
Leader 模式的核心设计思想
Leader 模式是一种经典的并发设计模式,在 DelayQueue.take()
中用来避免惊群效应和减少CPU浪费。
假设没有 Leader 机制,多个线程同时调用 take()
:
- 队列头元素还有 5 秒过期
- 所有线程都会
awaitNanos(5秒)
- 5 秒后所有线程同时被唤醒
- 只有一个线程能获取元素,其他线程白白消耗了CPU
Leader 模式解决方案
if (leader != null)available.await(); // 非Leader线程无限期等待
else {Thread thisThread = Thread.currentThread();leader = thisThread; // 成为Leadertry {available.awaitNanos(delay); // 只有Leader定时等待} finally {if (leader == thisThread)leader = null;}
}
Leader 线程职责
- 精确等待:只等待到队列头元素过期的时间点
- 独占等待:同一时刻只有一个Leader线程在定时等待
- 负责唤醒:获取元素后通过
finally
块唤醒其他线程
Follower 线程职责
- 无限等待:调用
available.await()
直到被唤醒 - 竞争Leader:被唤醒后重新循环,有机会成为新的Leader
关于条件变量唤醒顺序
条件变量无法控制先唤醒谁
Condition.signal()
的唤醒顺序是不确定的,JVM 规范没有保证FIFO(虽然 hotspot JVM 的实现大都FIFO)。但这不影响Leader模式的正确性:
finally {if (leader == null && q.peek() != null)available.signal(); // 唤醒一个等待的线程lock.unlock();
}
无论唤醒哪个线程,它都会:
- 重新检查队列状态
- 如果元素未过期且无Leader,就成为新Leader
- 如果已有Leader,就继续等待
剩余
offer/put/add 插入逻辑
- 插入新元素时,如果它成为新的队头(即最早过期),则重置 leader 并 signal 唤醒等待线程。
代码片段:
public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}
}
解释:
- 新元素成为队头时唤醒等待线程,确保及时处理最新到期元素。
poll 和 poll(timeout) 的实现
poll()
只取已经过期(getDelay <= 0)的队头元素,否则返回 null。poll(timeout)
支持有超时等待(内部逻辑和 take 类似)。
drainTo
- 一次性取出所有已过期元素,提高批量处理效率。
for (E first;n < maxElements&& (first = q.peek()) != null&& first.getDelay(NANOSECONDS) <= 0;) {c.add(first);q.poll();++n;
}
ScheduledThreadPoolExecutor 结构分析
ScheduledThreadPoolExecutor
继承自 ThreadPoolExecutor
并实现 ScheduledExecutorService
,专门用于执行延迟任务和周期性任务。
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService
关键字段设计
// 控制关闭后周期性任务的执行策略
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;// 任务取消时是否立即从队列移除
volatile boolean removeOnCancel;// 全局序列号生成器,保证FIFO顺序
private static final AtomicLong sequencer = new AtomicLong();
ScheduledFutureTask - 智能任务包装器
这是最核心的内部类,实现了延迟和周期性执行的逻辑:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {private final long sequenceNumber; // FIFO排序保证private volatile long time; // 纳秒级触发时间private final long period; // 周期值:正数=固定频率,负数=固定延迟,0=一次性RunnableScheduledFuture<V> outerTask = this; // 支持装饰器模式int heapIndex; // 堆中的索引位置,O(1)删除的关键
}
核心算法 - 周期性任务的重新调度:
public void run() {if (!canRunInCurrentRunState(this))cancel(false);else if (!isPeriodic())super.run(); // 一次性任务直接执行else if (super.runAndReset()) { // 周期性任务,执行但不设置结果setNextRunTime(); // 计算下次执行时间reExecutePeriodic(outerTask); // 重新加入队列}
}private void setNextRunTime() {long p = period;if (p > 0)time += p; // 固定频率:基于上次开始时间elsetime = triggerTime(-p); // 固定延迟:基于当前时间
}
DelayedWorkQueue - 高性能延迟队列
基于最小堆的无界延迟队列,这是一个节点维护数组索引的堆,支持直接删除非堆顶元素:
核心数据结构
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private Thread leader; // Leader-Follower模式的领导线程
private final Condition available = lock.newCondition();
O(log n) 插入和维护堆性质
private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1; // 父节点索引RunnableScheduledFuture<?> e = queue[parent];if (key.compareTo(e) >= 0) // 已满足堆性质break;queue[k] = e;setIndex(e, k); // 更新任务的堆索引k = parent;}queue[k] = key;setIndex(key, k);
}
O(1) 任务取消 - 利用heapIndex
public boolean remove(Object x) {final ReentrantLock lock = this.lock;lock.lock();try {int i = indexOf(x);if (i < 0) return false;setIndex(queue[i], -1); // 标记为已删除int s = --size;RunnableScheduledFuture<?> replacement = queue[s];queue[s] = null;if (s != i) {siftDown(i, replacement); // 向下调整if (queue[i] == replacement)siftUp(i, replacement); // 可能需要向上调整}return true;} finally {lock.unlock();}
}
Leader-Follower模式的take()
这是解决"惊群效应"的经典实现:
public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];if (first == null)available.await();else {long delay = first.getDelay(NANOSECONDS);if (delay <= 0L)return finishPoll(first);first = null; // 避免内存泄漏if (leader != null)available.await(); // 非leader线程无限等待else {Thread thisThread = Thread.currentThread();leader = thisThread; // 成为leadertry {available.awaitNanos(delay); // 只等待必要时间} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal(); // 唤醒新的leaderlock.unlock();}
}
延迟执行的核心逻辑
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {super.getQueue().add(task); // 加入延迟队列// 双重检查:加入后可能状态改变if (!canRunInCurrentRunState(task) && remove(task))task.cancel(false);elseensurePrestart(); // 确保有线程处理任务}
}
触发时间计算的边界处理
long triggerTime(long delay) {return System.nanoTime() + Math.min(delay, MAX_NANOS);
}private static final long MAX_NANOS = (Long.MAX_VALUE >>> 1) - 1;
设计总结
- heapIndex优化:每个任务记录在堆中的位置,实现O(1)删除
- Leader-Follower模式:避免多线程等待时的惊群效应
- 双重状态检查:任务加入队列后再次检查执行状态
- period符号语义:用正负号区分固定频率和固定延迟
- 序列号保证FIFO:相同时间的任务按提交顺序执行
这些实现细节使得 ScheduledThreadPoolExecutor
在处理大量延迟和周期性任务时具有优秀的性能表现。
PriorityBlockingQueue 结构分析
PriorityBlockingQueue
是一个基于数组实现的二叉堆结构的无界阻塞优先队列,其核心特色在于动态扩容机制和堆维护算法的线程安全实现。
主要成员变量
private transient Object[] queue; // 存储元素的数组(二叉堆)
private transient int size; // 当前元素数量
private transient Comparator<? super E> comparator; // 比较器
private final ReentrantLock lock; // 主锁
private final Condition notEmpty; // 等待条件
private transient volatile int allocationSpinLock; // 扩容自旋锁
🔥 双锁机制的动态扩容
最有趣的设计是tryGrow()
方法中的双锁策略:
private void tryGrow(Object[] array, int oldCap) {lock.unlock(); // 先释放主锁!Object[] newArray = null;if (allocationSpinLock == 0 &&ALLOCATIONSPINLOCK.compareAndSet(this, 0, 1)) {try {int growth = (oldCap < 64) ? (oldCap + 2) : (oldCap >> 1);int newCap = ArraysSupport.newLength(oldCap, 1, growth);if (queue == array)newArray = new Object[newCap];} finally {allocationSpinLock = 0;}}if (newArray == null) Thread.yield(); // 让出CPU给其他扩容线程lock.lock(); // 重新获取主锁if (newArray != null && queue == array) {queue = newArray;System.arraycopy(array, 0, newArray, 0, oldCap);}
}
算法亮点:
- 主锁释放:扩容时释放主锁,允许消费者继续
take()
操作 - CAS自旋锁:用
allocationSpinLock
控制扩容竞争,避免多线程重复扩容 - 容量策略:小数组快速增长(
oldCap + 2
),大数组按50%增长(oldCap >> 1
)
🔥 智能的元素移除算法
removeAt(i)
方法展现了 根据下标进行移除 双向堆调整的精妙:
private void removeAt(int i) {final Object[] es = queue;final int n = size - 1;if (n == i) // 移除最后一个元素es[i] = null;else {E moved = (E) es[n]; // 用最后一个元素填补es[n] = null;// 先向下调整siftDownComparable(i, moved, es, n);// 如果元素没动,说明需要向上调整if (es[i] == moved) {siftUpComparable(i, moved, es);}}size = n;
}
算法思路:
- 用最后一个元素替换被删除的元素
- 先尝试向下调整(
siftDown
) - 如果元素位置未变,再向上调整(
siftUp
) - 这样确保堆性质得到恢复
🔥 高效的堆调整算法
向下调整(siftDownComparable
)实现了标准的堆化:
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {Comparable<? super T> key = (Comparable<? super T>)x;int half = n >>> 1; // 只需要检查到叶子节点的父节点while (k < half) {int child = (k << 1) + 1; // 左子节点Object c = es[child];int right = child + 1;// 选择较小的子节点if (right < n && ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)c = es[child = right];if (key.compareTo((T) c) <= 0)break;es[k] = c;k = child;}es[k] = key;
}
关键优化:
half = n >>> 1
:只遍历到非叶子节点- 左右子节点比较:
(k << 1) + 1
和(k << 1) + 2
- 向上冒泡较小元素,维持最小堆性质
架构特色
线程安全策略
- 单一主锁:所有公共操作都通过
ReentrantLock
串行化 - 扩容优化:扩容时释放主锁 + CAS自旋锁,提高并发性能
- 条件等待:空队列时
take()
操作通过notEmpty.await()
阻塞
性能设计
- 无界队列:
remainingCapacity()
始终返回Integer.MAX_VALUE
- 批量操作:
drainTo()
直接调用dequeue()
,避免重复加锁 - 快照迭代器:
iterator()
基于数组副本,弱一致性遍历
这种设计使得PriorityBlockingQueue
在保证线程安全的同时,通过巧妙的双锁机制和堆算法优化,实现了较高的并发性能。
相关文章:
DelayQueue、ScheduledThreadPoolExecutor 和 PriorityBlockingQueue :怎么利用堆实现定时任务
DelayQueue DelayQueue 的最大亮点: 并不是简单全局锁的“单调队列”实现,而是用Leader-Follower 模式极大减少了线程唤醒的开销。插入与唤醒、等待与 leader 变更,都通过巧妙的锁和条件变量组合完成。 如果只关注“线程安全的优先队列全局…...
Kafka 消息模式实战:从简单队列到流处理(二)
四、Kafka 流处理实战 4.1 Kafka Streams 简介 Kafka Streams 是 Kafka 提供的流处理库,它为开发者提供了一套简洁而强大的 API,用于构建实时流处理应用程序。Kafka Streams 基于 Kafka 的高吞吐量、分布式和容错特性,能够处理大规模的实时…...
大数据(2) 大数据处理架构Hadoop
一、Hadoop简介 1.定义 Hadoop 是一个开源的分布式计算框架,由 Apache 基金会开发,用于处理海量数据,具备高可靠性、高扩展性和高容错性。它主要由两个核心模块组成: HDFS(Hadoop Distributed File System)…...
【Kotlin】注解反射扩展
文章目录 注解用法反射类引用 扩展扩展函数的作用域成员方法优先级总高于扩展函数 被滥用的扩展函数扩展属性静态扩展 标准库中的扩展函数 使用 T.also 函数交换两个变量sNullOrEmpty | isNullOrBlankwith函数repeat函数 调度方式对扩展函数的影响静态与动态调度扩展函数始终静…...

固定ip和非固定ip的区别是什么?如何固定ip地址
在互联网中,我们常会接触到固定IP和非固定IP的概念。它们究竟有何不同?如何固定IP地址?让我们一起来探究这个问题。 一、固定IP和非固定IP的区别是什么 固定IP(静态IP)和非固定IP(动态IP)是两种…...
升级centos 7.9内核到 5.4.x
前面是指南,后面是工作日志。 wget http://mirrors.coreix.net/elrepo-archive-archive/kernel/el7/x86_64/RPMS/kernel-lt-devel-5.4.225-1.el7.elrepo.x86_64.rpm wget http://mirrors.coreix.net/elrepo-archive-archive/kernel/el7/x86_64/RPMS/kernel-lt-5.4.2…...
Nginx 安全设置配置
1、增加header公共文件 文件地址:/etc/nginx/conf.d/security_headers.conf # XSS防护配置add_header X-XSS-Protection "1; modeblock" always; # 其他安全配置add_header X-Content-Type-Options "nosniff";add_header X-Frame-Options &qu…...
协程的常用阻塞函数
以下是一些常见的阻塞函数示例: 1. **Thread.sleep()** 阻塞当前线程一段时间。 kotlin Thread.sleep(1000) // 阻塞线程 1 秒 2. **InputStream.read()** 从输入流中读取数据时会阻塞,直到有数据可用或流结束。 kotlin val inputStream FileInputStre…...
探索NoSQL注入的奥秘:如何消除MongoDB查询中的前置与后置条件
随着互联网技术的飞速发展,数据库作为信息存储与管理的核心,其安全性问题日益凸显。近年来,NoSQL数据库因其灵活性和高性能逐渐成为许多企业的首选,其中MongoDB以其文档存储和JSON-like查询语言在开发社区中广受欢迎。然而&#x…...

使用矩阵乘法+线段树解决区间历史和问题的一种通用解法
文章目录 前言P8868 [NOIP2022] 比赛CF1824DP9990/2020 ICPC EcFinal G 前言 一般解决普通的区间历史和,只需要定义辅助 c h s − t ⋅ a chs-t\cdot a chs−t⋅a, h s hs hs是历史和, a a a是区间和, t t t是时间戳,…...
React Navive初识
文章目录 搭建开发环境安装 Node、homebrew、Watchman安装 Node安装 homebrew安装 watchman 安装 React Native 的命令行工具(react-native-cli)创建新项目编译并运行 React Native 应用在 ios 模拟器上运行 调试访问 App 内的开发菜单 搭建开发环境 在…...
scss(sass)中 的使用说明
在 SCSS(Sass)中,& 符号是一个父选择器引用,它代表当前嵌套规则的外层选择器。主要用途如下: 1. 连接伪类/伪元素 scss 复制 下载 .button {background: blue;&:hover { // 相当于 .button:hoverbackgrou…...

如何从浏览器中导出网站证书
以导出 GitHub 证书为例,点击 小锁 点击 导出 注意:这里需要根据你想要证书格式手动加上后缀名,我的是加 .crt 双击文件打开...

低功耗MQTT物联网架构Java实现揭秘
文章目录 一、引言二、相关技术概述2.1 物联网概述2.2 MQTT协议java三、基于MQTT的Iot物联网架构设计3.1 架构总体设计3.2 MQTT代理服务器选择3.3 物联网设备设计3.4 应用服务器设计四、基于MQTT的Iot物联网架构的Java实现4.1 开发环境搭建4.2 MQTT客户端实现4.3 应用服务器实现…...
总结HTML中的文本标签
总结HTML中的文本标签 文章目录 总结HTML中的文本标签引言一、标题标签(h1 - h6)语法示例使用建议 二、段落标签(p)语法示例使用建议 三、文本节点标签(span)语法示例使用建议 四、粗体标签(b&a…...
python版若依框架开发:前端开发规范
python版若依框架开发 从0起步,扬帆起航。 python版若依部署代码生成指南,迅速落地CURD!项目结构解析前端开发规范文章目录 python版若依框架开发新增 view新增 api新增组件新增样式引⼊依赖新增 view 在 @/views文件下 创建对应的文件夹,一般性一个路由对应⼀个文件, 该…...
AI推理服务的高可用架构设计
AI推理服务的高可用架构设计 在传统业务系统中,高可用架构主要关注服务冗余、数据库容灾、限流熔断等通用能力。而在AI系统中,尤其是大模型推理服务场景下,高可用架构面临更加复杂的挑战,如推理延迟敏感性、GPU资源稀缺性、模型版本切换频繁等问题。本节将专门探讨如何构建…...
GPU集群故障分析:大型AI训练中的硬件问题与影响
GPU集群故障分析:大型AI训练中的硬件问题与影响 核心问题 在大型AI计算集群(如使用上千块GPU卡训练大模型)中: GPU硬件会出哪些毛病?这些问题发生的频率、严重程度如何?最终对AI训练任务有什么影响&#…...

ideal2022.3.1版本编译项目报java: OutOfMemoryError: insufficient memory
最近换了新电脑,用新电脑拉项目配置后,启动时报错,错误描述 idea 启动Springboot项目在编译阶段报错:java: OutOfMemoryError: insufficient memory 2. 处理方案 修改VM参数,分配更多内存 ❌ 刚刚开始以为时JVM内存设置…...

centos7编译安装LNMP架构
一、LNMP概念 LNMP架构是一种常见的网站服务器架构,由Linux操作系统、Nginx Web服务器、MySQL数据库和PHP后端脚本语言组成。 1 用户请求:用户通过浏览器输入网址,请求发送到Nginx Web服务器。 2 Nginx处理:Nginx接收请求后&…...
接口限频算法:漏桶算法、令牌桶算法、滑动窗口算法
文章目录 限频三大算法对比与选型建议一、漏桶算法(Leaky Bucket Algorithm)1.核心原理2.实现3.为什么要限制漏桶容量4.优缺点分析 二、令牌桶算法(Token Bucket Algorithm)1.核心原理2.实现(1)单机实现&am…...

Spring Boot 3.3 + MyBatis 基础教程:从入门到实践
Spring Boot 3.3 MyBatis 基础教程:从入门到实践 在当今的Java开发领域,Spring Boot和MyBatis是构建高效、可维护的后端应用的两个强大工具。Spring Boot简化了Spring应用的初始搭建和开发过程,而MyBatis则提供了一种灵活的ORM(…...

征文投稿:如何写一份实用的技术文档?——以软件配置为例
📝 征文投稿:如何写一份实用的技术文档?——以软件配置为例 目录 [TOC](目录)🧭 技术文档是通往成功的“说明书”💡 一、明确目标读者:他们需要什么?📋 二、结构清晰:让读…...
【后端】RPC
不定期更新。 定义 RPC 是 Remote Procedure Call 的缩写,中文通常翻译为远程过程调用。作用 简化分布式系统开发。实现微服务架构,便于模块化、复用。提高系统性能和可伸缩性。提供高性能通信、负载均衡、容错重试机制。 在现代分布式系统、微服务架构…...
详细讲解Flutter GetX的使用
Flutter GetX 框架详解:状态管理、路由与依赖注入 GetX 是 Flutter 生态中一款强大且轻量级的全功能框架,集成了状态管理、路由管理和依赖注入三大核心功能。其设计理念是简洁高效,通过最小的代码实现最大的功能,特别适合快速开发…...
ReLU 新生:从死亡困境到强势回归
背景 在深度学习领域,激活函数的探索已成为独立研究课题。诸如 GELU、SELU 和 SiLU 等新型激活函数,因具备平滑梯度与出色的收敛特性,正备受关注。经典 ReLU 凭借简洁性、固有稀疏性及其独特优势拓扑特性,依旧受青睐。然而&#…...

tensorflow image_dataset_from_directory 训练数据集构建
以数据集 https://www.kaggle.com/datasets/vipoooool/new-plant-diseases-dataset 为例 目录结构 训练图像数据集要求: 主目录下包含多个子目录,每个子目录代表一个类别。每个子目录中存储属于该类别的图像文件。 例如 main_directory/ ...cat/ ...…...
QuickJS 如何发送一封邮件 ?
参阅:bellard.org : QuickJS 如何使用 qjs 执行 js 脚本 在 QuickJS 中发送邮件需要依赖外部库或调用系统命令,因为 QuickJS 本身不包含 SMTP 功能。以下是两种实现方法: 方法 1:调用系统命令(推荐) 使…...
clickhouse 和 influxdb 选型
以下是 ClickHouse、InfluxDB 和 HBase 在体系架构、存储引擎、数据类型、性能及场景的详细对比分析: 🏗️ 一、体系架构对比 维度ClickHouseInfluxDBHBase设计目标大规模OLAP分析,高吞吐复杂查询 时序数据采集与监控,优化时间线管理高吞吐随机…...

GOOUUU ESP32-S3-CAM 果云科技开发板开发指南(一)(超详细!)Vscode+espidf 通过摄像头拍摄照片并存取到SD卡中,文末附源码
看到最近好玩的开源项目比较多,就想要学习一下esp32的开发,目前使用比较多的ide基本上是arduino、esp-idf和platformio,前者编译比较慢,后两者看到开源大佬的项目做的比较多,所以主要学习后两者。 本次使用的硬件是GO…...