当前位置: 首页 > news >正文

通过Redisson构建延时队列并实现注解式消费

目录

  • 一、序言
  • 二、延迟队列实现
    • 1、Redisson延时消息监听注解和消息体
    • 2、Redisson延时消息发布器
    • 3、Redisson延时消息监听处理器
  • 三、测试用例
  • 四、结语

一、序言

两个月前接了一个4万的私活,做一个线上商城小程序,在交易过程中不可避免的一个问题就是用户下单后的订单自动取消。

目前成熟的方案有通过RabbitMQ+死信队列RabbitMQ+延迟消息插件RocketMQ定时消息推送Redisson延时队列来实现。

考虑到商城的定位和用户体量,以及系统维护成本,其实完全没有必要引入消息中间件,借助Redis其实就可以轻松实现这个需求。

加上Redisson客户端本身就已经实现了很多分布式集合工具类,借助阻塞队列和延时队列就可轻松搞定。

当然,为了使用方便以及团队协作,顺便模仿@RabbitListener封装了一套基于注解的消息消费,废话不多说,直接上代码。


二、延迟队列实现

1、Redisson延时消息监听注解和消息体

延迟消息监听器定义:

/*** Redisson延时队列监听器** @author Nick Liu* @date 2024/11/13*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedissonDelayedQueueListener {/*** 队列名称* @return*/String queueName();
}

消息体定义:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RedisDelayedMsgDTO {/*** 消息内容*/private String msg;/*** 队列名称*/private String queueName;/*** 延时时间*/private long delayTime;private TimeUnit timeUnit;
}

2、Redisson延时消息发布器

@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedMsgPublisher {private final RedissonClient redissonClient;/*** 发布延时信息* @param delayedMsgDTO*/public void publishDelayedMsg(RedisDelayedMsgDTO delayedMsgDTO) {log.info("开始发布延迟消息: {}", FastJsonUtils.toJsonString(delayedMsgDTO));RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(delayedMsgDTO.getQueueName());RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);delayedQueue.offer(delayedMsgDTO.getMsg(), delayedMsgDTO.getDelayTime(), delayedMsgDTO.getTimeUnit());}
}

这里我们借助RBlockingQueueRDelayedQueue来实现,只有当延迟消息快到期时,消费者才能从阻塞队列拉取到消息,否则消费者将一直阻塞。

3、Redisson延时消息监听处理器

这里我们定义了一个BeanPostProcessor 的实现,目的就是为了扫描Spring容器中所有带RedissonDelayedQueueListener注解的Bean实例和方法。

/*** Redisson延迟队列Bean后处理器* @author Nick Liu* @date 2025/1/3*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayedQueuePostProcessor implements BeanPostProcessor {private final RedissonClient redissonClient;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 获取最终的目标运行时对象Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);Method[] methods = clazz.getDeclaredMethods();for (Method m : methods) {if (!m.isAnnotationPresent(RedissonDelayedQueueListener.class)) {continue;}// 如果Bean上的方法有Redisson队列监听注解,则启动一个线程监听队列RedissonDelayedQueueListener annotation = m.getAnnotation(RedissonDelayedQueueListener.class);CompletableFuture.runAsync(() -> {log.info("开始监听Redisson延时队列[{}]消息", annotation.queueName());while (true) {RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(annotation.queueName());redissonClient.getDelayedQueue(blockingQueue);try {String msg = blockingQueue.take();MDC.put(CommonConst.X_REQUEST_ID, SerialNoUtils.generateSimpleUUID());log.info("监听到队列[{}]延时消息: {}", annotation.queueName(), msg);m.invoke(bean, msg);MDC.remove(CommonConst.X_REQUEST_ID);} catch (Exception e) {log.error(e.getMessage(), e);}}});}return bean;}}

这里我们扫描到指定Bean的方法后,会开启一个异步线程,并轮询拉取延时消息,如果消息没过期,异步线程将会一直阻塞等待。


三、测试用例

/*** @author Nick Liu* @date 2025/2/2*/
@Slf4j
@RestController
@RequiredArgsConstructor
public class RedissonDelayedMsgController {private static final String DELAYED_QUEUE = "redisson:delayed:queue";private final RedissonDelayedMsgPublisher redissonDelayedMsgPublisher;@GetMapping("/delayed/msg")public ResponseEntity<RedisDelayedMsgDTO> publishDelayedMsg() {RedisDelayedMsgDTO redisDelayedMsgDTO = new RedisDelayedMsgDTO();redisDelayedMsgDTO.setQueueName(DELAYED_QUEUE);redisDelayedMsgDTO.setMsg("This is a delayed msg");redisDelayedMsgDTO.setDelayTime(10);redisDelayedMsgDTO.setTimeUnit(TimeUnit.SECONDS);redissonDelayedMsgPublisher.publishDelayedMsg(redisDelayedMsgDTO);return ResponseEntity.ok(redisDelayedMsgDTO);}@RedissonDelayedQueueListener(queueName = DELAYED_QUEUE)public void handleDelayedMsg(String msg) {log.info("Received delayed msg: {}", msg);}
}

启动服务后,Bean后处理器会启动异步线程监听延时消息,如下:

2025-02-02 16:46:04.271 INFO  [] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():44] - 开始监听Redisson延时队列[redisson:delayed:queue]消息

浏览器直接输入http://localhost:8000/delayed/msg发布延时消息,10s后消费者进行处理,如下:

2025-02-02 16:43:11.107 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [c.xlyj.common.message.RedissonDelayedQueuePostProcessor.lambda$postProcessAfterInitialization$0():51] - 监听到队列[redisson:delayed:queue]延时消息: This is a delayed msg
2025-02-02 16:43:11.108 INFO  [e810d175b0e24e71a4b9e517366b4aa6] [ForkJoinPool.commonPool-worker-2] [com.xlyj.contoller.RedissonDelayedMsgController.handleDelayedMsg():40] - Received delayed msg: This is a delayed msg

四、结语

虽说通过Redisson实现的延迟队列也能实现支付订单的自动取消,但是可用性相比专业的消息中间件还是尚有不足的。

比如消息生产者发送消息没有确认机制,消息消费也没有确认机制,这两个环节都有可能导致消息丢失。

当然我们可以通过其它保障机制去补偿,比如再加上定时任务扫表,把扫描时间可以设置长一点,保证最终的一致性。

在大型项目中还是优先推荐专业的消息中间件去实现延时消息消费。

相关文章:

通过Redisson构建延时队列并实现注解式消费

目录 一、序言二、延迟队列实现1、Redisson延时消息监听注解和消息体2、Redisson延时消息发布器3、Redisson延时消息监听处理器 三、测试用例四、结语 一、序言 两个月前接了一个4万的私活&#xff0c;做一个线上商城小程序&#xff0c;在交易过程中不可避免的一个问题就是用户…...

SQL Server配置管理器无法连接到 WMI 提供程序

目录 第一步第二部 第一步 发现没有资源管理器 ​​​​ 在文件夹找到管理器 打开发现报这个错误 配置管理器无法连接到 WMI 提供程序第二部 https://blog.csdn.net/thb369208315/article/details/126954074...

Linux内核源码:ext4 extent详解

在 Linux 系统的庞大体系中&#xff0c;文件系统就像是一个井然有序的图书馆&#xff0c;而 ext4 文件系统则是这座图书馆中极为重要的 “藏书室”&#xff0c;它负责高效管理和存储数据。在 ext4 众多的奥秘中&#xff0c;ext4 extent 犹如一颗璀璨的明珠&#xff0c;起着关键…...

Maven jar 包下载失败问题处理

Maven jar 包下载失败问题处理 1.配置好国内的Maven源2.重新下载3. 其他问题 1.配置好国内的Maven源 打开⾃⼰的 Idea 检测 Maven 的配置是否正确&#xff0c;正确的配置如下图所示&#xff1a; 检查项⼀共有两个&#xff1a; 确认右边的两个勾已经选中&#xff0c;如果没有请…...

自指学习:AGI的元认知突破

文章目录 引言:从模式识别到认知革命一、自指学习的理论框架1.1 自指系统的数学定义1.2 认知架构的三重反射1.3 与传统元学习的本质区别二、元认知突破的技术路径2.1 自指神经网络架构2.2 认知效能评价体系2.3 知识表示的革命三、实现突破的关键挑战3.1 认知闭环的稳定性3.2 计…...

排序算法--希尔排序

希尔排序是插入排序的改进版本&#xff0c;适合中等规模数据排序&#xff0c;性能优于简单插入排序。 // 希尔排序函数 void shellSort(int arr[], int n) {// 初始间隔&#xff08;gap&#xff09;为数组长度的一半&#xff0c;逐步缩小for (int gap n / 2; gap > 0; gap …...

Java 2024年面试总结(持续更新)

目录 最近趁着金三银四面了五六家公司吧&#xff0c;也整理了一些问题供大家参考一下&#xff08;适合经验三年左右的&#xff09;。 面试问题&#xff08;答案是我自己总结的&#xff0c;不一定正确&#xff09;&#xff1a; 总结&#xff1a; 最近趁着金三银四面了五六家公…...

TensorFlow是个啥玩意?

TensorFlow是一个开源的机器学习框架&#xff0c;由Google开发。它可以帮助开发者构建和训练各种机器学习模型&#xff0c;包括神经网络和深度学习模型。TensorFlow的设计理念是使用数据流图来表示计算过程&#xff0c;其中节点表示数学运算&#xff0c;边表示数据流动。 Tens…...

不可信的搜索路径(CWE-426)

漏洞描述&#xff1a;程序使用关键资源时&#xff08;如动态链接库、执行文件、配置文件等&#xff09;没有明确的指定资源的路径&#xff0c;而是依赖操作系统去搜索资源&#xff0c;这种行为可能被攻击者利用&#xff0c;通过在搜索优先级较高的目录放置不良资源&#xff0c;…...

Linux——基础命令

$&#xff1a;普通用户 #&#xff1a;超级用户 cd 切换目录 cd 目录 &#xff08;进入目录&#xff09; cd ../ &#xff08;返回上一级目录&#xff09; cd ~ &#xff08;切换到当前用户的家目录&#xff09; cd - &#xff08;返回上次目录&#xff09; pwd 输出当前目…...

利用TensorFlow.js实现浏览器端机器学习:一个全面指南

引言 随着深度学习技术的不断发展&#xff0c;机器学习已从传统的服务器端运算逐渐转向了前端技术。TensorFlow.js 是 Google 推出的一个用于在浏览器中进行机器学习的开源库&#xff0c;它允许开发者在浏览器中直接运行机器学习模型&#xff0c;而无需依赖后端服务器。Tensor…...

利用HTML和css技术编写学校官网页面

目录 一&#xff0c;图例展示 二&#xff0c;代码说明 1&#xff0c;html部分&#xff1a; 【第一张图片】 【第二张图片】 【第三张图片】 2&#xff0c;css部分&#xff1a; 【第一张图片】 【第二张图片】 【第三张图片】 三&#xff0c;程序代码 一&#xff0c;…...

SpringSecurity密码编码器:使用BCrypt算法加密、自定义密码编码器

1、Spring Security 密码编码器 Spring Security 作为一个功能完备的安全性框架,一方面提供用于完成加密操作的 PasswordEncoder 组件,另一方面提供一个可以在应用程序中独立使用的密码模块。 1.1 PasswordEncoder 抽象接口 在 Spring Security 中,PasswordEncoder 接口代…...

笔记:新能源汽车零部件功率级测试怎么进行?

摘要:本文旨在梳理主机厂对新能源汽车核心零部件功率级测试需求,通过试验室的主流设备仪器集成,快速实现试验方案搭建,并体现测试测量方案的时效性、便捷性优势。目标是通过提升实现设备的有效集成能力、实现多设备测试过程的有效协同、流程化测试,可快速采集、分析当前数…...

ES6中的map和原生的对象有什么区别?

在 ES6 中&#xff0c;Map 和原生的对象&#xff08;Object&#xff09;都是用来存储键值对数据的集合&#xff0c;但它们有显著的区别。以下是它们之间的主要区别&#xff1a; 1. 键的类型 Object: 只允许使用字符串或符号作为键。其他类型的键&#xff08;如数字或对象&…...

2502vim,vim文本对象中文文档

介绍 文本块用户(textobj-user)是一个可帮助你毫不费力地创建自己的文本对象的Vim插件. 因为有许多陷阱需要处理,很难创建文本对象.此插件隐藏了此类细节,并提供了声明式定义文本对象的方法. 你可用正则式来定义简单的文本对象,或使用函数来定义复杂的文本对象.如… 文本对…...

spring security与gateway结合进行网关鉴权和授权

在Spring Cloud Gateway中集成Spring Security 6以实现鉴权和认证工作&#xff0c;可以在网关代理层完成权限校验和认证。这种架构通常被称为“边缘安全”或“API网关安全”&#xff0c;它允许你在请求到达后端服务之前进行集中式的安全控制。 以下是如何配置Spring Cloud Gat…...

LabVIEW在电机自动化生产线中的实时数据采集与生产过程监控

在电机自动化生产线中&#xff0c;实时数据采集与生产过程监控是确保生产效率和产品质量的重要环节。LabVIEW作为一种强大的图形化编程平台&#xff0c;可以有效实现数据采集、实时监控和自动化控制。详细探讨如何利用LabVIEW实现这一目标&#xff0c;包括硬件选择、软件架构设…...

log4j2日志配置文件

log4j2配置文件每个项目都会用到,记录一个比较好用的配置文件,方便以后使用时调取,日志输出级别为debug,也可以修改 <?xml version"1.0" encoding"UTF-8"?> <Configuration monitorInterval"180" packages""><prope…...

用Deepseek做EXCLE文件对比

背景是我想对比两个PO系统里的一个消息映射&#xff0c;EDI接口的mapping有多复杂懂的都懂&#xff0c;它还不支持跨系统版本对比&#xff0c;所以我费半天劲装NWDS&#xff0c;导出MM到excle&#xff0c;然后问题来了&#xff0c;我需要对比两个excel文件里的内容&#xff0c;…...

OpenClaw自动化测试:nanobot驱动浏览器执行回归用例

OpenClaw自动化测试&#xff1a;nanobot驱动浏览器执行回归用例 1. 为什么选择OpenClaw进行自动化测试 去年接手一个老项目时&#xff0c;我遇到了一个典型的前端测试困境——每次发版前需要手动执行87个回归测试用例&#xff0c;整个过程耗时近4小时。尝试过Selenium和Playw…...

3分钟解决ROG笔记本色彩发白问题:G-Helper智能恢复指南

3分钟解决ROG笔记本色彩发白问题&#xff1a;G-Helper智能恢复指南 【免费下载链接】g-helper Lightweight Armoury Crate alternative for Asus laptops. Control tool for ROG Zephyrus G14, G15, G16, M16, Flow X13, Flow X16, TUF, Strix, Scar and other models 项目地…...

Ubuntu-24.04服务器磁盘扩容实战:从30GB到80GB的完整操作记录(附常见错误排查)

Ubuntu 24.04服务器磁盘扩容实战&#xff1a;从30GB到80GB的完整操作记录 最近在部署一套视频处理集群时&#xff0c;遇到了一个典型问题&#xff1a;某台运行Ubuntu 24.04 LTS的服务器在持续写入4K视频素材时&#xff0c;根分区突然爆满。这台当初只分配了30GB磁盘的服务器&am…...

智能家庭网络系统新选择:iStoreOS打造高效家庭网络与存储中心

智能家庭网络系统新选择&#xff1a;iStoreOS打造高效家庭网络与存储中心 【免费下载链接】istoreos 提供一个人人会用的的路由、NAS系统 &#xff08;目前活跃的分支是 istoreos-22.03&#xff09; 项目地址: https://gitcode.com/gh_mirrors/is/istoreos 家庭网络卡顿…...

从LDF文件看LIN调度:为什么说‘可预测性’是汽车低端总线的灵魂?

从LDF文件看LIN调度&#xff1a;为什么说‘可预测性’是汽车低端总线的灵魂&#xff1f; 当你按下车窗按钮时&#xff0c;那个瞬间发生的升降动作背后&#xff0c;隐藏着一套精密的通信协议在默默运作。不同于高端车载网络CAN总线的复杂仲裁机制&#xff0c;LIN总线以其独特的&…...

Aspen Plus模拟电解质水脱酸:一场化工模拟的奇妙之旅

Aspen Plus模拟电解质水脱酸Aspen 化工过程模拟→电解质水脱酸模拟在温度为 8C、压力为 1 atm、质量流量为 5000 kg/h 的条件下&#xff0c;含有 0.20 wt% CO2、0.15 wt% H2S 和 0.1 wt% NH3 的酸性水流将通过 1.1 atm、质量流量为 1500 kg/h 的干蒸汽进行处理。在化工领域&…...

前端面试高频考点总结(不仅有考点,还有对应解答)

2026年 AI面试 经验分享 前端面试核心要点 技术考察转向实际场景与新兴技术&#xff0c;重点包括&#xff1a; JavaScript/TypeScript核心机制与编码能力React/Vue3的高阶特性与原理工程化与性能优化体系网络/安全与综合性场景题 3-5年经验者需突出&#xff1a; 技术原理深度&a…...

Swin2SR进阶使用:通过HTTP链接实现远程增强

Swin2SR进阶使用&#xff1a;通过HTTP链接实现远程增强 1. 引言&#xff1a;从本地工具到远程服务 如果你用过Swin2SR这个AI图像超分工具&#xff0c;一定会被它“化腐朽为神奇”的能力震撼——一张模糊的小图&#xff0c;经过AI的“脑补”&#xff0c;瞬间变成细节丰富的高清…...

OpenLdap部署

背景 很多开源软件支持Ldap,比如Jenkins、Grafana、Gitlab、Jumpserver等。其中Ldap只保留数据库和密码。权限控制在各个应用里去控制。 常用运维命令 # 创建 ou=people 组织单元 ldapadd -x -D "cn=admin,dc=lf,dc=org" -w "123456" <<EOF dn: …...

2026年全国青少年信息素养大赛算法应用主题赛(C++赛项初赛模拟题)

2026年全国青少年信息素养大赛算法应用主题赛&#xff08;C赛项初赛模拟题&#xff09; 一、单项选择题&#xff08;共 15 题&#xff0c;每题 5 分&#xff09; 1. 数组下标与长征物资 题目内容 你需要记录红军某运输队一周&#xff08;7 天&#xff09;的粮食消耗量&#x…...