SpringBoot 整合 RabbitMQ 实现延迟消息
一、业务场景说明
用于解决用户下单以后,订单超时如何取消订单的问题。
- 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
- 生成订单,获取订单的id;
- 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
- 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
- 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。
相关概念:(死信队列与延迟队列)
死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列
死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景。

死信队列(Dead Letter Queue, DLQ)
目的: 处理无法成功消费的消息。
特点:
- 目的: 用于接收无法成功处理的消息。这些消息可能因为多种原因无法被正常消费,例如消息处理失败、消息超时、队列满等。
- 触发条件: 当消息在主队列中无法被正常消费,并且达到一定的失败次数或过期时间后,会被转发到死信队列。
- 用途: 通过分析死信队列中的消息,开发者可以排查和解决系统中的问题,也可以进行后续的审计和处理。
实现方式:
- 需要配置主队列和死信队列之间的关系。
- 配置消息的重试策略和死信转发条件(如重试次数、过期时间等)。
延迟队列(Delayed Queue)
目的: 控制消息的处理时间。
特点:
- 目的: 在特定的时间点或延迟一段时间后再将消息发送到主队列进行处理。用于在系统中实现消息的延迟处理。
- 触发条件: 当消息被发送到延迟队列时,它会根据设定的延迟时间在未来某个时刻被转发到主队列。
- 用途: 常用于实现任务的延迟执行、定时任务、超时重试等功能。
实现方式:
- 使用专门支持延迟队列的消息队列系统,或者通过设置消息的过期时间和重试机制来实现。
- 可以配置延迟时间,确保消息在设定的时间后进入主队列。
总结
- 死信队列: 处理无法消费的消息,通常用于异常处理和问题排查。
- 延迟队列: 控制消息的处理时间,允许消息在未来某个时间点才被处理。
二、整合RabbitMQ实现延迟消息
整合依赖及配置
●在pom.xml中添加AMQP相关依赖;
<!--Spring AMQP相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 修改
application.yml文件,在spring节点下添加RabbitMQ相关配置。
spring:rabbitmq:host: localhostport: 5672virtual-host: /mallusername: mallpassword: mallpublisher-returns: true #消息发送到队列确认publisher-confirm-type: simple #消息发送到交换器确认
实现延迟消息
- 添加消息队列的枚举配置类QueueEnum,用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称;
/*** @description 消息队列枚举配置*/
@Getter
public enum QueueEnum {/*** 消息通知队列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl队列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交换机名称*/private String exchange;/*** 队列名称*/private String name;/*** 路由键*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}
- 添加RabbitMQ的Java配置,用于配置交换机、队列及队列与交换机的绑定关系;
/*** @description 消息队列配置*/
@Configuration
public class RabbitMqConfig {/*** 订单消息实际消费队列所绑定的交换机*/@BeanDirectExchange orderDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单延迟队列所绑定的交换机*/@BeanDirectExchange orderTtlDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单实际消费队列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 订单延迟队列(死信队列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键.build();}/*** 将订单队列绑定到交换机*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 将订单延迟队列绑定到交换机*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}
- 此时登录,在RabbitMQ管理页面可以看到以下交换机和队列;


- 交换机及队列说明如下:mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为
mall.order.cancel,一旦有消息以mall.order.cancel为路由键发过来,会发送到此队列。mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel(取消订单消息消费队列)。 - 添加延迟消息的发送者CancelOrderSender,用于向订单延迟消息队(mall.order.cancel.ttl)里发送消息;
/*** @description 取消订单消息的发出者*/
@Component
public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//给延迟队列发送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//给消息设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);}
}
- 添加取消订单消息的接收者CancelOrderReceiver,用于从取消订单的消息队列(mall.order.cancel)里接收消息;
/*** @description 取消订单消息的处理者*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);}
}
- 添加OmsPortalOrderService接口;
/*** @description 前台订单管理Service*/
public interface OmsPortalOrderService {/*** 根据提交信息生成订单*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消单个超时订单*/@Transactionalvoid cancelOrder(Long orderId);
}
- 添加OmsPortalOrderService的实现类OmsPortalOrderServiceImpl;
/*** @description 前台订单管理Service*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 执行一系类下单操作,具体参考mall项目LOGGER.info("process generateOrder");//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下单成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 执行一系类取消订单操作,具体参考mall项目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//获取订单超时时间,假设为60分钟long delayTimes = 30 * 1000;//发送延迟消息cancelOrderSender.sendMessage(orderId, delayTimes);}}
- 添加OmsPortalOrderController定义接口;
/*** @description 订单管理Controller*/
@Controller
@Api(tags = "OmsPortalOrderController")
@Tag(name = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根据购物车信息生成订单")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);}
}
延迟消息功能演示
- 运行项目,访问Swagger API文档,访问地址:http://localhost:8080/swagger-ui/

可以看到,经过了30秒后延迟消息发送出去了

相关文章:
SpringBoot 整合 RabbitMQ 实现延迟消息
一、业务场景说明 用于解决用户下单以后,订单超时如何取消订单的问题。 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);生成订单,获取订单的id;获取到设置的订单超时时间࿰…...
Cilium:基于开源 eBPF 的网络、安全性和可观察性
基于 eBPF 的网络、安全性和可观察性 Cilium 是一种开源的云原生解决方案,它利用 Linux 内核中的 eBPF 技术来提供、保护和监控工作负载之间的网络连接。 什么是 eBPF? eBPF 是一项源自 Linux 内核的技术,允许沙盒程序在特权上下文&#x…...
Axios 详解与使用指南
Axios 详解与使用指南 1. Axios 简介 Axios 是一个基于 Promise 的 HTTP 客户端,能够在浏览器和 Node.js 环境中运行。它提供了一种简便的方式来执行 HTTP 请求,并支持多种请求方法,如 GET、POST、PUT、DELETE 等。Axios 的配置灵活&#x…...
深度学习 —— 个人学习笔记20(转置卷积、全卷积网络)
声明 本文章为个人学习使用,版面观感若有不适请谅解,文中知识仅代表个人观点,若出现错误,欢迎各位批评指正。 三十九、转置卷积 import torch from torch import nndef trans_conv(X, K):h, w K.shapeY torch.zeros((X.shape[…...
解决Mac系统Python3.12版本pip安装报错error: externally-managed-environment的问题
遇到的问题 在Mac安装了Python3.12.x版本(3.12.3、3.12.4)后,当尝试pip3 install xxx的时候,总是报错:error: externally-managed-environment error: externally-managed-environment This environment is external…...
lvm知识终结
、什么是 LVM LVM 是 Linux 下对磁盘分区进行管理的一种工具,适合管理大存储设备,并允许用户动态调整文件系统的大小 lvm 常用的命令 功能 PV 管理命令 VG 管理命令 LV 管理命令 scan 扫描 pvscan vgscan lvscan create 创建 pvcreate v…...
ESP32S3 IDF 对 16路输入输出芯片MCP23017做了个简单的测试
这次还是使用了idf老版本4.4.7,上次用了5.3,感觉不好用,官方的MCP23017芯片是英文版,真的很难读明白,可能是我英语水平不够吧。先看看每个寄存器的功能: IODIRA 和 IODIRB: 输入/输出方向寄存器 IPOLA 和 I…...
【技术前沿】Flux.1部署教程入门--Stable Diffusion团队最前沿、免费的开源AI图像生成器
项目简介 FLUX.1 是一种新的开源图像生成模型。它由 Stable Diffusion 背后的团队 Black Forest Labs 开发。 官网中有以下功能开源供大家参考: FLUX.1 擅长在图像中准确再现文字,因此非常适合需要清晰文字或短语的设计。无论是标牌、书籍封面还是品牌…...
Redis 的 STREAM 和 RocketMQ 是两种不同的消息队列和流处理解决方案,它们在设计理念、功能和用途上有显著区别。以下是它们的主要区别:
20240813 Redis 的 STREAM 和 RocketMQ 是两种不同的消息队列和流处理解决方案,它们在设计理念、功能和用途上有显著区别。以下是它们的主要区别:1. 使用 Redis 的 Sorted Set 数据结构连接到 Redis示例用法添加事件获取滑动窗口内的事件移除过期事件连接…...
Visual Studio Code安装与C/C++语言运行(上)
Visual Studio Code(VS Code)作为微软开发的一款轻量级但功能强大的源代码编辑器,广泛应用于各种编程语言的开发,包括C/C。以下将详细介绍VS Code的安装过程以及与C/C语言运行环境的配置。 一、Visual Studio Code的安装 1. 准备…...
探索数据可视化,数据看板在各行业中的应用
数据可视化是一种通过图形化手段将数据呈现出来的技术,它将复杂的数据和信息转化为易于理解的图表、地图、仪表盘等视觉元素,使得数据的模式、趋势和关系更加直观地展现出来。通过数据可视化,用户可以快速识别重要信息、发现潜在问题…...
haralyzer 半自动,一次性少量数据采集快捷方法
使用场景:半自动,一次性少量数据采集需求在工作中还是不少遇到的,无论使用模拟的方式,或者破解都不太划算。其实这种需求,使用半自动爬虫是最简单的。不需要考虑网站反爬虫的问题,因为你使用的就是真实的浏…...
mall-admin-web-master前端项目下载依赖失败解决
碰壁后的总结 pythone 环境 2.XX版本,切记不要3.0以上的。node 16.x不能太高 错误案例 npm ERR! code 1 npm ERR! path D:\workspace\springBootMall\mall-admin-web-master\node_modules\node-sass npm ERR! command failed npm ERR! command C:\windows\system…...
【07】JVM是怎么实现invokedynamic的
在Java中,方法调用会被编译为invokeStatic,invokeSpecial,invokVirtual以及invokeInterface四种指令。这些指令与包含目标方法类名、方法名以及方法描述符的符号引用捆绑,在实际运行之前,JVM根据这个符号引用链接到具体…...
使用API有效率地管理Dynadot域名,查看参与的拍卖列表
前言 Dynadot是通过ICANN认证的域名注册商,自2002年成立以来,服务于全球108个国家和地区的客户,为数以万计的客户提供简洁,优惠,安全的域名注册以及管理服务。 Dynadot平台操作教程索引(包括域名邮箱&…...
Linux 基本指令讲解
linux 基本指令 clear 清屏 Alt Enter 全屏/退出全屏 pwd 显示当前用户所处路径 cd 改变目录 cd /root/mikecd … 返回上级目录cd - 返回最近所处的路径cd ~ 直接返回当前用户自己的家目 roor 中:/root普通用户中:/home/mike mkdir 创建一个文件夹(d) …...
PRE_EMPHASIS
PRE_EMPASIS属性用于提高高频信号的信号完整性 其通过传输线遭受高频损耗。发射机 预加重(pre_EMPASIS)功能允许对某些信号驱动器进行预加重 I/O标准。 提示:发射机的预加重可以与接收机的均衡相结合,以提高 整体信号完整性。 理想…...
【QT常用技术讲解】多线程处理+全局变量处理异步事件并获取多个线程返回的结果
前言 QTableView加入勾选项后(参考【QT常用技术讲解】QTableView添加QCheckBox、QPushButton),如果支持右键菜单功能,此时就有统一执行多个异步事件,并且统一输出到界面的需求了,本篇结合多线程共享全局变量…...
数组列表中的最大距离
给定 m 个数组,每个数组都已经按照升序排好序了。现在你需要从两个不同的数组中选择两个整数(每个数组选一个)并且计算它们的距离。两个整数 a 和 b 之间的距离定义为它们差的绝对值 |a-b| 。你的任务就是去找到最大距离 示例 1:…...
C语言新手小白详细教程(7)指针和指针变量
希望文章能够给到初学的你一些启发~ 如果觉得文章对你有帮助的话,点赞 关注 收藏支持一下笔者吧~ 阅读指南: 开篇说明1、指针的定义接下来我们用图示的形式来解释一下 指针:2、申明指针变量3、取地址符 &4、为指针…...
内存分配函数malloc kmalloc vmalloc
内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...
css实现圆环展示百分比,根据值动态展示所占比例
代码如下 <view class""><view class"circle-chart"><view v-if"!!num" class"pie-item" :style"{background: conic-gradient(var(--one-color) 0%,#E9E6F1 ${num}%),}"></view><view v-else …...
vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
什么是Ansible Jinja2
理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具,可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板,允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板,并通…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
初探Service服务发现机制
1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
PH热榜 | 2025-06-08
1. Thiings 标语:一套超过1900个免费AI生成的3D图标集合 介绍:Thiings是一个不断扩展的免费AI生成3D图标库,目前已有超过1900个图标。你可以按照主题浏览,生成自己的图标,或者下载整个图标集。所有图标都可以在个人或…...
CppCon 2015 学习:REFLECTION TECHNIQUES IN C++
关于 Reflection(反射) 这个概念,总结一下: Reflection(反射)是什么? 反射是对类型的自我检查能力(Introspection) 可以查看类的成员变量、成员函数等信息。反射允许枚…...
