分布式系统事务一致性解决方案(基于事务消息)
参考:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/
文章目录
- 概要
- 错误的方案
- 方案一:业务方自己实现
- 方案二:RocketMQ 事务消息
- 什么是事务消息
- 事务消息处理流程
- 事务消息生命周期
- 使用限制
- 使用示例
- 使用建议
概要
说起分布式事务,就会谈到 Bob 给 Smith 账户转账的问题:2 个账号, 分别处于 A、B 两个 DB,或者说 2 个不同的子系统中,Bob 的账户要扣钱,Smith 的账户要加钱,如何保证原子性?
一般思路是通过消息中间件来实现 “最终一致性”:A系统扣钱,然后发送消息给 MQ,B系统接收此消息,进行加钱。
但这里有个问题:是先update DB后发消息?还是先发消息后update DB呢?
- 假设
先update DB成功,发送消息时网络问题,重发又失败,怎么办? - 假设
先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?
所以,只要发送消息和update DB这2个操作不是原子的,无论谁先谁后,都会有问题。
错误的方案
有人可能会想到,可以把 “发送消息” 这个网络调用和 update DB 放在同一个事务中,如果发送消息失败,update DB 自动回滚。这样不就保证原子性了吗?
这个方案看似正确,但其实是错误的,原因:
- 把网络调用放在 DB 事务中,可能会因为网络的延时,导致 DB 长事务。严重的还会 block 整个 DB,这个风险很大。
- 如果发送消息失败,发送方并不知道是 MQ 没收到消息,还是已收到消息只是返回 response 的时候失败了。如果接收方已经收到消息了,而发送方认为没有收到,执行 update DB 回滚的操作,会导致 A 账户钱没有扣,B 账户的钱却增加了。
方案一:业务方自己实现
假设消息中间件没提供 “事务消息” 功能,比如 Kafka。那该如何解决这个问题?
结局方案如下:
- Producer 准备 1 个消息表,把 update DB 和 send message 这 2 个操作,放在一个DB事务中。
- 准备一个后台程序,不停地把消息表中的 message 传送给消息中间件。发送失败则不断重试,直到成功。允许消息重复,但消息不会丢。
- Consumer 准备 1 个判重表。处理过的消息记录在里面,实现业务的幂等性
通过以上 3 步,就基本解决了原子性问题
但此方案缺点是:需要设计 DB 消息表,同时还需要 1 个后台任务,不断扫描本地消息。导致消息的处理和业务逻辑耦合,额外增加业务方的负担。
方案二:RocketMQ 事务消息
目前各大知名电商平台和互联网公司,几乎都是使用 “事务消息” 这种方案来实现 “最终一致性” 的。这种方式适用的业务场景广泛,且比较靠谱。
什么是事务消息
事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
事务消息处理流程
事务消息交互流程如下图所示。

-
生产者将消息发送至Apache RocketMQ服务端。
-
Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
-
生产者开始执行本地事务逻辑。
-
生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
-
二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
-
二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
-
-
在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查(服务端回查的间隔时间和最大回查次数,请参见参数限制)。
-
生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
-
生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。
事务消息生命周期

-
初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。
-
事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。
-
消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。
-
提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。
-
消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。
-
消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
-
消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。
使用限制
-
消息类型一致性
事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。 -
消费事务性
Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。 -
中间状态可见性
Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。 -
事务超时机制
Apache RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。
使用示例
创建主题
Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:
sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION
发送消息
事务消息相比普通消息发送时需要修改以下几点:
-
发送事务消息前,需要开启事务并关联本地的事务执行。
-
为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。
创建事务主题
NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。
./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION
- -c 集群名称
- -t Topic名称
- -n nameserver地址
- -a 额外属性,本例给主题添加了message.type为TRANSACTION的属性用来支持事务消息
以Java语言为例,使用事务消息示例参考如下:
//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilderImpl();//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 错误的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。return;}Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。.addProperty("OrderId", "xxx")//消息体。.setBody("messageBody".getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。return;}/*** 执行本地事务,并确定本地事务结果。* 1. 如果本地事务提交成功,则提交消息事务。* 2. 如果本地事务提交失败,则回滚消息事务。* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。*/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}
使用建议
-
避免大量未决事务导致超时
Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。
-
正确处理"进行中"的事务
消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:
-
将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
-
程序能正确识别正在进行中的事务。
-
相关文章:
分布式系统事务一致性解决方案(基于事务消息)
参考:https://rocketmq.apache.org/zh/docs/featureBehavior/04transactionmessage/ 文章目录 概要错误的方案方案一:业务方自己实现方案二:RocketMQ 事务消息什么是事务消息事务消息处理流程事务消息生命周期使用限制使用示例使用建议 概要 …...
Unity Animation--动画剪辑
Unity Animation--动画剪辑 动画剪辑 动画剪辑是Unity动画系统的核心元素之一。Unity支持从外部来源导入动画,并提供创建动画剪辑的能力使用“动画”窗口在编辑器中从头开始。 外部来源的动画 从外部来源导入的动画剪辑可能包括: 人形动画 运动捕捉…...
如何将 redis 快速部署为 docker 容器?
部署 Redis 作为 Docker 容器是一种快速、灵活且可重复使用的方式,特别适合开发、测试和部署环境。本文将详细介绍如何将 Redis 部署为 Docker 容器,包括 Docker 安装、Redis 容器配置、数据持久化、网络设置等方面。 步骤 1:安装 Docker 首…...
iOS - Undefined symbols: 解决方法
Undefined symbols: 是让人苦恼的报错,如何知道是 哪个 symbols 不对呢? 今天探索到下面的方法: 1、点击导航上方 最右侧的按钮,查看历史报错 2、选中报错信息,右键选择 Expand All Transcripts 在出现的详细信息面…...
优化理论复习——(三)
本篇介绍无约束优化的问题,通过四种算法来进行求解的过程和思路,也是最优化方法中的最重要的一类问题。 无约束优化问题主要是通过迭代搜索算法来切结,比线性规划的计算量都小一点。 目录 无约束优化问题最优性条件最速下降法牛顿法共轭梯度…...
RK3568笔记二十四:基于Flask的网页监控系统
若该文为原创文章,转载请注明原文出处。 此实验参考 《鲁班猫监控检测》,原代码有点BUG,已经下载不了。2. 鲁班猫监控检测 — [野火]嵌入式AI应用开发实战指南—基于LubanCat-RK系列板卡 文档 (embedfire.com) 一、简介 记录简单的摄像头监…...
[Django 0-1] Core.Serializers 模块
Core.Serializers 模块 Django 序列化模块 模块结构 . ├── __init__.py ├── base.py ├── json.py ├── jsonl.py ├── python.py ├── pyyaml.py └── xml_serializer.py1 directory, 7 files自定义序列化器 通过继承django.core.serializers.base.Serial…...
鸿蒙内核源码分析(用栈方式篇) | 程序运行场地谁提供的
精读内核源码就绕不过汇编语言,鸿蒙内核有6个汇编文件,读不懂它们就真的很难理解以下问题. 1.系统调用是如何实现的? 2.CPU是如何切换任务和进程上下文的? 3.硬件中断是如何处理的? 4.main函数到底是怎么来的? 5.开机最开始发生了什么? 6.关机…...
Linux 进程间通信之匿名管道
💓博主CSDN主页:麻辣韭菜💓 ⏩专栏分类:Linux知识分享⏪ 🚚代码仓库:Linux代码练习🚚 🌹关注我🫵带你学习更多Linux知识 🔝 目录 前言 一. 进程间通信介绍 1.进程间通…...
数据结构与算法学习笔记六--数组和广义表(C语言)
目录 前言 1.数组 1.定义 2.初始化 3.销毁 4.取值 5.设置值 6.完整代码 前言 这篇博客主要介绍数据结构中的数组和广义表的用法。 1.数组 在数据结构中,数组是一种线性数据结构,它由一组连续的相同类型的元素组成,每个元素都有一个唯…...
图搜索算法详解
图搜索算法详解 摘要: 图搜索算法是解决路径规划和网络分析问题的关键技术。本文将详细介绍图搜索算法的基本概念、分类以及常见的算法,如广度优先搜索(BFS)、深度优先搜索(DFS)、A*搜索等。同时ÿ…...
安卓中常见的UI控件
TextView(文本视图)EditText(编辑文本)Button(按钮)ImageView(图像视图)ImageButton(图像按钮)CheckBox(复选框)RadioButtonÿ…...
基于Labelme的背部穴位关键点制作
一、穴位定位方法 穴位定位,自春秋时期以来,通过各代医学实践的继承与发展,形成了一套较为科学的定位体系。这套体系基于经络理论,采用“寸”作为测量单位,按照人体比例来进行精确的穴位定位,主要有依据体…...
go-mysql-transfer 同步数据到es
同步数据需要注意的事项 前提条件 1 要同步的mysql 表必须包含主键 2 mysql binlog 必须是row 模式 3 不支持程序运行过程中修改表结构 4 要赋予连接mysql 账号的权限 reload, replication super 权限 如果是root 权限则不需要 安装 go-mysql-transfer git clone…...
外包干了3天,技术就明显退步了。。。。。
先说一下自己的情况,本科生,19年通过校招进入广州某软件公司,干了接近4年的功能测试,今年年初,感觉自己不能够在这样下去了,长时间呆在一个舒适的环境会让一个人堕落!而我已经在一个企业干了四年的功能测试…...
将要上市的自动驾驶新书《自动驾驶系统开发》中摘录各章片段 1
以下摘录一些章节片段: 1. 概论 自动驾驶系统的认知中有一些模糊的地方,比如自动驾驶系统如何定义的问题,自动驾驶的研发为什么会有那么多的子模块,怎么才算自动驾驶落地等等。本章想先给读者一个概括介绍,了解自动驾…...
String、StringBuilder、StringBuffer之间的区别是什么?
在Java中,String、StringBuilder 和 StringBuffer 是处理字符串的三个类,其中 String 是不可变对象,而 StringBuilder 和 StringBuffer 是可变对象。这些类在字符串操作方面具有不同的特性和用途。 String String 类表示不可变的字符序列&a…...
docker系列8:容器卷挂载(上)
目录 传送门 从安装redis说起 什么是容器卷挂载 操作系统的挂载 日志文件一般是"首恶元凶" 挂载命令 容器卷挂载 卷挂载命令 启动时挂载 查看挂载卷信息 容器卷管理 查看卷列表 创建容器卷 具名挂载与匿名挂载 具名挂载 传送门 docker系列1ÿ…...
痉挛性斜颈患者自己做哪些运动对脖子好?
痉挛性斜颈(Dystonia)是一种罕见的神经系统疾病,其特点是颈部肌肉痉挛,导致头部姿势异常倾斜或扭曲。而在治疗痉挛性斜颈中,运动疗法是非常重要的一部分。下面将介绍一些痉挛性斜颈患者可以自己进行的运动,…...
数据结构——二叉树链式结构的实现(上)
二叉树概念 再看二叉树基本操作前,再回顾下二叉树的概念, 二叉树是: 1. 空树 2. 非空:根节点,根节点的左子树、根节点的右子树组成的。 从概念中可以看出,二叉树定义是递归式的 二叉树构成࿱…...
利用最小二乘法找圆心和半径
#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...
23-Oracle 23 ai 区块链表(Blockchain Table)
小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
vscode(仍待补充)
写于2025 6.9 主包将加入vscode这个更权威的圈子 vscode的基本使用 侧边栏 vscode还能连接ssh? debug时使用的launch文件 1.task.json {"tasks": [{"type": "cppbuild","label": "C/C: gcc.exe 生成活动文件"…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...
OkHttp 中实现断点续传 demo
在 OkHttp 中实现断点续传主要通过以下步骤完成,核心是利用 HTTP 协议的 Range 请求头指定下载范围: 实现原理 Range 请求头:向服务器请求文件的特定字节范围(如 Range: bytes1024-) 本地文件记录:保存已…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...
