当前位置: 首页 > news >正文

Redis延迟队列详解

以下是对 Redis 延迟队列的详细解释:

一、什么是 Redis 延迟队列

Redis 延迟队列是一种使用 Redis 实现的消息队列,其中的消息在被消费之前会等待一段时间,这段时间就是延迟时间。延迟队列常用于一些需要延迟处理的任务场景,例如订单超时未支付取消、定时提醒等。

二、实现原理

  1. 使用 ZSET(有序集合)存储消息

    • 在 Redis 中,可以使用 ZSET 存储延迟消息。ZSET 的成员是消息的唯一标识,分数(score)是消息的到期时间戳。这样,消息会根据到期时间戳自动排序。
    • 例如,我们可以使用以下 Redis 命令添加一条延迟消息:
     

    收起

    redis

    ZADD delay_queue <timestamp> <message_id>
    
     

    其中 <timestamp> 是消息到期的时间戳,<message_id> 是消息的唯一标识。

  2. 消费者轮询 ZSET

    • 消费者会不断轮询 ZSET,使用 ZRANGEBYSCORE 命令查找分数小于或等于当前时间戳的元素。
    • 例如:
     

    redis

    ZRANGEBYSCORE delay_queue 0 <current_timestamp>
    
     

    这里的 0 表示最小分数,<current_timestamp> 是当前时间戳,这个命令会返回所有到期的消息。

  3. 处理到期消息

    • 当消费者找到到期消息后,会将消息从 ZSET 中移除并进行处理。可以使用 ZREM 命令移除消息:

    redis

    ZREM delay_queue <message_id>
    
     

    然后将消息发送到实际的消息处理程序中。

三、Java 代码示例

以下是一个使用 Jedis(Redis 的 Java 客户端)实现 Redis 延迟队列的简单示例:

java

import redis.clients.jedis.Jedis;
import java.util.Set;public class RedisDelayQueue {private Jedis jedis;public RedisDelayQueue() {jedis = new Jedis("localhost", 6379);}// 生产者添加延迟消息public void addDelayMessage(String messageId, long delayMillis) {long score = System.currentTimeMillis() + delayMillis;jedis.zadd("delay_queue", score, messageId);}// 消费者轮询并处理消息public void consume() {while (true) {// 查找到期的消息Set<String> messages = jedis.zrangeByScore("delay_queue", 0, System.currentTimeMillis(), 0, 1);if (messages.isEmpty()) {try {// 没有消息,等待一段时间再轮询Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}continue;}String messageId = messages.iterator().next();// 移除消息Long removed = jedis.zrem("delay_queue", messageId);if (removed > 0) {// 消息成功移除,进行处理System.out.println("Processing message: " + messageId);// 在这里添加实际的消息处理逻辑}}}public static void main(String[] args) {RedisDelayQueue delayQueue = new RedisDelayQueue();// 生产者添加消息,延迟 5 秒delayQueue.addDelayMessage("message_1", 5000);// 启动消费者delayQueue.consume();}
}

代码解释

  • RedisDelayQueue 类封装了延迟队列的基本操作。
  • addDelayMessage 方法:
    • 计算消息的到期时间戳,将消息添加到 delay_queue ZSET 中,使用 jedis.zadd 命令。
  • consume 方法:
    • 不断轮询 delay_queue ZSET,使用 jedis.zrangeByScore 查找到期消息。
    • 如果没有消息,线程休眠 100 毫秒后继续轮询。
    • 若找到消息,使用 jedis.zrem 移除消息,如果移除成功,说明该消息被此消费者处理,进行后续处理。

四、注意事项

  1. 并发处理

    • 多个消费者同时轮询 ZSET 时,可能会出现竞争条件,需要注意消息的重复处理问题。可以使用 Redis 的事务(MULTIEXEC)或 Lua 脚本保证原子性。
    • 例如,可以使用 Lua 脚本将查找和移除操作合并为一个原子操作:

    lua

    local message = redis.call('ZRANGEBYSCORE', 'delay_queue', 0, ARGV[1], 'LIMIT', 0, 1)
    if #message > 0 thenif redis.call('ZREM', 'delay_queue', message[1]) == 1 thenreturn message[1]end
    end
    return nil
    
     

    然后在 Java 中调用这个脚本:

    java

    String script = "local message = redis.call('ZRANGEBYSCORE', 'delay_queue', 0, ARGV[1], 'LIMIT', 0, 1)\n" +"if #message > 0 then\n" +"    if redis.call('ZREM', 'delay_queue', message[1]) == 1 then\n" +"        return message[1]\n" +"    end\n" +"end\n" +"return nil";
    while (true) {String messageId = (String) jedis.eval(script, 0, String.valueOf(System.currentTimeMillis()));if (messageId!= null) {System.out.println("Processing message: " + messageId);// 在这里添加实际的消息处理逻辑} else {try {Thread.sleep(100);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
    }
    
  2. 消息持久化

    • Redis 是内存数据库,需要考虑消息的持久化问题,确保在 Redis 重启后不会丢失重要消息。可以使用 Redis 的 RDB 或 AOF 持久化机制,但要注意性能和数据安全的平衡。

五、使用 Redis 模块

除了上述基本实现,还可以使用 Redis 的一些第三方模块,如 Redis 的 Redisson 库,它提供了更高级的延迟队列实现,使用更加方便和可靠:

java

import org.redisson.Redisson;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;public class RedissonDelayQueueExample {public static void main(String[] args) {Config config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redisson = Redisson.create(config);RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("myQueue");RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);// 生产者添加延迟消息delayedQueue.offer("message_1", 5, TimeUnit.SECONDS);// 消费者new Thread(() -> {while (true) {try {String message = blockingQueue.take();System.out.println("Processing message: " + message);// 在这里添加实际的消息处理逻辑} catch (InterruptedException e) {Thread.currentThread().interrupt();}}}).start();}
}

代码解释

  • Redisson 是一个功能强大的 Redis 客户端库。
  • RBlockingQueue 是阻塞队列,RDelayedQueue 是延迟队列。
  • 使用 delayedQueue.offer("message_1", 5, TimeUnit.SECONDS) 添加延迟消息。
  • 消费者通过 blockingQueue.take() 阻塞等待消息,当消息到期时,会自动从延迟队列转移到阻塞队列并被消费者接收。

通过上述几种方法,可以使用 Redis 实现延迟队列,满足不同场景下的延迟任务处理需求。根据具体情况,可以选择简单的 ZSET 实现或使用更高级的第三方库,同时要注意并发处理和消息持久化等问题,以确保延迟队列的稳定性和可靠性。

总之,Redis 延迟队列是一种高效且灵活的实现延迟任务的方式,在分布式系统中具有广泛的应用,利用 Redis 的特性可以轻松处理延迟消息,减少系统的复杂性和开发成本。

相关文章:

Redis延迟队列详解

以下是对 Redis 延迟队列的详细解释&#xff1a; 一、什么是 Redis 延迟队列 Redis 延迟队列是一种使用 Redis 实现的消息队列&#xff0c;其中的消息在被消费之前会等待一段时间&#xff0c;这段时间就是延迟时间。延迟队列常用于一些需要延迟处理的任务场景&#xff0c;例如订…...

一文大白话讲清楚webpack基本使用——2——css相关loader的配置和使用

一文大白话讲清楚webpack基本使用——2——css相关loader的配置和使用 1. 建议按文章顺序从头看是看 第一篇&#xff1a;一文大白话讲清楚啥是个webpack第二篇&#xff1a;一文大白话讲清楚webpack基本使用——1——完成webpack的初步构建然后看本篇&#xff0c;Loader的配置…...

第二代增强-采购申请屏幕增强

这篇文章可能有点乱&#xff0c;但是都是学习的一个过程&#xff0c;问题就是在ME52N修改采购申请增强字段之后&#xff0c;点击保存之后无法更新到数据库&#xff0c;困扰了很久&#xff0c;终于解决了&#xff0c;特此记录 文章目录 需求创建增强结构测试屏幕ZXM02TOP创建屏幕…...

图论DFS:黑红树

我的个人主页 {\large \mathsf{{\color{Red} 我的个人主页} } } 我的个人主页 往 {\color{Red} {\Huge 往} } 往 期 {\color{Green} {\Huge 期} } 期 文 {\color{Blue} {\Huge 文} } 文 章 {\color{Orange} {\Huge 章}} 章 DFS 算法&#xff1a;记忆化搜索DFS 算法&#xf…...

零基础一篇打通Vue极速通关教程

文章目录 写给零基础看的Vue极速掌握教程第1章 Vue简介1.1 Vue 概述1.2 MVVM 模式1.3 WebStorm开发工具1.3.1 WebStorm简介1.3.2 集成Vue开发调试工具 第2章 Vue的事件绑定2.1 Vue基本使用2.1.1 插值表达式2.1.2 注意事项 2.2 Vue事件绑定2.1.1 点击事件2.2.2 键盘事件2.2.3 移…...

商城系统中的常见 BUG

以下是商城系统中一些常见的 BUG&#xff1a; 功能与操作类 支付问题&#xff1a;如无法成功完成支付&#xff0c;支付过程中出现延迟、错误或订单重复支付等&#xff0c;还可能因网络问题导致支付失败或数据不一致。 登录 / 注册问题&#xff1a;用户在注册或登录时可能遇到…...

下定决心不去读研了。。。

大家好&#xff0c;我是苍何。 之前发表过一篇文章&#xff0c;表达了自己读研的困惑和纠结&#xff0c;得到了大家很多的建议&#xff0c;也引起了很多人的共鸣&#xff0c;在留言区分享了自己的故事&#xff0c;看着这些故事&#xff0c;我觉得都够苍何写一部小说了。 可惜苍…...

100个网络基础知识

1)什么是链接? 链接是指两个设备之间的连接。它包括用于一个设备能够与另一个设备通信的电缆类型和协议。 2)OSI 参考模型的层次是什么? 有 7 个 OSI 层&#xff1a;物理层&#xff0c;数据链路层&#xff0c;网络层&#xff0c;传输层&#xff0c;会话层&#xff0c;表示…...

庄小焱——2024年博文总结与展望

摘要 大家好&#xff0c;我是庄小焱。岁末回首&#xff0c;2024 年是我在个人成长、博客创作以及生活平衡方面收获颇丰的一年。这一年的经历如同璀璨星辰&#xff0c;照亮了我前行的道路&#xff0c;也为未来的发展奠定了坚实基础。 1. 个人成长与突破 在 2024 年&#xff0c…...

高通8255 Android STR 启动失败要因分析调查

目录 背景&#xff1a; 调查过程&#xff1a; 步骤1&#xff1a; slog2info | grep vmm_service 步骤2&#xff1a; slog2info | grep qvm 总结&#xff1a; 解决方案 背景&#xff1a; 调试高通8255 STR的STR过程中发现Android和QNX进入STR状态后&#xff0c;脱出STR时…...

Qt QML专栏目录结构

第1章 走进Qt Quick的世界... 4 ★1.4 Qt Quick应用... 4 ★1.5 Qt Quick UI项目(qmlproject工程) 4 第2章 QML语法... 4 ★2.2 import导入语句... 4 ★2.3 QML类型系统... 5 ★2.4 对象特性&#xff08;Attributes&#xff09;... 6 三个等于号JavaScript语…...

“深入浅出”系列之FFmpeg:(3)音视频开发的学习路线和必备知识

一、岗位要求 音视频开发属于我自己想要学习的板块&#xff0c;我想知道公司招聘音视频开发工程师所需要的条件&#xff0c;于是我就从招聘网站上找来了几个有关音视频开发的岗位需求&#xff0c;内容仅供参考&#xff1a; &#xff08;1&#xff09;算法工程师-视频编解码 …...

Webpack简述

一、为什么要构建工具 人类喜欢书写的代码以及开发方式计算机不喜欢&#xff0c;构建工具的作用就是让人类舒舒服服写自己喜欢的代码&#xff0c;然后一打包生成计算机喜欢的代码 第一个webpack自身仅仅是将我们引入的模块打包成一个文件&#xff08;编译import&#xff09;&am…...

解决 Error: Invalid or corrupt jarfile day04_studentManager.jar 报错问题

在 Java 开发过程中&#xff0c;我们可能会遇到这样的报错信息&#xff1a;Error: Invalid or corrupt jarfile day04_studentManager.jar。这个错误通常表示 day04_studentManager.jar 文件可能已损坏或无效&#xff0c;下面将为大家详细介绍如何解决这个问题。 一、错误点分…...

ACL基础理论

ACL ——访问控制列表 ACL属于策略的一种 ACL访问控制列表的作用&#xff1a; 访问控制&#xff1a;在路由器流量流入或流出的接口上&#xff0c;匹配流量&#xff0c;然后执行设定好的动作&#xff1a;permit&#xff08;允许&#xff09;、deny&#xff08;拒绝&#xff…...

庄周梦蝶1

和尚大概的意思如下&#xff1a;人的每一个梦境都是一个世界&#xff0c;这些世界统称三千世界。每一个世界当中所谓时间的跨度不同&#xff0c;发展程度不同&#xff0c;但是里面都有一个你。这些世界是同时存在的&#xff0c;所以不存在未来过去和现在&#xff0c;因为你就存…...

使用SIPP发起媒体流性能测试详解

使用SIPP发起媒体流性能测试详解 一、SIPP工具简介二、测试前的准备三、编写测试脚本四、运行测试五、分析测试结果六、总结SIPP(SIP Performance Protocol)是一个开源工具,专门用于SIP(Session Initiation Protocol)协议的性能测试和基准测试。SIP是一种用于控制多媒体通…...

瑞利衰落信道机理的详解

瑞利衰落信道&#xff08;Rayleigh fading channel&#xff09;是一种无线电信号传播环境的统计模型&#xff0c;用于描述信号在无线信道中的传播特性。这种模型假设信号通过无线信道后&#xff0c;其信号幅度是随机的&#xff0c;即“衰落”&#xff0c;并且其包络服从瑞利分布…...

PyTorch使用教程(2)-torch包

1、简介 torch包是PyTorch框架最外层的包&#xff0c;主要是包含了张量的创建和基本操作、随机数生成器、序列化、局部梯度操作的上下文管理器等等&#xff0c;内容很多。我们基础学习的时候&#xff0c;只有关注张量的创建、序列化&#xff0c;随机数、张量的数学数学计算等常…...

Bash语言的函数实现

Bash语言的函数实现 Bash&#xff08;Bourne Again SHell&#xff09;是一种流行的命令行解释器&#xff0c;用于Unix和类Unix操作系统。它不仅支持命令行操作&#xff0c;还能通过脚本语言进行编程。函数是Bash脚本编程中的一个重要概念&#xff0c;可以帮助我们组织代码、提…...

Vim 调用外部命令学习笔记

Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

(二)原型模式

原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

VTK如何让部分单位不可见

最近遇到一个需求&#xff0c;需要让一个vtkDataSet中的部分单元不可见&#xff0c;查阅了一些资料大概有以下几种方式 1.通过颜色映射表来进行&#xff0c;是最正规的做法 vtkNew<vtkLookupTable> lut; //值为0不显示&#xff0c;主要是最后一个参数&#xff0c;透明度…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

图表类系列各种样式PPT模版分享

图标图表系列PPT模版&#xff0c;柱状图PPT模版&#xff0c;线状图PPT模版&#xff0c;折线图PPT模版&#xff0c;饼状图PPT模版&#xff0c;雷达图PPT模版&#xff0c;树状图PPT模版 图表类系列各种样式PPT模版分享&#xff1a;图表系列PPT模板https://pan.quark.cn/s/20d40aa…...

企业如何增强终端安全?

在数字化转型加速的今天&#xff0c;企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机&#xff0c;到工厂里的物联网设备、智能传感器&#xff0c;这些终端构成了企业与外部世界连接的 “神经末梢”。然而&#xff0c;随着远程办公的常态化和设备接入的爆炸式…...