当前位置: 首页 > news >正文

通过Redisson构建延时队列并实现注解式消费

目录

  • 一、序言
  • 二、延迟队列实现
    • 1、Redisson延时消息监听注解和消息体
    • 2、Redisson延时消息发布器
    • 3、Redisson延时消息监听处理器
  • 三、测试用例
  • 四、结语

一、序言

两个月前接了一个4万的私活,做一个线上商城小程序,在交易过程中不可避免的一个问题就是用户下单后的订单自动取消。

目前成熟的方案有通过RabbitMQ+死信队列RabbitMQ+延迟消息插件RocketMQ定时消息推送Redisson延时队列来实现。

考虑到商城的定位和用户体量,以及系统维护成本,其实完全没有必要引入消息中间件,借助Redis其实就可以轻松实现这个需求。

加上Redisson客户端本身就已经实现了很多分布式集合工具类,借助阻塞队列和延时队列就可轻松搞定。

当然,为了使用方便以及团队协作,顺便模仿@RabbitListener封装了一套基于注解的消息消费,废话不多说,直接上代码。


二、延迟队列实现

1、Redisson延时消息监听注解和消息体

延迟消息监听器定义:

/*** Redisson延时队列监听器** @author Nick Liu* @date 2024/11/13*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonDelayedQueueListener {/*** 队列名称* @return*/String queueName();
}

消息体定义:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息内容*/private String msg;/*** 队列名称*/private String queueName;/*** 延时时间*/private long delayTime;private TimeUnit timeUnit;
}

2、Redisson延时消息发布器

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 发布延时信息* @param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info("开始发布延迟消息: {}", FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}

这里我们借助RBlockingQueueRDelayedQueue来实现,只有当延迟消息快到期时,消费者才能从阻塞队列拉取到消息,否则消费者将一直阻塞。

3、Redisson延时消息监听处理器

这里我们定义了一个BeanPostProcessor 的实现,目的就是为了扫描Spring容器中所有带RedissonDelayedQueueListener注解的Bean实例和方法。

/*** Redisson延迟队列Bean后处理器* @author Nick Liu* @date 2025/1/3*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 获取最终的目标运行时对象Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);Method[] methods = clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson队列监听注解,则启动一个线程监听队列RedissonDelayedQueueListener annotation = m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() -> {log.info("开始监听Redisson延时队列[{}]消息", annotation.queueName());while (true) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg = blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info("监听到队列[{}]延时消息: {}", annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}

这里我们扫描到指定Bean的方法后,会开启一个异步线程,并轮询拉取延时消息,如果消息没过期,异步线程将会一直阻塞等待。


三、测试用例

/*** @author Nick Liu* @date 2025/2/2*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE = "redisson:delayed:queue";private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;@GetMapping("/delayed/msg")public ResponseEntity<RedisDelayedMsgDTO> publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO = new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg("This is a delayed msg");redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}@RedissonDelayedQueueListener(queueName = DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info("Received delayed msg: {}", msg);}
}

启动服务后,Bean后处理器会启动异步线程监听延时消息,如下:

2025-02-02 16:46:04.271 INFO  [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 开始监听Redisson延时队列[redisson:delayed:queue]消息

浏览器直接输入http://localhost:8000/delayed/msg发布延时消息,10s后消费者进行处理,如下:

2025-02-02 16:43:11.107 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 监听到队列[redisson:delayed:queue]延时消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg

四、结语

虽说通过Redisson实现的延迟队列也能实现支付订单的自动取消,但是可用性相比专业的消息中间件还是尚有不足的。

比如消息生产者发送消息没有确认机制,消息消费也没有确认机制,这两个环节都有可能导致消息丢失。

当然我们可以通过其它保障机制去补偿,比如再加上定时任务扫表,把扫描时间可以设置长一点,保证最终的一致性。

在大型项目中还是优先推荐专业的消息中间件去实现延时消息消费。

相关文章:

通过Redisson构建延时队列并实现注解式消费

目录 一、序言二、延迟队列实现1、Redisson延时消息监听注解和消息体2、Redisson延时消息发布器3、Redisson延时消息监听处理器 三、测试用例四、结语 一、序言 两个月前接了一个4万的私活&#xff0c;做一个线上商城小程序&#xff0c;在交易过程中不可避免的一个问题就是用户…...

SQL Server配置管理器无法连接到 WMI 提供程序

目录 第一步第二部 第一步 发现没有资源管理器 ​​​​ 在文件夹找到管理器 打开发现报这个错误 配置管理器无法连接到 WMI 提供程序第二部 https://blog.csdn.net/thb369208315/article/details/126954074...

Linux内核源码:ext4 extent详解

在 Linux 系统的庞大体系中&#xff0c;文件系统就像是一个井然有序的图书馆&#xff0c;而 ext4 文件系统则是这座图书馆中极为重要的 “藏书室”&#xff0c;它负责高效管理和存储数据。在 ext4 众多的奥秘中&#xff0c;ext4 extent 犹如一颗璀璨的明珠&#xff0c;起着关键…...

Maven jar 包下载失败问题处理

Maven jar 包下载失败问题处理 1.配置好国内的Maven源2.重新下载3. 其他问题 1.配置好国内的Maven源 打开⾃⼰的 Idea 检测 Maven 的配置是否正确&#xff0c;正确的配置如下图所示&#xff1a; 检查项⼀共有两个&#xff1a; 确认右边的两个勾已经选中&#xff0c;如果没有请…...

自指学习:AGI的元认知突破

文章目录 引言:从模式识别到认知革命一、自指学习的理论框架1.1 自指系统的数学定义1.2 认知架构的三重反射1.3 与传统元学习的本质区别二、元认知突破的技术路径2.1 自指神经网络架构2.2 认知效能评价体系2.3 知识表示的革命三、实现突破的关键挑战3.1 认知闭环的稳定性3.2 计…...

排序算法--希尔排序

希尔排序是插入排序的改进版本&#xff0c;适合中等规模数据排序&#xff0c;性能优于简单插入排序。 // 希尔排序函数 void shellSort(int arr[], int n) {// 初始间隔&#xff08;gap&#xff09;为数组长度的一半&#xff0c;逐步缩小for (int gap n / 2; gap > 0; gap …...

Java 2024年面试总结(持续更新)

目录 最近趁着金三银四面了五六家公司吧&#xff0c;也整理了一些问题供大家参考一下&#xff08;适合经验三年左右的&#xff09;。 面试问题&#xff08;答案是我自己总结的&#xff0c;不一定正确&#xff09;&#xff1a; 总结&#xff1a; 最近趁着金三银四面了五六家公…...

TensorFlow是个啥玩意?

TensorFlow是一个开源的机器学习框架&#xff0c;由Google开发。它可以帮助开发者构建和训练各种机器学习模型&#xff0c;包括神经网络和深度学习模型。TensorFlow的设计理念是使用数据流图来表示计算过程&#xff0c;其中节点表示数学运算&#xff0c;边表示数据流动。 Tens…...

不可信的搜索路径(CWE-426)

漏洞描述&#xff1a;程序使用关键资源时&#xff08;如动态链接库、执行文件、配置文件等&#xff09;没有明确的指定资源的路径&#xff0c;而是依赖操作系统去搜索资源&#xff0c;这种行为可能被攻击者利用&#xff0c;通过在搜索优先级较高的目录放置不良资源&#xff0c;…...

Linux——基础命令

$&#xff1a;普通用户 #&#xff1a;超级用户 cd 切换目录 cd 目录 &#xff08;进入目录&#xff09; cd ../ &#xff08;返回上一级目录&#xff09; cd ~ &#xff08;切换到当前用户的家目录&#xff09; cd - &#xff08;返回上次目录&#xff09; pwd 输出当前目…...

利用TensorFlow.js实现浏览器端机器学习:一个全面指南

引言 随着深度学习技术的不断发展&#xff0c;机器学习已从传统的服务器端运算逐渐转向了前端技术。TensorFlow.js 是 Google 推出的一个用于在浏览器中进行机器学习的开源库&#xff0c;它允许开发者在浏览器中直接运行机器学习模型&#xff0c;而无需依赖后端服务器。Tensor…...

利用HTML和css技术编写学校官网页面

目录 一&#xff0c;图例展示 二&#xff0c;代码说明 1&#xff0c;html部分&#xff1a; 【第一张图片】 【第二张图片】 【第三张图片】 2&#xff0c;css部分&#xff1a; 【第一张图片】 【第二张图片】 【第三张图片】 三&#xff0c;程序代码 一&#xff0c;…...

SpringSecurity密码编码器:使用BCrypt算法加密、自定义密码编码器

1、Spring Security 密码编码器 Spring Security 作为一个功能完备的安全性框架,一方面提供用于完成加密操作的 PasswordEncoder 组件,另一方面提供一个可以在应用程序中独立使用的密码模块。 1.1 PasswordEncoder 抽象接口 在 Spring Security 中,PasswordEncoder 接口代…...

笔记:新能源汽车零部件功率级测试怎么进行?

摘要:本文旨在梳理主机厂对新能源汽车核心零部件功率级测试需求,通过试验室的主流设备仪器集成,快速实现试验方案搭建,并体现测试测量方案的时效性、便捷性优势。目标是通过提升实现设备的有效集成能力、实现多设备测试过程的有效协同、流程化测试,可快速采集、分析当前数…...

ES6中的map和原生的对象有什么区别?

在 ES6 中&#xff0c;Map 和原生的对象&#xff08;Object&#xff09;都是用来存储键值对数据的集合&#xff0c;但它们有显著的区别。以下是它们之间的主要区别&#xff1a; 1. 键的类型 Object: 只允许使用字符串或符号作为键。其他类型的键&#xff08;如数字或对象&…...

2502vim,vim文本对象中文文档

介绍 文本块用户(textobj-user)是一个可帮助你毫不费力地创建自己的文本对象的Vim插件. 因为有许多陷阱需要处理,很难创建文本对象.此插件隐藏了此类细节,并提供了声明式定义文本对象的方法. 你可用正则式来定义简单的文本对象,或使用函数来定义复杂的文本对象.如… 文本对…...

spring security与gateway结合进行网关鉴权和授权

在Spring Cloud Gateway中集成Spring Security 6以实现鉴权和认证工作&#xff0c;可以在网关代理层完成权限校验和认证。这种架构通常被称为“边缘安全”或“API网关安全”&#xff0c;它允许你在请求到达后端服务之前进行集中式的安全控制。 以下是如何配置Spring Cloud Gat…...

LabVIEW在电机自动化生产线中的实时数据采集与生产过程监控

在电机自动化生产线中&#xff0c;实时数据采集与生产过程监控是确保生产效率和产品质量的重要环节。LabVIEW作为一种强大的图形化编程平台&#xff0c;可以有效实现数据采集、实时监控和自动化控制。详细探讨如何利用LabVIEW实现这一目标&#xff0c;包括硬件选择、软件架构设…...

log4j2日志配置文件

log4j2配置文件每个项目都会用到,记录一个比较好用的配置文件,方便以后使用时调取,日志输出级别为debug,也可以修改 <?xml version"1.0" encoding"UTF-8"?> <Configuration monitorInterval"180" packages""><prope…...

用Deepseek做EXCLE文件对比

背景是我想对比两个PO系统里的一个消息映射&#xff0c;EDI接口的mapping有多复杂懂的都懂&#xff0c;它还不支持跨系统版本对比&#xff0c;所以我费半天劲装NWDS&#xff0c;导出MM到excle&#xff0c;然后问题来了&#xff0c;我需要对比两个excel文件里的内容&#xff0c;…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

华硕a豆14 Air香氛版,美学与科技的馨香融合

在快节奏的现代生活中&#xff0c;我们渴望一个能激发创想、愉悦感官的工作与生活伙伴&#xff0c;它不仅是冰冷的科技工具&#xff0c;更能触动我们内心深处的细腻情感。正是在这样的期许下&#xff0c;华硕a豆14 Air香氛版翩然而至&#xff0c;它以一种前所未有的方式&#x…...

【分享】推荐一些办公小工具

1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由&#xff1a;大部分的转换软件需要收费&#xff0c;要么功能不齐全&#xff0c;而开会员又用不了几次浪费钱&#xff0c;借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...

深度学习之模型压缩三驾马车:模型剪枝、模型量化、知识蒸馏

一、引言 在深度学习中&#xff0c;我们训练出的神经网络往往非常庞大&#xff08;比如像 ResNet、YOLOv8、Vision Transformer&#xff09;&#xff0c;虽然精度很高&#xff0c;但“太重”了&#xff0c;运行起来很慢&#xff0c;占用内存大&#xff0c;不适合部署到手机、摄…...

Axure 下拉框联动

实现选省、选完省之后选对应省份下的市区...

rknn toolkit2搭建和推理

安装Miniconda Miniconda - Anaconda Miniconda 选择一个 新的 版本 &#xff0c;不用和RKNN的python版本保持一致 使用 ./xxx.sh进行安装 下面配置一下载源 # 清华大学源&#xff08;最常用&#xff09; conda config --add channels https://mirrors.tuna.tsinghua.edu.cn…...

智能职业发展系统:AI驱动的职业规划平台技术解析

智能职业发展系统&#xff1a;AI驱动的职业规划平台技术解析 引言&#xff1a;数字时代的职业革命 在当今瞬息万变的就业市场中&#xff0c;传统的职业规划方法已无法满足个人和企业的需求。据统计&#xff0c;全球每年有超过2亿人面临职业转型困境&#xff0c;而企业也因此遭…...

leetcode_69.x的平方根

题目如下 &#xff1a; 看到题 &#xff0c;我们最原始的想法就是暴力解决: for(long long i 0;i<INT_MAX;i){if(i*ix){return i;}else if((i*i>x)&&((i-1)*(i-1)<x)){return i-1;}}我们直接开始遍历&#xff0c;我们是整数的平方根&#xff0c;所以我们分两…...

2025-05-08-deepseek本地化部署

title: 2025-05-08-deepseek 本地化部署 tags: 深度学习 程序开发 2025-05-08-deepseek 本地化部署 参考博客 本地部署 DeepSeek&#xff1a;小白也能轻松搞定&#xff01; 如何给本地部署的 DeepSeek 投喂数据&#xff0c;让他更懂你 [实验目的]&#xff1a;理解系统架构与原…...

【Ftrace 专栏】Ftrace 参考博文

ftrace、perf、bcc、bpftrace、ply、simple_perf的使用Ftrace 基本用法Linux 利用 ftrace 分析内核调用如何利用ftrace精确跟踪特定进程调度信息使用 ftrace 进行追踪延迟Linux-培训笔记-ftracehttps://www.kernel.org/doc/html/v4.18/trace/events.htmlhttps://blog.csdn.net/…...