redis — 基于Spring Boot实现redis延迟队列
1. 业务场景
延时队列场景在我们日常业务开发中经常遇到,它是一种特殊类型的消息队列,它允许把消息发送到队列中,但不立即投递给消费者,而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种:
- 在各种购物平台上下单,订单超过30分钟未支付,自动关闭。
- 订单完成后, 如果用户一直未评价, 5天后自动好评。
- 会员到期前15天, 到期前3天分别发送短信提醒。
- 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
- 如何定期检查处于退款状态的订单是否已经退款成功?
2. Redis延迟队列实现原理
目前延迟队列的类型主要实现有:
- 基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,或者定义时间轮,新消息落在指定位置;
- 基于队列的延迟: 设置不同延迟级别的队列,比如5s、1min、30mins、1h等,每个队列中消息的延迟时间都是相同的。
基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rabbitMQ可安装插件实现。第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,然假如我们对消息的实时性以及可靠性要求非常高,可能就需要使用MQ或kafka来实现了。
消息延迟流程图如下:

Redis延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来,整体思路为:
- 消息体设置有效期,设置好score,然后放入zset中
- 通过排名拉取消息
- 有效期到了,就把当前消息从zset中移除
zadd命令
使用方式:ZADD key score member [[score member][score member] …]
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作。如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。score 值可以是整数值或双精度浮点数。
ZRANGEBYSCORE命令
使用方式:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
- 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
- 具有相同 score 值的成员按字典序来排列
- 可选的 LIMIT 参数指定返回结果的数量及区间(就像SQL中的 SELECT LIMIT offset, count ),注意当 offset 很大时,定位 offset 的操作可能需要遍历整个有序集,此过程最坏复杂度为 O(N) 时间。
- 可选的 WITHSCORES 参数决定结果集是单单返回有序集的成员,还是将有序集成员及其 score 值一起返回。
ZREM命令
使用方式:ZREM key member [member …]
移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
当 key 存在但不是有序集类型时,返回一个错误。
3. 基于springboot实现redis延迟队列
3.1 引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>${version}</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${version}</version>
</dependency>
3.2 redis基础方法
定义RedisService基础服务方法,本次案例只涉及到以下三个基础方法:
/*** 添加 ZSet 元素** @param key* @param value* @param score*/@Overridepublic boolean add(String key, Object value, double score) {return redisTemplate.opsForZSet().add(key, value, score);}/*** 返回 分数范围内 指定 count 数量的元素集合, 并且从 offset 下标开始(从小到大,不带分数的集合)** @param key* @param min* @param max* @param offset 从指定下标开始* @param count 输出指定元素数量* @return*/@Overridepublic Set<Object> rangeByScore(String key, double min, double max, long offset, long count) {return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);}/*** Zset 删除一个或多个元素** @param key* @param values* @return*/@Overridepublic Long removeZset(String key, Object... values) {return redisTemplate.opsForZSet().remove(key, values);}
3.3 定义Spring消息事件推送
@Getter
@ToString
public class DelayMsg extends ApplicationEvent {private String msg;private String topic;public DelayMsg(Object source, String msg, String topic) {super(source);this.msg = msg;this.topic = topic;}
}
3.4 消息获取
定义redis获取延迟队列消息方法:
/*** 从zset中取出score小于当前时间戳的数据** @param key* @return*/
public String getDelayOne(String key) {//先查后删,一次拿3个做备选,这样抢占到的概率就会高一些Set<Object> sets = redisService.rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);if (CollectionUtils.isEmpty(sets)) {return null;}for (Object val : sets) {if (1L.equals(redisService.removeZset(key, val))) {// 删除成功,表示抢占到return val.toString();}}return null;
}
这里每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息
- 为什么这样设计? 这里有两个点,先解释第一个,为啥先查后删
如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有一个实例抢占到呢?
借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;
因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登。
- 接下来再看第二个,为啥一次拿三个
从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了。
上面定义了如何获取延迟队列中已到期的消息,接下来需要定时轮训获取消息:
/*** 每5s定时轮训消息*/
@Scheduled(fixedRate = 5000)
public void schedule() {for (String specialTopic : topic) {String msg = redisDelayQueue.getDelayOne(specialTopic);logger.info("开始轮训获取消息 {}", msg);if (StringUtil.isNotEmpty(msg)) {//使用Spring推送事件处理applicationContext.publishEvent(new DelayMsg(this, msg, specialTopic));}}
}
上面的定时任务,直接借助Spring的@Schedule来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener事件机制来实现消息处理的解耦
3.5 定义消费者注解和切面处理
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {String topic();
}
注意这个注解上面还有 @EventListener,表明它可以监听的spring的事件
3.6 定义延时业务的切面处理
@Aspect
@Component
public class ConsumerAspect {@Around("@annotation(consumer)")public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {Object[] args = joinPoint.getArgs();boolean check = false;for (Object obj : args) {if (obj instanceof DelayMsg) {check = consumer.topic().equals(((DelayMsg) obj).getTopic());}}if (!check) {// 不满足条件,直接忽略return null;}// topic匹配成功,执行return joinPoint.proceed();}
}
3.7 消息监听
//使用自定义的consumer注解监听topic延迟队列@Consumer(topic = RedisKeyConstant.DELAY_QUEUE)public void consumer(DelayMsg delayMsg) {logger.info("预约单延时确认: " + delayMsg.getMsg() + " at:" + System.currentTimeMillis());//延迟业务具体实现//...//...}
3.8 业务facade层调用延迟处理
经过以上的延迟队列封装处理,在facade层,也就是我们的业务中就可以直接调用:
@Autowired
private DelayListWrapper delayListWrapper;
...
delayListWrapper.publish(RedisKeyConstant.DELAY_QUEUE, xxxId, xxx);
4 总结
本文以redis的zset来实现延时队列,并基于SpringBoot实现了延迟队列的推送和消费。
相关文章:
redis — 基于Spring Boot实现redis延迟队列
1. 业务场景 延时队列场景在我们日常业务开发中经常遇到,它是一种特殊类型的消息队列,它允许把消息发送到队列中,但不立即投递给消费者,而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种: 在…...
【日常积累】Linux之init系统学习
init系统简介: Linux 操作系统的启动首先从 BIOS 开始,接下来进入 boot loader,由 bootloader 载入内核,进行内核初始化。内核初始化的最后一步就是启动 pid 为 1 的 init 进程,这个进程是系统的第一个进程,它负责产生…...
Python功能制作之3D方块
介绍 用python写一个黑窗口,窗口里面有一个白色的3D方块,左键按下后移动可以旋转以各个视角来看方块。 当然有需要的话,可以自己在代码中去更改颜色,直接通过RBG的参数进行更改即可。 做了两个函数:init[初始化]和d…...
【0基础入门Python笔记】二、python 之逻辑运算和制流程语句
二、python 之逻辑运算和制流程语句 逻辑运算控制流程语句条件语句(if语句)循环结构(for循环、while循环)continue、break和pass关键字控制流程语句的嵌套以及elif 逻辑运算 Python提供基本的逻辑运算:不仅包括布尔运…...
python中的svm:介绍和基本使用方法
python中的svm:介绍和基本使用方法 支持向量机(Support Vector Machine,简称SVM)是一种常用的分类算法,可以用于解决分类和回归问题。SVM通过构建一个超平面,将不同类别的数据分隔开,使得正负样…...
typedef
t y p e d e f typedef typedef 声明,简称typedef,是创建现有类型的新名字。 比如: #include <bits/stdc.h> using namespace std; typedef long long ll; int main() {ll n;scanf("%lld",&n);printf("%lld"…...
校园跑腿市场行情分析
随着社会的发展和人们生活节奏的加快,校园跑腿市场逐渐兴起并呈现出蓬勃发展的态势。在这个快节奏的时代,越来越多的学生需要在繁忙的学业之外完成各种任务,而校园跑腿服务正是应运而生,为他们提供了便利和时效。本文将从需求方面…...
微服务相关面试题
👏作者简介:大家好,我是爱写博客的嗯哼,爱好Java的小菜坤 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦 📝社区论坛:希望大家能加入社区共同进步…...
前端-ES6
let 和 const 为了解决var的作用域的问题,而且var 有变量提升,会出现全局污染的问题 let 块状作用域,并且不能重复声明const 一般用于声明常量,一旦被声明无法修改,但是const 可以声明一个对象,对象内部的…...
169. 多数元素(摩尔投票法) 题解
题目描述:169. 多数元素 - 力扣(LeetCode) 给定一个大小为 n 的数组 nums ,返回其中的多数元素。多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的,并且给定的数组总是存在多数元素。 示…...
python中的cnn:介绍和基本使用方法
python中的cnn:介绍和基本使用方法 卷积神经网络(Convolutional Neural Networks,简称CNN)是一种在图像识别、语音识别、自然语言处理等许多领域取得显著成功的深度学习模型。CNN的设计灵感来源于生物的视觉系统,由多…...
Dockerfile概念、镜像原理、制作及案例讲解
1.Docker镜像原理 Linux文件操作系统讲解 2.镜像如何制作 3.Dockerfile概念 Docker网址:https://hub.docker.com 3.1 Dockerfile关键字 4.案例...
07-微信小程序-注册页面-模块化
07-微信小程序-注册页面 文章目录 注册页面使用 Page 构造器注册页面参数Object初始数据案例代码 生命周期回调函数组件事件处理函数setData()案例代码 生命周期模块化 注册页面 对于小程序中的每个页面,都需要在页面对应的 js 文件中进行注册,指定页面…...
考研算法第46天: 字符串转换整数 【字符串,模拟】
题目前置知识 c中的string判空 string Count; Count.empty(); //正确 Count ! null; //错误c中最大最小宏 #include <limits.h>INT_MAX INT_MIN 字符串使用发运算将字符加到字符串末尾 string Count; string str "liuda"; Count str[i]; 题目概况 AC代码…...
Cesium for unity 1.5.0使用注意事项
Cesium for Unity Quickstart – Cesium 1.Unity版本仅支持Unity2021.3.2f1以后版 2.仅支持 3D (URP)和3D (HDRP)渲染管线 3.如果Package Manager中不出现My Registries选项,请在 Edit > Project Settings...>Package Manager中重命名或删除重新添加Packag…...
初阶C语言-结构体
🌞 “少年有梦不至于心动,更要付诸行动。” 今天我们一起学习一下结构体的相关内容! 结构体 🎈1.结构体的声明1.1结构的基础知识1.2结构的声明1.3结构成员的类型1.4结构体变量的定义和初始化 🎈2.结构体成员的访问2.1结…...
Android Studio实现解析HTML获取图片URL,将URL存到list,进行瀑布流展示
目录 效果展示build.gradle(app)添加的依赖(用不上的可以不加)AndroidManifest.xml错误代码activity_main.xmlitem_image.xmlMainActivityImage适配器ImageModel 接收图片URL效果展示 build.gradle(app)添加的依赖(用不上的可以不加) dependencies {implementation co…...
java学习004
常用数据结构对应 php中常用的数据结构是Array数组,相对的在java开发中常用的数据结构是ArrayList和HashMap,它们可以看成是array的拆分,一种简单的对应关系为 PHPJAVAArray: array(1,2,3)ArrayListlArray: array(“name” > “jack”,“…...
Linux网络编程:网络基础
文章目录: 1.协议 2.锁 3.网络层次模型 4.以太网帧和ARP协议 5.IP协议 6.UDP协议 7.TCP协议 8.BS模式和CS模式 9.网络套接字(socket) 10.网络字节序 11.IP地址转换函数 12.sockaddr地址结构 学习Linux的网络编程原则上基于:Linux的系统编程…...
3D沉浸式旅游网站开发案例复盘【Three.js】
Plongez dans Lyon网站终于上线了。 我们与 Danka 团队和 Nico Icecream 共同努力,打造了一个令我们特别自豪的流畅的沉浸式网站。 这个网站是专为 ONLYON Tourism 和会议而建,旨在展示里昂最具标志性的活动场所。观看简短的介绍视频后,用户…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...
