当前位置: 首页 > news >正文

redis — 基于Spring Boot实现redis延迟队列

1. 业务场景

延时队列场景在我们日常业务开发中经常遇到,它是一种特殊类型的消息队列,它允许把消息发送到队列中,但不立即投递给消费者,而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种:

  • 在各种购物平台上下单,订单超过30分钟未支付,自动关闭。
  • 订单完成后, 如果用户一直未评价, 5天后自动好评。
  • 会员到期前15天, 到期前3天分别发送短信提醒。
  • 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
  • 如何定期检查处于退款状态的订单是否已经退款成功?

2. Redis延迟队列实现原理

目前延迟队列的类型主要实现有:

  • 基于消息的延迟:指为每条消息设置不同的延迟时间,那么每当队列中有新消息进入的时候就会重新根据延迟时间排序,或者定义时间轮,新消息落在指定位置;
  • 基于队列的延迟: 设置不同延迟级别的队列,比如5s、1min、30mins、1h等,每个队列中消息的延迟时间都是相同的。

基于第一种不少组件都有实现方案,比如redis的sortset间接实现,kafka内部时间轮,rabbitMQ可安装插件实现。第一种实时性高,不过主观看会比较依赖组件本身,但自己实现就得考虑持久化、高可用等问题,建议直接使用组件本身;第二种方案可以基于组件去实现,通用性会高点,不过实时性不高,更适合用于重试业务场景。当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,然假如我们对消息的实时性以及可靠性要求非常高,可能就需要使用MQ或kafka来实现了。

消息延迟流程图如下:
在这里插入图片描述
Redis延迟队列可以通过 zset 来实现,因为 zset 中有一个 score,我们可以把时间作为 score,将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来,整体思路为:

  1. 消息体设置有效期,设置好score,然后放入zset中
  2. 通过排名拉取消息
  3. 有效期到了,就把当前消息从zset中移除

zadd命令

使用方式:ZADD key score member [[score member][score member] …]
将一个或多个 member 元素及其 score 值加入到有序集 key 当中。如果 key 不存在,则创建一个空的有序集并执行 ZADD 操作。如果某个 member 已经是有序集的成员,那么更新这个 member 的 score 值,并通过重新插入这个 member 元素,来保证该 member 在正确的位置上。score 值可以是整数值或双精度浮点数。

ZRANGEBYSCORE命令

使用方式:ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]

  1. 返回有序集 key 中,所有 score 值介于 min 和 max 之间(包括等于 min 或 max )的成员。有序集成员按 score 值递增(从小到大)次序排列。
  2. 具有相同 score 值的成员按字典序来排列
  3. 可选的 LIMIT 参数指定返回结果的数量及区间(就像SQL中的 SELECT LIMIT offset, count ),注意当 offset 很大时,定位 offset 的操作可能需要遍历整个有序集,此过程最坏复杂度为 O(N) 时间。
  4. 可选的 WITHSCORES 参数决定结果集是单单返回有序集的成员,还是将有序集成员及其 score 值一起返回。

ZREM命令

使用方式:ZREM key member [member …]
移除有序集 key 中的一个或多个成员,不存在的成员将被忽略。
当 key 存在但不是有序集类型时,返回一个错误。

3. 基于springboot实现redis延迟队列

3.1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>${version}</version>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${version}</version>
</dependency>

3.2 redis基础方法

定义RedisService基础服务方法,本次案例只涉及到以下三个基础方法:

    /*** 添加 ZSet 元素** @param key* @param value* @param score*/@Overridepublic boolean add(String key, Object value, double score) {return redisTemplate.opsForZSet().add(key, value, score);}/*** 返回 分数范围内 指定 count 数量的元素集合, 并且从 offset 下标开始(从小到大,不带分数的集合)** @param key* @param min* @param max* @param offset 从指定下标开始* @param count  输出指定元素数量* @return*/@Overridepublic Set<Object> rangeByScore(String key, double min, double max, long offset, long count) {return redisTemplate.opsForZSet().rangeByScore(key, min, max, offset, count);}/*** Zset 删除一个或多个元素** @param key* @param values* @return*/@Overridepublic Long removeZset(String key, Object... values) {return redisTemplate.opsForZSet().remove(key, values);}

3.3 定义Spring消息事件推送

@Getter
@ToString
public class DelayMsg extends ApplicationEvent {private String msg;private String topic;public DelayMsg(Object source, String msg, String topic) {super(source);this.msg = msg;this.topic = topic;}
}

3.4 消息获取

定义redis获取延迟队列消息方法:

/*** 从zset中取出score小于当前时间戳的数据** @param key* @return*/
public String getDelayOne(String key) {//先查后删,一次拿3个做备选,这样抢占到的概率就会高一些Set<Object> sets = redisService.rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);if (CollectionUtils.isEmpty(sets)) {return null;}for (Object val : sets) {if (1L.equals(redisService.removeZset(key, val))) {// 删除成功,表示抢占到return val.toString();}}return null;
}

这里每次查询时取了三个数据,然后遍历获取到的数据,依次尝试去删除,若删除成功,则表示当前实例抢占到了这个消息

  1. 为什么这样设计? 这里有两个点,先解释第一个,为啥先查后删

如果我们按照正常的实现流程,每次从zset中取一个,但是无法保证这个时候就只有我一个人拿到了这个数据,在多实例的场景下,可能存在多个实例同时拿到了它,那么如何才能表示只有一个实例抢占到呢?

借助redis的单线程机制,只可能有一个实例会删除成功,所以拿到并删除成功的那个小伙伴,就是最终的幸运儿;

因此实现细节就是先查,后删,若删除成功,表示获取成功;否则表示被其他的实例捷足先登。

  1. 接下来再看第二个,为啥一次拿三个

从上面的分析可以看出,如果我一次只拿一个,那么我抢占到的几率并不太大,特别是当实例比较多时,可能会做多次的无效操作;为了减少这个可能性,所以我一次多拿几个做备选,这样抢占到的概率就会高一些,至于为什么是3,这个就看实际的实例与定时任务的执行间隔了。

上面定义了如何获取延迟队列中已到期的消息,接下来需要定时轮训获取消息:

/*** 每5s定时轮训消息*/
@Scheduled(fixedRate = 5000)
public void schedule() {for (String specialTopic : topic) {String msg = redisDelayQueue.getDelayOne(specialTopic);logger.info("开始轮训获取消息 {}", msg);if (StringUtil.isNotEmpty(msg)) {//使用Spring推送事件处理applicationContext.publishEvent(new DelayMsg(this, msg, specialTopic));}}
}

上面的定时任务,直接借助Spring的@Schedule来实现,遍历所有的topic,捞出数据之后,通过spring的 event/listener事件机制来实现消息处理的解耦

3.5 定义消费者注解和切面处理

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener
public @interface Consumer {String topic();
}

注意这个注解上面还有 @EventListener,表明它可以监听的spring的事件

3.6 定义延时业务的切面处理

@Aspect
@Component
public class ConsumerAspect {@Around("@annotation(consumer)")public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {Object[] args = joinPoint.getArgs();boolean check = false;for (Object obj : args) {if (obj instanceof DelayMsg) {check = consumer.topic().equals(((DelayMsg) obj).getTopic());}}if (!check) {// 不满足条件,直接忽略return null;}// topic匹配成功,执行return joinPoint.proceed();}
}

3.7 消息监听

	//使用自定义的consumer注解监听topic延迟队列@Consumer(topic = RedisKeyConstant.DELAY_QUEUE)public void consumer(DelayMsg delayMsg) {logger.info("预约单延时确认: " + delayMsg.getMsg() + " at:" + System.currentTimeMillis());//延迟业务具体实现//...//...}

3.8 业务facade层调用延迟处理

经过以上的延迟队列封装处理,在facade层,也就是我们的业务中就可以直接调用:

@Autowired
private DelayListWrapper delayListWrapper;
...
delayListWrapper.publish(RedisKeyConstant.DELAY_QUEUE, xxxId, xxx);

4 总结

本文以redis的zset来实现延时队列,并基于SpringBoot实现了延迟队列的推送和消费。

相关文章:

redis — 基于Spring Boot实现redis延迟队列

1. 业务场景 延时队列场景在我们日常业务开发中经常遇到&#xff0c;它是一种特殊类型的消息队列&#xff0c;它允许把消息发送到队列中&#xff0c;但不立即投递给消费者&#xff0c;而是在一定时间后再将消息投递给消费者。延迟队列的常见使用场景有以下几种&#xff1a; 在…...

【日常积累】Linux之init系统学习

init系统简介: Linux 操作系统的启动首先从 BIOS 开始&#xff0c;接下来进入 boot loader&#xff0c;由 bootloader 载入内核&#xff0c;进行内核初始化。内核初始化的最后一步就是启动 pid 为 1 的 init 进程&#xff0c;这个进程是系统的第一个进程&#xff0c;它负责产生…...

Python功能制作之3D方块

介绍 用python写一个黑窗口&#xff0c;窗口里面有一个白色的3D方块&#xff0c;左键按下后移动可以旋转以各个视角来看方块。 当然有需要的话&#xff0c;可以自己在代码中去更改颜色&#xff0c;直接通过RBG的参数进行更改即可。 做了两个函数&#xff1a;init[初始化]和d…...

【0基础入门Python笔记】二、python 之逻辑运算和制流程语句

二、python 之逻辑运算和制流程语句 逻辑运算控制流程语句条件语句&#xff08;if语句&#xff09;循环结构&#xff08;for循环、while循环&#xff09;continue、break和pass关键字控制流程语句的嵌套以及elif 逻辑运算 Python提供基本的逻辑运算&#xff1a;不仅包括布尔运…...

python中的svm:介绍和基本使用方法

python中的svm&#xff1a;介绍和基本使用方法 支持向量机&#xff08;Support Vector Machine&#xff0c;简称SVM&#xff09;是一种常用的分类算法&#xff0c;可以用于解决分类和回归问题。SVM通过构建一个超平面&#xff0c;将不同类别的数据分隔开&#xff0c;使得正负样…...

typedef

t y p e d e f typedef typedef 声明&#xff0c;简称typedef&#xff0c;是创建现有类型的新名字。 比如&#xff1a; #include <bits/stdc.h> using namespace std; typedef long long ll; int main() {ll n;scanf("%lld",&n);printf("%lld"…...

校园跑腿市场行情分析

随着社会的发展和人们生活节奏的加快&#xff0c;校园跑腿市场逐渐兴起并呈现出蓬勃发展的态势。在这个快节奏的时代&#xff0c;越来越多的学生需要在繁忙的学业之外完成各种任务&#xff0c;而校园跑腿服务正是应运而生&#xff0c;为他们提供了便利和时效。本文将从需求方面…...

微服务相关面试题

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱写博客的嗯哼&#xff0c;爱好Java的小菜坤 &#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44d;三连支持&#x1f44d;一下博主哦 &#x1f4dd;社区论坛&#xff1a;希望大家能加入社区共同进步…...

前端-ES6

let 和 const 为了解决var的作用域的问题&#xff0c;而且var 有变量提升&#xff0c;会出现全局污染的问题 let 块状作用域&#xff0c;并且不能重复声明const 一般用于声明常量&#xff0c;一旦被声明无法修改&#xff0c;但是const 可以声明一个对象&#xff0c;对象内部的…...

169. 多数元素(摩尔投票法) 题解

题目描述&#xff1a;169. 多数元素 - 力扣&#xff08;LeetCode&#xff09; 给定一个大小为 n 的数组 nums &#xff0c;返回其中的多数元素。多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示…...

python中的cnn:介绍和基本使用方法

python中的cnn&#xff1a;介绍和基本使用方法 卷积神经网络&#xff08;Convolutional Neural Networks&#xff0c;简称CNN&#xff09;是一种在图像识别、语音识别、自然语言处理等许多领域取得显著成功的深度学习模型。CNN的设计灵感来源于生物的视觉系统&#xff0c;由多…...

Dockerfile概念、镜像原理、制作及案例讲解

1.Docker镜像原理 Linux文件操作系统讲解 2.镜像如何制作 3.Dockerfile概念 Docker网址&#xff1a;https://hub.docker.com 3.1 Dockerfile关键字 4.案例...

07-微信小程序-注册页面-模块化

07-微信小程序-注册页面 文章目录 注册页面使用 Page 构造器注册页面参数Object初始数据案例代码 生命周期回调函数组件事件处理函数setData()案例代码 生命周期模块化 注册页面 对于小程序中的每个页面&#xff0c;都需要在页面对应的 js 文件中进行注册&#xff0c;指定页面…...

考研算法第46天: 字符串转换整数 【字符串,模拟】

题目前置知识 c中的string判空 string Count; Count.empty(); //正确 Count ! null; //错误c中最大最小宏 #include <limits.h>INT_MAX INT_MIN 字符串使用发运算将字符加到字符串末尾 string Count; string str "liuda"; Count str[i]; 题目概况 AC代码…...

Cesium for unity 1.5.0使用注意事项

Cesium for Unity Quickstart – Cesium 1.Unity版本仅支持Unity2021.3.2f1以后版 2.仅支持 3D (URP)和3D (HDRP)渲染管线 3.如果Package Manager中不出现My Registries选项&#xff0c;请在 Edit > Project Settings...>Package Manager中重命名或删除重新添加Packag…...

初阶C语言-结构体

&#x1f31e; “少年有梦不至于心动&#xff0c;更要付诸行动。” 今天我们一起学习一下结构体的相关内容&#xff01; 结构体 &#x1f388;1.结构体的声明1.1结构的基础知识1.2结构的声明1.3结构成员的类型1.4结构体变量的定义和初始化 &#x1f388;2.结构体成员的访问2.1结…...

Android Studio实现解析HTML获取图片URL,将URL存到list,进行瀑布流展示

目录 效果展示build.gradle(app)添加的依赖(用不上的可以不加)AndroidManifest.xml错误代码activity_main.xmlitem_image.xmlMainActivityImage适配器ImageModel 接收图片URL效果展示 build.gradle(app)添加的依赖(用不上的可以不加) dependencies {implementation co…...

java学习004

常用数据结构对应 php中常用的数据结构是Array数组&#xff0c;相对的在java开发中常用的数据结构是ArrayList和HashMap&#xff0c;它们可以看成是array的拆分&#xff0c;一种简单的对应关系为 PHPJAVAArray: array(1,2,3)ArrayListlArray: array(“name” > “jack”,“…...

Linux网络编程:网络基础

文章目录&#xff1a; 1.协议 2.锁 3.网络层次模型 4.以太网帧和ARP协议 5.IP协议 6.UDP协议 7.TCP协议 8.BS模式和CS模式 9.网络套接字(socket) 10.网络字节序 11.IP地址转换函数 12.sockaddr地址结构 学习Linux的网络编程原则上基于&#xff1a;Linux的系统编程…...

3D沉浸式旅游网站开发案例复盘【Three.js】

Plongez dans Lyon网站终于上线了。 我们与 Danka 团队和 Nico Icecream 共同努力&#xff0c;打造了一个令我们特别自豪的流畅的沉浸式网站。 这个网站是专为 ONLYON Tourism 和会议而建&#xff0c;旨在展示里昂最具标志性的活动场所。观看简短的介绍视频后&#xff0c;用户…...

IO的几个模型

I/O模型名词介绍 说到I/O模型&#xff0c;都会牵扯到同步、异步、阻塞、非阻塞这几个词&#xff0c;以下讲解这几个词的概念。 阻塞和非阻塞 阻塞和非阻塞指的是一直等还是可以去做其他事。 阻塞&#xff08;blocking&#xff09;&#xff1a;调用结果返回之前&#xff0c;…...

中路对线发现正在攻防演练中投毒的红队大佬

背景 2023年8月14日晚&#xff0c;墨菲安全实验室发布《首起针对国内金融企业的开源组件投毒攻击事件》NPM投毒事件分析文章&#xff0c;紧接着我们在8月17日监控到一个新的npm投毒组件包 hreport-preview&#xff0c;该投毒组件用来下载木马文件的域名地址竟然是 img.murphys…...

【LINUX相关】生成随机数(srand、/dev/random 和 /dev/urandom )

目录 一、问题背景二、修改方法2.1 修改种子2.2 使用linux中的 /dev/urandom 生成随机数 三、/dev/random 和 /dev/urandom 的原理3.1 参考连接3.2 重难点总结3.2.1 生成随机数的原理3.2.2 随机数生成器的结构3.2.3 二者的区别和选择 四、在代码的使用方法 一、问题背景 在一个…...

spark使用心得

spark入门 启停spark sbin/start-all.shsbin/stop-all.shspark-shell 进入spark/bin目录&#xff0c;执行&#xff1a; ./spark-shell 输出中有这么一行&#xff1a; Spark context Web UI available at http://xx.xx.xx.188:4040意味着我们可以从web页面查看spark的运行情…...

什么是边车

名词和概念定义 Sidecar&#xff1a;边车。微服务中数据平面的进程&#xff0c;负责转发应用、服务请求&#xff0c;并支持限流、熔断、负载均衡等特性。 Control-plane: 控制平面。微服务的配置中心&#xff0c;负责配置下发、数据搜集、服务发现等功能。 应用: 应用是指服务…...

vue项目打包成exe文件

1. 获取electron-quick-start demo git clone https://github.com/electron/electron-quick-start2. 安装依赖包 npm install 或 npm i // 安装依赖时可能会遇到node版本的问题&#xff0c;需要切换node版本的可以先看下nvm&#xff0c;简单易操作3. 打包项目&#xff08;需要…...

基于MFCC特征提取和GMM训练的语音信号识别matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1 MFCC特征提取 4.2 Gaussian Mixture Model&#xff08;GMM&#xff09; 4.3. 实现过程 4.4 应用领域 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3…...

client-go实战之十二:选主(leader-election)

欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)&#xff1a;https://github.com/zq2599/blog_demos 本篇概览 本文是《client-go实战》系列的第十二篇&#xff0c;又有一个精彩的知识点在本章呈现&#xff1a;选主(leader-election)在解释什么是选主之前&…...

2023年即将推出的CSS特性对你影响大不大?

Google开发者大会每年都会提出有关于 Web UI 和 CSS 方面的新特性&#xff0c;今年又上新了许多新功能&#xff0c;今天就从中找出了影响最大的几个功能给大家介绍一下 :has :has() 可以通过检查父元素是否包含特定子元素或这些子元素是否处于特定状态来改变样式&#xff0c;也…...

opencv实战项目-停车位计数

手势识别系列文章目录 手势识别是一种人机交互技术&#xff0c;通过识别人的手势动作&#xff0c;从而实现对计算机、智能手机、智能电视等设备的操作和控制。 1. opencv实现手部追踪&#xff08;定位手部关键点&#xff09; 2.opencv实战项目 实现手势跟踪并返回位置信息&a…...