当前位置: 首页 > news >正文

012 rocketmq事务消息

文章目录

  • 事务消息
    • 概念介绍
    • 交互流程
    • 事务消息原理
    • TransactionListener接⼝
    • TransactionProducer.java
    • TransactionConsumer.java

事务消息

内置topic中的消息对消费者不可见
本地事务+mq消息=事务消息

消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求的场景。

概念介绍

事务消息:消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能,通过消息队列RocketMQ 事务消息能达到分布式事务的最终⼀致。

半事务消息:暂不能投递的消息,发送⽅已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到⽣产者对该消息的⼆次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查:由于⽹络闪断、⽣产者应⽤重启等原因,导致某条事务消息的⼆次确认丢失,消息队列RocketMQ 版服务端通过扫描发现某条消息⻓期处于“半事务消息”时,需要主动向消息⽣产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

交互流程

事务消息交互流程如下图所示。
事务消息
事务消息发送步骤如下:

  1. 发送⽅将半事务消息发送⾄消息队列 RocketMQ 版服务端。
  2. 消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送⽅返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
  3. 发送⽅开始执⾏本地事务逻辑。
  4. 发送⽅根据本地事务执⾏结果向服务端提交⼆次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅⽅最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅⽅将不会接受该消息。

事务消息回查步骤如下:

  1. 在断⽹或者是应⽤重启的特殊情况下,上述步骤 4 提交的⼆次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送⽅收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
  3. 发送⽅根据检查得到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤 4 对半事务消息进⾏操作。

注意事项

  1. 事务消息不⽀持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次⽽导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是⽤户可以通过 Broker 配置⽂件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误⽇志。⽤户可以通过重写AbstractTransactionalMessageCheckListener 类来修改这个⾏为。
  3. 事务消息将在 Broker 配置⽂件中的参数 transactionTimeout 这样的特定时间⻓度之后被检查。当发送事务消息时,⽤户还可以通过设置⽤户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  4. 事务性消息可能不⽌⼀次被检查或消费。做好幂等性的检查
  5. 提交给⽤户的⽬标主题消息可能会失败,⽬前这依⽇志的记录⽽定。它的⾼可⽤性通过 RocketMQ本身的⾼可⽤性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使⽤同步的双重写⼊机制。
  6. 事务消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的⽣产者 ID 查询到消费者。

事务消息原理

HALF消息:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息)
事务消息替换主题,保存原主题和队列信息
半消息对Consumer不可⻅,不会被投递
OP消息: RMQ_SYS_TRANS_OP_HALF_TOPIC(记录⼆阶段操作)
Rollback:只做记录
Commit:根据备份信息重新构造消息并投递
回查:
对⽐HALF消息和OP消息进⾏回查

TransactionListener接⼝

要使⽤RocketMQ的事务消息,要实现⼀个TransactionListener的接⼝,这个接⼝中有两个⽅法,如下:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public interface TransactionListener {/*** When send transactional prepare(half) message succeed, this method will* be invoked to execute local transaction.* 执⾏本地事务** @param msg Half(prepare) message* @param arg Custom business parameter* @return Transaction state*/LocalTransactionState executeLocalTransaction(final Message msg, finalObject arg);/*** When no response to prepare(half) message. broker will send check* message to check the transaction status, and this* method will be invoked to get local transaction status.* 消息回查后,需要检查对应消息的本地事务执⾏的最终结果** @param msg Check message* @return Transaction state*/LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,prepared和commited。当消息执⾏完send⽅法后,进⼊的prepared状态,进⼊prepared状态以后,就要执⾏executeLocalTransaction⽅法,这个⽅法的返回值有3个,也决定着这个消息的命运,1.LocalTransactionState.COMMIT_MESSAGE:提交消息,这个消息由prepared状态进⼊到commited状态,消费者可以消费这个消息;
2.LocalTransactionState.ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
3.LocalTransactionState.UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,⽽最终决定这个消息命运的,是checkLocalTransaction这个⽅法。
当executeLocalTransaction⽅法返回UNKNOW以后,RocketMQ会每隔⼀段时间调⽤⼀次checkLocalTransaction,这个⽅法的返回值决定着这个消息的最终归宿。那么checkLocalTransaction这个⽅法多⻓时间调⽤⼀次呢?我们在BrokerConfig类中可以找到,

/*** Transaction message check interval.*/@ImportantFieldprivate long transactionCheckInterval = 60 * 1000;

这个值是在brokder.conf中配置的,默认值是60*1000,也就是1分钟。那么会检查多少次呢?如果每次都返回UNKNOW,也不能⽆休⽌的检查吧,我们在BrokerConfig类中可以找到

/*** The maximum number of times the message was checked, if exceed this
value, this message will be discarded.*/@ImportantFieldprivate int transactionCheckMax = 15;

这个是检查的最⼤次数,超过这个次数,如果还返回UNKNOW,这个消息将被删除。

TransactionProducer.java

package com.example.rocketmq.demo.transaction;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.TimeUnit;public class TransactionProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名TransactionMQProducer producer = new TransactionMQProducer("GroupTransaction");//2.指定nameserver地址producer.setNamesrvAddr("localhost:9876");//3.添加事务监听器producer.setTransactionListener(new TransactionListener() {/*** 在该方法执行本地事务* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {//db ,本地事务 + mq消息 = 事务消息System.out.println("executeLocal:"+msg.getTags());if(StringUtils.equals("TAGA",msg.getTags())){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.equals("TAGB",msg.getTags())){//B消息本地事务返回rollbackreturn LocalTransactionState.ROLLBACK_MESSAGE;}else if(StringUtils.equals("TAGC",msg.getTags())){return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}/*** 该方法用于MQ进行消息的回查* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("checkLocalTransaction:"+msg.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});//4.启动producerproducer.start();String[] tags = {"TAGA","TAGB","TAGC"};//发送三条消息for (int i = 0; i < 3; i++) {// 发送A、B、C三条Message msg = new Message("TransactionTopic", tags[i],("Hello RocketMQ: " + tags[i]).getBytes(RemotingHelper.DEFAULT_CHARSET));//6.发送消息SendResult sendResult = producer.sendMessageInTransaction(msg,null);//7.获取发送状态SendStatus sendStatus = sendResult.getSendStatus();System.out.printf("发送结果:%s%n", sendStatus);TimeUnit.SECONDS.sleep(1);}//8.关闭生产者producer
//        producer.shutdown();}
}

TransactionConsumer.java

package com.example.rocketmq.demo.transaction;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class TransactionConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 1.创建消费者consumer、制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupTransaction");// 2.指定nameserver地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅主题Topic和Tagconsumer.subscribe("TransactionTopic", "*");// 4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for(MessageExt msg:msgs){String key = msg.getKeys();System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));}
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumer.start();System.out.printf("Consumer Started.%n");}
}

相关文章:

012 rocketmq事务消息

文章目录 事务消息概念介绍交互流程事务消息原理TransactionListener接⼝TransactionProducer.javaTransactionConsumer.java 事务消息 内置topic中的消息对消费者不可见 本地事务mq消息事务消息 消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求…...

ChatGPT与DeepSeek:开源与闭源的AI模型之争

目录 一、模型架构与技术原理 二、性能能力与应用场景 三、用户体验与部署灵活性 四、成本与商业模式 五、未来展望与市场影响 六、总结 随着人工智能技术的飞速发展&#xff0c;ChatGPT和DeepSeek作为两大领先的AI语言模型&#xff0c;成为了行业内外关注的焦点。它们在…...

Ollama的底层实现原理分析

一、背景 Ollama我们可以很方便的对DeepSeek等开源大模型进行部署&#xff0c;几条命令便能部署一个本地大模型服务&#xff0c;降低了非专业大模型开发者的门槛。 我们从中可以看到类似Docker的影子&#xff0c;ollama run 、ollama list等等&#xff0c;拉取对应大模型镜像&a…...

nginx 动态计算拦截非法访问ip

需求&#xff1a;在Nginx上实现一个动态拦截IP的方法&#xff0c;具体是当某个IP在1分钟内访问超过60次时&#xff0c;将其加入Redis并拦截&#xff0c;拦截时间默认1天。 技术选型&#xff1a;使用NginxLuaRedis的方法。这种方案通过Lua脚本在Nginx处理请求时检查Redis中的黑…...

商业秘密维权有哪些成本开支?

企业商业秘密百问百答之六十三&#xff1a;商业秘密维权费用项目有哪些&#xff1f; 在商业秘密维权过程中&#xff0c;原告可能需要支付多种费用&#xff0c;一般费用项目包括&#xff1a; 1、诉讼费。诉讼费是向法院支付的费用&#xff0c;包括起诉费、案件受理费等。这些费…...

使用UA-SPEECH和TORGO数据库验证自动构音障碍语音分类方法

使用UA-SPEECH和TORGO数据库验证自动构音障碍语音分类方法 引言 原文:On using the UA-Speech and TORGO databases to validate automatic dysarthric speech classification approaches 构音障碍简介 构音障碍是一种由于脑损伤或神经疾病(如脑瘫、肌萎缩侧索硬化症、帕金森…...

WebSocketHandler 是 Spring Framework 中用于处理 WebSocket 通信的接口

WebSocketHandler 是 Spring Framework 中用于处理 WebSocket 通信的接口&#xff0c;其主要作用是定义了如何处理 WebSocket 的各种事件和消息。以下是 WebSocketHandler 的主要作用和功能&#xff1a; ### 1. 处理 WebSocket 生命周期事件 WebSocketHandler 定义了多个方法来…...

Pikachu

一、网站搭建 同样的&#xff0c;先下载安装好phpstudy 然后启动Apache和Mysql 然后下载pikachu&#xff0c;解压到phpstudy文件夹下的www文件 然后用vscode打开pikachu中www文件夹下inc中的config.inc.php 将账户和密码改为和phpstudy中的一致&#xff08;默认都是root&…...

如何使用 Jenkins 实现 CI/CD 流水线:从零开始搭建自动化部署流程

如何使用 Jenkins 实现 CI/CD 流水线:从零开始搭建自动化部署流程 在软件开发过程中,持续集成(CI)和持续交付(CD)已经成为现代开发和运维的标准实践。随着代码的迭代越来越频繁,传统的手动部署方式不仅低效,而且容易出错。为了提高开发效率和代码质量,Jenkins作为一款…...

Vue.js 学习笔记

文章目录 前言一、Vue.js 基础概念1.1 Vue.js 简介1.2 Vue.js 的特点1.3 Vue.js 基础示例 二、Vue.js 常用指令2.1 双向数据绑定&#xff08;v-model&#xff09;2.2 条件渲染&#xff08;v-if 和 v-show&#xff09;2.3 列表渲染&#xff08;v-for&#xff09;2.4 事件处理&am…...

数据存储:一文掌握RabbitMQ的详细使用

文章目录 一、RabbitMQ简介二、RabbitMQ的概述2.1 基本概念2.2 实际应用场景三、RabbitMQ的安装与配置3.1 安装RabbitMQ3.2 启用管理插件四、使用Python操作RabbitMQ4.1 安装Pika库4.2 生产者示例4.3 消费者示例4.4 发布/订阅模式示例五、RabbitMQ的高级特性5.1 消息持久化5.2 …...

辛格迪客户案例 | 祐儿医药科技GMP培训管理(TMS)项目

01 项目背景&#xff1a;顺应行业趋势&#xff0c;弥补管理短板 随着医药科技行业的快速发展&#xff0c;相关法规和标准不断更新&#xff0c;对企业的质量管理和人员培训提出了更高要求。祐儿医药科技有限公司&#xff08;以下简称“祐儿医药”&#xff09;作为一家专注于创新…...

FreeRtos实时系统: 十六.tickless低功耗模式

FreeRtos实时系统: 十六.tickless低功耗模式 一.tickless低功耗模式简介二.tickless模式详解三.tickless模式相关配置项四.tickless低功耗模式实验五.课堂总结 一.tickless低功耗模式简介 STM32低功耗模式&#xff1a; 二.tickless模式详解 为了可以降低功耗&#xff0c;又不…...

CSDN博客:Markdown编辑语法教程总结教程(上)

❤个人主页&#xff1a;折枝寄北的博客 Markdown编辑语法教程总结 前言1. CSDN Markdown编辑器功能简介1.1 基础操作界面1.2 创作助手和语法说明 2. Markdown编辑器语法2.1 目录2.2 标题2.2.1 标题级别设置2.2.2 标题居中 3. 文本样式3.1 强调文本&#xff08;斜体&#xff09…...

多个pdf合并成一个pdf的方法

将多个PDF文件合并优点&#xff1a; 能更容易地对其进行归档和备份.打印时可以选择双面打印&#xff0c;减少纸张的浪费。比如把住宿发票以及滴滴发票、行程单等生成一个pdf&#xff0c;双面打印或者无纸化办公情况下直接发送给财务进行存档。 方法: 利用PDF24 Tools网站 …...

Spark基础篇 RDD、DataFrame与DataSet的关系、适用场景与演进趋势

一、核心概念与演进背景 1.1 RDD(弹性分布式数据集) 定义:RDD 是 Spark 最早的核心抽象(1.0版本引入),代表不可变、分区的分布式对象集合,支持函数式编程和容错机制。特点: 无结构化信息:仅存储对象本身,无法自动感知数据内部结构(如字段名、类型)。编译时类型安全…...

odoo初始化数据库

在 Odoo 中&#xff0c;初始化数据库的命令会因使用的环境和启动方式而有所不同&#xff0c;下面为你详细介绍几种常见的初始化数据库的方式。 1. 使用命令行工具初始化 在命令行中&#xff0c;你可以使用 Odoo 的启动脚本并结合相关参数来初始化数据库。以下是基本的命令格式…...

大模型WebUI:Gradio全解12——LangChain原理、架构和组件(2)

大模型WebUI:Gradio全解12——LangChain原理、架构和组件(2) 前言12. LangChain原理及agents构建Gradio UI12.2 学习资料12.2.1 学习文档12.2.2 用途示例12.2.3 OpenAI和DeepSeek例程1. OpenAI示例2. DeepSeek例程参考文献前言 本系列文章主要介绍WEB界面工具Gradio。Gradi…...

1. 搭建前端+后端开发框架

1. 说明 本篇博客主要介绍网页开发中&#xff0c;搭建前端和后端开发框架的具体步骤&#xff0c;框架中所使用的技术栈如下&#xff1a; 前端&#xff1a;VUE Javascript 后端&#xff1a;Python Flask Mysql 其中MySQL主要用来存储需要的数据&#xff0c;在本文中搭建基本…...

初会学习记录

目录 务实&#xff1a; 第一章 (1)会计概念&#xff0c;职能和目标&#xff1a; (2)会计假设&#xff1a; (3)会计核算基础&#xff1a; (4)会计信息质量要求&#xff1a; (5)会计人员职业道德规范 (6)会计准则制度体系概述&#xff1a; (7)会计要素与会计等式&#x…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…...

rknn优化教程(二)

文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK&#xff0c;开始写第二篇的内容了。这篇博客主要能写一下&#xff1a; 如何给一些三方库按照xmake方式进行封装&#xff0c;供调用如何按…...

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

ssc377d修改flash分区大小

1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

django filter 统计数量 按属性去重

在Django中&#xff0c;如果你想要根据某个属性对查询集进行去重并统计数量&#xff0c;你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求&#xff1a; 方法1&#xff1a;使用annotate()和Count 假设你有一个模型Item&#xff0c;并且你想…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论

路径问题的革命性重构&#xff1a;基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中&#xff08;图1&#xff09;&#xff1a; mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

《Offer来了:Java面试核心知识点精讲》大纲

文章目录 一、《Offer来了:Java面试核心知识点精讲》的典型大纲框架Java基础并发编程JVM原理数据库与缓存分布式架构系统设计二、《Offer来了:Java面试核心知识点精讲(原理篇)》技术文章大纲核心主题:Java基础原理与面试高频考点Java虚拟机(JVM)原理Java并发编程原理Jav…...

深入浅出WebGL:在浏览器中解锁3D世界的魔法钥匙

WebGL&#xff1a;在浏览器中解锁3D世界的魔法钥匙 引言&#xff1a;网页的边界正在消失 在数字化浪潮的推动下&#xff0c;网页早已不再是静态信息的展示窗口。如今&#xff0c;我们可以在浏览器中体验逼真的3D游戏、交互式数据可视化、虚拟实验室&#xff0c;甚至沉浸式的V…...