SpringBoot 整合 RabbitMQ 实现延迟消息
一、业务场景说明
用于解决用户下单以后,订单超时如何取消订单的问题。
- 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);
- 生成订单,获取订单的id;
- 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
- 按订单超时时间发送一个延迟消息给RabbitMQ,让它在订单超时后触发取消订单的操作;
- 如果用户没有支付,进行取消订单操作(释放锁定商品库存、返还优惠券、返回积分一系列操作)。
相关概念:(死信队列与延迟队列)
死信,顾名思义就是无法被消费的消息,一般来说 Producer 将消息投递到 broker 或者直接丢到 queue 中,Consumer 从 Queue 中取出消息进行消费,但是某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列
死信队列有其特殊的应用场景,例如用户在商城下单成功并点击去支付的时候,如果在指定的时间内未支付,那么就可以将该下单消息投递到死信队列中,至于后续怎么处理死信队列需要结合具体的应用场景。
死信队列(Dead Letter Queue, DLQ)
目的: 处理无法成功消费的消息。
特点:
- 目的: 用于接收无法成功处理的消息。这些消息可能因为多种原因无法被正常消费,例如消息处理失败、消息超时、队列满等。
- 触发条件: 当消息在主队列中无法被正常消费,并且达到一定的失败次数或过期时间后,会被转发到死信队列。
- 用途: 通过分析死信队列中的消息,开发者可以排查和解决系统中的问题,也可以进行后续的审计和处理。
实现方式:
- 需要配置主队列和死信队列之间的关系。
- 配置消息的重试策略和死信转发条件(如重试次数、过期时间等)。
延迟队列(Delayed Queue)
目的: 控制消息的处理时间。
特点:
- 目的: 在特定的时间点或延迟一段时间后再将消息发送到主队列进行处理。用于在系统中实现消息的延迟处理。
- 触发条件: 当消息被发送到延迟队列时,它会根据设定的延迟时间在未来某个时刻被转发到主队列。
- 用途: 常用于实现任务的延迟执行、定时任务、超时重试等功能。
实现方式:
- 使用专门支持延迟队列的消息队列系统,或者通过设置消息的过期时间和重试机制来实现。
- 可以配置延迟时间,确保消息在设定的时间后进入主队列。
总结
- 死信队列: 处理无法消费的消息,通常用于异常处理和问题排查。
- 延迟队列: 控制消息的处理时间,允许消息在未来某个时间点才被处理。
二、整合RabbitMQ实现延迟消息
整合依赖及配置
●在pom.xml中添加AMQP相关依赖;
<!--Spring AMQP相关依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 修改
application.yml
文件,在spring节点下添加RabbitMQ相关配置。
spring:rabbitmq:host: localhostport: 5672virtual-host: /mallusername: mallpassword: mallpublisher-returns: true #消息发送到队列确认publisher-confirm-type: simple #消息发送到交换器确认
实现延迟消息
- 添加消息队列的枚举配置类QueueEnum,用于延迟消息队列及处理取消订单消息队列的常量定义,包括交换机名称、队列名称、路由键名称;
/*** @description 消息队列枚举配置*/
@Getter
public enum QueueEnum {/*** 消息通知队列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl队列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交换机名称*/private String exchange;/*** 队列名称*/private String name;/*** 路由键*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}
}
- 添加RabbitMQ的Java配置,用于配置交换机、队列及队列与交换机的绑定关系;
/*** @description 消息队列配置*/
@Configuration
public class RabbitMqConfig {/*** 订单消息实际消费队列所绑定的交换机*/@BeanDirectExchange orderDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单延迟队列所绑定的交换机*/@BeanDirectExchange orderTtlDirect() {return ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 订单实际消费队列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 订单延迟队列(死信队列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后转发的交换机.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后转发的路由键.build();}/*** 将订单队列绑定到交换机*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 将订单延迟队列绑定到交换机*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}
- 此时登录,在RabbitMQ管理页面可以看到以下交换机和队列;
- 交换机及队列说明如下:mall.order.direct(取消订单消息队列所绑定的交换机):绑定的队列为
mall.order.cancel
,一旦有消息以mall.order.cancel
为路由键发过来,会发送到此队列。mall.order.direct.ttl(订单延迟消息队列所绑定的交换机):绑定的队列为mall.order.cancel.ttl
,一旦有消息以mall.order.cancel.ttl
为路由键发送过来,会转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到mall.order.cancel
(取消订单消息消费队列)。 - 添加延迟消息的发送者CancelOrderSender,用于向订单延迟消息队(mall.order.cancel.ttl)里发送消息;
/*** @description 取消订单消息的发出者*/
@Component
public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//给延迟队列发送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//给消息设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);}
}
- 添加取消订单消息的接收者CancelOrderReceiver,用于从取消订单的消息队列(mall.order.cancel)里接收消息;
/*** @description 取消订单消息的处理者*/
@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);}
}
- 添加OmsPortalOrderService接口;
/*** @description 前台订单管理Service*/
public interface OmsPortalOrderService {/*** 根据提交信息生成订单*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消单个超时订单*/@Transactionalvoid cancelOrder(Long orderId);
}
- 添加OmsPortalOrderService的实现类OmsPortalOrderServiceImpl;
/*** @description 前台订单管理Service*/
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 执行一系类下单操作,具体参考mall项目LOGGER.info("process generateOrder");//下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下单成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 执行一系类取消订单操作,具体参考mall项目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//获取订单超时时间,假设为60分钟long delayTimes = 30 * 1000;//发送延迟消息cancelOrderSender.sendMessage(orderId, delayTimes);}}
- 添加OmsPortalOrderController定义接口;
/*** @description 订单管理Controller*/
@Controller
@Api(tags = "OmsPortalOrderController")
@Tag(name = "OmsPortalOrderController", description = "订单管理")
@RequestMapping("/order")
public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根据购物车信息生成订单")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);}
}
延迟消息功能演示
- 运行项目,访问Swagger API文档,访问地址:http://localhost:8080/swagger-ui/
可以看到,经过了30秒后延迟消息发送出去了
相关文章:

SpringBoot 整合 RabbitMQ 实现延迟消息
一、业务场景说明 用于解决用户下单以后,订单超时如何取消订单的问题。 用户进行下单操作(会有锁定商品库存、使用优惠券、积分一系列的操作);生成订单,获取订单的id;获取到设置的订单超时时间࿰…...

Cilium:基于开源 eBPF 的网络、安全性和可观察性
基于 eBPF 的网络、安全性和可观察性 Cilium 是一种开源的云原生解决方案,它利用 Linux 内核中的 eBPF 技术来提供、保护和监控工作负载之间的网络连接。 什么是 eBPF? eBPF 是一项源自 Linux 内核的技术,允许沙盒程序在特权上下文&#x…...
Axios 详解与使用指南
Axios 详解与使用指南 1. Axios 简介 Axios 是一个基于 Promise 的 HTTP 客户端,能够在浏览器和 Node.js 环境中运行。它提供了一种简便的方式来执行 HTTP 请求,并支持多种请求方法,如 GET、POST、PUT、DELETE 等。Axios 的配置灵活&#x…...

深度学习 —— 个人学习笔记20(转置卷积、全卷积网络)
声明 本文章为个人学习使用,版面观感若有不适请谅解,文中知识仅代表个人观点,若出现错误,欢迎各位批评指正。 三十九、转置卷积 import torch from torch import nndef trans_conv(X, K):h, w K.shapeY torch.zeros((X.shape[…...
解决Mac系统Python3.12版本pip安装报错error: externally-managed-environment的问题
遇到的问题 在Mac安装了Python3.12.x版本(3.12.3、3.12.4)后,当尝试pip3 install xxx的时候,总是报错:error: externally-managed-environment error: externally-managed-environment This environment is external…...

lvm知识终结
、什么是 LVM LVM 是 Linux 下对磁盘分区进行管理的一种工具,适合管理大存储设备,并允许用户动态调整文件系统的大小 lvm 常用的命令 功能 PV 管理命令 VG 管理命令 LV 管理命令 scan 扫描 pvscan vgscan lvscan create 创建 pvcreate v…...

ESP32S3 IDF 对 16路输入输出芯片MCP23017做了个简单的测试
这次还是使用了idf老版本4.4.7,上次用了5.3,感觉不好用,官方的MCP23017芯片是英文版,真的很难读明白,可能是我英语水平不够吧。先看看每个寄存器的功能: IODIRA 和 IODIRB: 输入/输出方向寄存器 IPOLA 和 I…...
【技术前沿】Flux.1部署教程入门--Stable Diffusion团队最前沿、免费的开源AI图像生成器
项目简介 FLUX.1 是一种新的开源图像生成模型。它由 Stable Diffusion 背后的团队 Black Forest Labs 开发。 官网中有以下功能开源供大家参考: FLUX.1 擅长在图像中准确再现文字,因此非常适合需要清晰文字或短语的设计。无论是标牌、书籍封面还是品牌…...
Redis 的 STREAM 和 RocketMQ 是两种不同的消息队列和流处理解决方案,它们在设计理念、功能和用途上有显著区别。以下是它们的主要区别:
20240813 Redis 的 STREAM 和 RocketMQ 是两种不同的消息队列和流处理解决方案,它们在设计理念、功能和用途上有显著区别。以下是它们的主要区别:1. 使用 Redis 的 Sorted Set 数据结构连接到 Redis示例用法添加事件获取滑动窗口内的事件移除过期事件连接…...
Visual Studio Code安装与C/C++语言运行(上)
Visual Studio Code(VS Code)作为微软开发的一款轻量级但功能强大的源代码编辑器,广泛应用于各种编程语言的开发,包括C/C。以下将详细介绍VS Code的安装过程以及与C/C语言运行环境的配置。 一、Visual Studio Code的安装 1. 准备…...

探索数据可视化,数据看板在各行业中的应用
数据可视化是一种通过图形化手段将数据呈现出来的技术,它将复杂的数据和信息转化为易于理解的图表、地图、仪表盘等视觉元素,使得数据的模式、趋势和关系更加直观地展现出来。通过数据可视化,用户可以快速识别重要信息、发现潜在问题…...

haralyzer 半自动,一次性少量数据采集快捷方法
使用场景:半自动,一次性少量数据采集需求在工作中还是不少遇到的,无论使用模拟的方式,或者破解都不太划算。其实这种需求,使用半自动爬虫是最简单的。不需要考虑网站反爬虫的问题,因为你使用的就是真实的浏…...
mall-admin-web-master前端项目下载依赖失败解决
碰壁后的总结 pythone 环境 2.XX版本,切记不要3.0以上的。node 16.x不能太高 错误案例 npm ERR! code 1 npm ERR! path D:\workspace\springBootMall\mall-admin-web-master\node_modules\node-sass npm ERR! command failed npm ERR! command C:\windows\system…...
【07】JVM是怎么实现invokedynamic的
在Java中,方法调用会被编译为invokeStatic,invokeSpecial,invokVirtual以及invokeInterface四种指令。这些指令与包含目标方法类名、方法名以及方法描述符的符号引用捆绑,在实际运行之前,JVM根据这个符号引用链接到具体…...

使用API有效率地管理Dynadot域名,查看参与的拍卖列表
前言 Dynadot是通过ICANN认证的域名注册商,自2002年成立以来,服务于全球108个国家和地区的客户,为数以万计的客户提供简洁,优惠,安全的域名注册以及管理服务。 Dynadot平台操作教程索引(包括域名邮箱&…...

Linux 基本指令讲解
linux 基本指令 clear 清屏 Alt Enter 全屏/退出全屏 pwd 显示当前用户所处路径 cd 改变目录 cd /root/mikecd … 返回上级目录cd - 返回最近所处的路径cd ~ 直接返回当前用户自己的家目 roor 中:/root普通用户中:/home/mike mkdir 创建一个文件夹(d) …...
PRE_EMPHASIS
PRE_EMPASIS属性用于提高高频信号的信号完整性 其通过传输线遭受高频损耗。发射机 预加重(pre_EMPASIS)功能允许对某些信号驱动器进行预加重 I/O标准。 提示:发射机的预加重可以与接收机的均衡相结合,以提高 整体信号完整性。 理想…...
【QT常用技术讲解】多线程处理+全局变量处理异步事件并获取多个线程返回的结果
前言 QTableView加入勾选项后(参考【QT常用技术讲解】QTableView添加QCheckBox、QPushButton),如果支持右键菜单功能,此时就有统一执行多个异步事件,并且统一输出到界面的需求了,本篇结合多线程共享全局变量…...
数组列表中的最大距离
给定 m 个数组,每个数组都已经按照升序排好序了。现在你需要从两个不同的数组中选择两个整数(每个数组选一个)并且计算它们的距离。两个整数 a 和 b 之间的距离定义为它们差的绝对值 |a-b| 。你的任务就是去找到最大距离 示例 1:…...

C语言新手小白详细教程(7)指针和指针变量
希望文章能够给到初学的你一些启发~ 如果觉得文章对你有帮助的话,点赞 关注 收藏支持一下笔者吧~ 阅读指南: 开篇说明1、指针的定义接下来我们用图示的形式来解释一下 指针:2、申明指针变量3、取地址符 &4、为指针…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...

对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
代理篇12|深入理解 Vite中的Proxy接口代理配置
在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...

Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...

【堆垛策略】设计方法
堆垛策略的设计是积木堆叠系统的核心,直接影响堆叠的稳定性、效率和容错能力。以下是分层次的堆垛策略设计方法,涵盖基础规则、优化算法和容错机制: 1. 基础堆垛规则 (1) 物理稳定性优先 重心原则: 大尺寸/重量积木在下…...

MySQL体系架构解析(三):MySQL目录与启动配置全解析
MySQL中的目录和文件 bin目录 在 MySQL 的安装目录下有一个特别重要的 bin 目录,这个目录下存放着许多可执行文件。与其他系统的可执行文件类似,这些可执行文件都是与服务器和客户端程序相关的。 启动MySQL服务器程序 在 UNIX 系统中,用…...

边缘计算网关提升水产养殖尾水处理的远程运维效率
一、项目背景 随着水产养殖行业的快速发展,养殖尾水的处理成为了一个亟待解决的环保问题。传统的尾水处理方式不仅效率低下,而且难以实现精准监控和管理。为了提升尾水处理的效果和效率,同时降低人力成本,某大型水产养殖企业决定…...
拟合问题处理
在机器学习中,核心任务通常围绕模型训练和性能提升展开,但你提到的 “优化训练数据解决过拟合” 和 “提升泛化性能解决欠拟合” 需要结合更准确的概念进行梳理。以下是对机器学习核心任务的系统复习和修正: 一、机器学习的核心任务框架 机…...
SQL进阶之旅 Day 22:批处理与游标优化
【SQL进阶之旅 Day 22】批处理与游标优化 文章简述(300字左右) 在数据库开发中,面对大量数据的处理任务时,单条SQL语句往往无法满足性能需求。本篇文章聚焦“批处理与游标优化”,深入探讨如何通过批量操作和游标技术提…...