当前位置: 首页 > news >正文

RocketMQ 事务消息发送

目录

事务消息介绍

应用场景

功能原理

使用限制

使用示例

使用建议​


事务消息介绍

在一些对数据一致性有强需求的场景,可以用 RocketMQ 事务消息来解决,从而保证上下游数据的一致性。

应用场景

分布式事务的诉求

分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。

以电商交易场景为例,用户支付订单这一核心操作的同时会涉及到下游物流发货、积分变更、购物车状态清空等多个子系统的变更。当前业务的处理分支包括:

  • 主分支订单系统状态更新:由未支付变更为支付成功。

  • 物流系统状态新增:新增待发货物流记录,创建订单物流记录。

  • 积分系统状态变更:变更用户积分,更新用户积分表。

  • 购物车系统状态变更:清空购物车,更新用户购物车记录。

传统XA事务方案:性能不足

为了保证上述四个分支的执行结果一致性,典型方案是基于XA协议的分布式事务系统来实现。将四个调用分支封装成包含四个独立事务分支的大事务。基于XA分布式事务的方案可以满足业务处理结果的正确性,但最大的缺点是多分支环境下资源锁定范围大,并发度低,随着下游分支的增加,系统性能会越来越差。

基于普通消息方案:一致性保障困难

将上述基于XA事务的方案进行简化,将订单系统变更作为本地事务,剩下的系统变更作为普通消息的下游来执行,事务分支简化成普通消息+订单表事务,充分利用消息异步化的能力缩短链路,提高并发度。

该方案中消息下游分支和订单系统变更的主分支很容易出现不一致的现象,例如:

  • 消息发送成功,订单没有执行成功,需要回滚整个事务。

  • 订单执行成功,消息没有发送成功,需要额外补偿才能发现不一致。

  • 消息发送超时未知,此时无法判断需要回滚订单还是提交订单变更。

基于RocketMQ分布式事务消息:支持最终一致性

上述普通消息方案中,普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。

而基于Apache RocketMQ实现的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

Apache RocketMQ事务消息的方案,具备高性能、可扩展、业务开发简单的优势。具体事务消息的原理和流程,请参见下文的功能原理。

功能原理

什么是事务消息

事务消息是 RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

  1. 生产者将消息发送至RocketMQ服务端。

  2. RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。

  3. 生产者开始执行本地事务逻辑。

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。

    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数,请参见参数限制。

  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

 

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。具体信息,请参见消费重试。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。更多信息,请参见消息存储和清理机制。

使用限制

消息类型一致性

事务消息仅支持在 MessageType 为 Transaction 的主题内使用,即事务消息只能发送至类型为事务消息的主题中,发送的消息的类型必须和主题的类型一致。

消费事务性

RocketMQ 事务消息保证本地主分支事务和下游消息发送事务的一致性,但不保证消息消费结果和上游事务的一致性。因此需要下游业务分支自行保证消息正确处理,建议消费端做好消费重试,如果有短暂失败可以利用重试机制保证最终处理成功。

中间状态可见性

RocketMQ 事务消息为最终一致性,即在消息提交到下游消费端处理完成之前,下游分支和上游事务之间的状态会不一致。因此,事务消息仅适合接受异步执行的事务场景。

事务超时机制

RocketMQ 事务消息的命周期存在超时机制,即半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。事务超时时间,请参见参数限制。

使用示例

创建主题

RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION

发送消息

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

创建事务主题

NORMAL类型Topic不支持TRANSACTION类型消息,生产消息会报错。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION

  • -c 集群名称
  • -t Topic名称
  • -n nameserver地址
  • -a 额外属性,本例给主题添加了message.typeTRANSACTION的属性用来支持事务消息

以Java语言为例,使用事务消息示例参考如下:

    //演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。private static boolean checkOrderById(String orderId) {return true;}//演示demo,模拟本地事务的执行结果。private static boolean doLocalTransaction() {return true;}public static void main(String[] args) throws ClientException {ClientServiceProvider provider = new ClientServiceProvider();MessageBuilder messageBuilder = new MessageBuilderImpl();//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。Producer producer = provider.newProducerBuilder().setTransactionChecker(messageView -> {/*** 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。*/final String orderId = messageView.getProperties().get("OrderId");if (Strings.isNullOrEmpty(orderId)) {// 错误的消息,直接返回Rollback。return TransactionResolution.ROLLBACK;}return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;}).build();//开启事务分支。final Transaction transaction;try {transaction = producer.beginTransaction();} catch (ClientException e) {e.printStackTrace();//事务分支开启失败,直接退出。return;}Message message = messageBuilder.setTopic("topic")//设置消息索引键,可根据关键字精确查找某条消息。.setKeys("messageKey")//设置消息Tag,用于消费端根据指定Tag过滤消息。.setTag("messageTag")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。.addProperty("OrderId", "xxx")//消息体。.setBody("messageBody".getBytes()).build();//发送半事务消息final SendReceipt sendReceipt;try {sendReceipt = producer.send(message, transaction);} catch (ClientException e) {//半事务消息发送失败,事务可以直接退出并回滚。return;}/*** 执行本地事务,并确定本地事务结果。* 1. 如果本地事务提交成功,则提交消息事务。* 2. 如果本地事务提交失败,则回滚消息事务。* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。**/boolean localTransactionOk = doLocalTransaction();if (localTransactionOk) {try {transaction.commit();} catch (ClientException e) {// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}} else {try {transaction.rollback();} catch (ClientException e) {// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。e.printStackTrace();}}}

使用建议​

避免大量未决事务导致超时

Apache RocketMQ支持在事务提交阶段异常的情况下发起事务回查,保证事务一致性。但生产者应该尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能受损,容易导致事务处理延迟。

正确处理"进行中"的事务

消息回查时,对于正在进行中的事务不要返回Rollback或Commit结果,应继续保持Unknown的状态。 一般出现消息回查时事务正在处理的原因为:事务执行较慢,消息回查太快。解决方案如下:

  • 将第一次事务回查时间设置较大一些,但可能导致依赖回查的事务提交延迟较大。

  • 程序能正确识别正在进行中的事务。

相关文章:

RocketMQ 事务消息发送

目录 事务消息介绍 应用场景 功能原理 使用限制 使用示例 使用建议​ 事务消息介绍 在一些对数据一致性有强需求的场景&#xff0c;可以用 RocketMQ 事务消息来解决&#xff0c;从而保证上下游数据的一致性。 应用场景 分布式事务的诉求 分布式系统调用的特点为一个核…...

后端-POST请求中只需要两个参数,后端不想创建对象时

问题&#xff1a; 在前后端对接中&#xff0c;很多前端的规范是POST&#xff0c;参数放body里面&#xff0c;媒体类型是json&#xff0c;后端就需要用RequestBody去接收&#xff0c;但是后端只用接收两个对象&#xff0c;这时候后端不想创建对象&#xff0c;使用RequestParm&a…...

UG\NX二次开发 通过点云生成曲面 UF_MODL_create_surf_from_cloud

文章作者:里海 来源网站:《里海NX二次开发3000例专栏》 感谢粉丝订阅 感谢 Rlgun 订阅本专栏,非常感谢。 简介 有网友想做一个通过点云生成曲面的程序,我们也试一下 效果 代码 #include "me.hpp" /*HEAD CREATE_SURF_FROM_CLOUD CCC UFUN */...

Linux常用指令(二)

目录 一、 删除空目录&#xff08;rmdir&#xff09; 二、ln 硬链接与软链接 三、新建空文件或更新文件的时间戳&#xff08;touch&#xff09; 四、比较文件内容的差异&#xff08;diff&#xff09; 五、显示当前时间或设置系统时间&#xff08;date&#xff09; 六、显…...

【HUAWEI】单臂路由

目录 ​ &#x1f96e;写在前面 &#x1f96e;2.1、拓扑图 &#x1f96e;2.2、操作思路 &#x1f96e;2.3、配置操作 &#x1f363;2.3.1、LSW4配置 &#x1f363;2.3.2、R2配置 &#x1f363;2.3.3、测试网络 &#x1f990;博客主页&#xff1a;大虾好吃吗的博客 &…...

安全学习_开发相关_Java第三方组件Log4jFastJSON及相关安全问题简介

文章目录 JNDI&#xff1a;(见图) Java-三方组件-Log4J&JNDILog4J&#xff1a;Log4j-组件安全复现使用Log4j Java-三方组件-FastJsonFastJson&#xff1a;Fastjson-组件安全复现对象转Json(带类型)Json转对象Fastjson漏洞复现&#xff08;大佬文章 JNDI&#xff1a;(见图) …...

零代码编程:用ChatGPT批量自动下载archive.org上的音频书

http://archive.org 是一个神奇的网站&#xff0c;可以下载各种古旧的软件、书籍、音频、视频&#xff0c;还可以搜索各个网站的历史网页。 比如说&#xff0c;一些儿童故事音频就可以在http://archive.org下载到&#xff0c;可以用来做英语听力启蒙用。 举个例子&#xff0c…...

力扣用队列实现栈

自己写的栈&#xff0c;再让其他函数去调用自己写的栈 typedef int QDataType; typedef struct QueueNode {struct QueueNode* next;//单链表QDataType data;//放数据 }QNode;typedef struct Queue {QNode* phead;//头节点QNode* ptail;//尾节点QDataType size; //统计有多少节…...

一朵华为云,如何做好百模千态?

点击关注 文丨刘雨琦、郝鑫 2005年华为提出网络时代的“All IP”&#xff0c;2011年提出数字化时代的“All Cloud”&#xff0c;2023年提出智能时代的“All Intelligence”。 截至目前&#xff0c;华为的战略升级经历了三个阶段。 步入智能化&#xff0c;需要迎接的困难依然…...

华为云云耀云服务器L实例评测 | 实例使用教学之软件安装:华为云云耀云服务器环境下安装 Docker

华为云云耀云服务器L实例评测 &#xff5c; 实例使用教学之软件安装&#xff1a;华为云云耀云服务器环境下安装 Docker 介绍华为云云耀云服务器 华为云云耀云服务器 &#xff08;目前已经全新升级为 华为云云耀云服务器L实例&#xff09; 华为云云耀云服务器是什么华为云云耀云…...

小程序编译器性能优化之路

作者 | 马可 导读 小程序编译器是百度开发者工具中的编译构建模块&#xff0c;用来将小程序代码转换成运行时代码。旧版编译器由于业务发展&#xff0c;存在编译慢、内存占用高的问题&#xff0c;我们对编译器做了一次大规模的重构&#xff0c;采用自研架构&#xff0c;做了多线…...

FFmpeg 命令:从入门到精通 | ffmpeg 命令分类查询

FFmpeg 命令&#xff1a;从入门到精通 | ffmpeg 命令分类查询 FFmpeg 命令&#xff1a;从入门到精通 | ffmpeg 命令分类查询ffmpeg -versionffmpeg -buildconfffmpeg -formatsffmpeg -muxersffmpeg -demuxersffmpeg -codecsffmpeg -decodersffmpeg -encodersffmpeg -bsfsffmpeg…...

Linux学习记录——삼십일 socket编程---TCP套接字

文章目录 TCP套接字简单通信1、服务端1、基本框架2、获取连接 2、客户端3、多进程4、多线程5、线程池6、简单的日志系统7、守护进程8、其它 TCP套接字简单通信 本篇gitee 学习完udp套接字通信后&#xff0c;再来看TCP套接字。 四个文件tcp_server.hpp&#xff0c; tcp_serve…...

【学习笔记】深度学习分布式系统

深度学习分布式系统 前言1. 数据并行&#xff1a;参数服务器2. 流水线并行&#xff1a;GPipe3. 张量并行&#xff1a;Megatron LM4. 切片并行&#xff1a;ZeRO5. 异步分布式&#xff1a;PATHWAYS总结参考链接 前言 最近跟着李沐老师的视频学习了深度学习分布式系统的发展。这里…...

【数据结构】树、二叉树的概念和二叉树的顺序结构及实现

目录 前言&#xff1a;一、树的概念及结构1.树的概念2.树的相关概念3.树的存储4.树在实际中的运用 二、二叉树概念及结构1.概念2.特殊的二叉树&#xff08;1&#xff09;满二叉树&#xff08;2&#xff09;完全二叉树 3.二叉树的性质4.二叉树的存储(1)顺序存储(2)链式存储 三、…...

rust学习-string

介绍 A UTF-8–encoded, growable string(可增长字符串). 拥有string内容的所有权 A String is made up of three components: a pointer to some bytes, a length, and a capacity. The length is the number of bytes currently stored in the buffer pub fn as_bytes(&…...

No167.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…...

【python】pycharm导入anaconda环境

参考 Pycharm导入anaconda环境的教程图解 - 知乎 (zhihu.com)...

【数据结构】逻辑结构与物理结构

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 &#x1f333;逻辑结构 1.集合结构 2.线性结构 3.树形结构 4.图形结构或网状结构 &#x1f333;物理结构 1.顺序存储结构 2.链式存储结构 结语 根据视点的不同,我…...

HTML5高级部分

目录 一、拖拽API1.1 拖拽元素1.2 监听事件1.3 dataTransfer传递数据 二、媒体API2.1 常用监听事件2.2 常用API 三、画布API3.1 canvas 标签3.2 创建canvas对象3.3 常用API 四、地理API4.1 方法 一、拖拽API 1.1 拖拽元素 页面中设置了draggable"true"的元素可以进…...

CMake基础:构建流程详解

目录 1.CMake构建过程的基本流程 2.CMake构建的具体步骤 2.1.创建构建目录 2.2.使用 CMake 生成构建文件 2.3.编译和构建 2.4.清理构建文件 2.5.重新配置和构建 3.跨平台构建示例 4.工具链与交叉编译 5.CMake构建后的项目结构解析 5.1.CMake构建后的目录结构 5.2.构…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

基于当前项目通过npm包形式暴露公共组件

1.package.sjon文件配置 其中xh-flowable就是暴露出去的npm包名 2.创建tpyes文件夹&#xff0c;并新增内容 3.创建package文件夹...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互

引擎版本&#xff1a; 3.8.1 语言&#xff1a; JavaScript/TypeScript、C、Java 环境&#xff1a;Window 参考&#xff1a;Java原生反射机制 您好&#xff0c;我是鹤九日&#xff01; 回顾 在上篇文章中&#xff1a;CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...

Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理

引言 Bitmap&#xff08;位图&#xff09;是Android应用内存占用的“头号杀手”。一张1080P&#xff08;1920x1080&#xff09;的图片以ARGB_8888格式加载时&#xff0c;内存占用高达8MB&#xff08;192010804字节&#xff09;。据统计&#xff0c;超过60%的应用OOM崩溃与Bitm…...

Spring AI与Spring Modulith核心技术解析

Spring AI核心架构解析 Spring AI&#xff08;https://spring.io/projects/spring-ai&#xff09;作为Spring生态中的AI集成框架&#xff0c;其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似&#xff0c;但特别为多语…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)

推荐 github 项目:GeminiImageApp(图片生成方向&#xff0c;可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...

MySQL 知识小结(一)

一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库&#xff0c;分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷&#xff0c;但是文件存放起来数据比较冗余&#xff0c;用二进制能够更好管理咱们M…...

什么是VR全景技术

VR全景技术&#xff0c;全称为虚拟现实全景技术&#xff0c;是通过计算机图像模拟生成三维空间中的虚拟世界&#xff0c;使用户能够在该虚拟世界中进行全方位、无死角的观察和交互的技术。VR全景技术模拟人在真实空间中的视觉体验&#xff0c;结合图文、3D、音视频等多媒体元素…...