当前位置: 首页 > news >正文

通过Redisson构建延时队列并实现注解式消费

目录

  • 一、序言
  • 二、延迟队列实现
    • 1、Redisson延时消息监听注解和消息体
    • 2、Redisson延时消息发布器
    • 3、Redisson延时消息监听处理器
  • 三、测试用例
  • 四、结语

一、序言

两个月前接了一个4万的私活,做一个线上商城小程序,在交易过程中不可避免的一个问题就是用户下单后的订单自动取消。

目前成熟的方案有通过RabbitMQ+死信队列RabbitMQ+延迟消息插件RocketMQ定时消息推送Redisson延时队列来实现。

考虑到商城的定位和用户体量,以及系统维护成本,其实完全没有必要引入消息中间件,借助Redis其实就可以轻松实现这个需求。

加上Redisson客户端本身就已经实现了很多分布式集合工具类,借助阻塞队列和延时队列就可轻松搞定。

当然,为了使用方便以及团队协作,顺便模仿@RabbitListener封装了一套基于注解的消息消费,废话不多说,直接上代码。


二、延迟队列实现

1、Redisson延时消息监听注解和消息体

延迟消息监听器定义:

/*** Redisson延时队列监听器** @author Nick Liu* @date 2024/11/13*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonDelayedQueueListener {/*** 队列名称* @return*/String queueName();
}

消息体定义:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息内容*/private String msg;/*** 队列名称*/private String queueName;/*** 延时时间*/private long delayTime;private TimeUnit timeUnit;
}

2、Redisson延时消息发布器

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 发布延时信息* @param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info("开始发布延迟消息: {}", FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}

这里我们借助RBlockingQueueRDelayedQueue来实现,只有当延迟消息快到期时,消费者才能从阻塞队列拉取到消息,否则消费者将一直阻塞。

3、Redisson延时消息监听处理器

这里我们定义了一个BeanPostProcessor 的实现,目的就是为了扫描Spring容器中所有带RedissonDelayedQueueListener注解的Bean实例和方法。

/*** Redisson延迟队列Bean后处理器* @author Nick Liu* @date 2025/1/3*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 获取最终的目标运行时对象Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);Method[] methods = clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson队列监听注解,则启动一个线程监听队列RedissonDelayedQueueListener annotation = m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() -> {log.info("开始监听Redisson延时队列[{}]消息", annotation.queueName());while (true) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg = blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info("监听到队列[{}]延时消息: {}", annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}

这里我们扫描到指定Bean的方法后,会开启一个异步线程,并轮询拉取延时消息,如果消息没过期,异步线程将会一直阻塞等待。


三、测试用例

/*** @author Nick Liu* @date 2025/2/2*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE = "redisson:delayed:queue";private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;@GetMapping("/delayed/msg")public ResponseEntity<RedisDelayedMsgDTO> publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO = new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg("This is a delayed msg");redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}@RedissonDelayedQueueListener(queueName = DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info("Received delayed msg: {}", msg);}
}

启动服务后,Bean后处理器会启动异步线程监听延时消息,如下:

2025-02-02 16:46:04.271 INFO  [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 开始监听Redisson延时队列[redisson:delayed:queue]消息

浏览器直接输入http://localhost:8000/delayed/msg发布延时消息,10s后消费者进行处理,如下:

2025-02-02 16:43:11.107 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 监听到队列[redisson:delayed:queue]延时消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg

四、结语

虽说通过Redisson实现的延迟队列也能实现支付订单的自动取消,但是可用性相比专业的消息中间件还是尚有不足的。

比如消息生产者发送消息没有确认机制,消息消费也没有确认机制,这两个环节都有可能导致消息丢失。

当然我们可以通过其它保障机制去补偿,比如再加上定时任务扫表,把扫描时间可以设置长一点,保证最终的一致性。

在大型项目中还是优先推荐专业的消息中间件去实现延时消息消费。

相关文章:

通过Redisson构建延时队列并实现注解式消费

目录 一、序言二、延迟队列实现1、Redisson延时消息监听注解和消息体2、Redisson延时消息发布器3、Redisson延时消息监听处理器 三、测试用例四、结语 一、序言 两个月前接了一个4万的私活&#xff0c;做一个线上商城小程序&#xff0c;在交易过程中不可避免的一个问题就是用户…...

SQL Server配置管理器无法连接到 WMI 提供程序

目录 第一步第二部 第一步 发现没有资源管理器 ​​​​ 在文件夹找到管理器 打开发现报这个错误 配置管理器无法连接到 WMI 提供程序第二部 https://blog.csdn.net/thb369208315/article/details/126954074...

Linux内核源码:ext4 extent详解

在 Linux 系统的庞大体系中&#xff0c;文件系统就像是一个井然有序的图书馆&#xff0c;而 ext4 文件系统则是这座图书馆中极为重要的 “藏书室”&#xff0c;它负责高效管理和存储数据。在 ext4 众多的奥秘中&#xff0c;ext4 extent 犹如一颗璀璨的明珠&#xff0c;起着关键…...

Maven jar 包下载失败问题处理

Maven jar 包下载失败问题处理 1.配置好国内的Maven源2.重新下载3. 其他问题 1.配置好国内的Maven源 打开⾃⼰的 Idea 检测 Maven 的配置是否正确&#xff0c;正确的配置如下图所示&#xff1a; 检查项⼀共有两个&#xff1a; 确认右边的两个勾已经选中&#xff0c;如果没有请…...

自指学习:AGI的元认知突破

文章目录 引言:从模式识别到认知革命一、自指学习的理论框架1.1 自指系统的数学定义1.2 认知架构的三重反射1.3 与传统元学习的本质区别二、元认知突破的技术路径2.1 自指神经网络架构2.2 认知效能评价体系2.3 知识表示的革命三、实现突破的关键挑战3.1 认知闭环的稳定性3.2 计…...

排序算法--希尔排序

希尔排序是插入排序的改进版本&#xff0c;适合中等规模数据排序&#xff0c;性能优于简单插入排序。 // 希尔排序函数 void shellSort(int arr[], int n) {// 初始间隔&#xff08;gap&#xff09;为数组长度的一半&#xff0c;逐步缩小for (int gap n / 2; gap > 0; gap …...

Java 2024年面试总结(持续更新)

目录 最近趁着金三银四面了五六家公司吧&#xff0c;也整理了一些问题供大家参考一下&#xff08;适合经验三年左右的&#xff09;。 面试问题&#xff08;答案是我自己总结的&#xff0c;不一定正确&#xff09;&#xff1a; 总结&#xff1a; 最近趁着金三银四面了五六家公…...

TensorFlow是个啥玩意?

TensorFlow是一个开源的机器学习框架&#xff0c;由Google开发。它可以帮助开发者构建和训练各种机器学习模型&#xff0c;包括神经网络和深度学习模型。TensorFlow的设计理念是使用数据流图来表示计算过程&#xff0c;其中节点表示数学运算&#xff0c;边表示数据流动。 Tens…...

不可信的搜索路径(CWE-426)

漏洞描述&#xff1a;程序使用关键资源时&#xff08;如动态链接库、执行文件、配置文件等&#xff09;没有明确的指定资源的路径&#xff0c;而是依赖操作系统去搜索资源&#xff0c;这种行为可能被攻击者利用&#xff0c;通过在搜索优先级较高的目录放置不良资源&#xff0c;…...

Linux——基础命令

$&#xff1a;普通用户 #&#xff1a;超级用户 cd 切换目录 cd 目录 &#xff08;进入目录&#xff09; cd ../ &#xff08;返回上一级目录&#xff09; cd ~ &#xff08;切换到当前用户的家目录&#xff09; cd - &#xff08;返回上次目录&#xff09; pwd 输出当前目…...

利用TensorFlow.js实现浏览器端机器学习:一个全面指南

引言 随着深度学习技术的不断发展&#xff0c;机器学习已从传统的服务器端运算逐渐转向了前端技术。TensorFlow.js 是 Google 推出的一个用于在浏览器中进行机器学习的开源库&#xff0c;它允许开发者在浏览器中直接运行机器学习模型&#xff0c;而无需依赖后端服务器。Tensor…...

利用HTML和css技术编写学校官网页面

目录 一&#xff0c;图例展示 二&#xff0c;代码说明 1&#xff0c;html部分&#xff1a; 【第一张图片】 【第二张图片】 【第三张图片】 2&#xff0c;css部分&#xff1a; 【第一张图片】 【第二张图片】 【第三张图片】 三&#xff0c;程序代码 一&#xff0c;…...

SpringSecurity密码编码器:使用BCrypt算法加密、自定义密码编码器

1、Spring Security 密码编码器 Spring Security 作为一个功能完备的安全性框架,一方面提供用于完成加密操作的 PasswordEncoder 组件,另一方面提供一个可以在应用程序中独立使用的密码模块。 1.1 PasswordEncoder 抽象接口 在 Spring Security 中,PasswordEncoder 接口代…...

笔记:新能源汽车零部件功率级测试怎么进行?

摘要:本文旨在梳理主机厂对新能源汽车核心零部件功率级测试需求,通过试验室的主流设备仪器集成,快速实现试验方案搭建,并体现测试测量方案的时效性、便捷性优势。目标是通过提升实现设备的有效集成能力、实现多设备测试过程的有效协同、流程化测试,可快速采集、分析当前数…...

ES6中的map和原生的对象有什么区别?

在 ES6 中&#xff0c;Map 和原生的对象&#xff08;Object&#xff09;都是用来存储键值对数据的集合&#xff0c;但它们有显著的区别。以下是它们之间的主要区别&#xff1a; 1. 键的类型 Object: 只允许使用字符串或符号作为键。其他类型的键&#xff08;如数字或对象&…...

2502vim,vim文本对象中文文档

介绍 文本块用户(textobj-user)是一个可帮助你毫不费力地创建自己的文本对象的Vim插件. 因为有许多陷阱需要处理,很难创建文本对象.此插件隐藏了此类细节,并提供了声明式定义文本对象的方法. 你可用正则式来定义简单的文本对象,或使用函数来定义复杂的文本对象.如… 文本对…...

spring security与gateway结合进行网关鉴权和授权

在Spring Cloud Gateway中集成Spring Security 6以实现鉴权和认证工作&#xff0c;可以在网关代理层完成权限校验和认证。这种架构通常被称为“边缘安全”或“API网关安全”&#xff0c;它允许你在请求到达后端服务之前进行集中式的安全控制。 以下是如何配置Spring Cloud Gat…...

LabVIEW在电机自动化生产线中的实时数据采集与生产过程监控

在电机自动化生产线中&#xff0c;实时数据采集与生产过程监控是确保生产效率和产品质量的重要环节。LabVIEW作为一种强大的图形化编程平台&#xff0c;可以有效实现数据采集、实时监控和自动化控制。详细探讨如何利用LabVIEW实现这一目标&#xff0c;包括硬件选择、软件架构设…...

log4j2日志配置文件

log4j2配置文件每个项目都会用到,记录一个比较好用的配置文件,方便以后使用时调取,日志输出级别为debug,也可以修改 <?xml version"1.0" encoding"UTF-8"?> <Configuration monitorInterval"180" packages""><prope…...

用Deepseek做EXCLE文件对比

背景是我想对比两个PO系统里的一个消息映射&#xff0c;EDI接口的mapping有多复杂懂的都懂&#xff0c;它还不支持跨系统版本对比&#xff0c;所以我费半天劲装NWDS&#xff0c;导出MM到excle&#xff0c;然后问题来了&#xff0c;我需要对比两个excel文件里的内容&#xff0c;…...

MPNet:旋转机械轻量化故障诊断模型详解python代码复现

目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

线程与协程

1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指&#xff1a;像函数调用/返回一样轻量地完成任务切换。 举例说明&#xff1a; 当你在程序中写一个函数调用&#xff1a; funcA() 然后 funcA 执行完后返回&…...

Frozen-Flask :将 Flask 应用“冻结”为静态文件

Frozen-Flask 是一个用于将 Flask 应用“冻结”为静态文件的 Python 扩展。它的核心用途是&#xff1a;将一个 Flask Web 应用生成成纯静态 HTML 文件&#xff0c;从而可以部署到静态网站托管服务上&#xff0c;如 GitHub Pages、Netlify 或任何支持静态文件的网站服务器。 &am…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序

一、开发环境准备 ​​工具安装​​&#xff1a; 下载安装DevEco Studio 4.0&#xff08;支持HarmonyOS 5&#xff09;配置HarmonyOS SDK 5.0确保Node.js版本≥14 ​​项目初始化​​&#xff1a; ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

七、数据库的完整性

七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化

缓存架构 代码结构 代码详情 功能点&#xff1a; 多级缓存&#xff0c;先查本地缓存&#xff0c;再查Redis&#xff0c;最后才查数据库热点数据重建逻辑使用分布式锁&#xff0c;二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...

基于IDIG-GAN的小样本电机轴承故障诊断

目录 🔍 核心问题 一、IDIG-GAN模型原理 1. 整体架构 2. 核心创新点 (1) ​梯度归一化(Gradient Normalization)​​ (2) ​判别器梯度间隙正则化(Discriminator Gradient Gap Regularization)​​ (3) ​自注意力机制(Self-Attention)​​ 3. 完整损失函数 二…...