当前位置: 首页 > news >正文

Redis实现延迟队列

最近用到一个延迟消息的功能,第一时间想到使用MQ或者MQ的插件,因为数据量不大,所以尝试使用Redis来实现了,毕竟Redis也天生支持类似MQ的队列消费,所以,在这里总结了一下Redis实现延迟消息队列的方式。

一、监听key过期时间

处理流程:当redis的一个key过期时,redis会生成一个事件,通知订阅了该事件的客户端(KeyExpirationEventMessageListener),然后在客户端的回调方法中处理逻辑。
1)新建SpringBoot项目,maven依赖及yml如下
maven依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

yml文件

server:port: 8000spring:redis:database: 0host: xxxxport: 6379password: xxxxxxlettuce:pool:#最大连接数max-active: 8#最大阻塞等待时间max-wait: -1#最大空闲max-idle: 8#最小空闲min-idle: 0#连接超时时间timeout: 5000

2)修改redis.conf文件开启事件通知配置
默认的配置:notify-keyspace-events “”
修改为:notify-keyspace-events Ex,该配置表示监听key的过期事件

3)设置Redis监听配置,注入Bean RedisMessageListenerContaine

@Configuration
public class RedisTimeoutConfiguration {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);return redisMessageListenerContainer;}@Beanpublic KeyExpiredListener keyExpiredListener() {return new KeyExpiredListener(this.redisMessageListenerContainer());}
}

4)创建监听器类,重写key过期回调方法onMessage

@Slf4j
public class KeyExpiredListener extends KeyExpirationEventMessageListener {@Autowiredpublic RedisTemplate<String, String> redisTemplate;public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {super(listenerContainer);}@Overridepublic void onMessage(Message message, byte[] bytes) {String channel = new String(message.getChannel(), StandardCharsets.UTF_8);//过期的keyString key = new String(message.getBody(), StandardCharsets.UTF_8);log.info("redis key 过期:bytes={},channel={},key={}", new String(bytes), channel, key);}
}

5)编写测试接口:写入一个带过期时间的key

@RestController
@RequestMapping("/demo")
public class BasicController {@Autowiredpublic RedisTemplate<String, String> redisTemplate;@GetMapping(value = "/test")public void redisTest() {redisTemplate.opsForValue().set("test", "5s后过期", 5, TimeUnit.SECONDS);}
}

执行后,onMessage监听方法打印结果:

 redis key 过期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test

该方案缺点:可靠性问题,Redis 是一个内存数据库,尽管它提供了数据持久化选项(如 RDB 和 AOF),但在某些情况下(如意外崩溃或重启),可能会丢失一些未处理的过期事件。

二、zset + score

基本思路是将消息按需发送的时间作为分数存储在有序集合zset中,然后定期检查并处理到期的消息。代码例子如下:
1)创建 DelayedMessageService 类

@Slf4j
@Service
public class DelayedMessageService {private static final String DELAYED_MESSAGES_ZSET = "delayed:messages";@Autowiredprivate RedisTemplate<String, String> redisTemplate;public void addMessage(String message, long delayMillis) {long score = System.currentTimeMillis() + delayMillis;redisTemplate.opsForZSet().add(DELAYED_MESSAGES_ZSET, message, score);}@Scheduled(fixedRate = 1000)public void processMessages() {long now = System.currentTimeMillis();Set<ZSetOperations.TypedTuple<String>> messages = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAYED_MESSAGES_ZSET, 0, now);if (messages != null && !messages.isEmpty()) {for (ZSetOperations.TypedTuple<String> message : messages) {String msg = message.getValue();long score = message.getScore().longValue();if (score <= now) {// Process the messageSystem.out.println("Processing message: " + msg);// Remove the message from the zsetredisTemplate.opsForZSet().remove(DELAYED_MESSAGES_ZSET, msg);}}}else{log.info("定时任务执行~");}}}

2)编写Controller接口测试,初始化zset内容

@RestController
@RequestMapping("/demo")
public class BasicController {@Autowiredprivate DelayedMessageService delayedMessageService;@GetMapping(value = "/test2")public void redisZsetTest() {// Add some messages with delaysdelayedMessageService.addMessage("Message 1", 5000); // 5 seconds delaydelayedMessageService.addMessage("Message 2", 10000); // 10 seconds delaydelayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay}
}

说明:

  • redisZsetTest接口通过调用DelayedMessageServiceaddMessage方法,将消息及其到期时间添加到 Redis 的 zset 中
  • 开启一个定时任务,定期检查和处理到期的消息。使用 @Scheduled 注解定期执行,每秒检查一次,注意这里使用@Scheduled,不要忘了启动类上添加@EnableScheduling注解,否则定时任务不会生效。fixedRate 属性表示以固定的频率(毫秒为单位)执行方法。即方法执行完成后,会立即等待指定的毫秒数,然后再次执行。
  • 通过 redisTemplate.opsForZSet().rangeByScoreWithScores 方法按时间范围获取到期的消息,消息处理完成后,从zset 中移除处理过的消息

三、Redisson框架

利用 Redisson 提供的数据结构RDelayedQueueRBlockingDeque,可以自动处理过期的任务并将它们移动到阻塞队列中,这样我们就可以从阻塞队列中获取任务并进行消费处理。例子如下:
1)添加依赖

<!-- Redisson 依赖项 -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>2.15.1</version>
</dependency>

2)创建DelayedMessageService

@Slf4j
@Service
public class DelayedMessageService {@Autowiredprivate RedissonClient redissonClient;private RBlockingDeque<String> blockingDeque;private RDelayedQueue<String> delayedQueue;@PostConstructpublic void init() {this.blockingDeque = redissonClient.getBlockingDeque("delayedQueue");this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque);Executors.newSingleThreadExecutor().submit(this::processMessages);}public void addMessage(String message, long delayMillis) {delayedQueue.offer(message, delayMillis, TimeUnit.MILLISECONDS);}public void processMessages() {try {while (true) {String message = blockingDeque.take();// Process the messagelog.info("消息被处理: " + message);// ..业务逻辑处理}} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("中断异常",e);}}}

3)测试接口

@GetMapping(value = "/test3")public void redisQueueTest() {// Add some messages with delaysdelayedMessageService.addMessage("Message 1", 5000); // 5 seconds delaydelayedMessageService.addMessage("Message 2", 10000); // 10 seconds delaydelayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay}

说明:

  1. RDelayedQueue 是 Redisson 提供的延迟队列,它将消息存储在指定的队列中,直到消息到期才会被转移到该队列。它的主要作用包括:
    • 延迟消息管理:我们可以使用 RDelayedQueueoffer 方法将消息添加到延迟队列,并指定延迟时间,消息在延迟时间到期前一直保留在 RDelayedQueue 中。
    • 消息转移:一旦消息到期,RDelayedQueue 会自动将消息转移到指定的RBlockingDeque 中。
  2. RBlockingQueue是 Redisson 提供的阻塞队列,它支持阻塞操作。主要作用包括:
    • 阻塞操作:支持阻塞的 take 操作,如果队列中没有元素,会一直阻塞直到有元素可供消费。

总结
个人推荐使用Redisson 的RDelayedQueue 方式,感觉更加可靠和简单一些,当然zset+score也可以是个不错选择,毕竟更加灵活,延迟消息还有其他不同的方案,比如rocketmq、rabbitmq插件等,假如项目中用了redis,又不想引入更多的中间件,可以尝试使用redis来实现,为了测试,这里例子都比较简单,在实际使用过程中,还要考虑补偿机制、幂等性等问题。

参考:
1.https://blog.csdn.net/qq_34826261/article/details/120598731

2.https://github.com/redisson/redisson/wiki/7.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88#715-%E5%BB%B6%E8%BF%9F%E9%98%9F%E5%88%97delayed-queue

相关文章:

Redis实现延迟队列

最近用到一个延迟消息的功能&#xff0c;第一时间想到使用MQ或者MQ的插件&#xff0c;因为数据量不大&#xff0c;所以尝试使用Redis来实现了&#xff0c;毕竟Redis也天生支持类似MQ的队列消费&#xff0c;所以&#xff0c;在这里总结了一下Redis实现延迟消息队列的方式。 一、…...

如何准确查找论文数据库?

在学术研究过程中&#xff0c;查找相关论文是获取最新研究成果、支持自己研究的重要途径。准确查找论文数据库不仅可以节省时间&#xff0c;还能确保找到高质量的学术资源。本文将介绍一些有效的方法和策略&#xff0c;帮助您准确查找论文数据库。 1. 选择合适的数据库 不同的…...

翻译《The Old New Thing》- What a drag: Dragging a virtual file (IStream edition)

What a drag: Dragging a virtual file (IStream edition) - The Old New Thing (microsoft.com)https://devblogs.microsoft.com/oldnewthing/20080319-00/?p23073 Raymond Chen 2008年03月19日 拖拽虚拟文件&#xff08;IStream 版本&#xff09; 上一次&#xff0c;我们看…...

【FPGA】Verilog语言从零到精通

接触fpga一段时间&#xff0c;也能写点跑点吧……试试系统地康康呢~这个需要耐心但是回报巨大的工作。正原子&&小梅哥 15_语法篇&#xff1a;Verilog高级知识点_哔哩哔哩_bilibili 1Verilog基础 Verilog程序框架&#xff1a;模块的结构 类比&#xff1a;c语言的基础…...

unity打包的WebGL部署到IIS问题

部署之后会出错&#xff0c;我遇到的有以下几种&#xff1b; 进度条卡住不动 明明已经部署到了IIS上&#xff0c;为什么浏览网页的时候还是过不去或者直接报错。 进度条卡住不动的问题其实就是wasm和data的错误。 此时在浏览器上按F12进入开发者模式查看错误&#xff08;下图…...

GPT-4o:人工智能的新里程碑

GPT-4o&#xff0c;作为OpenAI最新推出的人工智能技术&#xff0c;无疑在人工智能领域掀起了新一轮的浪潮。这款新型的语言模型不仅继承了GPT系列的核心优势&#xff0c;更在多个方面实现了突破性的进展。以下&#xff0c;我们将从版本间的对比分析、GPT-4o的技术能力以及个人整…...

发现一个ai工具网站

网址 https://17yongai.com/ 大概看了下&#xff0c;这个网站收集的数据还挺有用的&#xff0c;有很多实用的ai教程。 懂ai工具的可以在这上面找找灵感。...

第二十五章新增H5基础(以及视频~兼容)

1.HTML5中新增布局标签 HTML5新增了页眉&#xff0c;页脚&#xff0c;内容块等文档结构相关标签&#xff0c;可以使文档结构更加清晰明了。 1.新增的结构标签 1、<header>标签 定义文档或者文档中内容块的页眉。通常可以包含整个页面或一个内容区域的标题&#xff0c…...

[英语单词] production quality

Our goal is to implement a production quality switch platform that supports standard management interfaces and opens the forwarding functions to programmatic extension and control. 说在openswitch的文档里有说这两词&#xff0c;含义是产品质量。是production修…...

windows安装nodeJs,以及常用操作

1. 官网(Node.js — Run JavaScript Everywhere (nodejs.org))下载想要安装的node版本 的安装包完成安装 2.环境变量设置&#xff1a; 系统变量&#xff1a; Path新增&#xff1a;D:\Program Files\nodejs (node安装目录) 3.设置淘宝源&#xff1a; npm config set registr…...

MySql part1 安装和介绍

MySql part1 安装和介绍 数据 介绍 什么是数据库&#xff0c;数据很好理解&#xff0c;一般来说数据通常是我们所认识的 描述事物的符号记录&#xff0c; 可以是数字、 文字、图形、图像、声音、语言等&#xff0c;数据有多种形式&#xff0c;它们都以经过数字化后存入计算机…...

SpringBoot打war包并配置外部Tomcat运行

简介 由于其他原因&#xff0c;我们需要使用SpringBoot打成war包放在外部的Tomcat中运行,本文就以一个案例来说明从SpringBoot打war包到Tomcat配置并运行的全流程经过 环境 SpringBoot 2.6.15 Tomcat 8.5.100 JDK 1.8.0_281 Windows 正文 一、SpringBoot配置打war包 第一步&a…...

2024.5.31每日一题

LeetCode 找出缺失的重复数字 题目链接&#xff1a;2965. 找出缺失和重复的数字 - 力扣&#xff08;LeetCode&#xff09; 题目描述 给你一个下标从 0 开始的二维整数矩阵 grid&#xff0c;大小为 n * n &#xff0c;其中的值在 [1, n2] 范围内。除了 a 出现 两次&#xff…...

Oracle 数据库 varchar2 从 4000 扩展到 32k

Oracle 数据库 varchar2 从 4000 扩展到 32k 0. 引言1. 扩展 varchar2 支持长度2. 测试 0. 引言 今天来个项目需求&#xff0c;有1个字段的存储内容大概1万字。 当然其中1个方法是将这个字段的内容切分成几个字段&#xff0c;还有1个方法就是将 varchar2 默认支持 4000 的能力…...

postgressql——事务提交会通过delayChkpt阻塞checkpoint(9)

事务提交会通过delayChkpt阻塞checkpoint Postgresql事务在事务提交时&#xff08;执行commit的最后阶段&#xff09;会通过加锁阻塞checkpoint的执行&#xff0c;尽管时间非常短&#xff0c;分析为什么需要这样做&#xff1a; 首先看提交堆栈 #1 0x0000000000539175 in Co…...

开发者工具-sources(源代码选项)

一、概要说明 源代码面板从视觉效果上分为三个区域&#xff1a;菜单区、内容区、监听区。 菜单区里面有5个子分类&#xff1a; 网页(Page)&#xff1a;指页面源&#xff0c;包含了该页面中所有的文件&#xff0c;即使多个域名下的文件也都会展示出来&#xff0c;包括iframe…...

没有 rr 头的 kamailio 路由脚本

分享下笔者最近编写的 kamailio 路由脚本 不用 rr 模块&#xff0c;因为有些 sip 协议栈不支持 rr 头处理 sip 注册直接回 200 OK&#xff0c;这部分目前不是重点更换 contact 头&#xff0c;换成 kamailio 自己目前只支持 sip transport 为 udp&#xff0c;以后可能支持 tcp&…...

mysql 分区

目标 给一个表&#xff08;半年有800万&#xff09;增加分区以增加查询速度 约束 分区不能有外键否则会报错 https://blog.csdn.net/yabingshi_tech/article/details/52241034 主键 按照时间列进行分区 https://blog.csdn.net/winerpro/article/details/135736454 参看以…...

在龙芯安装docker compose

安装过程报错&#xff1a;pynacl无法安装 原因&#xff1a;未知 解决尝试&#xff1a;单独安装pynacl 执行&#xff1a;pip install pynacl 报错&#xff1a; 再次执行dockerscompose撒谎啥 少了头文件 dev&#xff0c;表示c编译器有问题 python是c编写的 喵的 搞了半天是我…...

纯C++做多项式拟合

一、多项式拟合用途 当前有一组对应的x、y数据&#xff0c;希望通过这些数据点做出近似的多项式曲线&#xff1a;YnX^2mXc 其中多项式最高次数可调&#xff0c;返回各个参数及曲线的拟合度R^2 二、函数实现 参数中的order为设置的多项式最高次次数&#xff0c;coefficients为…...

web vue 项目 Docker化部署

Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段&#xff1a; 构建阶段&#xff08;Build Stage&#xff09;&#xff1a…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

IGP(Interior Gateway Protocol,内部网关协议)

IGP&#xff08;Interior Gateway Protocol&#xff0c;内部网关协议&#xff09; 是一种用于在一个自治系统&#xff08;AS&#xff09;内部传递路由信息的路由协议&#xff0c;主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...