Java实现一个延时队列
文章目录
- 前言
- 正文
- 一、基本概念
- 1.1 延时队列的特点
- 1.2 常见的实现方式
- 二、Java原生的内存型延时队列
- 2.1 定义延时元素DelayedElement
- 2.2 定义延时队列管理器DelayedQueueManager
- 2.3 消费元素
- 2.4 调试
- 2.5 调试结果
- 2.6 精髓之 DelayQueue.poll()
- 三、基于Redisson的延时队列
- 3.1 定义延时队列管理器
- 3.2 调试
- 3.3 调试结果
前言
业务中经常会出现各种涉及到定时,延迟执行的需求任务。
有一种队列专门处理这种情况。那就是延时队列。
本文提供两种实现方式:
- java原生的内存型延时队列;
- redisson 的内置延时队列;
正文
一、基本概念
延时队列(Delay Queue)是一种特殊的消息队列,用于处理需要在将来某个时间点执行的任务。
与普通的队列不同,延时队列中的消息在指定的时间之前是不可见的,只有当消息的延时时间到达后,消息才会被消费。
1.1 延时队列的特点
- 延时性:消息在进入队列后并不会立即被消费,而是需要等待一段时间后才能被消费。
- 有序性:消息按照延时时间的先后顺序被消费。
- 可靠性:通常需要保证消息不丢失,即使在系统故障的情况下也能恢复。(这里对于内存型的延时队列不太适合,一旦内存释放就会丢失消息)
1.2 常见的实现方式
- 数据库:使用数据库的定时任务或触发器。
- 消息队列:使用支持延时消息的消息队列,如 RabbitMQ、Kafka、RocketMQ 等。
- 内存队列:使用内存中的数据结构,如 Java 中的 DelayQueue。
- Redis:使用 Redis 的 sorted set 或 Redisson 的 RDelayedQueue。
二、Java原生的内存型延时队列
使用 Java 的 DelayQueue
- 生产者:将任务封装成 Delayed 接口的实现类,添加到 DelayQueue 中。
- 消费者:使用 take 或 poll 方法从 DelayQueue 中取出任务进行处理。
2.1 定义延时元素DelayedElement
package com.pine.common.util.delayqueue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;/*** 延迟元素** @author fengjinsong*/
public class DelayedElement implements Delayed {/*** 延迟时间(单位:毫秒)*/private final AtomicLong delayTime;/*** 到期时间*/private final AtomicLong expire;/*** 任务数据*/private final Object data;/*** 执行次数*/private final AtomicInteger executionFrequency;public DelayedElement(long delayTime, Object data) {this.delayTime = new AtomicLong(delayTime);this.expire = new AtomicLong(System.currentTimeMillis() + delayTime);this.data = data;this.executionFrequency = new AtomicInteger(0);}public Object getData() {return this.data;}public AtomicInteger getExecutionFrequency() {return executionFrequency;}public void setExecutionFrequency() {this.executionFrequency.incrementAndGet();}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire.longValue() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}/*** 重置延迟时间*/public void resetDelay(long delayTime) {this.delayTime.set(delayTime);this.expire.set(System.currentTimeMillis() + this.delayTime.longValue());}/*** 重置延迟时间*/public void resetDelay() {resetDelay(this.delayTime.longValue());}@Overridepublic String toString() {return "DelayedElement{" +"delayTime=" + delayTime +", expire=" + expire +", data=" + data +", executionFrequency=" + executionFrequency +'}';}
}
2.2 定义延时队列管理器DelayedQueueManager
package com.pine.common.util.delayqueue;import java.util.List;
import java.util.concurrent.DelayQueue;/*** 延时队列管理器** @author fengjinsong*/
public class DelayedQueueManager {private DelayedQueueManager() {}/*** 延时队列*/private static final DelayQueue<DelayedElement> DELAY_QUEUE = new DelayQueue<>();/*** 添加元素** @param element 元素*/public static void addElement(DelayedElement element) {DELAY_QUEUE.add(element);}public static void addElement(List<DelayedElement> elements) {DELAY_QUEUE.addAll(elements);}/*** 获取元素,并从队列中移除该元素** @return 元素*/public static DelayedElement pollElement() {return DELAY_QUEUE.poll();}
}
2.3 消费元素
package com.pine.common.util.delayqueue;import java.time.LocalDateTime;public class DelayedElementConsumer implements Runnable {private final static int[] FREQUENCY_SEQUENCE = new int[]{1, 2, 3, 6, 12, 24, 48, 96, 192, 384, 768};@Overridepublic void run() {boolean hasDelayedElement = true;while (hasDelayedElement) {// 获取元素DelayedElement element = DelayedQueueManager.pollElement();try {if (element != null) {System.out.println(LocalDateTime.now() + "消费了延迟元素:" + element);if (element.getData().toString().contains("3")) {throw new RuntimeException("模拟报错");}} else {hasDelayedElement = false;}} catch (Exception e) {retry(element);}}}private void retry(DelayedElement element) {element.setExecutionFrequency();System.out.println("执行出错:" + element);//出错3次后,不再重试if (element.getExecutionFrequency().intValue() > 3) {System.out.println("出错3次后,不再重试");} else {element.resetDelay(FREQUENCY_SEQUENCE[element.getExecutionFrequency().intValue() + 3] * 1000);// 重试DelayedQueueManager.addElement(element);}}}
2.4 调试
package com.pine.common.redis.delayqueue;import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;public class Client {public static void main(String[] args) {// 模拟生产数据RedissonDelayedQueueManager.offer("hello22", 3000);RedissonDelayedQueueManager.offer("hello33", 5000);// 模拟消费数据System.out.println(LocalDateTime.now() + "开始消费数据");while (true) {Object object = RedissonDelayedQueueManager.poll(10, TimeUnit.SECONDS);if (object != null) {System.out.println("-----------------------" + LocalDateTime.now() + ":" + object);}}}
}
2.5 调试结果
2024-11-06T16:57:39.342358开始消费数据
-----------------------2024-11-06T16:57:42.285383:hello22
-----------------------2024-11-06T16:57:44.378298:hello33
可以观察到 hello22 延时了3秒;hello33延时了5秒;
2.6 精髓之 DelayQueue.poll()
检索并删除此队列的头部,如果此队列没有延迟过期的元素,则返回null。
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();return (first == null || first.getDelay(NANOSECONDS) > 0)? null: q.poll();} finally {lock.unlock();}}
三、基于Redisson的延时队列
使用 Redisson 的 RDelayedQueue
- 生产者:使用 RDelayedQueue 的 offer 方法将任务添加到队列中,指定延时时间。
- 消费者:使用 RQueue 的 poll 方法从队列中取出任务进行处理。
3.1 定义延时队列管理器
package com.pine.common.redis.delayqueue;import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class RedissonDelayedQueueManager {private static final String QUEUE_NAME = "delay_queue";private static final RedissonClient REDISSON_CLIENT;static {try {String content = """singleServerConfig:address: "redis://10.189.64.136:8379"""";Config config = Config.fromYAML(content);REDISSON_CLIENT = Redisson.create(config);} catch (IOException e) {throw new RuntimeException(e);}}/*** 获取延迟队列* <p>* 本方法通过Redisson客户端创建一个阻塞队列,并基于该阻塞队列创建一个延迟队列* 延迟队列用于处理需要延迟执行的任务,例如任务重试机制、任务调度等场景** @param <T> 队列中元素的类型* @return 返回一个延迟队列实例,用于后续的操作和管理*/private static <T> RDelayedQueue<T> getDelayedQueue() {// 创建一个阻塞队列,这是后续创建延迟队列的基础RBlockingQueue<T> queue = REDISSON_CLIENT.getBlockingQueue(QUEUE_NAME);// 基于阻塞队列创建延迟队列并返回return REDISSON_CLIENT.getDelayedQueue(queue);}/*** 向延迟队列中添加元素,并设置延迟时间** @param task 要添加的元素* @param delayTime 延迟时间,单位为毫秒* @param <T> 元素类型*/public static <T> void offer(T task, long delayTime) {RDelayedQueue<T> delayedQueue = getDelayedQueue();delayedQueue.offer(task, delayTime, TimeUnit.MILLISECONDS);}/*** 从延迟队列中获取元素,并设置超时时间** @param timeout 超时时间,单位为毫秒* @param unit 超时时间单位* @param <T> 元素类型* @return 返回获取到的元素,如果没有获取到元素则返回null*/public static <T> T poll(long timeout, TimeUnit unit) {RBlockingQueue<T> queue = REDISSON_CLIENT.getBlockingQueue(QUEUE_NAME);try {return queue.poll(timeout, unit);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
3.2 调试
package com.pine.common.redis.delayqueue;import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;public class Client {public static void main(String[] args) {// 模拟生产数据RedissonDelayedQueueManager.offer("hello22", 3000);RedissonDelayedQueueManager.offer("hello33", 5000);// 模拟消费数据System.out.println(LocalDateTime.now() + "开始消费数据");while (true) {Object object = RedissonDelayedQueueManager.poll(10, TimeUnit.SECONDS);if (object != null) {System.out.println("-----------------------" + LocalDateTime.now() + ":" + object);}}}
}
3.3 调试结果
2024-11-06T17:05:31.630768开始消费数据
-----------------------2024-11-06T17:05:34.548032:hello22
-----------------------2024-11-06T17:05:36.732607:hello33
可以观察到 hello22 延时了3秒;hello33延时了5秒;
相关文章:
Java实现一个延时队列
文章目录 前言正文一、基本概念1.1 延时队列的特点1.2 常见的实现方式 二、Java原生的内存型延时队列2.1 定义延时元素DelayedElement2.2 定义延时队列管理器DelayedQueueManager2.3 消费元素2.4 调试2.5 调试结果2.6 精髓之 DelayQueue.poll() 三、基于Redisson的延时队列3.1 …...
为什么说vue是双向数据流
Vue.js 被称为 双向数据绑定(two-way data binding),是因为它支持数据在 视图(View) 和 模型(Model) 之间双向流动。这意味着,当 数据变化 时,视图会自动更新;…...

创造属于你的 Claude Prompt 和个性化 SVG 卡片|对李继刚老师提示词的浅浅解析与总结
❤️ 如果你也关注大模型与 AI 的发展现状,且对大模型应用开发非常感兴趣,我会快速跟你分享最新的感兴趣的 AI 应用和热点信息,也会不定期分享自己的想法和开源实例,欢迎关注我哦! 🥦 微信公众号ÿ…...
redis与本地缓存
本地缓存是将数据存储在应用程序所在的本地内存中的缓存方式。既然,已经有了 Redis 可以实现分布式缓存了,为什么还需要本地缓存呢?接下来,我们一起来看。 为什么需要本地缓存? 尽管已经有 Redis 缓存了,但…...
git撤销commit和add
撤销commit git reset --soft HEAD^撤销add git reset .查看状态 git status...

【361】基于springboot的招生宣传管理系统
摘 要 使用旧方法对招生宣传管理系统的信息进行系统化管理已经不再让人们信赖了,把现在的网络信息技术运用在招生宣传管理系统的管理上面可以解决许多信息管理上面的难题,比如处理数据时间很长,数据存在错误不能及时纠正等问题。这次开发的招…...
【一些关于Python的信息和帮助】
Python是一种广泛使用的高级编程语言,它的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而不是使用大括号或关键字)。Python支持多种编程范式,包括面向对象、命令式、函数式和过程式编程。 以…...

creo toolkit二次开发学习之程序集(ProAsmcomp)和装配体组件路径对象(ProAsmcomppath)
程序集ProAsmcomp可以理解为装配体组件对象。 对象ProAssembly是ProSolid的一个实例,并共享相同的声明。因此,ProAssembly对象可以作为适用于装配体的任何ProSolid和ProMdl函数的输入。特别是,因为你可以使用函数ProSolidFeatVisit()来遍历特…...

深入浅出 Spring Boot 与 Shiro:构建安全认证与权限管理框架
一、Shiro框架概念 (一)Shiro框架概念 1.概念: Shiro是apache旗下一个开源安全框架,它对软件系统中的安全认证相关功能进行了封装,实现了用户身份认证,权限授权、加密、会话管理等功能,组成一…...

外包干了三年,精神严重内耗...
前段时间我同事(做测试的一个妹子)跟我讲,感觉早上起来十分的疲惫,不想上班,问我们这是什么样的现象,其实有时候我也有这种感觉,虽然我卷,但我也是肉体凡胎啊!不是机器人…...

ruoyi-vue集成tianai-captcha验证码
后端代码 官方使用demo文档:http://doc.captcha.tianai.cloud/#%E4%BD%BF%E7%94%A8demo 我的完整代码:https://gitee.com/Min-Duck/RuoYi-Vue.git 主pom.xml 加入依赖 <!-- 滑块验证码 --><dependency><groupId>cloud.tianai.captc…...

Django安装
在终端创建django项目 1.查看自己的python版本 输入对应自己本机python的版本,列如我的是3.11.8 先再全局安装django依赖包 2.在控制窗口输入安装命令: pip3.11 install django 看到Successflully 说明我们就安装成功了 python的Scripts文件用于存…...

Ubuntu 20.04 安装 QGC v4.3 开发环境
Ubuntu 20.04 安装 QGC开发环境 1. 准备安装 Qt 5.15.2安装依赖获取源码 2. 编译参考 前言 QGC ( QGroundControl) 是一个开源地面站,基于QT开发的,有跨平台的功能。可以在Windows,Android,MacOS或Linux上运行。它可以将PX4固件加…...

WPF+MVVM案例实战(二十一)- 制作一个侧边弹窗栏(AB类)
文章目录 1、案例效果1、侧边栏分类2、AB类侧边弹窗实现1.文件创建2、样式代码与功能代码实现3、功能代码实现 3 运行效果4、源代码获取 1、案例效果 1、侧边栏分类 A类 :左侧弹出侧边栏B类 :右侧弹出侧边栏C类 :顶部弹出侧边栏D类 …...
linux中怎样登录mysql数据库
在Linux中登录MySQL数据库,可以使用以下命令: mysql -u username -p 其中,username是你的MySQL用户名。运行该命令后,系统会提示你输入密码。 如果MySQL服务器不在本地主机或者你需要指定不同的端口,可以使用以下命…...
深入理解 Linux 内存管理:free 命令详解
在 Linux 系统中,内存是关键的资源之一,管理和监控内存的使用情况对系统的稳定性和性能至关重要。free 命令是 Linux 中用于查看内存使用情况的重要工具,它可以让我们快速了解系统中物理内存和交换分区(Swap)的使用状态…...

指针万字超级最强i解析与总结!!!!!
文章目录 1.内存和地址1.1内存1.2究竟该如何理解编址 2.指针变量和地址2.1 取地址操作符(&)2.2指针变量和解引用操作符(*)2.2.1指针变量2.2.2如何拆解指针类型2.2.3解引用操作符 2.3 指针变量的大小 3.指针变量类型的意义3.1指…...

告别生硬电子音,这款TTS软件让语音转换更自然动听
Balabolka是一款革新性的文本语音转换工具,为用户提供了极其灵活和个性化的阅读体验。这款软件不仅仅是简单的文字朗读器,更是一个智能的语音助手,能够将各类文本瞬间转化为生动自然的语音输出。 软件的核心优势在于其卓越的文件兼容性和多样…...
CORS(跨域资源共享)和SOP(同源策略)
CORS(跨域资源共享)和SOP(同源策略)不是同一个东西,但它们紧密相关,并且常常一起讨论,因为 CORS 是为了解决同源策略带来的跨域问题而引入的。 同源策略(Same-Origin Policy&#x…...
【系统设计】数据库压缩技术详解:从基础到实践(附Redis内存优化实战案例)
概述 在现代数据库系统中,压缩技术对于提高存储效率和加速查询性能至关重要。特别是在处理大规模数据时,压缩能够极大地减少存储空间,并优化查询性能。本文将总结几种常见的压缩方式,并通过详细的解释和示例清晰地展示每种压缩方…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
如何为服务器生成TLS证书
TLS(Transport Layer Security)证书是确保网络通信安全的重要手段,它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书,可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
Java线上CPU飙高问题排查全指南
一、引言 在Java应用的线上运行环境中,CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时,通常会导致应用响应缓慢,甚至服务不可用,严重影响用户体验和业务运行。因此,掌握一套科学有效的CPU飙高问题排查方法&…...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用
在工业制造领域,无损检测(NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统,以非接触式光学麦克风技术为核心,打破传统检测瓶颈,为半导体、航空航天、汽车制造等行业提供了高灵敏…...
适应性Java用于现代 API:REST、GraphQL 和事件驱动
在快速发展的软件开发领域,REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名,不断适应这些现代范式的需求。随着不断发展的生态系统,Java 在现代 API 方…...