从0开始学习 RocketMQ:分布式事务消息的实现
消息队列中的事务,主要是解决消息生产者和消息消费者数据一致性的问题。
应用场景
比如订单系统创建订单后,会发消息给购物车系统,将已下单的商品从购物车中删除。
由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤,所以使用消息队列来异步清理购物车是更合理的设计。

对于订单系统来说,它做了两件事情
- 在订单库中插入了一条订单数据,创建了订单;
- 给 MQ 发送了一条订单消息。
对于购物车系统来说,它做了一件事情
- 接收订单消息,删除购物车库中的商品,清理购物车。
在分布式系统中,上面的这几个步骤,都有可能失败,如果失败了不做处理的话,就会造成订单数据和购物车数据不一致的情况。
比如:
- 创建了订单,没有清理购物车;
- 购物车中的商品清掉了,订单没有创建成功。
所以,我们需要做的就是,要保证在任何步骤失败的情况下,订单数据和购物车数据的一致性。
对于购物车系统,失败的处理比较简单,只有成功删除商品后再提交消费确认,如果发生失败,因为没有提交消费确认,消息队列会重试。
所以,问题的重点在于,怎么保证订单系统创建订单和发送消息的步骤,要么都成功,要么都失败,不能一个成功一个失败。
分布式事务
消息队列是如何实现分布式事务的?就要用到事务消息了。
事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。
半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
在上面的步骤中,如果第 4 步提交事务消息失败了(比如网络异常),怎么办?
对于这个问题,Kafka 和 RocketMQ 给出了 2 种不同的解决方案。
- Kafka :简单粗暴,直接抛出异常,让用户自行处理。可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿;
- RocketMQ:事务反查机制。
RocketMQ方案
在 RocketMQ 的分布式事务实现中,增加了事务反查机制来解决事务消息提交失败的问题。
如果订单系统在第 4 步提交或回滚事务消息失败(如网络异常),Broker 迟迟没有收到提交或回滚的消息,Broker 会定期去订单系统上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
所以,订单系统需要提供一个反查本地事务状态的接口,即根据消息中的订单ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ 会自动根据事务反查的结果提交或者回滚事务消息。

使用限制
消息类型一致性
事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。
消费事务性
RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。
中间状态可见性
RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。
事务超时机制
RocketMQ 事务消息的生命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。
使用建议
避免大量未决事务导致超时
RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。
正确处理"进行中"的事务
消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:
- 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。
- 程序能正确识别正在进行中的事务。
使用示例
创建事务主题
sh bin/mqadmin updatetopic -n localhost:9876 -t TransactionTopic -c DefaultCluster -a +message.type=TRANSACTION
生产者代码
模拟正常流程,本地事务成功提交
public class ProducerTransactionExample {public static void main(String[] args) throws Exception {String endpoint = "182.92.198.60:8080";String topic = "TransactionTopic";ClientServiceProvider provider = ClientServiceProvider.loadService();ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);builder.setRequestTimeout(Duration.ofSeconds(20));ClientConfiguration configuration = builder.build();// 初始化Producer时需要设置通信配置以及预绑定的Topic。Producer producer = provider.newProducerBuilder().setTopics(topic).setClientConfiguration(configuration).setTransactionChecker(messageView -> {System.out.println("5.broker回查事务状态");String orderId = messageView.getProperties().get("orderId");if (Strings.isNullOrEmpty(orderId)) {return TransactionResolution.ROLLBACK;}if (checkOrderById(orderId)) {System.out.println("7.本地事务状态成功,提交消息");return TransactionResolution.COMMIT;} else {System.out.println("7.本地事务状态失败,回滚消息");return TransactionResolution.ROLLBACK;}}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();System.out.println("1.开启事务");} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。System.out.println("1.事务开启失败");return;}// 普通消息发送。Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("transaction").addProperty("orderId", "o10086")// 消息体。.setBody(("测试事务消息,订单号o10086").getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);System.out.println("2.半消息发送成功,messageId:" + sendReceipt.getMessageId());} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。System.out.println("2.半消息发送失败");return;}boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();System.out.println("4.commit事务消息");} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();System.out.println("4.commit事务消息失败");}} else {try {transaction.rollback();System.out.println("4.rollback事务消息");} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();System.out.println("4.rollback事务消息失败");}}}/*** 模拟本地事务的执行结果** @return*/private static boolean doLocalTransaction() {System.out.println("3.执行本地事务,处理中");try {TimeUnit.SECONDS.sleep(20);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("3.执行本地事务成功,提交事务");return true;}/*** 模拟本地事务反查** @param orderId* @return*/private static boolean checkOrderById(String orderId) {System.out.println("6.反查本地事务状态,订单号:" + orderId + "能查到");return true;}
}

消费端在第4步后可以消费到消息。

模拟异常流程,将第4步提交/回滚的代码注释掉

消费端在第7步后可以消费到消息。

设置第一次事务回查时间
CHECK_IMMUNITY_TIME_IN_SECONDS 属性定义了从事务消息发送到 Broker 后,Broker 在多长时间内不会对这条消息发起回查。这个时间窗口为生产者提供了一个缓冲期,以确保即使在网络延迟或短暂的服务中断情况下,事务消息也不会被过早地回查。
Message message = provider.newMessageBuilder().setTopic(topic)// 设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("transaction").addProperty("orderId", "o10086").addProperty("CHECK_IMMUNITY_TIME_IN_SECONDS", "300")// 消息体。.setBody(("测试事务消息,订单号o10086").getBytes()).build();

消费端消费

相关文章:
从0开始学习 RocketMQ:分布式事务消息的实现
消息队列中的事务,主要是解决消息生产者和消息消费者数据一致性的问题。 应用场景 比如订单系统创建订单后,会发消息给购物车系统,将已下单的商品从购物车中删除。 由于购物车删除商品这一步骤并不是用户下单支付这个主流程中的核心步骤&a…...
MySQL 查询数据库的数据总量
需求:查看MySQL数据库的数据总量,以MB为单位展示数据库占用的磁盘空间 实践: 登录到MySQL数据库服务器。 选择你想要查看数据总量的数据库: USE shield;运行查询以获取数据库的总大小: SELECT table_schema AS Datab…...
[C++]——vector
🌇个人主页:_麦麦_ 📚今日小句:快乐的方式有很多种,第一种便是见到你。 目录 一、前言 二、vector的介绍及使用 2.1 vector的介绍 2.2 vector的使用 2.2.1 vector的定义(构造函数) 2.2.2…...
自动驾驶:LQR、ILQR和DDP原理、公式推导以及代码演示(七、CILQR约束条件下的ILQR求解)
(七)CILQR约束条件下的ILQR求解 CILQR((Constrained Iterative Linear Quadratic Regulator)) 是为了在 iLQR 基础上扩展处理控制输入和状态约束的问题。在这种情况下,系统不仅要优化控制输入以最小化代价函数&#x…...
随想录笔记-二叉树练习题
合并二叉树 617. 合并二叉树 - 力扣(LeetCode) dfs递归 class Solution {public TreeNode mergeTrees(TreeNode root1, TreeNode root2) {if(root1null||root2null){return root1null?root2:root1;}return dfs(root1,root2);}public TreeNode dfs(Tre…...
华雁智科前端面试题
1. var 变量的提升 题目: var a 1 function fun() {console.log(b)var b 2 } fun() console.log(a) 正确输出结果:undefined、1答错了,给一个大嘴巴子,错误答案输出结果为:2,1 此题主要考察 var 定义的变量&…...
【iOS】单例模式
【iOS】单例模式 什么是单例模式? 定义 单例模式,简单地说就是一个类只对应一个对象,每次使用这个类时,都只能获取到那一个对象。它的详细定义如下: 如果一个类始终只能创建一个实例,则这个类被称为单例…...
Linux | 探索 Linux 信号机制:信号的产生和自定义捕捉
信号是 Linux 操作系统中非常重要的进程控制机制,用来异步通知进程发生某种事件。理解信号的产生、阻塞、递达、捕捉等概念,可以帮助开发者更好地编写健壮的应用程序,避免由于未处理的信号导致程序异常退出。本文将带你从基础概念开始&#x…...
递归的时间复杂度分析
确定回溯算法的时间复杂度通常比较复杂,因为它取决于搜索空间的大小以及你的剪枝效率。对于生成从1到n的所有长度为k的组合。分析这类算法的时间复杂度时,我们通常需要考虑递归树的所有可能路径。 组合数 生成的组合数量是从n个元素中选择k个的组合数&…...
C++: 二叉树进阶面试题
做每件事之前都心存诚意, 就会事半功倍. 目录 前言1. 根据二叉树创建字符串2. 二叉树的层序遍历Ⅰ3. 二叉树的层序遍历Ⅱ4. 二叉树的最近公共祖先5. 二叉搜索树与双向链表6. 根据一棵树的前序遍历与中序遍历构造二叉树7. 根据一棵树的中序遍历与后序遍历构造二叉树8. 二叉树的…...
【HarmonyOS NEXT】实现网络图片保存到手机相册
【问题描述】 给定一个网络图片的地址,实现将图片保存到手机相册 【API】 phAccessHelper.showAssetsCreationDialog【官方文档】 https://developer.huawei.com/consumer/cn/doc/harmonyos-references-V5/js-apis-photoaccesshelper-V5#showassetscreationdialog…...
Pytorch详解-数据模块
Pytorch详解-数据模块 torch.utils.data.Dataset数据交互模块—Dataset的功能示例系列APIsconcatSubsetrandom_splitsampler unsqueeze DataLoaderDataLoader功能支持两种形式数据集读取自定义采样策略自动组装成批数据多进程数据加载自动实现锁页内存(Pinning Memo…...
浅谈openresty
熟悉了nginx后再来看openresty,不得不说openresty是比较优秀的。 对nginx和openresty的历史等在这此就不介绍了。 首先对标nginx,自然有优劣 一、开发难度 nginx: 毫无疑问nginx的开发难度比较高,需要扎实的c/c基础ÿ…...
【学习笔记】2024最新版SpringCloud教程
2024最新版SpringCloud教程 0 前言闲聊开篇简介 1 SpringBoot和SpringCloud版本选型 2 SpringCloud是什么能干吗 3 SpringCloud各组件的停更升级替换说明 4 项目实战之需求说明 5 项目实战之Maven父工程聚合说明和mysql驱动选择 6 项目实战之Mapper4一键生成Dao层代码 …...
Proxyless Service Mesh:下一代微服务架构体系
一、项目背景及意义 在当今的微服务架构中,应用程序通常被拆分成多个独立的服务,这些服务通过网络进行通信。这种架构的优势在于可以提高系统的可扩展性和灵活性,但也带来了新的挑战,比如: 服务间通信的复杂性&#…...
大数据Flink(一百一十八):SQL水印操作(Watermark)
文章目录 SQL水印操作(Watermark) 一、为什么要有WaterMark 二、Watermark解决的问题 三、代码演示 SQL水印操作(Watermark) 一、为什么要…...
【QGC】把QGroundControl地面站添加到Ubuntu侧边菜单栏启动
把QGroundControl地面站添加到Ubuntu侧边菜单栏启动 简介准备工作步骤 1: 创建 Desktop Entry 文件步骤 2: 编辑 Desktop Entry 文件步骤 3: 刷新应用程序菜单步骤 4: 将 QGroundControl 固定到侧边栏 环境: Ubuntu :20.04 LTS 简介 QGroundControl 是…...
PostgreSQL配置主从同步
PostgreSQL配置主从同步 1 主、备库安装postgresql软件 su - pg12 cd /home/pg12/resource tar -zxvf postgresql-12.9.tar.gz cd postgresql-12.9/ ./configure --prefix/home/pg12/soft/ make -j 16 && make install2 主、备库配置环境变量 vi ~/.bash_profile…...
基于python+django+vue的鲜花商城系统
作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 【2025最新】基于pythondjangovueMySQL的线…...
李飞飞任CEO,空间智能公司World Labs亮相,全明星阵容曝光
人工智能的下个大方向已经出现,标志性学者决定下场创业。 本周五,一个重磅消息引爆了 AI 圈:斯坦福大学计算机科学家李飞飞正式宣布创办 AI 初创公司 ——World Labs,旨在向人工智能系统传授有关物理现实的深入知识。 李飞飞说道&…...
利用AI改写工具,五个策略帮助论文查重率快速降至合规标准
嘿,大家好!我是AI菌。今天咱们来聊聊一个让无数学生头疼的问题:论文重复率飙到30%以上怎么办?别慌,我这就分享5个实用降重技巧,帮你一次搞定,轻松压到合格线以下。这些方法都是我亲身试验过的&a…...
C#实战:5分钟搞定Modbus RTU通讯(基于NModbus4库)
C#实战:5分钟搞定Modbus RTU通讯(基于NModbus4库) 工业自动化领域的数据采集离不开设备通讯协议的支持,而Modbus RTU作为最广泛应用的串行通信协议之一,几乎成为工控开发者的必修课。今天我们就用C#和NModbus4库&#…...
解决ModelScope与datasets版本兼容性问题的最佳实践
1. 为什么ModelScope和datasets版本兼容性这么重要? 第一次用ModelScope加载数据集时,我就被报错整懵了。明明按照官方文档安装了最新版,却提示"ImportError: cannot import name _FEATURE_TYPES from datasets"。后来才发现是Mode…...
导师推荐 2026 最新!降AI率软件测评与好用工具推荐
2026年真正好用的AI论文降重与改写工具,核心看降重效果、去AI味、格式保留、学术适配四大指标。综合实测,千笔AI、ThouPen、豆包、DeepSeek、Grammarly 是当前最值得推荐的梯队,覆盖从免费到付费、从中文到英文、从文科到理工的全场景需求。 …...
个人时间管理神器:OpenClaw+百川2-13B自动分析日历与待办
个人时间管理神器:OpenClaw百川2-13B自动分析日历与待办 1. 为什么需要AI助手管理时间? 作为一个长期被多线程工作困扰的技术从业者,我一直在寻找能够真正理解时间管理需求的智能工具。传统的日历应用只能被动记录日程,而待办清…...
Wii Nunchuk嵌入式驱动库:I²C协议解析与跨平台适配
1. WiiChuck库概述:面向嵌入式系统的Wii Nunchuk通用适配框架WiiChuck是一个专为嵌入式平台设计的Wii Nunchuk(任天堂Wiimote扩展手柄)通用驱动库,其核心定位是提供跨平台、可裁剪、高可靠性的IC通信接口抽象层。该库并非简单封装…...
别再只盯着RTK了!聊聊GNSS/INS组合导航里,紧耦合如何用1颗卫星‘吊住’你的定位
1颗卫星的逆袭:紧耦合技术如何在极端环境下守护你的定位精度 想象一下,你正驾驶一辆L4级自动驾驶汽车穿越曼哈顿的钢铁森林。高楼间的"城市峡谷"让GPS信号时断时续,传统导航系统已经开始报警——"卫星信号丢失"。但你的车…...
从零开始:Windows与Ubuntu20.04双系统安装全指南
1. 为什么需要双系统? 对于很多刚接触Linux的朋友来说,直接在物理机上安装Ubuntu可能会有点担心。毕竟Windows用习惯了,万一Ubuntu用不顺手怎么办?这时候双系统就是最好的解决方案。我自己的第一台开发机就是WindowsUbuntu双系统&…...
解决k8s集群中containerd运行时拉取HTTP私有Harbor镜像的配置难题
1. 为什么需要配置HTTP私有Harbor镜像拉取 最近在帮客户部署Kubernetes集群时,遇到了一个典型问题:使用containerd作为容器运行时,无法从内网HTTP协议的Harbor私有仓库拉取镜像。这个问题其实很常见,特别是很多企业内网环境中&…...
从立创EDA到Cadence Allegro:封装转换的完整指南
1. 为什么需要封装转换? 最近在帮朋友做一个硬件项目,发现他用立创EDA设计的电路板需要转到Cadence Allegro平台生产。这就像两个说不同语言的人要合作,必须找个翻译——封装转换就是这个翻译过程。立创EDA和Allegro虽然都是PCB设计工具&…...
