第十六章 RabbitMQ延迟消息之延迟插件优化
目录
一、引言
二、优化方案
三、核心代码实现
3.1. 生产者代码
3.2. 消息处理器
3.3. 自定义多延迟消息封装类
3.4. 订单实体类
3.5. 消费者代码
四、运行效果
一、引言
上一章节我们提到,直接使用延迟插件,创建一个延迟指定时间的消息(如10分钟),并不是最好的解决方案,因为假如我们的订单是在5分钟支付的,那么剩余的5分钟时间,RabbitMQ中延迟消息时钟还是一直占用着资源。如果有大量的延迟消息,那么对于服务来说压力是很大的,同时会耗费庞大昂贵的资源。因此,本章节我们就来近一步对延迟插件的消息进行优化。
我们通过下面的流程图来做近一步分析:
1. 用户下单完成后,发送15分钟延迟消息,在15分钟后接收消息,检查支付状态:
2. 已支付:更新订单状态为已支付
3. 未支付:更新订单状态为关闭订单,恢复商品库存
常规延迟插件消息使用的弊端总结:
1. 设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
2. 如果并发较高,30分钟可能堆积消息过多,对MQ压力很大
3. 大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源
二、优化方案
如下图所示,我们可以将10分钟甚至30分钟拆分成多份零散的较短的时间。
消息初次发送的延迟时间设定为10s,10s过后如果订单还是未支付状态,我们判断延迟时间数组里还有没有剩余延迟时间,如果有则继续发送延迟消息,时间设定为数组中的第二个时间10s,直到订单支付成功终止循环,或是最后一份时间消耗完依然未支付,我们取消订单。
三、核心代码实现
3.1. 生产者代码
package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {Order order = Order.builder().orderId(1L).content("生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!").build();MultiDelayMessage<Order> msg = MultiDelayMessage.of(order, 1000L, 5000L, 2000L, 10000L);rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(msg.removeNextDelay());return message;}});}
}
3.2. 消息处理器
package com.example.publisher;import lombok.AllArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;/*** 消息请求处理器*/
@AllArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final Long delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(delay);return message;}
}
3.3. 自定义多延迟消息封装类
package com.example.publisher;import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.util.CollectionUtils;import java.io.Serializable;
import java.util.List;/*** 自定义的多延时消息封装类* @param <T>*/
@Data
@NoArgsConstructor
public class MultiDelayMessage<T> implements Serializable {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long...delayMillis) {return new MultiDelayMessage<>(data, (List<Long>) CollectionUtils.arrayToList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay() {return delayMillis.remove(0);}/*** 是否还有下一个延迟时间* @return*/public boolean hasNextDelay() {return !delayMillis.isEmpty();}
}
3.4. 订单实体类
package com.example.publisher;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 订单类 * 此处为了演示,将真实业务中的订单类做了简化* 只包含一个订单ID和自定义消息内容*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order {private Long orderId;private String content;
}
3.5. 消费者代码
package com.example.consumer;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 消费者* 因为作为演示,所以商城支付、订单、及扣减库存的业务代码已注释* 注释中保留了整个商城下单支付扣减库存的流程步骤*/
@Slf4j
@Component
public class SimpleListener {@Resourceprivate RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listener(MultiDelayMessage<Order> msg) throws Exception {System.out.println(((Order)msg.getData()).getContent());// 1. 查询订单状态// Order order = orderService.getById(msg.getData())// 2. 判断是否已支付
// if (Order == null || order.status == 2) {
// 订单不存在或者已处理则直接返回
// return;
// }// 主动去支付服务查询真正的支付状态
// PayOrder payOrder = payService.getById(order.getId());// 2.1. 已支付,则标记订单为已支付
// if (payOrder.isPay()) {
// orderService.markOrderPaySuccess(order.getId());
// return;
// }// 2.2. 未支付,获取下次订单延迟时间// 3. 判断是否存在延迟时间if (msg.hasNextDelay()) {// 3.1 存在,重发延迟消息Long nextDelay = msg.removeNextDelay();rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(nextDelay);return message;}});return;}// 3.2 不存在,取消订单
// orderService.lambdaUpdate()
// .set(Order::getStatus, 5);
// .set(Order::getCloseTime, LocalDateTime.now());
// .eq(Order::getId, order.getId())
// .update();// 4. 恢复库存}
}
四、运行效果
最终我们会看到每间隔一段时间消费者就会消费一条消息,这个间隔时间就是我们设定的分段时间数组,这么做就能极大地减少资源消耗和服务的压力:
相关文章:

第十六章 RabbitMQ延迟消息之延迟插件优化
目录 一、引言 二、优化方案 三、核心代码实现 3.1. 生产者代码 3.2. 消息处理器 3.3. 自定义多延迟消息封装类 3.4. 订单实体类 3.5. 消费者代码 四、运行效果 一、引言 上一章节我们提到,直接使用延迟插件,创建一个延迟指定时间的消息&…...

[单master节点k8s部署]32.ceph分布式存储(三)
基于ceph rbd生成pv 在集群中认证ceph 用下面代码生成ceph的secret .创建 ceph 的 secret,在 k8s 的控制节点操作: 回到 ceph 管理节点创建 pool 池: [rootmaster1-admin ~]# ceph osd pool create k8stest 56 pool k8stest created [rootm…...
git 相关问题解决一一记录
文章目录 gitssh.github.com: Permission denied (publickey)1. 检查 SSH 密钥生成新的 SSH 密钥添加 SSH 密钥到 GitHub 2. 配置 SSH 代理启动 SSH 代理添加私钥到 SSH 代理 3. 检查 SSH 配置文件4. 测试 SSH 连接5. 检查防火墙和网络设置6. 检查 GitHub 账户设置详细步骤 更新…...

UE4 材质学习笔记04(着色器性能优化)
一.着色器性能优化 1.衡量着色器的性能 衡量着色器性能的主要方法有三个 第一个:可以使用场景的视图模式的优化视图模式的着色器复杂度 下面的滑条代表了着色器指令的复杂度 如果场景大部分是绿色的,说明着色器耗能低,反之白色则是很糟糕…...

3、Redis Stack扩展功能
文章目录 一、了解Redis产品二、申请RedisCloud实例三、Redis Stack体验1、RedisStack有哪些扩展?2、Redis JSON1、Redis JSON是什么2、Redis JSON有什么用3、Redis JSON的优势 3、Search And Query1、传统Scan搜索2、Search And Query搜索 4、Bloom Filter1、布隆过…...

Flythings学习(二)控件相关
文章目录 1 前言2 通用属性2.1 控件ID值2.2 控件位置2.3 背景色2.4 背景图2.5 显示与隐藏2.6 控件状态2.7 蜂鸣器控制 3 文本类TextView4 按键类 Button4.1 系统按键4.2 处理按钮长按事件4.3 处理按键触摸事件 5 复选框CheckBox6 单选组 RadioGroup7 进度条,滑块7.1…...
关于multiprocessing使用freeze_support()方法
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、freeze_support()?二、使用方法总结 **注意下面** 如果有车载讨论需要的小伙伴,可以私信加我微信,拉你进群,…...

基于rk356x u-boot版本功能分析及编译相关(一)
🎏技术驱动源于热爱,祝各位学有所成。 文章目录 uboot的分支是next-dev历史版本v2017-09uboot支持DM框架uboot前级pre-loader支持及引导下级uboot分区支持uboot支持固件格式secure bootuboot编译脚本位置build.shuboot/make.shrkbin仓库uboot的分支是next-dev历史版本v2017-…...

Jenkins---01
什么是敏捷开发 敏捷开发以用户的需求进化为核心,采用迭代、循序渐进的方法进行软件开发。在敏捷开 发中,软件项目在构建初期被切分成多个子项目,各个子项目的成果都经过测试,具备可视、 可集成和可运行使用的特征。换言之&…...

第十五届蓝桥杯C++B组省赛
文章目录 1.握手问题解题思路1(组合数学)解题思路2(暴力枚举) 2.小球反弹做题思路 3.好数算法思路(暴力解法)---不会超时 4.R格式算法思路 5.宝石组合算法思路---唯一分解定理 6.数字接龙算法思路----DFS 7…...
线程 vs 虚拟线程:深入理解及区别
Java 提供了两种线程机制:普通线程(平台线程)和 虚拟线程。普通线程是 Java 中经典的并发处理方式,而虚拟线程是随着 Java 21 引入的新特性,旨在提升并发性能和开发体验。本文将详细探讨它们的区别,并帮助你…...

【WEB应用安全测试指南–蓝队安全测试2】--超详细-可直接进行实战!!!亲测-可进行安全及渗透测试
安全基础理论入门知识参考上一篇《WEB应用安全测试指南蓝队安全测试1》 WEB应用安全测试指南2 一、文件 I/O 类1.1、任意文件上传1.2、任意文件下载1.3、文件包含 二、接口安全类2.1、短信炸弹2.2、邮件炸弹2.3、短信内容可控2.4、邮件内容可控 三、逻辑流程类3.1、越权3.2、未…...

使用HTML、CSS和JavaScript创建滚动弹幕效果
使用HTML、CSS和JavaScript创建滚动弹幕效果 在现代网页设计中,滚动文本是一种常见的动态效果,可以吸引用户的注意力并增强交互体验。在这篇博客文章中,我们将详细介绍如何使用HTML、CSS和JavaScript实现滚动文本效果。 效果 步骤1…...

【C语言】--数组
😊个人主页: 起名字真南 😋个人专栏:【数据结构初阶】 【C语言】 【C】 目录 1 数组的概念2 一维数组的创建和初始化2.2 数组的初始化2.3 数组类型 3 一维数组的使用3.1 数组下标3.2 数组的输入 4 一维数组在内存中的存储5 sizeof计算数组中的元素6 二维…...
面向B2B市场的Spring Boot医疗病历系统开发
第1章绪论 计算机已经从科研院所,大中型企业,走进了平常百姓家,Internet遍及世界各地,在网上能够用计算机进行文字草拟、修改、打印清样、文件登陆、检索、综合统计、分类、数据库管理等,用科学的方法将无序的信息进行…...

闭着眼学机器学习——支持向量机分类
引言: 在正文开始之前,首先给大家介绍一个不错的人工智能学习教程:https://www.captainbed.cn/bbs。其中包含了机器学习、深度学习、强化学习等系列教程,感兴趣的读者可以自行查阅。 1. 算法介绍 支持向量机(Support Vector Mach…...

今日指数项目day8实战权限管理器(上)
3.权限管理器 3.1 权限列表展示功能 1)原型效果 2)接口说明 功能描述: 查询所有权限集合 服务路径: /api/permissions 服务方法:Get 请求参数:无响应数据格式: {"code": 1,"data":…...

《机器学习与数据挖掘综合实践》实训课程教学解决方案
一、引言 随着信息技术的飞速发展,人工智能已成为推动社会进步的重要力量。作为人工智能的核心技术之一,机器学习与数据挖掘在各行各业的应用日益广泛。本方案旨在通过系统的理论教学、丰富的实践案例和先进的实训平台,帮助学生掌握机器学习…...
linux中软连接和硬链接的区别
定义与概念 硬链接(Hard Link):硬链接是文件系统中的一个概念,它直接指向文件系统中的物理数据块。可以把硬链接看作是原始文件的一个别名,它们共享相同的inode(索引节点)编号。在Linux文件系统…...
#Swift 对比 Static 在Swift 和 OC中的用法
在 Objective-C 和 Swift 中,static 关键字都用于定义类型级别的成员,但它们的用法和行为在两个语言中有所不同。让我们来详细对比一下 Objective-C 和 Swift 中 static 的使用方式和特性。 1. Objective-C 中的 static 在 Objective-C 中,…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
数据链路层的主要功能是什么
数据链路层(OSI模型第2层)的核心功能是在相邻网络节点(如交换机、主机)间提供可靠的数据帧传输服务,主要职责包括: 🔑 核心功能详解: 帧封装与解封装 封装: 将网络层下发…...
鱼香ros docker配置镜像报错:https://registry-1.docker.io/v2/
使用鱼香ros一件安装docker时的https://registry-1.docker.io/v2/问题 一键安装指令 wget http://fishros.com/install -O fishros && . fishros出现问题:docker pull 失败 网络不同,需要使用镜像源 按照如下步骤操作 sudo vi /etc/docker/dae…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...

安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖
在Vuzix M400 AR智能眼镜的助力下,卢森堡罗伯特舒曼医院(the Robert Schuman Hospitals, HRS)凭借在无菌制剂生产流程中引入增强现实技术(AR)创新项目,荣获了2024年6月7日由卢森堡医院药剂师协会࿰…...
NPOI Excel用OLE对象的形式插入文件附件以及插入图片
static void Main(string[] args) {XlsWithObjData();Console.WriteLine("输出完成"); }static void XlsWithObjData() {// 创建工作簿和单元格,只有HSSFWorkbook,XSSFWorkbook不可以HSSFWorkbook workbook new HSSFWorkbook();HSSFSheet sheet (HSSFSheet)workboo…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...
Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析
Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析 一、第一轮基础概念问题 1. Spring框架的核心容器是什么?它的作用是什么? Spring框架的核心容器是IoC(控制反转)容器。它的主要作用是管理对…...

【深度学习新浪潮】什么是credit assignment problem?
Credit Assignment Problem(信用分配问题) 是机器学习,尤其是强化学习(RL)中的核心挑战之一,指的是如何将最终的奖励或惩罚准确地分配给导致该结果的各个中间动作或决策。在序列决策任务中,智能体执行一系列动作后获得一个最终奖励,但每个动作对最终结果的贡献程度往往…...