当前位置: 首页 > news >正文

012 rocketmq事务消息

文章目录

  • 事务消息
    • 概念介绍
    • 交互流程
    • 事务消息原理
    • TransactionListener接⼝
    • TransactionProducer.java
    • TransactionConsumer.java

事务消息

内置topic中的消息对消费者不可见
本地事务+mq消息=事务消息

消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求的场景。

概念介绍

事务消息:消息队列 RocketMQ 版提供类似 X/Open XA 的分布式事务功能,通过消息队列RocketMQ 事务消息能达到分布式事务的最终⼀致。

半事务消息:暂不能投递的消息,发送⽅已经成功地将消息发送到了消息队列 RocketMQ 版服务端,但是服务端未收到⽣产者对该消息的⼆次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。

消息回查:由于⽹络闪断、⽣产者应⽤重启等原因,导致某条事务消息的⼆次确认丢失,消息队列RocketMQ 版服务端通过扫描发现某条消息⻓期处于“半事务消息”时,需要主动向消息⽣产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

交互流程

事务消息交互流程如下图所示。
事务消息
事务消息发送步骤如下:

  1. 发送⽅将半事务消息发送⾄消息队列 RocketMQ 版服务端。
  2. 消息队列 RocketMQ 版服务端将消息持久化成功之后,向发送⽅返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
  3. 发送⽅开始执⾏本地事务逻辑。
  4. 发送⽅根据本地事务执⾏结果向服务端提交⼆次确认(Commit 或是 Rollback),服务端收到Commit 状态则将半事务消息标记为可投递,订阅⽅最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅⽅将不会接受该消息。

事务消息回查步骤如下:

  1. 在断⽹或者是应⽤重启的特殊情况下,上述步骤 4 提交的⼆次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  2. 发送⽅收到消息回查后,需要检查对应消息的本地事务执⾏的最终结果。
  3. 发送⽅根据检查得到的本地事务的最终状态再次提交⼆次确认,服务端仍按照步骤 4 对半事务消息进⾏操作。

注意事项

  1. 事务消息不⽀持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次⽽导致半队列消息累积,我们默认将单个消息的检查次数限制为15 次,但是⽤户可以通过 Broker 配置⽂件的 transactionCheckMax 参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误⽇志。⽤户可以通过重写AbstractTransactionalMessageCheckListener 类来修改这个⾏为。
  3. 事务消息将在 Broker 配置⽂件中的参数 transactionTimeout 这样的特定时间⻓度之后被检查。当发送事务消息时,⽤户还可以通过设置⽤户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  4. 事务性消息可能不⽌⼀次被检查或消费。做好幂等性的检查
  5. 提交给⽤户的⽬标主题消息可能会失败,⽬前这依⽇志的记录⽽定。它的⾼可⽤性通过 RocketMQ本身的⾼可⽤性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使⽤同步的双重写⼊机制。
  6. 事务消息的⽣产者 ID 不能与其他类型消息的⽣产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的⽣产者 ID 查询到消费者。

事务消息原理

HALF消息:RMQ_SYS_TRANS_HALF_TOPIC(临时存放消息信息)
事务消息替换主题,保存原主题和队列信息
半消息对Consumer不可⻅,不会被投递
OP消息: RMQ_SYS_TRANS_OP_HALF_TOPIC(记录⼆阶段操作)
Rollback:只做记录
Commit:根据备份信息重新构造消息并投递
回查:
对⽐HALF消息和OP消息进⾏回查

TransactionListener接⼝

要使⽤RocketMQ的事务消息,要实现⼀个TransactionListener的接⼝,这个接⼝中有两个⽅法,如下:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public interface TransactionListener {/*** When send transactional prepare(half) message succeed, this method will* be invoked to execute local transaction.* 执⾏本地事务** @param msg Half(prepare) message* @param arg Custom business parameter* @return Transaction state*/LocalTransactionState executeLocalTransaction(final Message msg, finalObject arg);/*** When no response to prepare(half) message. broker will send check* message to check the transaction status, and this* method will be invoked to get local transaction status.* 消息回查后,需要检查对应消息的本地事务执⾏的最终结果** @param msg Check message* @return Transaction state*/LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

RocketMQ的事务消息是基于两阶段提交实现的,也就是说消息有两个状态,prepared和commited。当消息执⾏完send⽅法后,进⼊的prepared状态,进⼊prepared状态以后,就要执⾏executeLocalTransaction⽅法,这个⽅法的返回值有3个,也决定着这个消息的命运,1.LocalTransactionState.COMMIT_MESSAGE:提交消息,这个消息由prepared状态进⼊到commited状态,消费者可以消费这个消息;
2.LocalTransactionState.ROLLBACK_MESSAGE:回滚,这个消息将被删除,消费者不能消费这个消息;
3.LocalTransactionState.UNKNOW:未知,这个状态有点意思,如果返回这个状态,这个消息既不提交,也不回滚,还是保持prepared状态,⽽最终决定这个消息命运的,是checkLocalTransaction这个⽅法。
当executeLocalTransaction⽅法返回UNKNOW以后,RocketMQ会每隔⼀段时间调⽤⼀次checkLocalTransaction,这个⽅法的返回值决定着这个消息的最终归宿。那么checkLocalTransaction这个⽅法多⻓时间调⽤⼀次呢?我们在BrokerConfig类中可以找到,

/*** Transaction message check interval.*/@ImportantFieldprivate long transactionCheckInterval = 60 * 1000;

这个值是在brokder.conf中配置的,默认值是60*1000,也就是1分钟。那么会检查多少次呢?如果每次都返回UNKNOW,也不能⽆休⽌的检查吧,我们在BrokerConfig类中可以找到

/*** The maximum number of times the message was checked, if exceed this
value, this message will be discarded.*/@ImportantFieldprivate int transactionCheckMax = 15;

这个是检查的最⼤次数,超过这个次数,如果还返回UNKNOW,这个消息将被删除。

TransactionProducer.java

package com.example.rocketmq.demo.transaction;import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.TimeUnit;public class TransactionProducer {public static void main(String[] args) throws Exception {//1.创建消息生产者producer,并制定生产者组名TransactionMQProducer producer = new TransactionMQProducer("GroupTransaction");//2.指定nameserver地址producer.setNamesrvAddr("localhost:9876");//3.添加事务监听器producer.setTransactionListener(new TransactionListener() {/*** 在该方法执行本地事务* @param msg* @param arg* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {//db ,本地事务 + mq消息 = 事务消息System.out.println("executeLocal:"+msg.getTags());if(StringUtils.equals("TAGA",msg.getTags())){return LocalTransactionState.COMMIT_MESSAGE;}else if(StringUtils.equals("TAGB",msg.getTags())){//B消息本地事务返回rollbackreturn LocalTransactionState.ROLLBACK_MESSAGE;}else if(StringUtils.equals("TAGC",msg.getTags())){return LocalTransactionState.UNKNOW;}return LocalTransactionState.UNKNOW;}/*** 该方法用于MQ进行消息的回查* @param msg* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("checkLocalTransaction:"+msg.getTags());return LocalTransactionState.COMMIT_MESSAGE;}});//4.启动producerproducer.start();String[] tags = {"TAGA","TAGB","TAGC"};//发送三条消息for (int i = 0; i < 3; i++) {// 发送A、B、C三条Message msg = new Message("TransactionTopic", tags[i],("Hello RocketMQ: " + tags[i]).getBytes(RemotingHelper.DEFAULT_CHARSET));//6.发送消息SendResult sendResult = producer.sendMessageInTransaction(msg,null);//7.获取发送状态SendStatus sendStatus = sendResult.getSendStatus();System.out.printf("发送结果:%s%n", sendStatus);TimeUnit.SECONDS.sleep(1);}//8.关闭生产者producer
//        producer.shutdown();}
}

TransactionConsumer.java

package com.example.rocketmq.demo.transaction;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class TransactionConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {// 1.创建消费者consumer、制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupTransaction");// 2.指定nameserver地址consumer.setNamesrvAddr("localhost:9876");// 3.订阅主题Topic和Tagconsumer.subscribe("TransactionTopic", "*");// 4.设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {for(MessageExt msg:msgs){String key = msg.getKeys();System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));}
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.启动消费者consumer.start();System.out.printf("Consumer Started.%n");}
}

相关文章:

012 rocketmq事务消息

文章目录 事务消息概念介绍交互流程事务消息原理TransactionListener接⼝TransactionProducer.javaTransactionConsumer.java 事务消息 内置topic中的消息对消费者不可见 本地事务mq消息事务消息 消息队列 RocketMQ 版提供的分布式事务消息适⽤于所有对数据最终⼀致性有强需求…...

ChatGPT与DeepSeek:开源与闭源的AI模型之争

目录 一、模型架构与技术原理 二、性能能力与应用场景 三、用户体验与部署灵活性 四、成本与商业模式 五、未来展望与市场影响 六、总结 随着人工智能技术的飞速发展&#xff0c;ChatGPT和DeepSeek作为两大领先的AI语言模型&#xff0c;成为了行业内外关注的焦点。它们在…...

Ollama的底层实现原理分析

一、背景 Ollama我们可以很方便的对DeepSeek等开源大模型进行部署&#xff0c;几条命令便能部署一个本地大模型服务&#xff0c;降低了非专业大模型开发者的门槛。 我们从中可以看到类似Docker的影子&#xff0c;ollama run 、ollama list等等&#xff0c;拉取对应大模型镜像&a…...

nginx 动态计算拦截非法访问ip

需求&#xff1a;在Nginx上实现一个动态拦截IP的方法&#xff0c;具体是当某个IP在1分钟内访问超过60次时&#xff0c;将其加入Redis并拦截&#xff0c;拦截时间默认1天。 技术选型&#xff1a;使用NginxLuaRedis的方法。这种方案通过Lua脚本在Nginx处理请求时检查Redis中的黑…...

商业秘密维权有哪些成本开支?

企业商业秘密百问百答之六十三&#xff1a;商业秘密维权费用项目有哪些&#xff1f; 在商业秘密维权过程中&#xff0c;原告可能需要支付多种费用&#xff0c;一般费用项目包括&#xff1a; 1、诉讼费。诉讼费是向法院支付的费用&#xff0c;包括起诉费、案件受理费等。这些费…...

使用UA-SPEECH和TORGO数据库验证自动构音障碍语音分类方法

使用UA-SPEECH和TORGO数据库验证自动构音障碍语音分类方法 引言 原文:On using the UA-Speech and TORGO databases to validate automatic dysarthric speech classification approaches 构音障碍简介 构音障碍是一种由于脑损伤或神经疾病(如脑瘫、肌萎缩侧索硬化症、帕金森…...

WebSocketHandler 是 Spring Framework 中用于处理 WebSocket 通信的接口

WebSocketHandler 是 Spring Framework 中用于处理 WebSocket 通信的接口&#xff0c;其主要作用是定义了如何处理 WebSocket 的各种事件和消息。以下是 WebSocketHandler 的主要作用和功能&#xff1a; ### 1. 处理 WebSocket 生命周期事件 WebSocketHandler 定义了多个方法来…...

Pikachu

一、网站搭建 同样的&#xff0c;先下载安装好phpstudy 然后启动Apache和Mysql 然后下载pikachu&#xff0c;解压到phpstudy文件夹下的www文件 然后用vscode打开pikachu中www文件夹下inc中的config.inc.php 将账户和密码改为和phpstudy中的一致&#xff08;默认都是root&…...

如何使用 Jenkins 实现 CI/CD 流水线:从零开始搭建自动化部署流程

如何使用 Jenkins 实现 CI/CD 流水线:从零开始搭建自动化部署流程 在软件开发过程中,持续集成(CI)和持续交付(CD)已经成为现代开发和运维的标准实践。随着代码的迭代越来越频繁,传统的手动部署方式不仅低效,而且容易出错。为了提高开发效率和代码质量,Jenkins作为一款…...

Vue.js 学习笔记

文章目录 前言一、Vue.js 基础概念1.1 Vue.js 简介1.2 Vue.js 的特点1.3 Vue.js 基础示例 二、Vue.js 常用指令2.1 双向数据绑定&#xff08;v-model&#xff09;2.2 条件渲染&#xff08;v-if 和 v-show&#xff09;2.3 列表渲染&#xff08;v-for&#xff09;2.4 事件处理&am…...

数据存储:一文掌握RabbitMQ的详细使用

文章目录 一、RabbitMQ简介二、RabbitMQ的概述2.1 基本概念2.2 实际应用场景三、RabbitMQ的安装与配置3.1 安装RabbitMQ3.2 启用管理插件四、使用Python操作RabbitMQ4.1 安装Pika库4.2 生产者示例4.3 消费者示例4.4 发布/订阅模式示例五、RabbitMQ的高级特性5.1 消息持久化5.2 …...

辛格迪客户案例 | 祐儿医药科技GMP培训管理(TMS)项目

01 项目背景&#xff1a;顺应行业趋势&#xff0c;弥补管理短板 随着医药科技行业的快速发展&#xff0c;相关法规和标准不断更新&#xff0c;对企业的质量管理和人员培训提出了更高要求。祐儿医药科技有限公司&#xff08;以下简称“祐儿医药”&#xff09;作为一家专注于创新…...

FreeRtos实时系统: 十六.tickless低功耗模式

FreeRtos实时系统: 十六.tickless低功耗模式 一.tickless低功耗模式简介二.tickless模式详解三.tickless模式相关配置项四.tickless低功耗模式实验五.课堂总结 一.tickless低功耗模式简介 STM32低功耗模式&#xff1a; 二.tickless模式详解 为了可以降低功耗&#xff0c;又不…...

CSDN博客:Markdown编辑语法教程总结教程(上)

❤个人主页&#xff1a;折枝寄北的博客 Markdown编辑语法教程总结 前言1. CSDN Markdown编辑器功能简介1.1 基础操作界面1.2 创作助手和语法说明 2. Markdown编辑器语法2.1 目录2.2 标题2.2.1 标题级别设置2.2.2 标题居中 3. 文本样式3.1 强调文本&#xff08;斜体&#xff09…...

多个pdf合并成一个pdf的方法

将多个PDF文件合并优点&#xff1a; 能更容易地对其进行归档和备份.打印时可以选择双面打印&#xff0c;减少纸张的浪费。比如把住宿发票以及滴滴发票、行程单等生成一个pdf&#xff0c;双面打印或者无纸化办公情况下直接发送给财务进行存档。 方法: 利用PDF24 Tools网站 …...

Spark基础篇 RDD、DataFrame与DataSet的关系、适用场景与演进趋势

一、核心概念与演进背景 1.1 RDD(弹性分布式数据集) 定义:RDD 是 Spark 最早的核心抽象(1.0版本引入),代表不可变、分区的分布式对象集合,支持函数式编程和容错机制。特点: 无结构化信息:仅存储对象本身,无法自动感知数据内部结构(如字段名、类型)。编译时类型安全…...

odoo初始化数据库

在 Odoo 中&#xff0c;初始化数据库的命令会因使用的环境和启动方式而有所不同&#xff0c;下面为你详细介绍几种常见的初始化数据库的方式。 1. 使用命令行工具初始化 在命令行中&#xff0c;你可以使用 Odoo 的启动脚本并结合相关参数来初始化数据库。以下是基本的命令格式…...

大模型WebUI:Gradio全解12——LangChain原理、架构和组件(2)

大模型WebUI:Gradio全解12——LangChain原理、架构和组件(2) 前言12. LangChain原理及agents构建Gradio UI12.2 学习资料12.2.1 学习文档12.2.2 用途示例12.2.3 OpenAI和DeepSeek例程1. OpenAI示例2. DeepSeek例程参考文献前言 本系列文章主要介绍WEB界面工具Gradio。Gradi…...

1. 搭建前端+后端开发框架

1. 说明 本篇博客主要介绍网页开发中&#xff0c;搭建前端和后端开发框架的具体步骤&#xff0c;框架中所使用的技术栈如下&#xff1a; 前端&#xff1a;VUE Javascript 后端&#xff1a;Python Flask Mysql 其中MySQL主要用来存储需要的数据&#xff0c;在本文中搭建基本…...

初会学习记录

目录 务实&#xff1a; 第一章 (1)会计概念&#xff0c;职能和目标&#xff1a; (2)会计假设&#xff1a; (3)会计核算基础&#xff1a; (4)会计信息质量要求&#xff1a; (5)会计人员职业道德规范 (6)会计准则制度体系概述&#xff1a; (7)会计要素与会计等式&#x…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

1688商品列表API与其他数据源的对接思路

将1688商品列表API与其他数据源对接时&#xff0c;需结合业务场景设计数据流转链路&#xff0c;重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点&#xff1a; 一、核心对接场景与目标 商品数据同步 场景&#xff1a;将1688商品信息…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

论文阅读笔记——Muffin: Testing Deep Learning Libraries via Neural Architecture Fuzzing

Muffin 论文 现有方法 CRADLE 和 LEMON&#xff0c;依赖模型推理阶段输出进行差分测试&#xff0c;但在训练阶段是不可行的&#xff0c;因为训练阶段直到最后才有固定输出&#xff0c;中间过程是不断变化的。API 库覆盖低&#xff0c;因为各个 API 都是在各种具体场景下使用。…...

Vue ③-生命周期 || 脚手架

生命周期 思考&#xff1a;什么时候可以发送初始化渲染请求&#xff1f;&#xff08;越早越好&#xff09; 什么时候可以开始操作dom&#xff1f;&#xff08;至少dom得渲染出来&#xff09; Vue生命周期&#xff1a; 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...

HTML前端开发:JavaScript 获取元素方法详解

作为前端开发者&#xff0c;高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法&#xff0c;分为两大系列&#xff1a; 一、getElementBy... 系列 传统方法&#xff0c;直接通过 DOM 接口访问&#xff0c;返回动态集合&#xff08;元素变化会实时更新&#xff09;。…...