当前位置: 首页 > news >正文

RocketMQ教程-(5)-功能特性-事务消息

事务消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。

事务消息为 Apache RocketMQ 中的高级特性消息,本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。

  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。

  • 积分系统状态变更:变更用户积分,更新用户积分表。

  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

传统XA事务方案:性能不足

为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。

基于普通消息方案:一致性保障困难

将上述基于XA事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。

普通消息方案

该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如:

  • 消息发送成功,订单没有执行成功,需要回滚整个事务。

  • 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。

  • 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。

                                         基于分布式事务消息:支持最终一致性

上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。

而基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

Apache RocketMQ事务消息的方案,具备高性能、可扩展、业务开发简单的优势。具体事务消息的原理和流程,请参见下文的功能原理。

功能原理​

什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

事务消息

  1. 生产者将消息发送至Apache RocketMQ服务端。

  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

事务消息

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

使用限制​

消息类型一致性

事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

消费事务性

Apache RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

中间状态可见性

Apache RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

事务超时机制

Apache RocketMQ 事务消息的命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。

使用示例​

创建主题

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Transaction

发送消息

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

创建事务主题

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.typeTRANSACTION的属性用来支持事务消息

 以Java语言为例,使用事务消息示例参考如下:

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilder();//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 错误的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。return;}Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。.addProperty("OrderId", "xxx")//消息体。.setBody("messageBody".getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。return;}/*** 执行本地事务,并确定本地事务结果。* 1. 如果本地事务提交成功,则提交消息事务。* 2. 如果本地事务提交失败,则回滚消息事务。* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。**/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}

使用建议​

避免大量未决事务导致超时

Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

正确处理"进行中"的事务

消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。

  • 程序能正确识别正在进行中的事务。

相关文章:

RocketMQ教程-(5)-功能特性-事务消息

事务消息为 Apache RocketMQ 中的高级特性消息&#xff0c;本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。 事务消息为 Apache RocketMQ 中的高级特性消息&#xff0c;本文为您介绍事务消息的应用场景、功能原理、使用限制、使用方法和使用建议。…...

HANA学习笔记

1、安装 准备安装介质&#xff0c;我这儿用的是HANA2.00.059.00&#xff0c;注意会用到三个lib包和saptune&#xff0c;提前准备好。 执行./hdblcm开启数据库安装&#xff0c;过程中会涉及到需要用户设置一些参数&#xff0c;按照自己需求设置即可。 安装完成会生成一个安装日…...

VMware虚拟机无法上网的解决办法

&#xff08;1&#xff09;1、在虚拟机右下角的网络适配器上面观察该图标是否是有绿色的灯在闪烁&#xff0c;如果网络适配器是灰色的证明虚拟机的网络没有打开&#xff0c;而是被禁用了&#xff0c;在适配器上点击鼠标右键&#xff0c;打开【设置】&#xff0c;在【已连接】、…...

PLC-Recorder的高速采集有多快?0.5ms算快吗?看控制器能力了!

大家知道&#xff0c;PLC-Recorder有一个高速采集的功能&#xff0c;基于TCP连接或UDP报文&#xff0c;速度取决于发送端的能力。对于西门子PLC&#xff0c;能做到1-2ms的采集速度&#xff0c;但是&#xff0c;我在前面的文章里提到了0.5ms的高速采集&#xff0c;哪个控制器能这…...

KMP算法总结

KMP算法总结 BF算法引导BF算法步骤&#xff08;图片演示&#xff09;代码演示 KMP算法推next数组代码演示 BF算法引导 BF算法是一个暴力的字符串匹配算法&#xff0c;时间复杂度是o&#xff08;m*n&#xff09; 假设主串和子串分别为 我们想要找到子串在主串的位置 BF算法核…...

消息中间件ActiveMQ介绍

一、消息中间件的介绍 介绍 ​ 消息队列 是指利用 高效可靠 的 消息传递机制 进行与平台无关的 数据交流&#xff0c;并基于 数据通信 来进行分布式系统的集成。 特点(作用) 应用解耦 异步通信 流量削峰 (海量)日志处理 消息通讯 …... 应用场景 根据消息队列的特点&a…...

【100天精通python】Day9:数据结构_字典、集合

目录 目录 1 字典 1.1 字典的基本操作示例 1.2 字典推导式 2 集合 2.1 集合的常用操作示例 3 列表、元组、字典、集合的区别 1 字典 在Python中&#xff0c;字典&#xff08;Dictionary&#xff09;是一种无序的数据结构&#xff0c;用于存储键值对的集合。每个…...

上海VR全景展示,快速了解VR全景拍摄

导语&#xff1a; 随着科技的不断进步&#xff0c;虚拟现实技术的应用日益广泛。在这其中&#xff0c;VR全景图片作为一种数字化助力的全景拍摄方式&#xff0c;正逐渐成为人们关注的焦点。通过数字化技术&#xff0c;VR全景图片能够以360度全方位的视角呈现真实的场景&#x…...

VScode远程不用再输入密码操作

安装插件remote development 1.先检查自己电脑上有没有生成一对公钥和私钥。&#xff08;一般会在这个目录&#xff09; 如果没有的话就自己生成一下。 打开命令行输入以下命令 ssh-keygen -t rsa2.在虚拟机中先看一下有没有公钥和私钥。如果没有的话就自己生成一下。 打开…...

MyBatis基本用法-@TableId

TableId 注解是 MyBatis Plus 框架中用于标识实体类中的主键字段的注解&#xff0c;它有一些可选的配置项。下面是详细说明&#xff1a; 首先&#xff0c;需要在项目中添加 MyBatis Plus 的依赖。可以在项目的 pom.xml 文件中添加以下代码&#xff1a; <dependency><…...

React AntDesign写一个导出数据的提示语 上面有跳转的路径,或者点击知道了,关闭该弹层

效果如下&#xff1a; 代码如下&#xff1a; ForwardDataCenterModal(_blank);export const ForwardDataCenterModal (target?: string) > {let contentBefore React.createElement(span, null, 数据正在处理中&#xff0c;请稍后前往);let contentAfter React.creat…...

小红书课程发光社群知识库,点亮哥专为超级个体设计解决方案

小红书课程点亮哥知识库 开创了学习小红书教育培训先河 针对超级个体轻创业的学习需求场景 创新推出了“知识库全新学习方式”。 一个人如何做好小红书? 超级个体轻创业,如何做好小红书? 通过打造个人IP、或者塑造老板个人品牌,来实现互联网变现,如何做好小红书? 就像挑…...

基于SpringBoot+Vue的摄影跟拍预定管理系统设计与实现(源码+lw+部署文档等)

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…...

HCIA 第二课总结

配置网络设备的明文密钥实验组网 实验拓扑 将一个路由器使用配置口进行连接 sys #进入系统视图模式 sysname RTA #给设备命名 user-interface console 0 #进入用户接口配置界面 authentication-mode password #配置认证模式为密钥认证 set authentication password ciphe…...

linux-------联网下载文件和配置

1.Wget Wget是一个十分常用命令行下载工具&#xff0c;多数Linux发行版本都默认包含这个工具。如果没有安装可在http://www.gnu.org/software/wget/wget.html下载最新版本&#xff0c;并使用如下命令编译安装&#xff1a; 1.#tar zxvf wget-1.9.1.tar.gz #cd wget-1.9.1 #./c…...

字典树Trie

Trie树又称字典树&#xff0c;前缀树。是一种可以高效查询前缀字符串的树&#xff0c;典型应用是用于统计&#xff0c;排序和保存大量的字符串&#xff08;但不仅限于字符串&#xff09;&#xff0c;所以经常被搜索引擎系统用于文本词频统计。 它的优点是&#xff1a;利用字符串…...

算法之桶排序算法

桶排序的基本思想是&#xff1a; 把数组 arr 划分为 n 个大小相同子区间&#xff08;桶&#xff09;&#xff0c;每个子区间各自排序&#xff0c;最 后合并 。计数排序是桶排序的一种特殊情况&#xff0c;可以把计数排序当成每个桶里只有一个元素的情况。 1.找出待排序数组中的…...

读kafka生产端源码,窥kafka设计之道(下)

背景 在上一篇文章《读kafka生产端源码&#xff0c;窥kafka设计之道&#xff08;上&#xff09;》 留下了kafka设计上比较优秀的一个点&#xff1b;内存的循环使用。本篇文章准备盘盘它。 好奇 为什么 kafka减少发送消息时向JVM频繁申请内存&#xff0c;就可以降低JVM GC的执…...

Pytorch个人学习记录总结 06

目录 神经网络-卷积层 torch.nn.Conv2d 神经网络-最大池化的使用 torch.nn.MaxPool2d 神经网络-卷积层 torch.nn.Conv2d torch.nn.Conv2d的官方文档地址 CLASS torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride1, padding0, dilation1, groups1, biasTrue,…...

Rust之泛型、特性和生命期(四):验证有生存期的引用

开发环境 Windows 10Rust 1.71.0 VS Code 1.80.1 项目工程 这里继续沿用上次工程rust-demo 验证具有生存期的引用 生存期是我们已经在使用的另一种泛型。生存期不是确保一个类型具有我们想要的行为&#xff0c;而是确保引用在我们需要时有效。 我们在第4章“引用和借用”一…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)

0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述&#xff0c;后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作&#xff0c;其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

【机器视觉】单目测距——运动结构恢复

ps&#xff1a;图是随便找的&#xff0c;为了凑个封面 前言 在前面对光流法进行进一步改进&#xff0c;希望将2D光流推广至3D场景流时&#xff0c;发现2D转3D过程中存在尺度歧义问题&#xff0c;需要补全摄像头拍摄图像中缺失的深度信息&#xff0c;否则解空间不收敛&#xf…...

华为OD机试-食堂供餐-二分法

import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)

要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况&#xff0c;可以通过以下几种方式模拟或触发&#xff1a; 1. 增加CPU负载 运行大量计算密集型任务&#xff0c;例如&#xff1a; 使用多线程循环执行复杂计算&#xff08;如数学运算、加密解密等&#xff09;。运行图…...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器&#xff1a;Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.

ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #&#xff1a…...

Qt 事件处理中 return 的深入解析

Qt 事件处理中 return 的深入解析 在 Qt 事件处理中&#xff0c;return 语句的使用是另一个关键概念&#xff0c;它与 event->accept()/event->ignore() 密切相关但作用不同。让我们详细分析一下它们之间的关系和工作原理。 核心区别&#xff1a;不同层级的事件处理 方…...