RocketMQ事务消息原理
一、RocketMQ事务消息原理:
RocketMQ 在 4.3 版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部,而不是业务数据库,事务消息解决的是生产端的消息发送与本地事务执行的原子性问题,这里的界限一定要清楚,是确保 MQ 生产端正确无误地将消息发送出来,没有多发,也不会漏发,至于发送后消费端有没有正常的消费消息,这种异常场景将由 MQ 消息消费失败重试机制来保证。
RocketMQ 设计中的 broker 与 producer 端的双向通信能力,使得 broker 天生可以作为一个事务协调者;而 RocketMQ 本身提供的存储机制则为事务消息提供了持久化能力;RocketMQ 的高可用机制以及可靠消息设计则为事务消息在系统发生异常时依然能够保证达成事务的最终一致性。
1、RocketMQ 实现事务一致性的原理:
备注:本地事务的回滚依赖于本地DB的ACID特性,订阅方的成功消费由 MQ Server 的失败重试机制进行保证。
(1)正常情况:在事务主动方服务正常,没有发生故障的情况下,发消息流程如下:
步骤①:MQ 发送方向 MQ Server 发送 half 消息,MQ Server 标记消息状态为 prepared,此时该消息 MQ 订阅方是无法消费到的
步骤②:MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经成功接收
步骤③:发送方开始执行本地事务逻辑
步骤④:发送方根据本地事务执行结果向 MQ Server 提交二次确认,commit 或 rollback
最终步骤:MQ Server 如果收到的是 commit 操作,则将半消息标记为可投递,MQ订阅方最终将收到该消息;若收到的是 rollback 操作则删除 half 半消息,订阅方将不会接受该消息;如果本地事务执行结果没响应或者超时,则 MQ Server 回查事务状态,具体见步骤(2)的异常情况说明。
(2)异常情况:在断网或者应用重启等异常情况下,图中的步骤④提交的二次确认超时未到达 MQ Server,此时的处理逻辑如下:
步骤⑤:MQ Server 对该消息进行消息回查
步骤⑥:发送方收到消息回查后,检查该消息的本地事务执行结果
步骤⑦:发送方根据检查得到的本地事务的最终状态再次提交二次确认。
最终步骤:MQ Server基于 commit/rollback 对消息进行投递或者删除
2、RocketMQ事务消息的实现流程:
以 RocketMQ 4.5.2 版本为例,事务消息有专门的一个队列 RMQ_SYS_TRANS_HALF_TOPIC,所有的 prepare 消息都先往这里放,当消息收到 Commit 请求后,就将消息转移到真实的 Topic 队列里,供 Consumer 消费,同时向 RMQ_SYS_TRANS_OP_HALF_TOPIC 塞一条消息。简易流程图如下:
当应用模块的事务因为中断或者其他的网络原因导致无法立即响应的,RocketMQ 会当做 UNKNOW 处理,对此 RocketMQ 事务消息提供了一个补救方案:定时回查事务消息的事务执行状态,简易流程图如下:
二、Springboot 整合 RocketMQ 实现事务消息:
该部分将从 "下订单 + 扣减库存"的案例来介绍 SpringBoot 如何整合 RocketMQ 并使用事务消息保证最终一致性。核心思路是订单服务(生产端)向 RocketMQ 发送库存扣减消息,再执行本地订单生成逻辑,最后交由 RocketMQ 通知 库存服务扣减库存并保证库存扣减消息被正常消费。
案例中使用到的服务分为两个,订单服务和库存服务;涉及到的数据库表主要有三个,订单表、存储表,本地事务状态表。由于这几个表都比较简单,这里就不将对应的建表语句粘贴出来了,同样对应的 Pojo对象、Dao层、Service层 代码也不粘贴出来了,下面只展示核心逻辑的代码。
1、启动 RocketMQ 服务端:
RocketMQ的安装与部署请参考这篇文章:https://blog.csdn.net/a745233700/article/details/122531859
2、在父pom文件中引入依赖:
<!-- rocketmq 事务消息 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
3、生产端代码:
生产端的核心逻辑就是向 RocketMQ 投递事务消息,并执行本地事务,最后将本地事务的执行结果通知到 RocketMQ
(1)RocketMQ相关配置:
在 application.properties 配置文件中添加以下配置:
rocketmq.name-server=172.28.190.101:9876
rocketmq.producer.group=order_shop
(2)创建一个监听类:
实现 TransactionListener 接口,在实现的数据库事务提交方法executeLocalTransaction() 和回查事务状态方法checkLocalTransaction() 中模拟结果
/**
* rocketmq 事务消息回调类
*/
@Slf4j
@Component
public class OrderTransactionListener implements TransactionListener
{
@Resource
private ShopOrderMapper shopOrderMapper;
/**
* half消息发送成功后回调此方法,执行本地事务
*
* @param message 回传的消息,利用transactionId即可获取到该消息的唯一Id
* @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
@Override
@Transactional
public LocalTransactionState executeLocalTransaction(Message message, Object arg)
{
log.info("开始执行本地事务:订单信息:" + new String(message.getBody()));
String msgKey = new String(message.getBody());
ShopOrderPojo shopOrder = JSONObject.parseObject(msgKey, ShopOrderPojo.class);
int saveResult;
LocalTransactionState state;
try
{
//修改为true时,模拟本地事务异常
boolean imitateException = true;
if(imitateException)
{
throw new RuntimeException("更新本地事务时抛出异常");
}
// 生成订单,本地事务的回滚依赖于DB的ACID特性,所以需要添加Transactional注解。当本地事务提交失败时,返回ROLLBACK_MESSAGE,则会回滚rocketMQ中的half message,保证分布式事务的一致性。
saveResult = shopOrderMapper.insert(shopOrder);
state = saveResult == 1 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
// 更新本地事务并将事务号持久化,为后续的幂等做准备
// TransactionDao.add(transactionId)
}
catch (Exception e)
{
log.error("本地事务执行异常,异常信息:", e);
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
//修改为true时,模拟本地事务超时,对于超时的消息,rocketmq会调用checkLocalTransaction方法回查本地事务执行状况
boolean imitateTimeout = false;
if(imitateTimeout)
{
state = LocalTransactionState.UNKNOW;
}
log.info("本地事务执行结果:msgKey=" + msgKey + ",execute state:" + state);
return state;
}
/**
* 回查本地事务接口
*
* @param messageExt 通过获取transactionId来判断这条消息的本地事务执行状态
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt)
{
log.info("调用回查本地事务接口:msgKey=" + new String(messageExt.getBody()));
String msgKey = new String(messageExt.getBody());
ShopOrderPojo shopOrder = JSONObject.parseObject(msgKey, ShopOrderPojo.class);
// 备注:此处应使用唯一ID查询本地事务是否执行成功,唯一ID可以使用事务的transactionId。但为了验证方便,只查询DB的订单表是否存在对应的记录
// TransactionDao.isExistTx(transactionId)
List<ShopOrderPojo> list = shopOrderMapper.selectList(new QueryWrapper<ShopOrderPojo>()
.eq("shop_id", shopOrder.getShopId())
.eq("user_id", shopOrder.getUserId()));
LocalTransactionState state = list.size() > 0 ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
log.info("调用回查本地事务接口的执行结果:" + state);
return state;
}
}
为了方便验证,上面 Demo 使用了两个 boolean 变量 imitateException、imitateTimeout 分别模拟了事务执行异常和超时的情况,只需要将布尔值设置为 true 即可。
(3)投递事务消息:
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class ShopOrderServiceImpl extends ServiceImpl<ShopOrderMapper, ShopOrderPojo> implements ShopOrderService
{
@Resource
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderTransactionListener orderTransactionListener;
/**
* 发送事务消息
*/
@Override
public boolean sendOrderRocketMqMsg(ShopOrderPojo shopOrderPojo)
{
String topic = "storage";
String tag = "reduce";
// 设置监听器,此处如果使用MQ其他版本,可能导致强转异常
((TransactionMQProducer) rocketMQTemplate.getProducer()).setTransactionListener(orderTransactionListener);
//构建消息体
String msg = JSONObject.toJSONString(shopOrderPojo);
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg).build();
//发送事务消息,由消费者进行进行减少库存
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic + ":" + tag , message, null);
log.info("Send transaction msg result: " + sendResult);
return sendResult.getSendStatus() == SendStatus.SEND_OK;
}
}
4、消费端代码:
消费端的核心逻辑就是监听 MQ,接收消息;接收到消息之后扣减库存
(1)RocketMQ相关配置:
在 application.properties 配置文件中添加以下配置:
rocketmq.name-server=172.28.190.101:9876
rocketmq.consumer.group=order_shop
(2)消费监听类:
import com.alibaba.fastjson.JSONObject;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 库存管理消费者类
**/
@Component
@RocketMQMessageListener (consumerGroup = "order_storage", topic = "storage")
public class StorageConsumerListener implements RocketMQListener<String>
{
@Resource
private TStorageService tStorageService;
/**
* rocketMQ消费者
*/
@Override
public void onMessage(String message)
{
System.out.println("消费者开始消费:从MQ中获取的消息是:" + message);
ShopOrderPojo shopOrder = JSONObject.parseObject(message, ShopOrderPojo.class);
// 1、幂等校验,防止消息重复消费--此处省略相关的代码逻辑:
// TransactionDao.isExistTx(transactionId)
// 2、执行消息消费操作--减少商品库存:
TStoragePojo shop = tStorageService.getById(shopOrder.getShopId());
shop.setNum(shop.getNum() - 1);
boolean updateResult = tStorageService.updateById(shop);
// 3、添加事务操作记录--此次省略代码:
// TransactionDao.add(transactionId)
System.out.println("消费者完成消费:操作结果:" + updateResult);
}
}
至此,一个完整的基于 RocketMQ 事务消息实现的分布式事务的最终一致性就完成了。
相关阅读:
常见的服务器架构入门:从单体架构、EAI 到 SOA 再到微服务和 ServiceMesh
常见分布式理论(CAP、BASE)和一致性协议(Gosssip协议、Raft一致性算法)
一致性哈希算法原理详解
Nacos注册中心的部署与用法详细介绍
Nacos配置中心用法详细介绍
SpringCloud OpenFeign 远程HTTP服务调用用法与原理
什么是RPC?RPC框架dubbo的核心流程
服务容错设计:流量控制、服务熔断、服务降级
sentinel 限流熔断神器详细介绍
Sentinel 规则持久化到 apollo 配置中心
Sentinel-Dashboard 与 apollo 规则的相互同步
Spring Cloud Gateway 服务网关的部署与使用详细介绍
Spring Cloud Gateway 整合 sentinel 实现流控熔断
Spring Cloud Gateway 整合 knife4j 聚合接口文档
常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
分布式事务Seata原理
RocketMQ事务消息原理
参考文章:https://www.cnblogs.com/huangying2124/p/11702761.html
————————————————
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/a745233700/article/details/122656108
相关文章:

RocketMQ事务消息原理
一、RocketMQ事务消息原理: RocketMQ 在 4.3 版本之后实现了完整的事务消息,基于MQ的分布式事务方案,本质上是对本地消息表的一个封装,整体流程与本地消息表一致,唯一不同的就是将本地消息表存在了MQ内部&…...

【Java】IntelliJ IDEA开发环境安装
一、下载 官方地址:https://www.jetbrains.com/idea/ 点击Download直接下载 二、安装 双击安装包,点击Next 选择安装路径,点击Next 勾选安装内容 安装完成。 三、创建项目 打开IDEA,填写项目名称,选择项目安装路径…...

Go语言中的通道 (Channel) 实践:Goroutine之间的通信
1. 引言 在Go语言中,并发编程是其核心优势之一。与其他编程语言不同,Go语言推荐使用通道 (Channel) 来进行多线程或并发任务的协调与通信,而非使用锁机制。本文将介绍如何通过通道在多个goroutine之间进行通信,避免竞争条件和复杂…...

常用类(二)--String类的简单总结
文章目录 1.基本介绍1.1创建对象1.2找到对应下标的字符1.3找到对应字符的下标1.4指定位置开始遍历1.5反向进行遍历1.6大小写之间的转换1.7字符串转换为数组1.8元素的替换1.9字符串的分割1.10字符串的截取 2.StringBuilder和StringBuffer2.1 StringBuilder的引入2.2面试题目 1.基…...

Spring Boot开发:从入门到精通
Spring Boot开发:从入门到精通 当你在开发一个新的Java应用时,是否曾经感到苦恼于繁琐的配置和重复的代码?Spring Boot就像一位友好的助手,向你伸出援手,让开发变得轻松愉快。从这一单一框架中,你可以快速…...

《数据结构》--队列【各种实现,算法推荐】
一、认识队列 队列是一种常见的数据结构,按照先进先出(FIFO,First In First Out)的原则排列数据。也就是说,最早进入队列的元素最先被移除。队列主要支持两种基本操作: 入队(enqueue࿰…...

面试八股文对校招的用处有多大?--GDB篇
前言 1.本系列面试八股文的题目及答案均来自于网络平台的内容整理,对其进行了归类整理,在格式和内容上或许会存在一定错误,大家自行理解。内容涵盖部分若有侵权部分,请后台联系,及时删除。 2.本系列发布内容分为12篇…...

Unity用VS打开FGUI脚本变成杂项怎么处理?
在Unity中使用Visual Studio(VS)打开FGUI脚本时,如果脚本显示为杂项文件,这通常意味着VS没有正确识别或关联这些脚本文件。以下是一些解决此问题的步骤: 对惹,这里有一个游戏开发交流小组,大家…...

交叉熵损失函数(Cross-Entropy Loss Function)解释说明
公式 8-11 的内容如下: L ( y , a ) − [ y log a ( 1 − y ) log ( 1 − a ) ] L(y, a) -[y \log a (1 - y) \log (1 - a)] L(y,a)−[yloga(1−y)log(1−a)] 这个公式表示的是交叉熵损失函数(Cross-Entropy Loss Function)&#…...

和外部机构API交互如何防止外部机构服务不可用拖垮调用服务
引言 在现代的分布式系统和微服务架构中,服务之间的通信往往通过API进行,尤其是在与外部机构或第三方服务进行交互时,更需要通过API实现功能的集成。然而,由于外部服务的可控性较差,其服务的不可用性(如响…...

自动猫砂盆真的有必要吗?买自动猫砂盆不看这四点小心害死猫。
现在越来越多铲屎官选择购买自动猫砂盆来代替自己给猫咪铲屎,可是自动猫砂盆真的有必要吗?要知道,在现在忙碌的生活中,有很多人因为工作上的忙碌而不小心忽视了猫咪,猫咪的猫砂盆堆满粪便,要知道猫砂盆一天…...

国外解压视频素材哪里找?五个海外解压视频素材网站推荐
国外解压视频素材哪里找?五个海外解压视频素材网站推荐 如果你正在寻找国外的解压视频素材,那么今天这篇文章一定能帮助你。无论是修牛蹄、洗地毯,还是切肥皂、玩解压游戏等,下面分享的几个网站都是你找到高质量海外解压视频素材…...

Android一个APP里面最少有几个线程
Android一个APP里面最少有几个线程 参考 https://www.jianshu.com/p/92bff8d6282f https://www.jianshu.com/p/8a820d93c6aa 线程查看 Android一个进程里面最少包含5个线程,分别为: main线程(主线程)FinalizerDaemon线程 终结者守护线程…...

位操作解决数组的花样遍历
文章目录 题目 一、思路: 二、代码 总结 题目 leetcodeT289 https://leetcode.cn/problems/game-of-life/description/ 一、思路: 这题思路很简单,对每个位置按照题目所给规则进行遍历,判断周围网格的活细胞数即可。但是题目要求…...

【面试宝典】深入Python高级:直戳痛点的题目演示(下)
目录 🍔 Python下多线程的限制以及多进程中传递参数的⽅式 🍔 Python是如何进⾏内存管理的? 🍔 Python⾥⾯如何拷⻉⼀个对象? 🍔 Python⾥⾯search()和match()的区别? 🍔 lambd…...

Hive数仓操作(十七)
一、Hive的存储 一、Hive 四种存储格式 在 Hive 中,支持四种主要的数据存储格式,每种格式有其特点和适用场景,不过一般只会使用Text 和 ORC : 1. Text 说明:Hive 的默认存储格式。存储方式:行存储。优点…...

工业和自动化领域常见的通信协议
在工业和自动化领域,有多种常见的通信协议,主要用于设备间的通信、数据传输和控制。 Modbus: 类型:串行通信协议用途:广泛用于工业自动化设备间的通信,如PLC、传感器和执行器。优点:简单、开放且…...

连夜爆肝收藏各大云服务新老用户优惠活动入口地址(内含免费试用1个月的地址),适用于小白,大学生,开发者,小企业老板....
具体请前往:云服务器优惠活动入口大全--收藏各主流云厂商的云服务器等系列产品的优惠活动入口,免费试用1个月活动入口,让新老用户都能根据使用场景和身份快速锁定优惠权益 经济下滑,被优化增多,大学生就业难࿰…...

SpringBoot+Redis+RabbitMQ完成增删改查
各部分分工职责 RabbitMQ负责添加、修改、删除的异步操作 Redis负责数据的缓存 RabbitMQ里面角色职责简单描述 RabbitMQ里面有几个角色要先分清以及他们的对应关系: 交换机、队列、路由键 交换机和队列是一对多 队列和路由键是多对多 然后就是消息的发送者&…...

【系统集成中级】线上直播平台开发项目质量管理案例分析
【系统集成中级】线上直播平台开发项目质量管理案例分析 一、案例二、小林在项目质量管理中存在的问题(一)计划阶段缺失(二)测试用例编制与执行问题(三)质量管理流程问题(四)质量保证…...

浪潮信息领航边缘计算,推动AI与各行业深度融合
在9月20日于安徽盛大召开的浪潮信息边缘计算合作伙伴大会上,浪潮信息指出,未来的计算领域将全面融入AI技术,特别是在企业边缘侧,智能应用特别是生成式人工智能应用正在迅速普及,这一趋势正引领边缘计算向边缘智算的方向…...

Koa2项目实战3 (koa-body,用于处理 HTTP 请求中的请求体)
以用户注册接口为例,需要在请求里携带2个参数:用户名(user_name)和密码(password)。 开发者需要在接口端,解析出user_name 、password。 在使用Koa开发的接口中,如何解析出请求携带…...

复盘20241012
1、 classpath "com.android.tools.build:gradle:8.5.1" 的版本 与distributionUrlhttps\://services.gradle.org/distributions/gradle-8.9-bin.zip的对应规则: Execution failed for task :app:compileDebugKotlin. 解决方案 切换 setting --> ot…...

泊松流负载均衡控制
目录 泊松流负载均衡控制 一、到达率λ 二、服务率μ 三、泊松流负载均衡控制 泊松流负载均衡控制 在探讨泊松流负载均衡控制时,我们主要关注的是到达率λ和服务率μ这两个核心参数。以下是对这两个参数及其在泊松流负载均衡控制中作用的详细解释: 一、到达率λ 定义:…...

3D打印矫形器市场报告:未来几年年复合增长率CAGR为10.8%
3D 打印矫形器是指使用 3D 打印技术制作的定制外部支撑装置。它们有助于稳定、引导、缓解或纠正肌肉骨骼状况,并根据个体患者的解剖结构进行设计,通常使用 3D 扫描和建模技术。3D 打印在矫形器方面的主要优势是能够生产精确适合患者解剖结构的定制装置&a…...

Richtek立锜科技线性稳压器 (LDO) 选型
一、什么是LDO? LDO也可称为低压差线性稳压器,适合从较高的输入电压转换成较低输出电压的应用,这种应用的功率消耗通常不是很大,尤其适用于要求低杂讯、低电流和输入、输出电压差很小的应用环境。 二、LDO的特性 LDO透过控制线性区调整管…...

Leetcode 前 k 个高频元素
使用最小堆算法来解决这道题目:相当于有一个容量固定为K的教室,只能容纳 K 个人,学生们逐个逐个进入该教室,当教室容量达到K人之后,每次进入一个新的学生后,我们将分数最低的学生(类似本题中的频率最低元素…...

[LeetCode] 面试题01.02 判定是否互为字符重拍
题目描述: 给定两个由小写字母组成的字符串 s1 和 s2,请编写一个程序,确定其中一个字符串的字符重新排列后,能否变成另一个字符串。 示例 1: 输入: s1 "abc", s2 "bca" 输出: true 示例 2&am…...

数据结构-4.5.KMP算法(旧版上)-朴素模式匹配算法的优化
朴素模式匹配算法最坏的情况: 一.实例: 第一轮匹配失败,开始下一轮的匹配: 不断的操作,最终匹配成功: 如上述图片所述,朴素模式匹配算法会导致时间开销增加, 优化思路:主…...

STM32 QSPI接口驱动GD/W25Qxx配置简要
STM32 QSPI接口GD/W25Qxx配置简要 📝本篇会具体涉及介绍Winbond(华邦)和GD(兆易创新) NOR flash相关型号指令差异。由于网络上可以搜索到很多相关QSPI相关知识内容,不对QSPI通讯协议做深度解析。 🔖首先确保所使用的ST…...