【分布式】如何使用RocketMQ实现下单-库存-支付这个场景的分布式事务问题
在 下单-库存-支付 场景中,通过消息队列实现最终一致性,需保证三个微服务的操作最终一致,且在支付失败或库存不足时触发回滚补偿。以下是具体实现方案:
1. 整体流程设计
正常流程(成功场景)
- 订单服务 创建订单(状态为
待支付),发送 订单创建成功事件。 - 库存服务 消费事件,扣减库存,发送 库存扣减成功事件。
- 支付服务 消费事件,执行支付,发送 支付成功事件。
- 订单服务 消费支付成功事件,更新订单状态为
已完成。
异常流程(失败场景)
• 库存不足:库存服务直接发送 库存不足事件,订单服务取消订单。
• 支付失败:支付服务发送 支付失败事件,触发库存回滚和订单取消。
2. 核心组件与消息设计
消息队列(以RocketMQ为例)
| 事件类型 | Topic | 生产者 | 消费者 | 说明 |
|---|---|---|---|---|
| 订单创建成功事件 | order_created | 订单服务 | 库存服务 | 触发库存扣减 |
| 库存扣减成功事件 | stock_reduced | 库存服务 | 支付服务 | 触发支付 |
| 支付成功事件 | payment_done | 支付服务 | 订单服务 | 完成订单 |
| 库存不足事件 | stock_failed | 库存服务 | 订单服务 | 取消订单 |
| 支付失败事件 | payment_failed | 支付服务 | 订单/库存服务 | 触发库存回滚和订单取消 |
数据表设计(关键字段)
• 订单表(Order)
CREATE TABLE orders (id VARCHAR(64) PRIMARY KEY, -- 订单ID(全局唯一)user_id BIGINT,amount DECIMAL(10,2),status ENUM('pending', 'completed', 'canceled') -- 订单状态
);
• 库存表(Inventory)
CREATE TABLE inventory (product_id BIGINT PRIMARY KEY,stock INT CHECK(stock >= 0) -- 库存不可为负
);
3. 实现步骤
(1) 订单服务:创建订单并发送事件
// 订单服务(OrderService.java)
@Transactional
public void createOrder(Order order) {// 1. 检查参数合法性(如用户存在性)// 2. 插入订单记录(状态为pending)orderDao.insert(order);// 3. 发送订单创建成功事件(事务消息)Message msg = new Message("order_created", JSON.toJSONBytes(order));TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("order_group", msg, order.getId() // 事务ID(用订单ID标识));// 4. 若消息发送失败,抛出异常触发事务回滚if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {throw new RuntimeException("订单创建事件发送失败");}
}
(2) 库存服务:扣减库存
// 库存服务(InventoryService.java)
@RocketMQMessageListener(topic = "order_created", consumerGroup = "inventory_group")
public class OrderCreatedListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);// 1. 检查库存是否充足Inventory inventory = inventoryDao.selectByProductId(order.getProductId());if (inventory.getStock() < order.getQuantity()) {// 发送库存不足事件rocketMQTemplate.sendOneWay("stock_failed", JSON.toJSONBytes(order));return;}// 2. 扣减库存(乐观锁防止超卖)int updated = inventoryDao.reduceStock(order.getProductId(), order.getQuantity(), inventory.getVersion());if (updated == 0) {throw new RetryException("库存扣减冲突,稍后重试"); // 触发消息重试}// 3. 发送库存扣减成功事件rocketMQTemplate.sendOneWay("stock_reduced", JSON.toJSONBytes(order));}
}
(3) 支付服务:执行支付
// 支付服务(PaymentService.java)
@RocketMQMessageListener(topic = "stock_reduced", consumerGroup = "payment_group")
public class StockReducedListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);// 1. 调用第三方支付接口boolean success = paymentClient.pay(order.getUserId(), order.getAmount());if (!success) {// 发送支付失败事件rocketMQTemplate.sendOneWay("payment_failed", JSON.toJSONBytes(order));return;}// 2. 发送支付成功事件rocketMQTemplate.sendOneWay("payment_done", JSON.toJSONBytes(order));}
}
(4) 订单服务:处理成功/失败事件
// 订单服务(OrderService.java)
@RocketMQMessageListener(topic = "payment_done", consumerGroup = "order_complete_group")
public class PaymentDoneListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);orderDao.updateStatus(order.getId(), "completed");}
}@RocketMQMessageListener(topic = {"stock_failed", "payment_failed"}, consumerGroup = "order_cancel_group")
public class OrderCancelListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);// 1. 取消订单(状态置为canceled)orderDao.updateStatus(order.getId(), "canceled");// 2. 若事件来自支付失败,需触发库存回滚(发送库存回滚事件)if (message.getTopic().equals("payment_failed")) {rocketMQTemplate.sendOneWay("stock_rollback", JSON.toJSONBytes(order));}}
}
(5) 库存服务:处理回滚事件
// 库存服务(InventoryService.java)
@RocketMQMessageListener(topic = "stock_rollback", consumerGroup = "inventory_rollback_group")
public class StockRollbackListener implements RocketMQListener<MessageExt> {@Override@Transactionalpublic void onMessage(MessageExt message) {Order order = JSON.parseObject(message.getBody(), Order.class);inventoryDao.addStock(order.getProductId(), order.getQuantity());}
}
4. 容错与补偿机制
(1) 消息可靠性
• 生产者端:
使用 RocketMQ 事务消息,确保本地事务与消息发送的原子性。若消息发送失败,订单创建事务回滚。
• 消费者端:
开启消费者重试(默认重试16次),若最终消费失败,消息进入死信队列(需人工干预)。
(2) 幂等性设计
• 订单服务:
通过订单ID唯一标识,updateStatus 操作天然幂等。
• 库存服务:
使用乐观锁(version字段)避免重复扣减。
(3) 最终一致性保障
• 支付失败/库存不足:
通过监听失败事件触发补偿动作(订单取消 + 库存回滚)。
• 消息顺序性:
同一订单的消息发送到同一队列(MessageQueue),保证顺序消费。
5. 方案优缺点
优点
• 低侵入性:业务代码仅需处理消息发送/监听,无分布式事务框架依赖。
• 高可用性:依赖消息队列的可靠性和重试机制,天然支持服务宕机容错。
• 性能友好:异步解耦,避免同步阻塞。
缺点
• 最终一致性延迟:依赖消息消费速度,不适用于实时性要求高的场景。
• 补偿逻辑需完备:需覆盖所有异常分支(如消息丢失、服务宕机)。
总结
通过消息队列实现下单-库存-支付的最终一致性,核心在于:
- 事件驱动:每个服务通过发布/订阅事件推进流程。
- 可靠消息:使用事务消息保证本地操作与消息发送的原子性。
- 补偿机制:监听失败事件,触发反向操作回滚状态。
此方案适合容忍短暂不一致性的场景(如电商交易),若需强一致性,可结合 Seata AT模式 或 TCC模式。
相关文章:
【分布式】如何使用RocketMQ实现下单-库存-支付这个场景的分布式事务问题
在 下单-库存-支付 场景中,通过消息队列实现最终一致性,需保证三个微服务的操作最终一致,且在支付失败或库存不足时触发回滚补偿。以下是具体实现方案: 1. 整体流程设计 正常流程(成功场景) 订单服务 创建…...
手写一些常见算法
手写一些常见算法 快速排序归并排序Dijkstra自定义排序交替打印0和1冒泡排序插入排序堆排序 快速排序 public class Main {public static void main(String[] args) {int nums[] {1,3,2,5,4,6,8,7,9};quickSort(nums,0,nums.length - 1);}private static void quickSort(int[…...
使用DeepSeek完成一个简单嵌入式开发
开启DeepSeek对话 请帮我使用Altium Designer设计原理图、PCB,使用keil完成代码编写;要求:使用stm32F103RCT6为主控芯片,控制3个流水灯的原理图 这里需要注意,每次DeepSeek的回答都不太一样。 DeepSeek回答 以下是使…...
每日一题之储存晶体
问题描述 威慑纪元 2230 年,人类联邦在与三体文明的对抗中,为了强化飞船的能源储备,决定收集能量晶体。飞船的储存空间呈矩形,边长分别为 a 和 b。对于一个能量晶体,只有当它的长度小于或等于存储空间的对角线长度时&…...
关于我和快速幂的事()
我之前只会这样的(dfs): 不懂下面这种写法的具体逻辑: 看完下面的推理,再转转我聪明的小老戴: 法一中:把2^11看成(2^5)^2 法二中:把2^11看成(2^2)^5...
【鸿蒙开发】Hi3861学习笔记- GPIO之直流电机
00. 目录 文章目录 00. 目录01. GPIO概述02. 直流电机概述03. ULN2003模块概述04. 硬件设计05. 软件设计06. 实验现象07. 附录 01. GPIO概述 GPIO(General-purpose input/output)即通用型输入输出。通常,GPIO控制器通过分组的方式管理所有GP…...
mapbox高阶,结合threejs(threebox)添加extrusion挤出几何体,并添加侧面窗户贴图和楼顶贴图,同时添加真实光照投影
👨⚕️ 主页: gis分享者 👨⚕️ 感谢各位大佬 点赞👍 收藏⭐ 留言📝 加关注✅! 👨⚕️ 收录于专栏:mapbox 从入门到精通 文章目录 一、🍀前言1.1 ☘️mapboxgl.Map 地图对象1.2 ☘️mapboxgl.Map style属性1.3 ☘️threebox extrusion挤出几何体1.3 ☘️…...
【蓝桥杯速成】| 2.逆向思维
题目一:青蛙跳台阶 题目描述 一只青蛙一次可以跳上1级台阶,也可以跳上2级台阶。 求该青蛙跳上一个 n 级的台阶总共有多少种跳法。 解题步骤 选用递归的方法解决该问题! 使用递归只需要考虑清楚边界条件/终止条件,再写清楚单层…...
halcon机器人视觉(四)calibrate_hand_eye_stationary_3d_sensor
目录 一、准备数据和模型二、按照表面匹配的的结果进行手眼标定三、根据标定结果计算CalObjInCamPose一、准备数据和模型 1、读3D模型:read_object_model_3d 2、创建表面匹配模板:create_surface_model 3、创建一个HALCON校准数据模型:create_calib_data read_object_mode…...
python-leetcode-叶子相似的树
872. 叶子相似的树 - 力扣(LeetCode) 下面是一个完整的 Python 函数,接收两个二叉树的根节点 root1 和 root2,返回它们是否叶相似。 代码实现 class TreeNode:def __init__(self, val0, leftNone, rightNone):self.val valself…...
<03.13>八股文补充知识
import java.lang.reflect.*; public class Main {public static void main(String[] args) throws Exception {// 获取 Class 对象//1. 通过类字面量Class<?> clazz Person.class;//2 通过对象实例化String str "Hello";Class<?> clazz_str str.ge…...
GraphRAG 融合 RAG:双剑合璧,精度更上一层楼
检索增强生成 (Retrieval-Augmented Generation, RAG) 已成为构建知识密集型 NLP 应用的标准范式。RAG 通过结合大型语言模型 (LLM) 的生成能力和外部知识库的检索能力,显著提升了生成结果的质量。然而,在某些场景下,仅依靠传统的 RAG 或 GraphRAG 可能无法达到最佳效果。本…...
ffmpeg + opencv 打静态库编译到可执行文件中
下载ffmpeg ,我下载的为6.0 版本,解压后执行: ./configure --enable-static --disable-shared --pkg-config-flags=“–static” --extra-cflags=“-fPIC” --extra-cxxflags=“-fPIC” --prefix=/usr/local2.等待配置完成,执行 make && make install 进行编译安装…...
2025探索短剧行业新可能报告40+份汇总解读|附PDF下载
原文链接:https://tecdat.cn/?p41043 近年来,短剧以其紧凑的剧情、碎片化的观看体验,迅速吸引了大量用户。百度作为互联网巨头,在短剧领域积极布局。从早期建立行业专属模型冷启动,到如今构建完整的商业生态…...
前端面试:如何实现预览 PDF 文件?
在前端开发中,实现 PDF 文件的预览是一个常见需求,尤其是在应用程序中需要用户查看文档时。以下是几种常见的方法,可以用来实现在网页中预览 PDF 文件: 方法一:使用 <iframe> 标签 1. 基本实现 最简单的方式是…...
STM32 内置的通讯协议
数据是以帧为单位发的 USART和UART的区别就是有没有同步功能 同步是两端设备有时钟连接,异步是没时钟连接,靠约定号的频率(波特率)接收发送数据 RTS和CTS是用来给外界发送已“可接收”或“可发送”信号的,一般用不到…...
一个简单的PHP框架
原文地址:一个简单的PHP框架 更多内容请关注:智想天开 框架概述 一个基本的 PHP 框架通常包含以下几个部分: 前端控制器(Front Controller):处理所有的 HTTP 请求,统一入口。 路由器…...
什么是SpringCloud?为何要选择SpringCloud?
什么是 Spring Cloud? Spring Cloud 是一套基于 Spring Boot 构建的 微服务架构解决方案,提供了一整套微服务开发所需的组件,如服务注册与发现、配置管理、负载均衡、熔断机制、网关等。它基于 Spring 生态系统,简化了分布式系统…...
信息安全访问控制、抗攻击技术、安全体系和评估(高软42)
系列文章目录 信息安全访问控制、抗攻击技术、安全体系和评估 文章目录 系列文章目录前言一、信息安全技术1.访问控制2.抗攻击技术 二、欺骗技术1.ARP欺骗2.DNS欺骗3.IP欺骗 三、抗攻击技术1.端口扫描2.强化TCP/IP堆栈 四、保证体系和评估1.保证体系2.安全风险管理 五、真题在…...
晋升系列4:学习方法
每一个成功的人,都是从底层开始打怪,不断的总结经验,一步一步打上来的。在这个过程中需要坚持、总结方法论。 对一件事情长久坚持的人其实比较少,在坚持的人中,不断的总结优化的更少,所以最终达到高级别的…...
脑电波控制设备:基于典型相关分析(CCA)的脑机接口频率精准解码方法
文章目录 前言一、CCA的用途二、频率求解思路三、输入数据结构四、判断方法五、matlab实践1.数据集获取及处理2.matlab代码3.运行及结果 六、参考文献 前言 在脑机接口(BCI)领域,有SSVEP方向,中文叫做稳态视觉诱发电位,当人观看闪烁的视觉刺激…...
Android Spinner总结
文章目录 Android Spinner总结概述简单使用自定义布局自定义Adapter添加分割线源码下载 Android Spinner总结 概述 在 Android 中,Spinner 是一个下拉选择框。 简单使用 xml布局: <Spinnerandroid:id"id/spinner1"android:layout_width&…...
element-ui layout 组件源码分享
layout 布局组件源码分享,主要从以下两个方面: 1、row 组件属性。 2、col 组件属性。 一、row 组件属性。 1.1 gutter 栅栏间隔,类型为 number,默认 0。 1.2 type 布局模式,可选 flex,现代浏览器下有效…...
OBJ文件生成PCD文件(python 实现)
代码实现 将 .obj 文件转换为 .pcd(点云数据) 代码文件。 import open3d as o3d# 加载 .obj 文件 mesh o3d.io.read_triangle_mesh("bunny.obj")# 检查是否成功加载 if not mesh.has_vertices():print("无法加载 .obj 文件,…...
LinPEAS 使用最佳实践指南
在渗透测试和权限提升评估中,LinPEAS(Linux Privilege Escalation Awesome Script)是⼀个⽤来搜索类unix主机上可能的提权路径的⾃动化脚本。本文将介绍使用 LinPEAS 的最佳实践方案,并针对不同环境(如无 curl 的情况&…...
c++介绍智能指针 十二(1)
普通指针:指向内存区域的地址变量。使用普通指针容易出现一些程序错误。 如果一个指针所指向的内存区域是动态分配的,那么这个指针变量离开了所在的作用域,这块内存也不会自动销毁。动态内存不进行释放就会导致内存泄露。如果一个指针指向已…...
Vue的scoped原理是什么?
scoped的工作原理 当在 <style> 标签上使用 scoped 属性时,Vue 会为当前组件的每个元素添加一个唯一的 data-v-xxxxxx 属性,并将样式规则中的选择器修改为包含该属性的形式。 编译阶段: 在编译 .vue 文件时,Vue 的编译器…...
大白话解释 React 中高阶组件(HOC)的概念和应用场景,并实现一个简单的 HOC。
高阶组件(HOC)的概念 在 React 里,高阶组件(Higher-Order Component,简称 HOC)就像是一个“超级工厂函数”。它本身是一个函数,而且这个函数接收一个组件作为参数,然后返回一个新的…...
深入浅出C++ STL:统领STL全局
深入浅出C STL:统领STL全局 深入浅出C STL:统领STL全局github主页地址前言一、STL的前世今生1.1 什么是STL?1.2 STL版本演进 二、STL六大核心组件详解2.1 容器(Containers)容器性能对照表 2.2 算法(Algorit…...
k8s面试题总结(十五)
1.如何使用Kubernetes进行多环境部署(如开发,测试和生产环境)? 使用命名空间(namespaces): 命名空间是用于逻辑隔离和资源分组的一种方式,可以为每个环境创建单独的命名空间。 2.使…...
