当前位置: 首页 > news >正文

Java实现一个延时队列

文章目录

  • 前言
  • 正文
    • 一、基本概念
      • 1.1 延时队列的特点
      • 1.2 常见的实现方式
    • 二、Java原生的内存型延时队列
      • 2.1 定义延时元素DelayedElement
      • 2.2 定义延时队列管理器DelayedQueueManager
      • 2.3 消费元素
      • 2.4 调试
      • 2.5 调试结果
      • 2.6 精髓之 DelayQueue.poll()
    • 三、基于Redisson的延时队列
      • 3.1 定义延时队列管理器
      • 3.2 调试
      • 3.3 调试结果

前言

业务中经常会出现各种涉及到定时,延迟执行的需求任务。

有一种队列专门处理这种情况。那就是延时队列。

本文提供两种实现方式:

  1. java原生的内存型延时队列;
  2. redisson 的内置延时队列;

正文

一、基本概念

延时队列(Delay Queue)是一种特殊的消息队列,用于处理需要在将来某个时间点执行的任务。

与普通的队列不同,延时队列中的消息在指定的时间之前是不可见的,只有当消息的延时时间到达后,消息才会被消费。

1.1 延时队列的特点

  • 延时性:消息在进入队列后并不会立即被消费,而是需要等待一段时间后才能被消费。
  • 有序性:消息按照延时时间的先后顺序被消费。
  • 可靠性:通常需要保证消息不丢失,即使在系统故障的情况下也能恢复。(这里对于内存型的延时队列不太适合,一旦内存释放就会丢失消息)

1.2 常见的实现方式

  • 数据库:使用数据库的定时任务或触发器。
  • 消息队列:使用支持延时消息的消息队列,如 RabbitMQ、Kafka、RocketMQ 等。
  • 内存队列:使用内存中的数据结构,如 Java 中的 DelayQueue。
  • Redis:使用 Redis 的 sorted set 或 Redisson 的 RDelayedQueue。

二、Java原生的内存型延时队列

使用 Java 的 DelayQueue

  • 生产者:将任务封装成 Delayed 接口的实现类,添加到 DelayQueue 中。
  • 消费者:使用 take 或 poll 方法从 DelayQueue 中取出任务进行处理。

2.1 定义延时元素DelayedElement

package com.pine.common.util.delayqueue;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;/*** 延迟元素** @author fengjinsong*/
public class DelayedElement implements Delayed {/*** 延迟时间(单位:毫秒)*/private final AtomicLong delayTime;/*** 到期时间*/private final AtomicLong expire;/*** 任务数据*/private final Object data;/*** 执行次数*/private final AtomicInteger executionFrequency;public DelayedElement(long delayTime, Object data) {this.delayTime = new AtomicLong(delayTime);this.expire = new AtomicLong(System.currentTimeMillis() + delayTime);this.data = data;this.executionFrequency = new AtomicInteger(0);}public Object getData() {return this.data;}public AtomicInteger getExecutionFrequency() {return executionFrequency;}public void setExecutionFrequency() {this.executionFrequency.incrementAndGet();}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(this.expire.longValue() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic int compareTo(Delayed o) {return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));}/*** 重置延迟时间*/public void resetDelay(long delayTime) {this.delayTime.set(delayTime);this.expire.set(System.currentTimeMillis() + this.delayTime.longValue());}/*** 重置延迟时间*/public void resetDelay() {resetDelay(this.delayTime.longValue());}@Overridepublic String toString() {return "DelayedElement{" +"delayTime=" + delayTime +", expire=" + expire +", data=" + data +", executionFrequency=" + executionFrequency +'}';}
}

2.2 定义延时队列管理器DelayedQueueManager

package com.pine.common.util.delayqueue;import java.util.List;
import java.util.concurrent.DelayQueue;/*** 延时队列管理器** @author fengjinsong*/
public class DelayedQueueManager {private DelayedQueueManager() {}/*** 延时队列*/private static final DelayQueue<DelayedElement> DELAY_QUEUE = new DelayQueue<>();/*** 添加元素** @param element 元素*/public static void addElement(DelayedElement element) {DELAY_QUEUE.add(element);}public static void addElement(List<DelayedElement> elements) {DELAY_QUEUE.addAll(elements);}/*** 获取元素,并从队列中移除该元素** @return 元素*/public static DelayedElement pollElement() {return DELAY_QUEUE.poll();}
}

2.3 消费元素

package com.pine.common.util.delayqueue;import java.time.LocalDateTime;public class DelayedElementConsumer implements Runnable {private final static int[] FREQUENCY_SEQUENCE = new int[]{1, 2, 3, 6, 12, 24, 48, 96, 192, 384, 768};@Overridepublic void run() {boolean hasDelayedElement = true;while (hasDelayedElement) {// 获取元素DelayedElement element = DelayedQueueManager.pollElement();try {if (element != null) {System.out.println(LocalDateTime.now() + "消费了延迟元素:" + element);if (element.getData().toString().contains("3")) {throw new RuntimeException("模拟报错");}} else {hasDelayedElement = false;}} catch (Exception e) {retry(element);}}}private void retry(DelayedElement element) {element.setExecutionFrequency();System.out.println("执行出错:" + element);//出错3次后,不再重试if (element.getExecutionFrequency().intValue() > 3) {System.out.println("出错3次后,不再重试");} else {element.resetDelay(FREQUENCY_SEQUENCE[element.getExecutionFrequency().intValue() + 3] * 1000);// 重试DelayedQueueManager.addElement(element);}}}

2.4 调试

package com.pine.common.redis.delayqueue;import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;public class Client {public static void main(String[] args) {// 模拟生产数据RedissonDelayedQueueManager.offer("hello22", 3000);RedissonDelayedQueueManager.offer("hello33", 5000);// 模拟消费数据System.out.println(LocalDateTime.now() + "开始消费数据");while (true) {Object object = RedissonDelayedQueueManager.poll(10, TimeUnit.SECONDS);if (object != null) {System.out.println("-----------------------" + LocalDateTime.now() + ":" + object);}}}
}

2.5 调试结果

2024-11-06T16:57:39.342358开始消费数据
-----------------------2024-11-06T16:57:42.285383:hello22
-----------------------2024-11-06T16:57:44.378298:hello33

可以观察到 hello22 延时了3秒;hello33延时了5秒;

2.6 精髓之 DelayQueue.poll()

检索并删除此队列的头部,如果此队列没有延迟过期的元素,则返回null。

public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();return (first == null || first.getDelay(NANOSECONDS) > 0)? null: q.poll();} finally {lock.unlock();}}

三、基于Redisson的延时队列

使用 Redisson 的 RDelayedQueue

  • 生产者:使用 RDelayedQueue 的 offer 方法将任务添加到队列中,指定延时时间。
  • 消费者:使用 RQueue 的 poll 方法从队列中取出任务进行处理。

3.1 定义延时队列管理器

package com.pine.common.redis.delayqueue;import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;import java.io.IOException;
import java.util.concurrent.TimeUnit;public class RedissonDelayedQueueManager {private static final String QUEUE_NAME = "delay_queue";private static final RedissonClient REDISSON_CLIENT;static {try {String content = """singleServerConfig:address: "redis://10.189.64.136:8379"""";Config config = Config.fromYAML(content);REDISSON_CLIENT = Redisson.create(config);} catch (IOException e) {throw new RuntimeException(e);}}/*** 获取延迟队列* <p>* 本方法通过Redisson客户端创建一个阻塞队列,并基于该阻塞队列创建一个延迟队列* 延迟队列用于处理需要延迟执行的任务,例如任务重试机制、任务调度等场景** @param <T> 队列中元素的类型* @return 返回一个延迟队列实例,用于后续的操作和管理*/private static <T> RDelayedQueue<T> getDelayedQueue() {// 创建一个阻塞队列,这是后续创建延迟队列的基础RBlockingQueue<T> queue = REDISSON_CLIENT.getBlockingQueue(QUEUE_NAME);// 基于阻塞队列创建延迟队列并返回return REDISSON_CLIENT.getDelayedQueue(queue);}/*** 向延迟队列中添加元素,并设置延迟时间** @param task     要添加的元素* @param delayTime 延迟时间,单位为毫秒* @param <T>      元素类型*/public static <T> void offer(T task, long delayTime) {RDelayedQueue<T> delayedQueue = getDelayedQueue();delayedQueue.offer(task, delayTime, TimeUnit.MILLISECONDS);}/*** 从延迟队列中获取元素,并设置超时时间** @param timeout 超时时间,单位为毫秒* @param unit    超时时间单位* @param <T>     元素类型* @return 返回获取到的元素,如果没有获取到元素则返回null*/public static <T> T poll(long timeout, TimeUnit unit) {RBlockingQueue<T> queue = REDISSON_CLIENT.getBlockingQueue(QUEUE_NAME);try {return queue.poll(timeout, unit);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}

3.2 调试

package com.pine.common.redis.delayqueue;import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;public class Client {public static void main(String[] args) {// 模拟生产数据RedissonDelayedQueueManager.offer("hello22", 3000);RedissonDelayedQueueManager.offer("hello33", 5000);// 模拟消费数据System.out.println(LocalDateTime.now() + "开始消费数据");while (true) {Object object = RedissonDelayedQueueManager.poll(10, TimeUnit.SECONDS);if (object != null) {System.out.println("-----------------------" + LocalDateTime.now() + ":" + object);}}}
}

3.3 调试结果

2024-11-06T17:05:31.630768开始消费数据
-----------------------2024-11-06T17:05:34.548032:hello22
-----------------------2024-11-06T17:05:36.732607:hello33

可以观察到 hello22 延时了3秒;hello33延时了5秒;

相关文章:

Java实现一个延时队列

文章目录 前言正文一、基本概念1.1 延时队列的特点1.2 常见的实现方式 二、Java原生的内存型延时队列2.1 定义延时元素DelayedElement2.2 定义延时队列管理器DelayedQueueManager2.3 消费元素2.4 调试2.5 调试结果2.6 精髓之 DelayQueue.poll() 三、基于Redisson的延时队列3.1 …...

为什么说vue是双向数据流

Vue.js 被称为 双向数据绑定&#xff08;two-way data binding&#xff09;&#xff0c;是因为它支持数据在 视图&#xff08;View&#xff09; 和 模型&#xff08;Model&#xff09; 之间双向流动。这意味着&#xff0c;当 数据变化 时&#xff0c;视图会自动更新&#xff1b…...

创造属于你的 Claude Prompt 和个性化 SVG 卡片|对李继刚老师提示词的浅浅解析与总结

❤️ 如果你也关注大模型与 AI 的发展现状&#xff0c;且对大模型应用开发非常感兴趣&#xff0c;我会快速跟你分享最新的感兴趣的 AI 应用和热点信息&#xff0c;也会不定期分享自己的想法和开源实例&#xff0c;欢迎关注我哦&#xff01; &#x1f966; 微信公众号&#xff…...

redis与本地缓存

本地缓存是将数据存储在应用程序所在的本地内存中的缓存方式。既然&#xff0c;已经有了 Redis 可以实现分布式缓存了&#xff0c;为什么还需要本地缓存呢&#xff1f;接下来&#xff0c;我们一起来看。 为什么需要本地缓存&#xff1f; 尽管已经有 Redis 缓存了&#xff0c;但…...

git撤销commit和add

撤销commit git reset --soft HEAD^撤销add git reset .查看状态 git status...

【361】基于springboot的招生宣传管理系统

摘 要 使用旧方法对招生宣传管理系统的信息进行系统化管理已经不再让人们信赖了&#xff0c;把现在的网络信息技术运用在招生宣传管理系统的管理上面可以解决许多信息管理上面的难题&#xff0c;比如处理数据时间很长&#xff0c;数据存在错误不能及时纠正等问题。这次开发的招…...

【一些关于Python的信息和帮助】

Python是一种广泛使用的高级编程语言&#xff0c;它的设计哲学强调代码的可读性和简洁的语法&#xff08;尤其是使用空格缩进划分代码块&#xff0c;而不是使用大括号或关键字&#xff09;。Python支持多种编程范式&#xff0c;包括面向对象、命令式、函数式和过程式编程。 以…...

creo toolkit二次开发学习之程序集(ProAsmcomp)和装配体组件路径对象(ProAsmcomppath)

程序集ProAsmcomp可以理解为装配体组件对象。 对象ProAssembly是ProSolid的一个实例&#xff0c;并共享相同的声明。因此&#xff0c;ProAssembly对象可以作为适用于装配体的任何ProSolid和ProMdl函数的输入。特别是&#xff0c;因为你可以使用函数ProSolidFeatVisit()来遍历特…...

深入浅出 Spring Boot 与 Shiro:构建安全认证与权限管理框架

一、Shiro框架概念 &#xff08;一&#xff09;Shiro框架概念 1.概念&#xff1a; Shiro是apache旗下一个开源安全框架&#xff0c;它对软件系统中的安全认证相关功能进行了封装&#xff0c;实现了用户身份认证&#xff0c;权限授权、加密、会话管理等功能&#xff0c;组成一…...

外包干了三年,精神严重内耗...

前段时间我同事&#xff08;做测试的一个妹子&#xff09;跟我讲&#xff0c;感觉早上起来十分的疲惫&#xff0c;不想上班&#xff0c;问我们这是什么样的现象&#xff0c;其实有时候我也有这种感觉&#xff0c;虽然我卷&#xff0c;但我也是肉体凡胎啊&#xff01;不是机器人…...

ruoyi-vue集成tianai-captcha验证码

后端代码 官方使用demo文档&#xff1a;http://doc.captcha.tianai.cloud/#%E4%BD%BF%E7%94%A8demo 我的完整代码&#xff1a;https://gitee.com/Min-Duck/RuoYi-Vue.git 主pom.xml 加入依赖 <!-- 滑块验证码 --><dependency><groupId>cloud.tianai.captc…...

Django安装

在终端创建django项目 1.查看自己的python版本 输入对应自己本机python的版本&#xff0c;列如我的是3.11.8 先再全局安装django依赖包 2.在控制窗口输入安装命令&#xff1a; pip3.11 install django 看到Successflully 说明我们就安装成功了 python的Scripts文件用于存…...

Ubuntu 20.04 安装 QGC v4.3 开发环境

Ubuntu 20.04 安装 QGC开发环境 1. 准备安装 Qt 5.15.2安装依赖获取源码 2. 编译参考 前言 QGC ( QGroundControl) 是一个开源地面站&#xff0c;基于QT开发的&#xff0c;有跨平台的功能。可以在Windows&#xff0c;Android&#xff0c;MacOS或Linux上运行。它可以将PX4固件加…...

WPF+MVVM案例实战(二十一)- 制作一个侧边弹窗栏(AB类)

文章目录 1、案例效果1、侧边栏分类2、AB类侧边弹窗实现1.文件创建2、样式代码与功能代码实现3、功能代码实现 3 运行效果4、源代码获取 1、案例效果 1、侧边栏分类 A类 &#xff1a;左侧弹出侧边栏B类 &#xff1a;右侧弹出侧边栏C类 &#xff1a;顶部弹出侧边栏D类 &#xf…...

linux中怎样登录mysql数据库

在Linux中登录MySQL数据库&#xff0c;可以使用以下命令&#xff1a; mysql -u username -p 其中&#xff0c;username是你的MySQL用户名。运行该命令后&#xff0c;系统会提示你输入密码。 如果MySQL服务器不在本地主机或者你需要指定不同的端口&#xff0c;可以使用以下命…...

深入理解 Linux 内存管理:free 命令详解

在 Linux 系统中&#xff0c;内存是关键的资源之一&#xff0c;管理和监控内存的使用情况对系统的稳定性和性能至关重要。free 命令是 Linux 中用于查看内存使用情况的重要工具&#xff0c;它可以让我们快速了解系统中物理内存和交换分区&#xff08;Swap&#xff09;的使用状态…...

指针万字超级最强i解析与总结!!!!!

文章目录 1.内存和地址1.1内存1.2究竟该如何理解编址 2.指针变量和地址2.1 取地址操作符&#xff08;&&#xff09;2.2指针变量和解引用操作符&#xff08;*&#xff09;2.2.1指针变量2.2.2如何拆解指针类型2.2.3解引用操作符 2.3 指针变量的大小 3.指针变量类型的意义3.1指…...

告别生硬电子音,这款TTS软件让语音转换更自然动听

Balabolka是一款革新性的文本语音转换工具&#xff0c;为用户提供了极其灵活和个性化的阅读体验。这款软件不仅仅是简单的文字朗读器&#xff0c;更是一个智能的语音助手&#xff0c;能够将各类文本瞬间转化为生动自然的语音输出。 软件的核心优势在于其卓越的文件兼容性和多样…...

CORS(跨域资源共享)和SOP(同源策略)

CORS&#xff08;跨域资源共享&#xff09;和SOP&#xff08;同源策略&#xff09;不是同一个东西&#xff0c;但它们紧密相关&#xff0c;并且常常一起讨论&#xff0c;因为 CORS 是为了解决同源策略带来的跨域问题而引入的。 同源策略&#xff08;Same-Origin Policy&#x…...

【系统设计】数据库压缩技术详解:从基础到实践(附Redis内存优化实战案例)

概述 在现代数据库系统中&#xff0c;压缩技术对于提高存储效率和加速查询性能至关重要。特别是在处理大规模数据时&#xff0c;压缩能够极大地减少存储空间&#xff0c;并优化查询性能。本文将总结几种常见的压缩方式&#xff0c;并通过详细的解释和示例清晰地展示每种压缩方…...

从信号处理到AI:卷积的含参积分本质,如何帮你理解PyTorch中的Conv1d层?

从信号处理到AI&#xff1a;卷积的含参积分本质&#xff0c;如何帮你理解PyTorch中的Conv1d层&#xff1f; 在信号处理领域&#xff0c;卷积操作早已是工程师们耳熟能详的工具。但当我们踏入深度学习的殿堂&#xff0c;面对PyTorch中的nn.Conv1d层时&#xff0c;是否曾疑惑过&a…...

解放双手!用STAR-CCM+的3D-CAD模块快速清理与简化仿真几何(保姆级教程)

解放双手&#xff01;用STAR-CCM的3D-CAD模块快速清理与简化仿真几何&#xff08;保姆级教程&#xff09; 在CAE仿真领域&#xff0c;几何模型的质量往往直接决定仿真效率与结果可靠性。许多工程师都有过这样的经历&#xff1a;从设计部门拿到一个细节完美的CAD模型&#xff0c…...

电力系统时序一致性保障:elec-ops-prediction的长时序稳定性约束实现

电力系统时序一致性保障&#xff1a;elec-ops-prediction的长时序稳定性约束实现 【免费下载链接】elec-ops-prediction elec-ops-prediction 是 CANN 社区 Electrical Engineering SIG&#xff08;电力行业兴趣小组&#xff09;旗下的电力负荷预测算子库&#xff0c; 聚焦于电…...

hot100 11盛最多水的容器

题目描述 给定一个长度为 n 的整数数组 height 。有 n 条垂线&#xff0c;第 i 条线的两个端点是 (i, 0) 和 (i, height[i]) 。 找出其中的两条线&#xff0c;使得它们与 x 轴共同构成的容器可以容纳最多的水。 返回容器可以储存的最大水量。 说明&#xff1a;你不能倾斜容…...

Midjourney镜头类型选择终极决策树(附可下载PDF流程图):输入拍摄意图→自动匹配最优镜头词+推荐--stylize值+规避AI视觉歧义

更多请点击&#xff1a; https://kaifayun.com 第一章&#xff1a;Midjourney镜头类型选择终极决策树概览 在 Midjourney V6 中&#xff0c;镜头类型&#xff08;Lens Type&#xff09;并非独立参数&#xff0c;而是通过组合 --style raw、 --s 750 及语义化摄影术语提示词协…...

auditd:Linux 系统审计日志,记录谁动了你的服务器

auditd&#xff1a;Linux 系统审计日志&#xff0c;记录谁动了你的服务器 服务器被入侵后&#xff0c;管理员面临的第一个问题往往不是"怎么修复"&#xff0c;而是"到底发生了什么"——攻击者登录了哪个账号&#xff1f;修改了哪些文件&#xff1f;执行了什…...

聚合物半导体薄膜:柔性电子皮肤如何实现无感健康监测

1. 项目概述&#xff1a;从“硬核”到“柔韧”的健康监测革命如果你还在用那些又厚又硬、贴着皮肤半天就发痒发红的老式健康监测电极&#xff0c;是时候了解一下这个正在改变游戏规则的新玩意儿了——聚合物半导体薄膜。这可不是什么实验室里的遥远概念&#xff0c;它正从顶尖期…...

A型流感病毒广谱中和抗体与广谱通用疫苗研究进展

摘要流感作为全球性的公共卫生问题&#xff0c;对人类健康构成严重威胁。接种流感疫苗是预防和控制流感流行的关键手段&#xff0c;但当前通用流感疫苗的研究尚处于初级阶段。本文聚焦于A型流感病毒&#xff0c;综述了广谱中和抗体的研究进展以及其在广谱通用疫苗研发中的潜在应…...

深入解析Keil MDK编译流程:从C代码到单片机运行的完整过程

1. 项目概述&#xff1a;从源码到芯片运行的旅程作为一名在嵌入式领域摸爬滚打了十多年的老工程师&#xff0c;我经常被问到这样一个问题&#xff1a;“我写的C代码&#xff0c;点一下MDK的‘Build’按钮&#xff0c;怎么就变成能在单片机里跑的程序了&#xff1f;” 这背后&am…...

Linux内核平台设备深度盘点:从原理到实战的全面解析

1. 项目概述&#xff1a;一次对Linux内核“家底”的深度盘点在Linux内核开发的日常工作中&#xff0c;无论是为一块新的开发板适配驱动&#xff0c;还是排查一个诡异的硬件初始化问题&#xff0c;我们常常会面临一个基础却又关键的问题&#xff1a;当前系统里到底有哪些“平台设…...