当前位置: 首页 > article >正文

可靠消息投递demo

以下是一个基于 Spring Boot + RocketMQ 的完整分布式事务实战 Demo,包含事务消息本地事务自动重试死信队列(DLQ) 等核心机制。代码已充分注释,可直接运行。


一、项目结构

src/main/java
├── com.example.rocketmq
│   ├── controller
│   │   └── OrderController.java
│   ├── model
│   │   ├── Order.java
│   │   ├── OrderRequest.java
│   ├── repository
│   │   ├── OrderRepository.java
│   ├── service
│   │   ├── InventoryService.java
│   │   ├── OrderService.java
│   │   ├── PaymentService.java
│   ├── listener
│   │   └── OrderConsumer.java
│   └── RocketMQConfig.java
├── application.yml
└── pom.xml

在这里插入图片描述


二、依赖配置(pom.xml)

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- RocketMQ Spring Boot Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.11.0</version></dependency><!-- MySQL Driver --><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><!-- Spring Data JPA --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency>
</dependencies>

三、配置文件(application.yml)

# RocketMQ配置
rocketmq:producer:name-server: localhost:9876default-topic: order_topicconsumer:name-server: localhost:9876default-topic: order_topicconsumer-group: order_consumer_groupacknowledge-mode: AUTOmax-reconsume-times: 5 # 最大重试次数broker:role: SYNC_MASTER # 同步复制模式store-path-commit-log: /data/rocketmq/commitlogstore-path-consume-queue: /data/rocketmq/consumequeue# 数据库配置
spring:datasource:url: jdbc:mysql://localhost:3306/rocketmq_db?useSSL=false&serverTimezone=UTCusername: rootpassword: rootdriver-class-name: com.mysql.cj.jdbc.Driverhikari:maximum-pool-size: 20jpa:hibernate:ddl-auto: updateshow-sql: true

四、核心代码实现

1. 实体类(Order.java)
@Entity
@Table(name = "orders")
@Data
public class Order {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Long id;private String userId;private BigDecimal amount;private String sku;private Integer status; // 0-待支付,1-已支付,2-已发货
}
2. 生产者代码(OrderService.java)
@Service
public class OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate PaymentService paymentService;@Transactional // 本地事务public void createOrder(OrderRequest request) {// 1. 扣减库存(本地事务)inventoryService.deduct(request.getSku());// 2. 发送事务消息(与本地事务绑定)rocketMQTemplate.sendMessageInTransaction("order_topic", request, () -> { // 事务回滚回调System.out.println("本地事务回滚,消息未发送!");return null;});}
}
3. 消费者代码(OrderConsumer.java)
@Service
public class OrderConsumer {@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate PaymentService paymentService;@Autowiredprivate InventoryService inventoryService;@RocketMQListener(topics = "order_topic",consumerGroup = "order_consumer_group",acknowledge-mode = AcknowledgeMode.AUTO)public void listen(OrderRequest request) {try {// 1. 生成订单记录Order order = new Order();order.setUserId(request.getUserId());order.setAmount(request.getAmount());order.setSku(request.getSku());orderRepository.save(order);// 2. 扣款(外部服务调用)paymentService.charge(request.getUserId(), request.getAmount());// 3. 发送物流通知(模拟成功)System.out.println("物流已通知,订单号: " + order.getId());} catch (Exception e) {// 4. 异常处理:触发重试或补偿System.out.println("处理失败,触发重试! 订单号: " + request.getOrderNo());throw new RuntimeException("订单处理失败", e);}}
}
4. 支付服务(PaymentService.java)
@Service
public class PaymentService {@Autowiredprivate PaymentRepository paymentRepository;public void charge(String userId, BigDecimal amount) {// 模拟支付失败(30%概率)if (Math.random() < 0.3) {throw new RuntimeException("支付失败,用户: " + userId);}Payment payment = new Payment();payment.setUserId(userId);payment.setAmount(amount);payment.setStatus("SUCCESS");paymentRepository.save(payment);}
}

五、数据库表设计

1. 订单表(orders)
CREATE TABLE orders (id BIGINT PRIMARY KEY AUTO_INCREMENT,user_id VARCHAR(50) NOT NULL,amount DECIMAL(10,2) NOT NULL,sku VARCHAR(50) NOT NULL,status TINYINT DEFAULT 0 COMMENT '0-待支付,1-已支付,2-已发货'
);
2. 支付表(payments)
CREATE TABLE payments (id BIGINT PRIMARY KEY AUTO_INCREMENT,user_id VARCHAR(50) NOT NULL,amount DECIMAL(10,2) NOT NULL,status ENUM('SUCCESS', 'FAILED') DEFAULT 'SUCCESS'
);

六、测试与验证

1. 正常流程

步骤

  1. 发送创建订单请求(扣减库存 + 发送事务消息)。
  2. 消费者处理消息(生成订单 + 扣款 + 物流通知)。
    预期结果
    • 库存减少,订单和支付记录生成,物流通知成功。
2. 异常流程(支付失败)

步骤

  1. 发送创建订单请求。
  2. 消费者处理时支付失败(抛出异常)。
  3. RocketMQ自动重试(默认3次)。
  4. 重试失败后消息转入DLQ。
    预期结果
    • 库存已恢复(通过本地事务回滚)。
    • 订单未生成,支付记录未插入。
3. DLQ处理

操作:手动消费DLQ中的消息,排查支付失败原因(如用户余额不足)。
代码示例

@RocketMQListener(topics = "order_topic_DLQ",consumerGroup = "order_consumer_group_dlq"
)
public void listenDLQ(OrderRequest request) {System.out.println("处理死信消息: " + request.getOrderNo());// 人工干预逻辑(如短信通知用户)
}

七、关键机制说明

1. 事务消息与本地事务绑定

代码示例sendMessageInTransaction 方法将消息发送与本地事务提交原子化。
流程
• 本地事务成功 → RocketMQ持久化消息。
• 本地事务失败 → RocketMQ丢弃消息。

2. 自动重试与死信队列

配置max-reconsume-times=5 表示最大重试5次。
DLQ Topic:默认死信队列名称为 order_topic_Retry,可通过 spring.rabbitmq.listener.defaultDLQ 配置。

3. ACK确认机制

自动ACK:消费者处理完消息后自动发送确认,RocketMQ删除消息。
手动ACK(可选):通过 AcknowledgeMode.MANUAL 控制。


八、生产环境优化建议

  1. 持久化配置
    • 确保 storePathCommitLogstorePathConsumeQueue 指向持久化磁盘路径。
  2. 多Broker集群
    • 部署多个Broker节点,配置 brokerRole=SYNC_MASTER 实现高可用。
  3. 监控与报警
    • 监控 ConsumerLagMessagesPending 指标,阈值报警。
  4. 日志记录
    • 启用RocketMQ日志(log4j2.xml),记录消息生产、消费详情。

九、总结

通过本Demo,你已掌握以下核心技能:

  1. 事务消息:结合本地事务实现强一致性。
  2. 自动重试:处理临时性故障(如网络抖动)。
  3. 死信队列:隔离无法处理的异常消息。
  4. 监控与运维:通过指标和日志保障系统稳定性。

下一步行动
• 将Demo部署到Docker容器,模拟高并发场景。
• 结合Seata框架实现更复杂的分布式事务(如订单-库存-支付三阶段)。

相关文章:

可靠消息投递demo

以下是一个基于 Spring Boot RocketMQ 的完整分布式事务实战 Demo&#xff0c;包含事务消息、本地事务、自动重试、死信队列&#xff08;DLQ&#xff09; 等核心机制。代码已充分注释&#xff0c;可直接运行。 一、项目结构 src/main/java ├── com.example.rocketmq │ …...

阻止 Mac 在运行任务时进入休眠状态

掌握Caffeinate命令&#xff1a;让您的 Mac 保持清醒以完成关键任务 开发人员经常发现自己在 Mac 上运行持续时间较长的进程。无论是大量文件上传、广泛的数据分析脚本&#xff0c;还是复杂的构建过程&#xff0c;我们最不希望的就是我们的机器在任务中途进入睡眠状态。输入 c…...

Copilot提示词库用法:调整自己想要的,记住常用的,分享该共用的

不论你是 Microsoft 365 Copilot 的新用户还是熟练运用的老鸟&#xff0c;不论你是使用copilot chat&#xff0c;还是在office365中使用copilot&#xff0c;copilot提示词库都将帮助你充分使用copilot这一划时代的产品。它不仅可以帮助你记住日常工作中常用的prompt提示词&…...

Python实战(3)-数据库操作

前面说过&#xff0c;可用的SQL数据库引擎有很多&#xff0c;它们都有相应的Python模块。这些数据库引擎大都作为服务器程序运行&#xff0c;连安装都需要有管理员权限。为降低Python DB API的使用门槛&#xff0c;我选择了一个名为SQLite的小型数据库引擎。它不需要作为独立的…...

LeetCode 160 Intersection Of Two Linked Lists 相交链表 Java

题目&#xff1a;找到两个相交列表的起始点&#xff0c;如图c1开始为A和B两个链表的相交点 举例1&#xff1a;8为两个链表的相交点。 注意&#xff1a;相交不止是数值上的相同。 举例2&#xff1a;2为相交点 举例3&#xff1a;没有相交点 解题思路&#xff1a; 相交证明最后一…...

AI Agent中的MCP详解

一、协议定义与核心价值 MCP(Model Context Protocol,模型上下文协议)是由Anthropic公司于2024年11月推出的开放标准协议,其核心目标是通过建立统一接口规范,解决AI模型与外部系统集成效率低下的行业痛点。该协议通过标准化通信机制,使大型语言模型(LLM)能够无缝对接数…...

win系统上自动化安装配置WSL linux和各种生信工具教程

windows系统上自动化安装配置WSL linux系统和各种生信工具教程 高通量测序原始数据的上游分析模块介绍 我开发的OmicsTools软件的这些分析测序原始数据的上游处理分析模块需要使用到linux和linux系统中的一些生信工具&#xff0c;在这里我开发了在windows系统中自动化安装WSL …...

统计可重复列表中的TOP N

文章目录 方案1&#xff1a;HashMap统计 全排序实现步骤&#xff1a;代码实现&#xff1a;优缺点&#xff1a; 方案2&#xff1a;HashMap统计 最小堆&#xff08;优先队列&#xff09;实现步骤&#xff1a;代码实现&#xff1a;优缺点&#xff1a; 方案3&#xff1a;Java Str…...

PowerBI纯小白如何驾驭DAX公式一键生成:copilot for fabric

在2025年2月份更新中&#xff0c;powerbi desktop里的copilot功能还新增了一个非常强大的功能&#xff1a;一键生成多个度量值&#xff0c;并直接加载到模型。 直接上示例展示&#xff1a; 打开DAX查询视图&#xff0c;在copilot窗格中直接输入想要生成多个度量值&#xff0c…...

Pytest的夹具

1、pytest的前置后置夹具 fixture 有些内容是在每个用例执行之前都要运行操作:-- 用例前置 接口:购物车模块先登录 --登录结果 【token鉴权】 UI: 每次用例 打开浏览器 --driver 有些内容在每个用例之后都要运行操作:–用例后置 接口: 数据清除 UI:关闭浏览器 叫做用例的…...

两市总的净流出和净流入来分析情况

为了排查数据干扰&#xff0c;只从两市总的净流出和净流入来分析情况。 净流出才对应资金抽离&#xff1a;若净流入为负&#xff08;即净流出&#xff09;&#xff0c;则意味着资金从股市中撤出&#xff0c;例如主动卖出的金额超过主动买入金额。净流入反映市场信心&#xff1…...

GitHub在push推送到远程仓库的时候显示Logon failed登录失败

具体问题描述 git.exe push --progress "origin" master:master Logon failed, use ctrlc to cancel basic credential prompt. remote: Support for password authentication was removed on August 13, 2021. 这是因为Git 推送失败的原因是 GitHub 已经不支持密码认…...

如何在SQL中高效使用聚合函数、日期函数和字符串函数:实用技巧与案例解析

文章目录 聚合函数group by子句的使用实战OJ日期函数字符串函数数学函数其它函数 聚合函数 函数说明COUNT([DISTINCT] expr)返回查询到的数据的 数量SUM([DISTINCT] expr)返回查询到的数据的 总和&#xff0c;不是数字没有意义AVG([DISTINCT] expr)返回查询到的数据的 平均值&…...

AutoGen :使用 Swarm 构建自治型多智能体团队

👉👉👉本人承接各类AI相关应用开发项目(包括但不限于大模型微调、RAG、AI智能体、NLP、机器学习算法、运筹优化算法、数据分析EDA等) !!!👉👉👉 有意愿请私信!!!AutoGen 的 AgentChat 模块提供了一种强大的方法来构建多智能体协作系统。 在之前的文章中,我们探讨了…...

RK3568平台设备树文件功能解析(鸿蒙系统篇)

鸿蒙设备树驱动修改时候发现目录下有很多的rk3568 的设备树,由于对这些设备树功能不太熟悉,所以索性就整理一下不同设备树的功能 rk3568-evb1-ddr4-v10.dts rk3568-evb4-lp3-v10.dts rk3568-evb6-ddr3-v10-rk628-rgb2hdmi.dts …...

k8s-coredns-CrashLoopBackOff 工作不正常

本文作者&#xff1a; slience_me 问题描述 # 问题描述 # rootk8s-node1:/home/slienceme# kubectl get pods --all-namespaces # NAMESPACE NAME READY STATUS RESTARTS AGE # kube-flannel kube-flannel-ds-66bcs …...

【Android性能】Systrace分析

1&#xff0c;分析工具 1&#xff0c;Systrace新UI网站 Perfetto UI 2&#xff0c;Systrace抓取 可通过android sdk中自带的systrace抓取&#xff0c;路径一般如下&#xff0c;..\AppData\Local\Android\Sdk\platform-tools&#xff0c; 另外需要安装python2.7&#xff0c;…...

Unity导出WebGL,无法显示中文

问题&#xff1a;中文无法显示 默认字体无法显示中文 在编辑器中设置了中文和英文的按钮&#xff0c;中文按钮无法显示 导出后无法显示中文 解决办法&#xff1a; 自己添加字体&#xff0c;导入项目&#xff0c;并引用 示例 下载一个字体文件&#xff0c;这里使用的阿里…...

oracle事务的组成

1)数据库事务由以下的部分组成: 一个或多个DML 语句 ; 一个 DDL(Data Definition Language – 数据定义语言) 语句&#xff1b; 一个 DCL(Data Control Language – 数据控制语言)语句&#xff1b; 2)事务的执行开始&#xff1a; 以第一个 DML 语句的执行作为开始 &#xff0c;…...

【如何在OpenWebUI中使用FLUX绘画:基于硅基流动免费API的完整指南】

如何在OpenWebUI中使用FLUX绘画&#xff1a;基于硅基流动免费API的完整指南 注册并获取硅基流动秘钥OpenWebUI中使用函数配置自定义模型-提示词配置效果验证 ) FLUX绘画是一种强大的AI绘图工具&#xff0c;本文将详细介绍如何在OpenWebUI中集成并使用FLUX绘画功能&#xff0c;…...

QT 磁盘文件 教程04-创建目录、删除目录、遍历目录

【1】新建目录 bool CreateDir(QString name){QString fileName name ;QDir dir(fileName);if (dir.isEmpty()) {dir.mkdir(fileName);return true;}else{qDebug()<<"文件夹已存在";return false;} } 【2】删除目录 bool DeleteDir(QString fileName){if (…...

Event driven agentic document workflows 笔记 - 2

代理文档工作流&#xff08;ADW&#xff09;- 课程笔记 Agentic Document Workflows (ADW) 1. 课程目标 介绍 代理文档工作流&#xff08;ADW&#xff09; 背后的核心概念&#xff0c;包括&#xff1a; RAG&#xff08;检索增强生成&#xff09;代理工作流 探讨如何利用 事件…...

Facebook 如何影响元宇宙的发展趋势

Facebook 如何影响元宇宙的发展趋势 引言 元宇宙&#xff08;Metaverse&#xff09;这个概念&#xff0c;曾经只存在于科幻小说中&#xff0c;如今正逐渐成为现实。它是一个由多个 3D 虚拟世界组成的网络&#xff0c;用户可以在其中进行社交、游戏、工作等活动。Facebook&…...

1.5.7 掌握Scala内建控制结构 - 变量作用域

本次实战深入理解了Scala中变量作用域的概念&#xff0c;通过两个任务演示了作用域的基本规则。在任务1中&#xff0c;我们创建了一个名为ScopeDemo01的对象&#xff0c;展示了内部作用域能够访问外部作用域的变量。通过在if语句块中访问在外部定义的message变量&#xff0c;我…...

RAID磁盘阵列管理

一. 什么是RAID RAID是英文Redundant Array of Independent Disks的缩写&#xff0c;中文翻译过来就是“独立冗余磁盘阵列”。简单的说&#xff0c;RAID是一种把多块独立的硬盘&#xff08;物理硬盘&#xff09;按不同的方式组合起来形成一个硬盘组&#xff08;逻辑硬盘&#…...

利用ffmpeg库实现音频AAC编解码

AAC‌&#xff08;Advanced Audio Coding&#xff09;是一种音频编码技术&#xff0c;出现于1997年&#xff0c;基于MPEG-2的音频编码技术。AAC具有高效的数据压缩能力和较高的音质&#xff0c;适用于各种音频应用场景。例如&#xff0c;在智能设备中&#xff0c;AAC技术被广泛…...

微博ip属地不发微博会不会变

随着社交媒体的普及&#xff0c;微博作为其中的佼佼者&#xff0c;一直备受关注。而且微博上线了显示用户IP属地的功能&#xff0c;这一功能旨在减少冒充热点事件当事人、恶意造谣、蹭流量等不良行为&#xff0c;确保传播内容的真实性和透明度。然而&#xff0c;这也引发了一些…...

appium之Toast元素识别

Appium之Toast元素识别教程与实例 一、Toast简介 Toast是Android系统中的轻量级消息提示框&#xff0c;以浮动形式短暂显示&#xff08;通常2-3秒&#xff09;&#xff0c;无法被点击且不会获取焦点。常见于登录失败、操作提示等场景&#xff0c;如“密码错误”或“网络异常”。…...

「JavaScript深入」WebSocket:高效的双向实时通信技术

WebSocket WebSocket 的特点1. 全双工通信2. 持久连接3. 低延迟4. 二进制和文本支持5. 连接管理6. 二进制数据传输 WebSocket 协议详解1. 握手过程2. 数据帧结构 WebSocket 的实现服务器端实现&#xff08;Node.js ws库&#xff09;1. 基础服务器2. 广播功能实现3. 心跳机制客…...

C#从入门到精通(1)

目录 第一章 C#与VS介绍 第二章 第一个C#程序 &#xff08;1&#xff09;C#程序基本组成 1.命名空间 2.类 3.Main方法 4.注释 5.语句 6.标识符及关键字 &#xff08;2&#xff09;程序编写规范 1.代码编写规则 2.程序命名方法 3.元素命名规范 第三章 变量 &…...