RocketMQ 事务消息 原理及使用方法解析
🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2023年3月24日
🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
- RocketMQ事务消息
- 适用场景举例
- 使用示例
- 发送事务消息
- 事务回查
- 事务执行
RocketMQ事务消息
RocketMQ针对事务消息扩展了两个相关的概念:
1. 半消息
半消息(Half Message)是一种特殊的消息类型,处于这个状态的消息暂时不能被Consumer消费。
当一条事务消息被成功投递到Broker上,但Broker没有收到Producer的二次确认时,该事务消息就处于暂时不可消费的状态,这种消息就是半消息。
2. 消息状态回查
由于网络抖动、系统宕机等等原因,可能导致Producer向Broker发送的二次确认信息没有送达。如果Broker检测到某条事务消息长时间处于半消息状态,则会主动向Producer端发起回查操作,查询该事务消息在Producer端的事务状态。
这个机制主要是用来解决分布式事务中的超时问题。

上图是RocketMQ官网提供的事务消息流程图,执行步骤如下:
Producer向Broker端发送半消息Broker发送ACK确认,表示半消息发送成功Producer执行本地事务- 本地事务完毕,根据事务的状态,
Producer向Broker发送二次确认消息,确认该半消息的Commit或Rollback状态。Broker收到二次确认消息之后:如果是Commit状态,则直接将消息发送到Consumer端执行消费逻辑;如果是Rollback状态,则会直接将其标记为失败,不会发送给Consumer - 针对超时情况,
Broker主动向Producer发起消息回查 Producer处理回查消息,返回对应的本地事务执行结果Broker针对消息回查的结果,执行【步骤4】的操作
适用场景举例
以转账系统为例,假设A要向B转账100元,执行本地事务和发送异步消息的过程应该同时保持成功或失败,即A的账户扣款成功后,就一定要发消息发送出去,最直观的思路可能有两个:
1. 先发消息
这种策略的流程如下:

存在的问题是: 如果消息发送成功,但后续A扣款失败了,消费端仍然会消费这条消息,进而向B账户里打钱,数据就出现不一致的情况了。
2. 后发消息

存在的问题是: 如果扣款成功,但是发送消息失败,就会出现A已经扣钱了,但B账户里没有入账的情况,同样也是无法接受的。
出现上述情况的根本原因是本地事务和发送消息这两个操作并不是原子的,因此也就无法做到同时失败或同时成功,所以数据一致性难以保障。
解决上述问题的方法就是上面提到的半消息:

如上图所示,执行本地事务之前先发送一个半消息,此时还不能被消费者消费,只有当本地事务执行完毕并发送二次确认消息之后,半消息才能被Consumer消费。
如此以来就保证了多个系统数据的数据一致性,前提是系统不需要保证数据的强一致性。
使用示例
发送事务消息
RocketMQ发送事务消息设计到消息发送、消息回查、消息二次确认等过程,因此这个过程可能会“稍显复杂”。
发送事务消息使用的是TransactionMQProducer,一个简单的demo如下:
public class TransactionProducer {public static void main(String[] args) throws MQClientException {TransactionCheckListener transactionCheckListener = new TransactionCheckListener() {@Overridepublic LocalTransactionState checkLocalTransactionState(MessageExt msg) {return null;}};TransactionMQProducer producer = new TransactionMQProducer("GROUP A");producer.setCheckThreadPoolMinSize(2);producer.setCheckThreadPoolMaxSize(2);producer.setCheckRequestHoldMax(2000);producer.setTransactionCheckListener(transactionCheckListener);producer.start();String[] tags = new String[]{"TAG A", "TAG B", "TAG C"};LocalTransactionExecuter transactionExecuter = new LocalTransactionExecuter() {@Overridepublic LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {return null;}};for (int i = 0; i < 10; i++) {Message msg = new Message("TEST", tags[i % tags.length], "KEY " + i, ("HELLO, ROCKETMQ" + i).getBytes());SendResult result = producer.sendMessageInTransaction(msg, transactionExecuter, null);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}producer.shutdown();}
}
事务回查
checkLocalTransaction是事务消息回查监听方法,可以获取本地事务状态,根据事务的状态来确定是否要发送二次确认消息,或者进行事务回滚操作。
消息回查事务的状态由以下几种情况:
LocalTransactionState.ROLLBACK_MESSAGE:事务回滚LocalTransactionState.COMMIT_MESSAGE:事务提交LocalTransactionState.UNKNOW:未知状态,此时Broker会定时重新查询Producer消息的状态,直到出现前面两种情况。
public interface TransactionListener {LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
事务执行
executeLocalTransaction方法用于执行本地事务,如果本地事务执行成功则进行事务提交,否则进行事务回滚,如果是UNKNOW状态的话,Broker就会定时回查Producer的消息状态,直到彻底成功或失败。
public interface TransactionListener {LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
}
相关文章:
RocketMQ 事务消息 原理及使用方法解析
🍊 Java学习:Java从入门到精通总结 🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想 🍊 绝对不一样的职场干货:大厂最佳实践经验指南 📆 最近更新:2023年3月24日 &#x…...
为什么 ChatGPT 输出时经常会中断,需要输入“继续” 才可以继续输出?
作者:明明如月学长, CSDN 博客专家,蚂蚁集团高级 Java 工程师,《性能优化方法论》作者、《解锁大厂思维:剖析《阿里巴巴Java开发手册》》、《再学经典:《EffectiveJava》独家解析》专栏作者。 热门文章推荐…...
PyTorch 之 基于经典网络架构训练图像分类模型
文章目录一、 模块简单介绍1. 数据预处理部分2. 网络模块设置3. 网络模型保存与测试二、数据读取与预处理操作1. 制作数据源2. 读取标签对应的实际名字3. 展示数据三、模型构建与实现1. 加载 models 中提供的模型,并且直接用训练的好权重当做初始化参数2. 参考 pyto…...
Scrapy的callback进入不了回调方法
一、前言 有的时候,Scrapy的callback方法直接被略过了,不去执行其中的回调方法,可能排查好久都排查不出来,我来教大家集中解决方法。 yield Request(urlurl, callbackself.parse_detail, cb_kwargs{item: item})二、解决方法 1…...
第二十一天 数据库开发-MySQL
目录 数据库开发-MySQL 前言 1. MySQL概述 1.1 安装 1.2 数据模型 1.3 SQL介绍 1.4 项目开发流程 2. 数据库设计-DDL 2.1 数据库操作 2.2 图形化工具 2.3 表操作 3. 数据库操作-DML 3.1 增加(insert) 3.2 修改(update) 3.3 删除(delete) 数据库开发-MySQL 前言 …...
蓝桥杯每日一真题—— [蓝桥杯 2021 省 AB2] 完全平方数(数论,质因数分解)
文章目录[蓝桥杯 2021 省 AB2] 完全平方数题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1样例 #2样例输入 #2样例输出 #2提示思路:理论补充:完全平方数的一个性质:完全平方数的质因子的指数一定为偶数最终思路:小插曲&am…...
Linux编辑器-vim
一、vim简述1)vi/vim2)检查vim是否安装2)如何用vim打开文件3)vim的几种模式命令模式插入模式末行模式可视化模式二、vim的基本操作1)进入vim(命令行模式)2)[命令行模式]切换至[插入模式]3)[插入模式]切换至[命令行模式]4)[命令行模…...
5G将在五方面彻底改变制造业
想象一下这样一个未来,智能机器人通过在工厂车间重新配置自己,从多条生产线上组装产品。安全无人机处理着从监视入侵者到确认员工停车等繁琐的任务。自动驾驶汽车不仅可以在建筑物之间运输零部件,还可以在全国各地运输。工厂检查可以在千里之…...
http和https的区别?
http和https的区别?HTTPHTTPSHTTP与HTTPS区别HTTPS相比于HTTP协议的优点和缺点HTTP http是超文本传输协议 HTTP协议是基于传输层的TCP协议进行通信,通用无状态的协议。80端口 HTTPS https—安全的超文本传输协议 是以安全为目标的HTTP通道,…...
【Spring Cloud Alibaba】4.创建服务消费者
文章目录简介开始搭建创建项目修改POM文件添加启动类添加配置项添加Controller添加配置文件启动项目测试访问Nacos访问接口查看端点检查简介 接下来我们创建一个服务消费者,本操作先要完成之前的步骤,详情请参照【Spring Cloud Alibaba】Spring Cloud A…...
C语言——动态内存管理 malloc、calloc、realloc、free的使用
目录 一、为什么存在动态内存分配 二、动态内存函数的介绍 2.1malloc和free 2.2calloc 2.3realloc 三、常见的动态内存错误 3.1对NULL指针的解引用操作 3.2对动态开辟空间的越界访问 3.3对非动态开辟的内存使用free释放 3.4使用free释放一块动态开辟内存的一部分 3.5…...
技术分享——Java8新特性
技术分享——Java8新特性1.背景2. 新特性主要内容3. Lambda表达式4. 四大内置核心函数式接口4.1 Consumer<T>消费型接口4.2 Supplier<T>供给型接口4.3 Function<T,R>函数型接口4.4 Predicate<T> 断定型接口5. Stream流操作5.1 什么是流以及流的类型5.2…...
vue基础知识大全
1,指令作用 以v-开头,由vue提供的attribute,为渲染DOM应用提供特殊的响应式行为,也即是在表达式的值发生变化的时候响应式的更新DOM。其内容为可以被求值的js代码,可以写在return后面被返回的表达式。 指令的简写指令简…...
第2篇|文献研读|nature climate change|减缓气候变化和促进热带生物多样性的碳储量走廊
研究背景 从 2000 年到 2012 年,潮湿和干燥热带地区的森林总损失超过 90,000 平方公里 yr-1,这主要是由农业扩张驱动的。热带森林砍伐向大气中排放 0:95 Pg C yr-1 并导致广泛的生物多样性丧失。保护区的生物多样性取决于与保护区所在的更广泛景观的生态…...
从暴力递归到动态规划(2)小乖,你也在为转移方程而烦恼吗?
前引:继上篇我们讲到暴力递归的过程,这一篇blog我们将继续对从暴力递归到动态规划的实现过程,与上篇类似,我们依然采用题目的方式对其转化过程进行论述。上篇博客:https://blog.csdn.net/m0_65431718/article/details/…...
Leetcode.1638 统计只差一个字符的子串数目
题目链接 Leetcode.1638 统计只差一个字符的子串数目 Rating : 1745 题目描述 给你两个字符串 s和 t,请你找出 s中的非空子串的数目,这些子串满足替换 一个不同字符 以后,是 t串的子串。换言之,请你找到 s和 t串中 恰…...
KoTime:v2.3.9新增线程管理(线程统计、状态查询等)
功能概览 KoTime的开源版本已经迭代到了V2.3.9,目前功能如下: 实时监听方法,统计运行时长web展示方法调用链路,瓶颈可视化追踪追踪系统异常,精确定位到方法接口超时邮件通知,无需实时查看线上热更新&…...
直面风口,未来不仅是中文版ChatGPT,还有AGI大时代在等着我们
说到标题的AI2.0这个概念的研究早在2015年就研究起步了,其实大家早已知道,人工智能技术必然是未来科技发展战略中的重要一环,今天我们就从AI2.0入手,以GPT-4及文心一言的发布为切入角度,来谈一谈即将降临的AGI时代。 关…...
若依微服务(ruoyi-cloud)保姆版容器编排运行
一、简介 项目gitee地址:https://gitee.com/y_project/RuoYi-Cloud 由于该项目运行有很多坑,大家可以在git克隆拷贝到本地后,执行下面的命令使master版本回退到本篇博客的版本: git reset --hard 05ca78e82fb4e074760156359d09a…...
vue2图片预览插件
学习:vue插件开发实例-图片预览插件 vue2-pre-img-plugin的gitee代码 准备工作 准备图片与基础的样式 将iconfont下载的字体图标资源放在src/assets/iconfont目录下将准备预览的图片放到src/static/images目录下 PrevImg.vue 在plugins/PrevImg目录下ÿ…...
如何快速配置FanControl风扇控制:从安装到优化的完整指南
如何快速配置FanControl风扇控制:从安装到优化的完整指南 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trending…...
论文重复率过高,应该怎么办?
重复率过高,先别急着全文重写。大多数情况,不是整篇都有问题,而是少数几个章节把总重复率拉爆了。很多人第一反应是“从头改到尾”。这个最累,而且效率最低。正确顺序是这样的。第一步:先看是哪一部分高,不…...
ESXi 9.0.0 HPE原厂定制版深度解析|专属硬件适配+零报错部署指南,HPE服务器运维最优解
随着vSphere 9.0虚拟化架构全面普及,企业HPE慧与服务器的底层虚拟化部署迎来全新升级需求。普通通用版ESXi镜像在HPE ProLiant、Apollo系列服务器中,常出现网卡不认、RAID驱动缺失、iLO管理异常、硬件兼容报错等问题,严重影响生产部署效率与系…...
5G通信实战:手把手教你用Vivado LDPC IP核配置编码参数(附避坑指南)
5G通信实战:FPGA开发中的LDPC编解码参数配置全解析 在5G通信系统的开发过程中,LDPC(低密度奇偶校验)码作为物理层的关键技术之一,其实现质量直接影响着系统的传输性能和可靠性。对于使用Xilinx FPGA进行5G基带开发的工…...
如何查询Flexy 4G扩展卡GSM信号强度
GSM信号强度查询与历史记录趋势图一、硬件准备1.1 硬件安装与确认1. 安装GSM扩展卡:将支持GSM功能的扩展卡插入Flexy 205的扩展卡插槽(Slot1或Slot2),确保硬件连接牢固。2. 插入SIM卡:确保SIM卡无欠费、信号覆盖正常。…...
保姆级教程:在Ubuntu 22.04上从源码编译RISC-V SPIKE模拟器(含libboost报错解决)
从零构建RISC-V开发环境:Ubuntu 22.04下SPIKE模拟器深度编译指南 当第一次接触RISC-V生态时,搭建可靠的开发环境往往成为新手面临的第一个挑战。作为RISC-V官方推荐的指令集模拟器,SPIKE以其轻量级和准确性成为学习RISC-V架构的理想工具。本文…...
编程入门必存 100 个经典代码 自学提升一站式合集
前言 我记得刚开始接触编程的时候,觉得太难了。 也很好奇,写代码的那些人也太厉害了吧?全是英文的,他们的英文水平一定很好吧? 他们是怎么记住这么多代码格式的?而且错了一个标点符号,整个程…...
LabVIEW状态机设计:从顺序流程到事件驱动的架构升级
1. 项目概述:从“顺序流程”到“状态驱动”的思维跃迁如果你用过LabVIEW,画过流程图,写过一些简单的数据采集或仪器控制程序,那你大概率经历过这样的场景:程序一开始跑得挺好,几个步骤按顺序执行࿰…...
常用shell命令总结(Linux命令)
当前目录 .上一级目录 …根目录,或者是目录拼接符 /管道符(左侧输出作为右侧输入) |上一个命令的返回码 $?或 ||且 &&cat 查看文档 cat XX.txt加权限 chmod x 文件 chmod 777 文件改变文件的所有者 chown newowner file.txt改变文件…...
Armv8/v9架构系统寄存器解析:SCXTNUM与SMCR深度剖析
1. AArch64系统寄存器概述 在Armv8/v9架构中,系统寄存器是处理器状态和控制的核心枢纽。与通用寄存器不同,系统寄存器专门用于配置处理器功能、监控运行状态以及实现安全隔离。AArch64架构通过精心设计的寄存器命名规范,使得寄存器的功能和访…...
