Redis延迟队列详解
以下是对 Redis 延迟队列的详细解释:
一、什么是 Redis 延迟队列
Redis 延迟队列是一种使用 Redis 实现的消息队列,其中的消息在被消费之前会等待一段时间,这段时间就是延迟时间。延迟队列常用于一些需要延迟处理的任务场景,例如订单超时未支付取消、定时提醒等。
二、实现原理
-
使用 ZSET(有序集合)存储消息:
- 在 Redis 中,可以使用 ZSET 存储延迟消息。ZSET 的成员是消息的唯一标识,分数(score)是消息的到期时间戳。这样,消息会根据到期时间戳自动排序。
- 例如,我们可以使用以下 Redis 命令添加一条延迟消息:
收起
redis
ZADD delay_queue <timestamp> <message_id>其中
<timestamp>是消息到期的时间戳,<message_id>是消息的唯一标识。 -
消费者轮询 ZSET:
- 消费者会不断轮询 ZSET,使用
ZRANGEBYSCORE命令查找分数小于或等于当前时间戳的元素。 - 例如:
redis
ZRANGEBYSCORE delay_queue 0 <current_timestamp>这里的
0表示最小分数,<current_timestamp>是当前时间戳,这个命令会返回所有到期的消息。 - 消费者会不断轮询 ZSET,使用
-
处理到期消息:
- 当消费者找到到期消息后,会将消息从 ZSET 中移除并进行处理。可以使用
ZREM命令移除消息:
redis
ZREM delay_queue <message_id>然后将消息发送到实际的消息处理程序中。
- 当消费者找到到期消息后,会将消息从 ZSET 中移除并进行处理。可以使用
三、Java 代码示例
以下是一个使用 Jedis(Redis 的 Java 客户端)实现 Redis 延迟队列的简单示例:
java
import redis.clients.jedis.Jedis;
import java.util.Set;public class RedisDelayQueue {private Jedis jedis;public RedisDelayQueue() {jedis = new Jedis("localhost", 6379);}// 生产者添加延迟消息public void addDelayMessage(String messageId, long delayMillis) {long score = System.currentTimeMillis() + delayMillis;jedis.zadd("delay_queue", score, messageId);}// 消费者轮询并处理消息public void consume() {while (true) {// 查找到期的消息Set<String> messages = jedis.zrangeByScore("delay_queue", 0, System.currentTimeMillis(), 0, 1);if (messages.isEmpty()) {try {// 没有消息,等待一段时间再轮询Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}continue;}String messageId = messages.iterator().next();// 移除消息Long removed = jedis.zrem("delay_queue", messageId);if (removed > 0) {// 消息成功移除,进行处理System.out.println("Processing message: " + messageId);// 在这里添加实际的消息处理逻辑}}}public static void main(String[] args) {RedisDelayQueue delayQueue = new RedisDelayQueue();// 生产者添加消息,延迟 5 秒delayQueue.addDelayMessage("message_1", 5000);// 启动消费者delayQueue.consume();}
}
代码解释:
RedisDelayQueue类封装了延迟队列的基本操作。addDelayMessage方法:- 计算消息的到期时间戳,将消息添加到
delay_queueZSET 中,使用jedis.zadd命令。
- 计算消息的到期时间戳,将消息添加到
consume方法:- 不断轮询
delay_queueZSET,使用jedis.zrangeByScore查找到期消息。 - 如果没有消息,线程休眠 100 毫秒后继续轮询。
- 若找到消息,使用
jedis.zrem移除消息,如果移除成功,说明该消息被此消费者处理,进行后续处理。
- 不断轮询
四、注意事项
-
并发处理:
- 多个消费者同时轮询 ZSET 时,可能会出现竞争条件,需要注意消息的重复处理问题。可以使用 Redis 的事务(
MULTI、EXEC)或 Lua 脚本保证原子性。 - 例如,可以使用 Lua 脚本将查找和移除操作合并为一个原子操作:
lua
local message = redis.call('ZRANGEBYSCORE', 'delay_queue', 0, ARGV[1], 'LIMIT', 0, 1) if #message > 0 thenif redis.call('ZREM', 'delay_queue', message[1]) == 1 thenreturn message[1]end end return nil然后在 Java 中调用这个脚本:
java
String script = "local message = redis.call('ZRANGEBYSCORE', 'delay_queue', 0, ARGV[1], 'LIMIT', 0, 1)\n" +"if #message > 0 then\n" +" if redis.call('ZREM', 'delay_queue', message[1]) == 1 then\n" +" return message[1]\n" +" end\n" +"end\n" +"return nil"; while (true) {String messageId = (String) jedis.eval(script, 0, String.valueOf(System.currentTimeMillis()));if (messageId!= null) {System.out.println("Processing message: " + messageId);// 在这里添加实际的消息处理逻辑} else {try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}} } - 多个消费者同时轮询 ZSET 时,可能会出现竞争条件,需要注意消息的重复处理问题。可以使用 Redis 的事务(
-
消息持久化:
- Redis 是内存数据库,需要考虑消息的持久化问题,确保在 Redis 重启后不会丢失重要消息。可以使用 Redis 的 RDB 或 AOF 持久化机制,但要注意性能和数据安全的平衡。
五、使用 Redis 模块
除了上述基本实现,还可以使用 Redis 的一些第三方模块,如 Redis 的 Redisson 库,它提供了更高级的延迟队列实现,使用更加方便和可靠:
java
import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;public class RedissonDelayQueueExample {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redisson = Redisson.create(config);RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("myQueue");RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);// 生产者添加延迟消息delayedQueue.offer("message_1", 5, TimeUnit.SECONDS);// 消费者new Thread(() -> {while (true) {try {String message = blockingQueue.take();System.out.println("Processing message: " + message);// 在这里添加实际的消息处理逻辑} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}).start();}
}
代码解释:
Redisson是一个功能强大的 Redis 客户端库。RBlockingQueue是阻塞队列,RDelayedQueue是延迟队列。- 使用
delayedQueue.offer("message_1", 5, TimeUnit.SECONDS)添加延迟消息。 - 消费者通过
blockingQueue.take()阻塞等待消息,当消息到期时,会自动从延迟队列转移到阻塞队列并被消费者接收。
通过上述几种方法,可以使用 Redis 实现延迟队列,满足不同场景下的延迟任务处理需求。根据具体情况,可以选择简单的 ZSET 实现或使用更高级的第三方库,同时要注意并发处理和消息持久化等问题,以确保延迟队列的稳定性和可靠性。
总之,Redis 延迟队列是一种高效且灵活的实现延迟任务的方式,在分布式系统中具有广泛的应用,利用 Redis 的特性可以轻松处理延迟消息,减少系统的复杂性和开发成本。
相关文章:
Redis延迟队列详解
以下是对 Redis 延迟队列的详细解释: 一、什么是 Redis 延迟队列 Redis 延迟队列是一种使用 Redis 实现的消息队列,其中的消息在被消费之前会等待一段时间,这段时间就是延迟时间。延迟队列常用于一些需要延迟处理的任务场景,例如订…...
一文大白话讲清楚webpack基本使用——2——css相关loader的配置和使用
一文大白话讲清楚webpack基本使用——2——css相关loader的配置和使用 1. 建议按文章顺序从头看是看 第一篇:一文大白话讲清楚啥是个webpack第二篇:一文大白话讲清楚webpack基本使用——1——完成webpack的初步构建然后看本篇,Loader的配置…...
第二代增强-采购申请屏幕增强
这篇文章可能有点乱,但是都是学习的一个过程,问题就是在ME52N修改采购申请增强字段之后,点击保存之后无法更新到数据库,困扰了很久,终于解决了,特此记录 文章目录 需求创建增强结构测试屏幕ZXM02TOP创建屏幕…...
图论DFS:黑红树
我的个人主页 {\large \mathsf{{\color{Red} 我的个人主页} } } 我的个人主页 往 {\color{Red} {\Huge 往} } 往 期 {\color{Green} {\Huge 期} } 期 文 {\color{Blue} {\Huge 文} } 文 章 {\color{Orange} {\Huge 章}} 章 DFS 算法:记忆化搜索DFS 算法…...
零基础一篇打通Vue极速通关教程
文章目录 写给零基础看的Vue极速掌握教程第1章 Vue简介1.1 Vue 概述1.2 MVVM 模式1.3 WebStorm开发工具1.3.1 WebStorm简介1.3.2 集成Vue开发调试工具 第2章 Vue的事件绑定2.1 Vue基本使用2.1.1 插值表达式2.1.2 注意事项 2.2 Vue事件绑定2.1.1 点击事件2.2.2 键盘事件2.2.3 移…...
商城系统中的常见 BUG
以下是商城系统中一些常见的 BUG: 功能与操作类 支付问题:如无法成功完成支付,支付过程中出现延迟、错误或订单重复支付等,还可能因网络问题导致支付失败或数据不一致。 登录 / 注册问题:用户在注册或登录时可能遇到…...
下定决心不去读研了。。。
大家好,我是苍何。 之前发表过一篇文章,表达了自己读研的困惑和纠结,得到了大家很多的建议,也引起了很多人的共鸣,在留言区分享了自己的故事,看着这些故事,我觉得都够苍何写一部小说了。 可惜苍…...
100个网络基础知识
1)什么是链接? 链接是指两个设备之间的连接。它包括用于一个设备能够与另一个设备通信的电缆类型和协议。 2)OSI 参考模型的层次是什么? 有 7 个 OSI 层:物理层,数据链路层,网络层,传输层,会话层,表示…...
庄小焱——2024年博文总结与展望
摘要 大家好,我是庄小焱。岁末回首,2024 年是我在个人成长、博客创作以及生活平衡方面收获颇丰的一年。这一年的经历如同璀璨星辰,照亮了我前行的道路,也为未来的发展奠定了坚实基础。 1. 个人成长与突破 在 2024 年,…...
高通8255 Android STR 启动失败要因分析调查
目录 背景: 调查过程: 步骤1: slog2info | grep vmm_service 步骤2: slog2info | grep qvm 总结: 解决方案 背景: 调试高通8255 STR的STR过程中发现Android和QNX进入STR状态后,脱出STR时…...
Qt QML专栏目录结构
第1章 走进Qt Quick的世界... 4 ★1.4 Qt Quick应用... 4 ★1.5 Qt Quick UI项目(qmlproject工程) 4 第2章 QML语法... 4 ★2.2 import导入语句... 4 ★2.3 QML类型系统... 5 ★2.4 对象特性(Attributes)... 6 三个等于号JavaScript语…...
“深入浅出”系列之FFmpeg:(3)音视频开发的学习路线和必备知识
一、岗位要求 音视频开发属于我自己想要学习的板块,我想知道公司招聘音视频开发工程师所需要的条件,于是我就从招聘网站上找来了几个有关音视频开发的岗位需求,内容仅供参考: (1)算法工程师-视频编解码 …...
Webpack简述
一、为什么要构建工具 人类喜欢书写的代码以及开发方式计算机不喜欢,构建工具的作用就是让人类舒舒服服写自己喜欢的代码,然后一打包生成计算机喜欢的代码 第一个webpack自身仅仅是将我们引入的模块打包成一个文件(编译import)&am…...
解决 Error: Invalid or corrupt jarfile day04_studentManager.jar 报错问题
在 Java 开发过程中,我们可能会遇到这样的报错信息:Error: Invalid or corrupt jarfile day04_studentManager.jar。这个错误通常表示 day04_studentManager.jar 文件可能已损坏或无效,下面将为大家详细介绍如何解决这个问题。 一、错误点分…...
ACL基础理论
ACL ——访问控制列表 ACL属于策略的一种 ACL访问控制列表的作用: 访问控制:在路由器流量流入或流出的接口上,匹配流量,然后执行设定好的动作:permit(允许)、deny(拒绝ÿ…...
庄周梦蝶1
和尚大概的意思如下:人的每一个梦境都是一个世界,这些世界统称三千世界。每一个世界当中所谓时间的跨度不同,发展程度不同,但是里面都有一个你。这些世界是同时存在的,所以不存在未来过去和现在,因为你就存…...
使用SIPP发起媒体流性能测试详解
使用SIPP发起媒体流性能测试详解 一、SIPP工具简介二、测试前的准备三、编写测试脚本四、运行测试五、分析测试结果六、总结SIPP(SIP Performance Protocol)是一个开源工具,专门用于SIP(Session Initiation Protocol)协议的性能测试和基准测试。SIP是一种用于控制多媒体通…...
瑞利衰落信道机理的详解
瑞利衰落信道(Rayleigh fading channel)是一种无线电信号传播环境的统计模型,用于描述信号在无线信道中的传播特性。这种模型假设信号通过无线信道后,其信号幅度是随机的,即“衰落”,并且其包络服从瑞利分布…...
PyTorch使用教程(2)-torch包
1、简介 torch包是PyTorch框架最外层的包,主要是包含了张量的创建和基本操作、随机数生成器、序列化、局部梯度操作的上下文管理器等等,内容很多。我们基础学习的时候,只有关注张量的创建、序列化,随机数、张量的数学数学计算等常…...
Bash语言的函数实现
Bash语言的函数实现 Bash(Bourne Again SHell)是一种流行的命令行解释器,用于Unix和类Unix操作系统。它不仅支持命令行操作,还能通过脚本语言进行编程。函数是Bash脚本编程中的一个重要概念,可以帮助我们组织代码、提…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
Objective-C常用命名规范总结
【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名(Class Name)2.协议名(Protocol Name)3.方法名(Method Name)4.属性名(Property Name)5.局部变量/实例变量(Local / Instance Variables&…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
Spring AI与Spring Modulith核心技术解析
Spring AI核心架构解析 Spring AI(https://spring.io/projects/spring-ai)作为Spring生态中的AI集成框架,其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似,但特别为多语…...
