当前位置: 首页 > news >正文

第十六章 RabbitMQ延迟消息之延迟插件优化

目录

一、引言

二、优化方案 

三、核心代码实现

3.1. 生产者代码

3.2. 消息处理器 

3.3. 自定义多延迟消息封装类 

3.4. 订单实体类 

3.5. 消费者代码 

四、运行效果


一、引言

上一章节我们提到,直接使用延迟插件,创建一个延迟指定时间的消息(如10分钟),并不是最好的解决方案,因为假如我们的订单是在5分钟支付的,那么剩余的5分钟时间,RabbitMQ中延迟消息时钟还是一直占用着资源。如果有大量的延迟消息,那么对于服务来说压力是很大的,同时会耗费庞大昂贵的资源。因此,本章节我们就来近一步对延迟插件的消息进行优化。

我们通过下面的流程图来做近一步分析:

1. 用户下单完成后,发送15分钟延迟消息,在15分钟后接收消息,检查支付状态:

2. 已支付:更新订单状态为已支付

3. 未支付:更新订单状态为关闭订单,恢复商品库存

常规延迟插件消息使用的弊端总结:

1. 设置30分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:

2. 如果并发较高,30分钟可能堆积消息过多,对MQ压力很大

3. 大多数订单在下单后1分钟内就会支付,但是却需要在MQ内等待30分钟,浪费资源

二、优化方案 

如下图所示,我们可以将10分钟甚至30分钟拆分成多份零散的较短的时间。

消息初次发送的延迟时间设定为10s,10s过后如果订单还是未支付状态,我们判断延迟时间数组里还有没有剩余延迟时间,如果有则继续发送延迟消息,时间设定为数组中的第二个时间10s,直到订单支付成功终止循环,或是最后一份时间消耗完依然未支付,我们取消订单。

三、核心代码实现

3.1. 生产者代码

package com.example.publisher;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;/*** 生产者*/
@Slf4j
@SpringBootTest
class PublisherApplicationTests {@Resourceprivate RabbitTemplate rabbitTemplate;@Testvoid test() {Order order = Order.builder().orderId(1L).content("生活不易,所以保持足够的努力,对自己要有信心,积极地去面对工作生活的挑战!").build();MultiDelayMessage<Order> msg = MultiDelayMessage.of(order, 1000L, 5000L, 2000L, 10000L);rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(msg.removeNextDelay());return message;}});}
}

3.2. 消息处理器 

package com.example.publisher;import lombok.AllArgsConstructor;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;/*** 消息请求处理器*/
@AllArgsConstructor
public class DelayMessageProcessor implements MessagePostProcessor {private final Long delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(delay);return message;}
}

3.3. 自定义多延迟消息封装类 

package com.example.publisher;import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.util.CollectionUtils;import java.io.Serializable;
import java.util.List;/*** 自定义的多延时消息封装类* @param <T>*/
@Data
@NoArgsConstructor
public class MultiDelayMessage<T> implements Serializable {/*** 消息体*/private T data;/*** 记录延迟时间的集合*/private List<Long> delayMillis;public MultiDelayMessage(T data, List<Long> delayMillis) {this.data = data;this.delayMillis = delayMillis;}public static <T> MultiDelayMessage<T> of(T data, Long...delayMillis) {return new MultiDelayMessage<>(data, (List<Long>) CollectionUtils.arrayToList(delayMillis));}/*** 获取并移除下一个延迟时间* @return 队列中的第一个延迟时间*/public Long removeNextDelay() {return delayMillis.remove(0);}/*** 是否还有下一个延迟时间* @return*/public boolean hasNextDelay() {return !delayMillis.isEmpty();}
}

3.4. 订单实体类 

package com.example.publisher;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;/*** 订单类 * 此处为了演示,将真实业务中的订单类做了简化* 只包含一个订单ID和自定义消息内容*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order {private Long orderId;private String content;
}

3.5. 消费者代码 

package com.example.consumer;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;/*** 消费者* 因为作为演示,所以商城支付、订单、及扣减库存的业务代码已注释* 注释中保留了整个商城下单支付扣减库存的流程步骤*/
@Slf4j
@Component
public class SimpleListener {@Resourceprivate RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"))public void listener(MultiDelayMessage<Order> msg) throws Exception {System.out.println(((Order)msg.getData()).getContent());// 1. 查询订单状态// Order order = orderService.getById(msg.getData())// 2. 判断是否已支付
//        if (Order == null || order.status == 2) {
//            订单不存在或者已处理则直接返回
//            return;
//        }// 主动去支付服务查询真正的支付状态
//        PayOrder payOrder = payService.getById(order.getId());// 2.1. 已支付,则标记订单为已支付
//        if (payOrder.isPay()) {
//            orderService.markOrderPaySuccess(order.getId());
//            return;
//        }// 2.2. 未支付,获取下次订单延迟时间// 3. 判断是否存在延迟时间if (msg.hasNextDelay()) {// 3.1 存在,重发延迟消息Long nextDelay = msg.removeNextDelay();rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelayLong(nextDelay);return message;}});return;}// 3.2 不存在,取消订单
//        orderService.lambdaUpdate()
//                .set(Order::getStatus, 5);
//                .set(Order::getCloseTime, LocalDateTime.now());
//                .eq(Order::getId, order.getId())
//                .update();// 4. 恢复库存}
}

四、运行效果

最终我们会看到每间隔一段时间消费者就会消费一条消息,这个间隔时间就是我们设定的分段时间数组,这么做就能极大地减少资源消耗和服务的压力:

相关文章:

第十六章 RabbitMQ延迟消息之延迟插件优化

目录 一、引言 二、优化方案 三、核心代码实现 3.1. 生产者代码 3.2. 消息处理器 3.3. 自定义多延迟消息封装类 3.4. 订单实体类 3.5. 消费者代码 四、运行效果 一、引言 上一章节我们提到&#xff0c;直接使用延迟插件&#xff0c;创建一个延迟指定时间的消息&…...

[单master节点k8s部署]32.ceph分布式存储(三)

基于ceph rbd生成pv 在集群中认证ceph 用下面代码生成ceph的secret .创建 ceph 的 secret&#xff0c;在 k8s 的控制节点操作&#xff1a; 回到 ceph 管理节点创建 pool 池&#xff1a; [rootmaster1-admin ~]# ceph osd pool create k8stest 56 pool k8stest created [rootm…...

git 相关问题解决一一记录

文章目录 gitssh.github.com: Permission denied (publickey)1. 检查 SSH 密钥生成新的 SSH 密钥添加 SSH 密钥到 GitHub 2. 配置 SSH 代理启动 SSH 代理添加私钥到 SSH 代理 3. 检查 SSH 配置文件4. 测试 SSH 连接5. 检查防火墙和网络设置6. 检查 GitHub 账户设置详细步骤 更新…...

UE4 材质学习笔记04(着色器性能优化)

一.着色器性能优化 1.衡量着色器的性能 衡量着色器性能的主要方法有三个 第一个&#xff1a;可以使用场景的视图模式的优化视图模式的着色器复杂度 下面的滑条代表了着色器指令的复杂度 如果场景大部分是绿色的&#xff0c;说明着色器耗能低&#xff0c;反之白色则是很糟糕…...

3、Redis Stack扩展功能

文章目录 一、了解Redis产品二、申请RedisCloud实例三、Redis Stack体验1、RedisStack有哪些扩展&#xff1f;2、Redis JSON1、Redis JSON是什么2、Redis JSON有什么用3、Redis JSON的优势 3、Search And Query1、传统Scan搜索2、Search And Query搜索 4、Bloom Filter1、布隆过…...

Flythings学习(二)控件相关

文章目录 1 前言2 通用属性2.1 控件ID值2.2 控件位置2.3 背景色2.4 背景图2.5 显示与隐藏2.6 控件状态2.7 蜂鸣器控制 3 文本类TextView4 按键类 Button4.1 系统按键4.2 处理按钮长按事件4.3 处理按键触摸事件 5 复选框CheckBox6 单选组 RadioGroup7 进度条&#xff0c;滑块7.1…...

关于multiprocessing使用freeze_support()方法

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、freeze_support()&#xff1f;二、使用方法总结 **注意下面** 如果有车载讨论需要的小伙伴&#xff0c;可以私信加我微信&#xff0c;拉你进群&#xff0c;…...

基于rk356x u-boot版本功能分析及编译相关(一)

🎏技术驱动源于热爱,祝各位学有所成。 文章目录 uboot的分支是next-dev历史版本v2017-09uboot支持DM框架uboot前级pre-loader支持及引导下级uboot分区支持uboot支持固件格式secure bootuboot编译脚本位置build.shuboot/make.shrkbin仓库uboot的分支是next-dev历史版本v2017-…...

Jenkins---01

什么是敏捷开发 敏捷开发以用户的需求进化为核心&#xff0c;采用迭代、循序渐进的方法进行软件开发。在敏捷开 发中&#xff0c;软件项目在构建初期被切分成多个子项目&#xff0c;各个子项目的成果都经过测试&#xff0c;具备可视、 可集成和可运行使用的特征。换言之&…...

第十五届蓝桥杯C++B组省赛

文章目录 1.握手问题解题思路1&#xff08;组合数学&#xff09;解题思路2&#xff08;暴力枚举&#xff09; 2.小球反弹做题思路 3.好数算法思路&#xff08;暴力解法&#xff09;---不会超时 4.R格式算法思路 5.宝石组合算法思路---唯一分解定理 6.数字接龙算法思路----DFS 7…...

线程 vs 虚拟线程:深入理解及区别

Java 提供了两种线程机制&#xff1a;普通线程&#xff08;平台线程&#xff09;和 虚拟线程。普通线程是 Java 中经典的并发处理方式&#xff0c;而虚拟线程是随着 Java 21 引入的新特性&#xff0c;旨在提升并发性能和开发体验。本文将详细探讨它们的区别&#xff0c;并帮助你…...

【WEB应用安全测试指南–蓝队安全测试2】--超详细-可直接进行实战!!!亲测-可进行安全及渗透测试

安全基础理论入门知识参考上一篇《WEB应用安全测试指南蓝队安全测试1》 WEB应用安全测试指南2 一、文件 I/O 类1.1、任意文件上传1.2、任意文件下载1.3、文件包含 二、接口安全类2.1、短信炸弹2.2、邮件炸弹2.3、短信内容可控2.4、邮件内容可控 三、逻辑流程类3.1、越权3.2、未…...

使用HTML、CSS和JavaScript创建滚动弹幕效果

使用HTML、CSS和JavaScript创建滚动弹幕效果 在现代网页设计中&#xff0c;滚动文本是一种常见的动态效果&#xff0c;可以吸引用户的注意力并增强交互体验。在这篇博客文章中&#xff0c;我们将详细介绍如何使用HTML、CSS和JavaScript实现滚动文本效果。 效果 步骤1&#xf…...

【C语言】--数组

&#x1f60a;个人主页: 起名字真南 &#x1f60b;个人专栏:【数据结构初阶】 【C语言】 【C】 目录 1 数组的概念2 一维数组的创建和初始化2.2 数组的初始化2.3 数组类型 3 一维数组的使用3.1 数组下标3.2 数组的输入 4 一维数组在内存中的存储5 sizeof计算数组中的元素6 二维…...

面向B2B市场的Spring Boot医疗病历系统开发

第1章绪论 计算机已经从科研院所&#xff0c;大中型企业&#xff0c;走进了平常百姓家&#xff0c;Internet遍及世界各地&#xff0c;在网上能够用计算机进行文字草拟、修改、打印清样、文件登陆、检索、综合统计、分类、数据库管理等&#xff0c;用科学的方法将无序的信息进行…...

闭着眼学机器学习——支持向量机分类

引言&#xff1a; 在正文开始之前&#xff0c;首先给大家介绍一个不错的人工智能学习教程&#xff1a;https://www.captainbed.cn/bbs。其中包含了机器学习、深度学习、强化学习等系列教程&#xff0c;感兴趣的读者可以自行查阅。 1. 算法介绍 支持向量机(Support Vector Mach…...

今日指数项目day8实战权限管理器(上)

3.权限管理器 3.1 权限列表展示功能 1&#xff09;原型效果 2&#xff09;接口说明 功能描述&#xff1a; 查询所有权限集合 服务路径&#xff1a; /api/permissions 服务方法&#xff1a;Get 请求参数&#xff1a;无响应数据格式: {"code": 1,"data":…...

《机器学习与数据挖掘综合实践》实训课程教学解决方案

一、引言 随着信息技术的飞速发展&#xff0c;人工智能已成为推动社会进步的重要力量。作为人工智能的核心技术之一&#xff0c;机器学习与数据挖掘在各行各业的应用日益广泛。本方案旨在通过系统的理论教学、丰富的实践案例和先进的实训平台&#xff0c;帮助学生掌握机器学习…...

linux中软连接和硬链接的区别

定义与概念 硬链接&#xff08;Hard Link&#xff09;&#xff1a;硬链接是文件系统中的一个概念&#xff0c;它直接指向文件系统中的物理数据块。可以把硬链接看作是原始文件的一个别名&#xff0c;它们共享相同的inode&#xff08;索引节点&#xff09;编号。在Linux文件系统…...

#Swift 对比 Static 在Swift 和 OC中的用法

在 Objective-C 和 Swift 中&#xff0c;static 关键字都用于定义类型级别的成员&#xff0c;但它们的用法和行为在两个语言中有所不同。让我们来详细对比一下 Objective-C 和 Swift 中 static 的使用方式和特性。 1. Objective-C 中的 static 在 Objective-C 中&#xff0c;…...

yakit使用教程(三,端口探测和指纹扫描)

本文仅作为学习参考使用&#xff0c;本文作者对任何使用本文进行渗透攻击破坏不负任何责任。 前言&#xff1a; 前文链接&#xff1a;yakit下载安装教程。 1.端口扫描的作用。 对目标端口进行扫描可以知道目标服务器开启了什么服务&#xff0c;以便于针对其所存在的服务展开…...

一维数组的引用

#define SIZE 5 int main(void) { int i 0; int arr[SIZE] { 86,85,85,896,45 };//同理五个数据只是偶然&#xff0c;可能会更多 //输入 for (i 0;i < SIZE;i) { printf("请输入你的第%d个值&#xff1a;",i1); scanf_s(&…...

Vue3 watch 监视属性

作用&#xff1a;监视数据的变化&#xff08;和Vue2中的watch作用一致&#xff09;特点&#xff1a;Vue3中的watch只能监视以下四种数据&#xff1a; ref定义的数据。reactive定义的数据。函数返回一个值&#xff08;getter函数&#xff09;。一个包含上述内容的数组。 我们在V…...

大数据-158 Apache Kylin 安装配置详解 集群模式启动

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…...

PHP商会招商项目系统一站式服务助力企业腾飞

商会招商项目系统——一站式服务&#xff0c;助力企业腾飞 &#x1f680;&#x1f4bc; &#x1f680; 开篇&#xff1a;企业成长的加速器&#xff0c;商会招商项目系统来袭 在竞争激烈的市场环境中&#xff0c;企业如何快速找到适合自己的发展路径&#xff0c;实现腾飞&…...

pnpm 和 npm

pnpm 和 npm 是 JavaScript 生态系统中常用的包管理工具&#xff0c;它们各自有不同的特性和优缺点。下面是这两者的详细比较&#xff1a; 1. 基本概念 npm (Node Package Manager)&#xff1a; 是 Node.js 的默认包管理器&#xff0c;提供安装、更新、卸载 JavaScript 包的功…...

笔试算法总结

文章目录 题目1题目2题目3题目4 题目1 使用 StringBuilder 模拟栈的行为&#xff0c;通过判断相邻2个字符是否相同&#xff0c;如果相同就进行删除 public class Main {public static String fun(String s) {if (s null || s.length() < 1) return s;StringBuilder builde…...

mybatisPlus对于pgSQL中UUID和UUID[]类型的交互

在PGSQL中&#xff0c;有的类型是UUID和UUID[]这种类型&#xff0c;在mybatis和这些类型交互的时候需要手动设置类型处理器才可以&#xff0c;这里记录一下类型处理器的设置 /*** UUID类型处理器*/ public class UUIDTypeHandler extends BaseTypeHandler<UUID> {/*** 获…...

vue3 高德地图标注(飞线,呼吸点)效果

装下这两个 npm 忘了具体命令了&#xff0c;百度一下就行 “loca”: “^1.0.1”, “amap/amap-jsapi-loader”: “^1.0.1”, <template><div id"map" style"width: 100%;height: 100%;"></div> </template><script setup> …...

程序员成长秘籍:是迈向管理巅峰,还是深耕技术架构?

专业在线打字练习平台-巧手打字通&#xff0c;只输出有价值的知识。 一 管理和架构 做技术的同学一般有两条职业发展路径&#xff0c;横向的管理路线和纵向的技术路线。管理路线对应的是管理岗&#xff0c;讲究的是排兵布阵&#xff0c;通过各种资源的优化配置发挥价值。技术路…...