SpringBoot 整合 RabbitMQ 实现延迟消息
一、业务场景说明
用于解决用户下单以后,订单超时如何取消订单的问题。
- 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
- 生成订单,获取订单的id;
- 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
- 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
- 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。
相关概念:(死信队列与延迟队列)
死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列
死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景。

死信队列(Dead Letter Queue, DLQ)
目的: 处理无法成功消费的消息。
特点:
- 目的: 用于接收无法成功处理的消息。这些消息可能因为多种原因无法被正常消费,例如消息处理失败、消息超时、队列满等。
- 触发条件: 当消息在主队列中无法被正常消费,并且达到一定的失败次数或过期时间后,会被转发到死信队列。
- 用途: 通过分析死信队列中的消息,开发者可以排查和解决系统中的问题,也可以进行后续的审计和处理。
实现方式:
- 需要配置主队列和死信队列之间的关系。
- 配置消息的重试策略和死信转发条件(如重试次数、过期时间等)。
延迟队列(Delayed Queue)
目的: 控制消息的处理时间。
特点:
- 目的: 在特定的时间点或延迟一段时间后再将消息发送到主队列进行处理。用于在系统中实现消息的延迟处理。
- 触发条件: 当消息被发送到延迟队列时,它会根据设定的延迟时间在未来某个时刻被转发到主队列。
- 用途: 常用于实现任务的延迟执行、定时任务、超时重试等功能。
实现方式:
- 使用专门支持延迟队列的消息队列系统,或者通过设置消息的过期时间和重试机制来实现。
- 可以配置延迟时间,确保消息在设定的时间后进入主队列。
总结
- 死信队列: 处理无法消费的消息,通常用于异常处理和问题排查。
- 延迟队列: 控制消息的处理时间,允许消息在未来某个时间点才被处理。
二、整合RabbitMQ实现延迟消息
整合依赖及配置
●在pom.xml中添加AMQP相关依赖;
<!--Spring AMQP相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 修改
application.yml文件,在spring节点下添加RabbitMQ相关配置。
spring:rabbitmq:host: localhostport: 5672virtual-host: /mallusername: mallpassword: mallpublisher-returns: true #消息发送到队列确认publisher-confirm-type: simple #消息发送到交换器确认
实现延迟消息
- 添加消息队列的枚举配置类QueueEnum,用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称;
/*** @description 消息队列枚举配置*/
@Getter
public enum QueueEnum {/*** 消息通知队列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl队列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交换机名称*/private String exchange;/*** 队列名称*/private String name;/*** 路由键*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}
- 添加RabbitMQ的Java配置,用于配置交换机、队列及队列与交换机的绑定关系;
/*** @description 消息队列配置*/
@Configuration
public class RabbitMqConfig {/*** 订单消息实际消费队列所绑定的交换机*/@BeanDirectExchange orderDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单延迟队列所绑定的交换机*/@BeanDirectExchange orderTtlDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单实际消费队列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 订单延迟队列(死信队列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键.build();}/*** 将订单队列绑定到交换机*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 将订单延迟队列绑定到交换机*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}
- 此时登录,在RabbitMQ管理页面可以看到以下交换机和队列;


- 交换机及队列说明如下:mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为
mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。 - 添加延迟消息的发送者CancelOrderSender,用于向订单延迟消息队(mall.order.cancel.ttl)里发送消息;
/*** @description 取消订单消息的发出者*/
@Component
public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//给延迟队列发送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//给消息设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);}
}
- 添加取消订单消息的接收者CancelOrderReceiver,用于从取消订单的消息队列(mall.order.cancel)里接收消息;
/*** @description 取消订单消息的处理者*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);}
}
- 添加OmsPortalOrderService接口;
/*** @description 前台订单管理Service*/
public interface OmsPortalOrderService {/*** 根据提交信息生成订单*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消单个超时订单*/@Transactionalvoid cancelOrder(Long orderId);
}
- 添加OmsPortalOrderService的实现类OmsPortalOrderServiceImpl;
/*** @description 前台订单管理Service*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 执行一系类下单操作,具体参考mall项目LOGGER.info("process generateOrder");//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下单成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 执行一系类取消订单操作,具体参考mall项目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//获取订单超时时间,假设为60分钟long delayTimes = 30 * 1000;//发送延迟消息cancelOrderSender.sendMessage(orderId, delayTimes);}}
- 添加OmsPortalOrderController定义接口;
/*** @description 订单管理Controller*/
@Controller
@Api(tags = "OmsPortalOrderController")
@Tag(name = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根据购物车信息生成订单")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);}
}
延迟消息功能演示
- 运行项目,访问Swagger API文档,访问地址:http://localhost:8080/swagger-ui/

可以看到,经过了30秒后延迟消息发送出去了

相关文章:
SpringBoot 整合 RabbitMQ 实现延迟消息
一、业务场景说明 用于解决用户下单以后,订单超时如何取消订单的问题。 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);生成订单,获取订单的id;获取到设置的订单超时时间࿰…...
Cilium:基于开源 eBPF 的网络、安全性和可观察性
基于 eBPF 的网络、安全性和可观察性 Cilium 是一种开源的云原生解决方案,它利用 Linux 内核中的 eBPF 技术来提供、保护和监控工作负载之间的网络连接。 什么是 eBPF? eBPF 是一项源自 Linux 内核的技术,允许沙盒程序在特权上下文&#x…...
Axios 详解与使用指南
Axios 详解与使用指南 1. Axios 简介 Axios 是一个基于 Promise 的 HTTP 客户端,能够在浏览器和 Node.js 环境中运行。它提供了一种简便的方式来执行 HTTP 请求,并支持多种请求方法,如 GET、POST、PUT、DELETE 等。Axios 的配置灵活&#x…...
深度学习 —— 个人学习笔记20(转置卷积、全卷积网络)
声明 本文章为个人学习使用,版面观感若有不适请谅解,文中知识仅代表个人观点,若出现错误,欢迎各位批评指正。 三十九、转置卷积 import torch from torch import nndef trans_conv(X, K):h, w K.shapeY torch.zeros((X.shape[…...
解决Mac系统Python3.12版本pip安装报错error: externally-managed-environment的问题
遇到的问题 在Mac安装了Python3.12.x版本(3.12.3、3.12.4)后,当尝试pip3 install xxx的时候,总是报错:error: externally-managed-environment error: externally-managed-environment This environment is external…...
lvm知识终结
、什么是 LVM LVM 是 Linux 下对磁盘分区进行管理的一种工具,适合管理大存储设备,并允许用户动态调整文件系统的大小 lvm 常用的命令 功能 PV 管理命令 VG 管理命令 LV 管理命令 scan 扫描 pvscan vgscan lvscan create 创建 pvcreate v…...
ESP32S3 IDF 对 16路输入输出芯片MCP23017做了个简单的测试
这次还是使用了idf老版本4.4.7,上次用了5.3,感觉不好用,官方的MCP23017芯片是英文版,真的很难读明白,可能是我英语水平不够吧。先看看每个寄存器的功能: IODIRA 和 IODIRB: 输入/输出方向寄存器 IPOLA 和 I…...
【技术前沿】Flux.1部署教程入门--Stable Diffusion团队最前沿、免费的开源AI图像生成器
项目简介 FLUX.1 是一种新的开源图像生成模型。它由 Stable Diffusion 背后的团队 Black Forest Labs 开发。 官网中有以下功能开源供大家参考: FLUX.1 擅长在图像中准确再现文字,因此非常适合需要清晰文字或短语的设计。无论是标牌、书籍封面还是品牌…...
Redis 的 STREAM 和 RocketMQ 是两种不同的消息队列和流处理解决方案,它们在设计理念、功能和用途上有显著区别。以下是它们的主要区别:
20240813 Redis 的 STREAM 和 RocketMQ 是两种不同的消息队列和流处理解决方案,它们在设计理念、功能和用途上有显著区别。以下是它们的主要区别:1. 使用 Redis 的 Sorted Set 数据结构连接到 Redis示例用法添加事件获取滑动窗口内的事件移除过期事件连接…...
Visual Studio Code安装与C/C++语言运行(上)
Visual Studio Code(VS Code)作为微软开发的一款轻量级但功能强大的源代码编辑器,广泛应用于各种编程语言的开发,包括C/C。以下将详细介绍VS Code的安装过程以及与C/C语言运行环境的配置。 一、Visual Studio Code的安装 1. 准备…...
探索数据可视化,数据看板在各行业中的应用
数据可视化是一种通过图形化手段将数据呈现出来的技术,它将复杂的数据和信息转化为易于理解的图表、地图、仪表盘等视觉元素,使得数据的模式、趋势和关系更加直观地展现出来。通过数据可视化,用户可以快速识别重要信息、发现潜在问题…...
haralyzer 半自动,一次性少量数据采集快捷方法
使用场景:半自动,一次性少量数据采集需求在工作中还是不少遇到的,无论使用模拟的方式,或者破解都不太划算。其实这种需求,使用半自动爬虫是最简单的。不需要考虑网站反爬虫的问题,因为你使用的就是真实的浏…...
mall-admin-web-master前端项目下载依赖失败解决
碰壁后的总结 pythone 环境 2.XX版本,切记不要3.0以上的。node 16.x不能太高 错误案例 npm ERR! code 1 npm ERR! path D:\workspace\springBootMall\mall-admin-web-master\node_modules\node-sass npm ERR! command failed npm ERR! command C:\windows\system…...
【07】JVM是怎么实现invokedynamic的
在Java中,方法调用会被编译为invokeStatic,invokeSpecial,invokVirtual以及invokeInterface四种指令。这些指令与包含目标方法类名、方法名以及方法描述符的符号引用捆绑,在实际运行之前,JVM根据这个符号引用链接到具体…...
使用API有效率地管理Dynadot域名,查看参与的拍卖列表
前言 Dynadot是通过ICANN认证的域名注册商,自2002年成立以来,服务于全球108个国家和地区的客户,为数以万计的客户提供简洁,优惠,安全的域名注册以及管理服务。 Dynadot平台操作教程索引(包括域名邮箱&…...
Linux 基本指令讲解
linux 基本指令 clear 清屏 Alt Enter 全屏/退出全屏 pwd 显示当前用户所处路径 cd 改变目录 cd /root/mikecd … 返回上级目录cd - 返回最近所处的路径cd ~ 直接返回当前用户自己的家目 roor 中:/root普通用户中:/home/mike mkdir 创建一个文件夹(d) …...
PRE_EMPHASIS
PRE_EMPASIS属性用于提高高频信号的信号完整性 其通过传输线遭受高频损耗。发射机 预加重(pre_EMPASIS)功能允许对某些信号驱动器进行预加重 I/O标准。 提示:发射机的预加重可以与接收机的均衡相结合,以提高 整体信号完整性。 理想…...
【QT常用技术讲解】多线程处理+全局变量处理异步事件并获取多个线程返回的结果
前言 QTableView加入勾选项后(参考【QT常用技术讲解】QTableView添加QCheckBox、QPushButton),如果支持右键菜单功能,此时就有统一执行多个异步事件,并且统一输出到界面的需求了,本篇结合多线程共享全局变量…...
数组列表中的最大距离
给定 m 个数组,每个数组都已经按照升序排好序了。现在你需要从两个不同的数组中选择两个整数(每个数组选一个)并且计算它们的距离。两个整数 a 和 b 之间的距离定义为它们差的绝对值 |a-b| 。你的任务就是去找到最大距离 示例 1:…...
C语言新手小白详细教程(7)指针和指针变量
希望文章能够给到初学的你一些启发~ 如果觉得文章对你有帮助的话,点赞 关注 收藏支持一下笔者吧~ 阅读指南: 开篇说明1、指针的定义接下来我们用图示的形式来解释一下 指针:2、申明指针变量3、取地址符 &4、为指针…...
7.4.分块查找
一.分块查找的算法思想: 1.实例: 以上述图片的顺序表为例, 该顺序表的数据元素从整体来看是乱序的,但如果把这些数据元素分成一块一块的小区间, 第一个区间[0,1]索引上的数据元素都是小于等于10的, 第二…...
在Ubuntu中设置开机自动运行(sudo)指令的指南
在Ubuntu系统中,有时需要在系统启动时自动执行某些命令,特别是需要 sudo权限的指令。为了实现这一功能,可以使用多种方法,包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法,并提供…...
vue3 定时器-定义全局方法 vue+ts
1.创建ts文件 路径:src/utils/timer.ts 完整代码: import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...
MySQL 8.0 OCP 英文题库解析(十三)
Oracle 为庆祝 MySQL 30 周年,截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始,将英文题库免费公布出来,并进行解析,帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
IP如何挑?2025年海外专线IP如何购买?
你花了时间和预算买了IP,结果IP质量不佳,项目效率低下不说,还可能带来莫名的网络问题,是不是太闹心了?尤其是在面对海外专线IP时,到底怎么才能买到适合自己的呢?所以,挑IP绝对是个技…...
人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...
Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
