012 rocketmq事务消息
文章目录
- 事务消息
- 概念介绍
- 交互流程
- 事务消息原理
- TransactionListener接⼝
- TransactionProducer.java
- TransactionConsumer.java
事务消息
内置topic中的消息对消费者不可见
本地事务+mq消息=事务消息
消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求的场景。
概念介绍
事务消息:消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能,通过消息队列RocketMQ 事务消息能达到分布式事务的最终⼀致。
半事务消息:暂不能投递的消息,发送⽅已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到⽣产者对该消息的⼆次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
消息回查:由于⽹络闪断、⽣产者应⽤重启等原因,导致某条事务消息的⼆次确认丢失,消息队列RocketMQ 版服务端通过扫描发现某条消息⻓期处于“半事务消息”时,需要主动向消息⽣产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。
交互流程
事务消息交互流程如下图所示。

事务消息发送步骤如下:
- 发送⽅将半事务消息发送⾄消息队列 RocketMQ 版服务端。
- 消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送⽅返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
- 发送⽅开始执⾏本地事务逻辑。
- 发送⽅根据本地事务执⾏结果向服务端提交⼆次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅⽅最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅⽅将不会接受该消息。
事务消息回查步骤如下:
- 在断⽹或者是应⽤重启的特殊情况下,上述步骤 4 提交的⼆次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
- 发送⽅收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
- 发送⽅根据检查得到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤 4 对半事务消息进⾏操作。
注意事项
- 事务消息不⽀持延时消息和批量消息。
- 为了避免单个消息被检查太多次⽽导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是⽤户可以通过 Broker 配置⽂件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误⽇志。⽤户可以通过重写AbstractTransactionalMessageCheckListener 类来修改这个⾏为。
- 事务消息将在 Broker 配置⽂件中的参数 transactionTimeout 这样的特定时间⻓度之后被检查。当发送事务消息时,⽤户还可以通过设置⽤户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
- 事务性消息可能不⽌⼀次被检查或消费。做好幂等性的检查
- 提交给⽤户的⽬标主题消息可能会失败,⽬前这依⽇志的记录⽽定。它的⾼可⽤性通过 RocketMQ本身的⾼可⽤性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使⽤同步的双重写⼊机制。
- 事务消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的⽣产者 ID 查询到消费者。
事务消息原理
HALF消息:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息)
事务消息替换主题,保存原主题和队列信息
半消息对Consumer不可⻅,不会被投递
OP消息: RMQ_SYS_TRANS_OP_HALF_TOPIC(记录⼆阶段操作)
Rollback:只做记录
Commit:根据备份信息重新构造消息并投递
回查:
对⽐HALF消息和OP消息进⾏回查
TransactionListener接⼝
要使⽤RocketMQ的事务消息,要实现⼀个TransactionListener的接⼝,这个接⼝中有两个⽅法,如下:
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public interface TransactionListener {/*** When send transactional prepare(half) message succeed, this method will* be invoked to execute local transaction.* 执⾏本地事务** @param msg Half(prepare) message* @param arg Custom business parameter* @return Transaction state*/LocalTransactionState executeLocalTransaction(final Message msg, finalObject arg);/*** When no response to prepare(half) message. broker will send check* message to check the transaction status, and this* method will be invoked to get local transaction status.* 消息回查后,需要检查对应消息的本地事务执⾏的最终结果** @param msg Check message* @return Transaction state*/LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,prepared和commited。当消息执⾏完send⽅法后,进⼊的prepared状态,进⼊prepared状态以后,就要执⾏executeLocalTransaction⽅法,这个⽅法的返回值有3个,也决定着这个消息的命运,1.LocalTransactionState.COMMIT_MESSAGE:提交消息,这个消息由prepared状态进⼊到commited状态,消费者可以消费这个消息;
2.LocalTransactionState.ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
3.LocalTransactionState.UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,⽽最终决定这个消息命运的,是checkLocalTransaction这个⽅法。
当executeLocalTransaction⽅法返回UNKNOW以后,RocketMQ会每隔⼀段时间调⽤⼀次checkLocalTransaction,这个⽅法的返回值决定着这个消息的最终归宿。那么checkLocalTransaction这个⽅法多⻓时间调⽤⼀次呢?我们在BrokerConfig类中可以找到,
/*** Transaction message check interval.*/@ImportantFieldprivate long transactionCheckInterval = 60 * 1000;
这个值是在brokder.conf中配置的,默认值是60*1000,也就是1分钟。那么会检查多少次呢?如果每次都返回UNKNOW,也不能⽆休⽌的检查吧,我们在BrokerConfig类中可以找到
/*** The maximum number of times the message was checked, if exceed this
value, this message will be discarded.*/@ImportantFieldprivate int transactionCheckMax = 15;
这个是检查的最⼤次数,超过这个次数,如果还返回UNKNOW,这个消息将被删除。
TransactionProducer.java
package com.example.rocketmq.demo.transaction;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.TimeUnit;public class TransactionProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名TransactionMQProducer producer = new TransactionMQProducer("GroupTransaction");//2.指定nameserver地址producer.setNamesrvAddr("localhost:9876");//3.添加事务监听器producer.setTransactionListener(new TransactionListener() {/*** 在该方法执行本地事务* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {//db ,本地事务 + mq消息 = 事务消息System.out.println("executeLocal:"+msg.getTags());if(StringUtils.equals("TAGA",msg.getTags())){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.equals("TAGB",msg.getTags())){//B消息本地事务返回rollbackreturn LocalTransactionState.ROLLBACK_MESSAGE;}else if(StringUtils.equals("TAGC",msg.getTags())){return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}/*** 该方法用于MQ进行消息的回查* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("checkLocalTransaction:"+msg.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});//4.启动producerproducer.start();String[] tags = {"TAGA","TAGB","TAGC"};//发送三条消息for (int i = 0; i < 3; i++) {// 发送A、B、C三条Message msg = new Message("TransactionTopic", tags[i],("Hello RocketMQ: " + tags[i]).getBytes(RemotingHelper.DEFAULT_CHARSET));//6.发送消息SendResult sendResult = producer.sendMessageInTransaction(msg,null);//7.获取发送状态SendStatus sendStatus = sendResult.getSendStatus();System.out.printf("发送结果:%s%n", sendStatus);TimeUnit.SECONDS.sleep(1);}//8.关闭生产者producer
// producer.shutdown();}
}
TransactionConsumer.java
package com.example.rocketmq.demo.transaction;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class TransactionConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 1.创建消费者consumer、制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupTransaction");// 2.指定nameserver地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅主题Topic和Tagconsumer.subscribe("TransactionTopic", "*");// 4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for(MessageExt msg:msgs){String key = msg.getKeys();System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));}
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumer.start();System.out.printf("Consumer Started.%n");}
}相关文章:
012 rocketmq事务消息
文章目录 事务消息概念介绍交互流程事务消息原理TransactionListener接⼝TransactionProducer.javaTransactionConsumer.java 事务消息 内置topic中的消息对消费者不可见 本地事务mq消息事务消息 消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求…...
ChatGPT与DeepSeek:开源与闭源的AI模型之争
目录 一、模型架构与技术原理 二、性能能力与应用场景 三、用户体验与部署灵活性 四、成本与商业模式 五、未来展望与市场影响 六、总结 随着人工智能技术的飞速发展,ChatGPT和DeepSeek作为两大领先的AI语言模型,成为了行业内外关注的焦点。它们在…...
Ollama的底层实现原理分析
一、背景 Ollama我们可以很方便的对DeepSeek等开源大模型进行部署,几条命令便能部署一个本地大模型服务,降低了非专业大模型开发者的门槛。 我们从中可以看到类似Docker的影子,ollama run 、ollama list等等,拉取对应大模型镜像&a…...
nginx 动态计算拦截非法访问ip
需求:在Nginx上实现一个动态拦截IP的方法,具体是当某个IP在1分钟内访问超过60次时,将其加入Redis并拦截,拦截时间默认1天。 技术选型:使用NginxLuaRedis的方法。这种方案通过Lua脚本在Nginx处理请求时检查Redis中的黑…...
商业秘密维权有哪些成本开支?
企业商业秘密百问百答之六十三:商业秘密维权费用项目有哪些? 在商业秘密维权过程中,原告可能需要支付多种费用,一般费用项目包括: 1、诉讼费。诉讼费是向法院支付的费用,包括起诉费、案件受理费等。这些费…...
使用UA-SPEECH和TORGO数据库验证自动构音障碍语音分类方法
使用UA-SPEECH和TORGO数据库验证自动构音障碍语音分类方法 引言 原文:On using the UA-Speech and TORGO databases to validate automatic dysarthric speech classification approaches 构音障碍简介 构音障碍是一种由于脑损伤或神经疾病(如脑瘫、肌萎缩侧索硬化症、帕金森…...
WebSocketHandler 是 Spring Framework 中用于处理 WebSocket 通信的接口
WebSocketHandler 是 Spring Framework 中用于处理 WebSocket 通信的接口,其主要作用是定义了如何处理 WebSocket 的各种事件和消息。以下是 WebSocketHandler 的主要作用和功能: ### 1. 处理 WebSocket 生命周期事件 WebSocketHandler 定义了多个方法来…...
Pikachu
一、网站搭建 同样的,先下载安装好phpstudy 然后启动Apache和Mysql 然后下载pikachu,解压到phpstudy文件夹下的www文件 然后用vscode打开pikachu中www文件夹下inc中的config.inc.php 将账户和密码改为和phpstudy中的一致(默认都是root&…...
如何使用 Jenkins 实现 CI/CD 流水线:从零开始搭建自动化部署流程
如何使用 Jenkins 实现 CI/CD 流水线:从零开始搭建自动化部署流程 在软件开发过程中,持续集成(CI)和持续交付(CD)已经成为现代开发和运维的标准实践。随着代码的迭代越来越频繁,传统的手动部署方式不仅低效,而且容易出错。为了提高开发效率和代码质量,Jenkins作为一款…...
Vue.js 学习笔记
文章目录 前言一、Vue.js 基础概念1.1 Vue.js 简介1.2 Vue.js 的特点1.3 Vue.js 基础示例 二、Vue.js 常用指令2.1 双向数据绑定(v-model)2.2 条件渲染(v-if 和 v-show)2.3 列表渲染(v-for)2.4 事件处理&am…...
数据存储:一文掌握RabbitMQ的详细使用
文章目录 一、RabbitMQ简介二、RabbitMQ的概述2.1 基本概念2.2 实际应用场景三、RabbitMQ的安装与配置3.1 安装RabbitMQ3.2 启用管理插件四、使用Python操作RabbitMQ4.1 安装Pika库4.2 生产者示例4.3 消费者示例4.4 发布/订阅模式示例五、RabbitMQ的高级特性5.1 消息持久化5.2 …...
辛格迪客户案例 | 祐儿医药科技GMP培训管理(TMS)项目
01 项目背景:顺应行业趋势,弥补管理短板 随着医药科技行业的快速发展,相关法规和标准不断更新,对企业的质量管理和人员培训提出了更高要求。祐儿医药科技有限公司(以下简称“祐儿医药”)作为一家专注于创新…...
FreeRtos实时系统: 十六.tickless低功耗模式
FreeRtos实时系统: 十六.tickless低功耗模式 一.tickless低功耗模式简介二.tickless模式详解三.tickless模式相关配置项四.tickless低功耗模式实验五.课堂总结 一.tickless低功耗模式简介 STM32低功耗模式: 二.tickless模式详解 为了可以降低功耗,又不…...
CSDN博客:Markdown编辑语法教程总结教程(上)
❤个人主页:折枝寄北的博客 Markdown编辑语法教程总结 前言1. CSDN Markdown编辑器功能简介1.1 基础操作界面1.2 创作助手和语法说明 2. Markdown编辑器语法2.1 目录2.2 标题2.2.1 标题级别设置2.2.2 标题居中 3. 文本样式3.1 强调文本(斜体)…...
多个pdf合并成一个pdf的方法
将多个PDF文件合并优点: 能更容易地对其进行归档和备份.打印时可以选择双面打印,减少纸张的浪费。比如把住宿发票以及滴滴发票、行程单等生成一个pdf,双面打印或者无纸化办公情况下直接发送给财务进行存档。 方法: 利用PDF24 Tools网站 …...
Spark基础篇 RDD、DataFrame与DataSet的关系、适用场景与演进趋势
一、核心概念与演进背景 1.1 RDD(弹性分布式数据集) 定义:RDD 是 Spark 最早的核心抽象(1.0版本引入),代表不可变、分区的分布式对象集合,支持函数式编程和容错机制。特点: 无结构化信息:仅存储对象本身,无法自动感知数据内部结构(如字段名、类型)。编译时类型安全…...
odoo初始化数据库
在 Odoo 中,初始化数据库的命令会因使用的环境和启动方式而有所不同,下面为你详细介绍几种常见的初始化数据库的方式。 1. 使用命令行工具初始化 在命令行中,你可以使用 Odoo 的启动脚本并结合相关参数来初始化数据库。以下是基本的命令格式…...
大模型WebUI:Gradio全解12——LangChain原理、架构和组件(2)
大模型WebUI:Gradio全解12——LangChain原理、架构和组件(2) 前言12. LangChain原理及agents构建Gradio UI12.2 学习资料12.2.1 学习文档12.2.2 用途示例12.2.3 OpenAI和DeepSeek例程1. OpenAI示例2. DeepSeek例程参考文献前言 本系列文章主要介绍WEB界面工具Gradio。Gradi…...
1. 搭建前端+后端开发框架
1. 说明 本篇博客主要介绍网页开发中,搭建前端和后端开发框架的具体步骤,框架中所使用的技术栈如下: 前端:VUE Javascript 后端:Python Flask Mysql 其中MySQL主要用来存储需要的数据,在本文中搭建基本…...
初会学习记录
目录 务实: 第一章 (1)会计概念,职能和目标: (2)会计假设: (3)会计核算基础: (4)会计信息质量要求: (5)会计人员职业道德规范 (6)会计准则制度体系概述: (7)会计要素与会计等式&#x…...
在软件开发中正确使用MySQL日期时间类型的深度解析
在日常软件开发场景中,时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志,到供应链系统的物流节点时间戳,时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库,其日期时间类型的…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...
CMake基础:构建流程详解
目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...
JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作
一、上下文切换 即使单核CPU也可以进行多线程执行代码,CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短,所以CPU会不断地切换线程执行,从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...
Caliper 配置文件解析:config.yaml
Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...
【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
免费数学几何作图web平台
光锐软件免费数学工具,maths,数学制图,数学作图,几何作图,几何,AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...
宇树科技,改名了!
提到国内具身智能和机器人领域的代表企业,那宇树科技(Unitree)必须名列其榜。 最近,宇树科技的一项新变动消息在业界引发了不少关注和讨论,即: 宇树向其合作伙伴发布了一封公司名称变更函称,因…...
全面解析数据库:从基础概念到前沿应用
在数字化时代,数据已成为企业和社会发展的核心资产,而数据库作为存储、管理和处理数据的关键工具,在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理,到社交网络的用户数据存储,再到金融行业的交易记录处理&a…...
可视化预警系统:如何实现生产风险的实时监控?
在生产环境中,风险无处不在,而传统的监控方式往往只能事后补救,难以做到提前预警。但如今,可视化预警系统正在改变这一切!它能够实时收集和分析生产数据,通过直观的图表和警报,让管理者第一时间…...
