当前位置: 首页 > news >正文

RocketMQ学习笔记:分布式事务

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

目录

  • 1、分布式事务的难题
  • 2、解决方式
    • 2.1、半事务消息和事务回查
    • 2.2、代码样例
      • 2.2.1、TransactionListener
      • 2.2.2、TransactionMQProducer
      • 2.2.3、MessageListenerConcurrently
      • 2.2.4、流程图


1、分布式事务的难题

现有两个系统,A向B转钱。A系统扣钱和B系统加钱就应该属于同一个事务,任何一个失败都要回滚。两个系统之间唯一的通信方式就是RocketMQ
请添加图片描述

以最朴素的想法,现在就有两个实现分布式事务的方案。但这两个都有比较大的不可靠性。

  • A系统先扣钱再发送MQ:这样的弊端是无法确定消息有没有发送到MQ,或者消息有没有被MQ保存。总之这做法缺少一些回查的机制。
  • A系统先发送MQ再扣钱:这样的弊端是发送消息后,A系统可能出现错误回滚。而B收到了消息就正常消费,完全不知道A那边出了问题。

2、解决方式

2.1、半事务消息和事务回查

  • 半事务消息:半事务消息是指向RocketMQ发送一条消息,但这个消息只存放在CommitLog中,并不在ConsumeQueue展示。也就是说该消息被RocketMQ接收了,但是消费者却无法消费到这条消息。
  • 事务回查:在半事务消息发送成功后。A系统执行事务,如果成功则MQ将消息变成正常消息,失败则不发送消息。这里如果业务太复杂还不能确定事务是否完成的话,还可以发送UNKNOWN给MQ,这样MQ就会有定时器去检查事务是否完成。
    RocketMQ会向生产者询问是否可以把半事务变成正常的消息让消费者可以消费到。在这篇文章的例子就是询问A系统扣款有没有扣成功。如果成功了那就让B系统消费消息。

请添加图片描述

所以呢,通过半事务消息事务回查就能保证A系统和发送消息具有事务,即扣款失败则不发送消息,扣款成功则发送消息。所以半事务消息至少保证了生产者和MQ之间的原子性。MQ和消费者之间的原子性需要另外处理。

消费者需要保证幂等性,失败后重试,即使称为死信后也特殊处理等操作来保证事务。这个例子中B系统成功加钱的话那交易结束,如果尝试多次后还是失败,那就需要一个机制来通知A系统,让他把扣掉的钱加回去。

2.2、代码样例

2.2.1、TransactionListener

一个接口规范,我们需要实现这个接口来定义本地事务和事务回查。

就是本地事务具体执行,成功后怎么办,失败了怎么办。定时的事务回查如何检查事务有没有完成。这些东西都要定义在TransactionListener的实现中。


TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 执行本地事务,A扣100块// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;//或者业务比较复杂,不想在这个阶段就关闭事务,可以返回Unknown,之后就需要MQ定时事务回查return LocalTransactionState.UNKNOW;}@Override// 事务回查,默认一分钟一次public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {System.out.println("事务回查, " + new SimpleDateFormat("yyyyMMdd, HH:mm:ss").format(new Date()));// 如果成功// return LocalTransactionState.COMMIT_MESSAGE;// 如果失败// return LocalTransactionState.ROLLBACK_MESSAGE;// 业务比较长,还不确定成功或失败,返回unknown,下次再查return LocalTransactionState.UNKNOW;}};

2.2.2、TransactionMQProducer

半事务消息的生产者,在DefaultMQProducer的基础上新增了一个重要的参数,类型是ExecutorService。这个线程池是用来生产线程去完成事务回查。

但是事务回查的逻辑不需要定义在线程的run()方法中,这一部分放在TransactionListener中。

 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer");producer.setNamesrvAddr("localhost:9876");// build a thread pool used to for MQ to call back to check transactionExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10), (r) -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();try{Message msg = new Message("transaction_producer", null, "A give B 100 dollar".getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, null);}catch(Exception e) {// rollbackSystem.out.println("rollback");}producer.shutdown();

2.2.3、MessageListenerConcurrently

消费者部分就比较简单,只要listener是MessageListenerConcurrently就好。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_consumer");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TransactionalTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {try{for(MessageExt msg: list) {// simulate DB actionSystem.out.println("update B where transactionId" + msg.getTransactionId());System.out.println("Success consume msg: " + msg.getMsgId());}} catch (Exception e) {e.printStackTrace();System.out.println("Failed to consume meg, try more times");// means that failed to consume this msg. In next time will still consume this msg.return ConsumeConcurrentlyStatus.RECONSUME_LATER;}// means that success to consume this msg. In the next time will consume next msg.return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});consumer.start();
while(true){
}

2.2.4、流程图

请添加图片描述

相关文章:

RocketMQ学习笔记:分布式事务

这是本人学习的总结&#xff0c;主要学习资料如下 马士兵教育rocketMq官方文档 目录 1、分布式事务的难题2、解决方式2.1、半事务消息和事务回查2.2、代码样例2.2.1、TransactionListener2.2.2、TransactionMQProducer2.2.3、MessageListenerConcurrently2.2.4、流程图 1、分布…...

单臂路由和三层交换机

目录 一.单臂路由 1.单臂路由的工作原理 2.单臂路由的配置 2.1画出拓扑图 2.2配置PC 2.3配置交换机 2.4配置路由器 2.5测试 二.三层交换机 1.三层交换机的概述 2.三层交换机的配置 2.1画出拓扑图 2.2配置PC 2.3配置二层交换机 2.4配置三层交换机 2.5测试 3.拓展 三.总结 一.…...

红岩思维导图的制作软件,分享4款热门的!

红岩思维导图的制作软件&#xff0c;分享4款热门的&#xff01; 在当今信息爆炸的时代&#xff0c;思维导图作为一种有效的知识整理和思维拓展工具&#xff0c;受到了广大用户的青睐。红岩思维导图以其独特的风格和实用性&#xff0c;成为了许多人学习和工作中的得力助手。那么…...

es 集群开机自动启动

前面搭建了 es 集群&#xff0c;但是每次机器重启 都需要手动启动&#xff0c;很麻烦&#xff0c;所以这里介绍一下开机自动启动 首先使用 root 用户 es &#xff1a; 执行以下命令 vim /etc/init.d/elasticsearch 将以下内容 cv 进去 #!/bin/bash #chkconfig: 345 63 …...

使用JMeter从JSON响应的URL参数中提取特定值

在使用Apache JMeter进行API测试时&#xff0c;我们经常需要从JSON格式的响应中提取特定字段的值。这可以通过使用JMeter内置的JSON提取器和正则表达式提取器来完成。以下是一个具体的例子&#xff0c;展示了如何从一个JSON响应中提取rowId的值&#xff0c;同时处理字符串终止符…...

汽车电子行业知识:自动驾驶系统结构和各模块功能

文章目录 2.自动驾驶系统结构和各模块功能2.1.自动驾驶系统结构2.2.车载传感器2.2.1.激光雷达2.2.2.毫米波雷达2.2.3.超声波雷达2.2.4.摄像头2.2.5.GNSS2.2.6. IMU2.2.7.多传感器融合 2.3.各功能模块2.3.1.高精度地图2.3.2.定位2.3.3.感知2.3.4.决策2.3.5.规划2.3.6.控制2.3.7.…...

Oracle参数文件详解

1、参数文件的作用 参数文件用于存放实例所需要的初始化参数&#xff0c;因为多数初始化参数都具有默认值&#xff0c;所以参数文件实际存放了非默认的初始化参数。 2、参数文件类型 1&#xff09;服务端参数文件&#xff0c;又称为 spfile 二进制的文件&#xff0c;命名规则…...

鸿蒙(HarmonyOS)Navigation如何实现多场景UI适配?

场景介绍 应用在不同屏幕大小的设备上运行时&#xff0c;往往有不同的UI适配&#xff0c;以聊天应用举例&#xff1a; 在窄屏设备上&#xff0c;联系人和聊天区在多窗口中体现。在宽屏设备上&#xff0c;联系人和聊天区在同一窗口体现。 要做好适配&#xff0c;往往需要开发…...

PTGui图像拼接实验

1 PTGui图像拼接实验 1.1 概述 图像拼接技术就是将数张有重叠部分的图像&#xff08;可能是不同时间、不同视角或者不同传感器获得的&#xff09;拼成一幅无缝的全景图或高分辨率图像的技术 图像配准&#xff08;image alignment&#xff09;和图像融合是图像拼接的两个关键…...

C++|类封装、类的分文件编写练习:设计立方体类、点和圆的关系

文章目录 练习案例1&#xff1a;设计立方体类CPP代码 练习案例2:点和圆的关系CPP代码 代码总结类的分文件编写 练习案例1&#xff1a;设计立方体类 设计立方体类(Cube) 求出立方体的面积和体积 分别用全局函数和成员函数判断两个立方体是否相等。 CPP代码 class Cube { pub…...

大数据开发扩展shell--尚硅谷shell笔记

大数据开发扩展shell 学习目标 1 熟悉shell脚本的原理和使用 2 熟悉shell的编程语法 第一节 Shell概述 1&#xff09;Linux提供的Shell解析器有&#xff1a; 查看系统中可用的 shell [atguiguhadoop101 ~]$ cat /etc/shells /bin/sh/bin/bash/sbin/nologin/bin/dash/bin/t…...

考研数学|《1800》《1000》《880》《660》最佳搭配使用方法

直接说结论&#xff1a;基础不好先做1800、强化之前660&#xff0c;强化可选880/1000题。 首先&#xff0c;传统习题册存在的一个问题是题量较大&#xff0c;但难度波动较大。《汤家凤1800》和《张宇1000》题量庞大&#xff0c;但有些题目难度不够平衡&#xff0c;有些过于简单…...

【GameFramework框架内置模块】17、声音(Sound)

推荐阅读 CSDN主页GitHub开源地址Unity3D插件分享简书地址QQ群:398291828大家好,我是佛系工程师☆恬静的小魔龙☆,不定时更新Unity开发技巧,觉得有用记得一键三连哦。 一、前言 【GameFramework框架】系列教程目录: https://blog.csdn.net/q764424567/article/details/1…...

视频记录历史播放位置效果

简介 每次打开页面视频从上一次的播放位置开始播放 利用lodash库做节流 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-sca…...

Request Response

简介 Request&#xff08;请求&#xff09; & Response&#xff08;响应&#xff09; 浏览器会向服务器发送请求数据&#xff0c;服务器也需要返回响应数据给浏览器&#xff0c;因此我们需要设置对应的类来代表请求数据和响应数据&#xff0c;且Servlet中的service方法就需…...

How to convert .py to .ipynb in Ubuntu 22.04

How to convert .py to .ipynb in Ubuntu 22.04 jupyter nbconvertp2j 最近看到大家在用jupyter notebook&#xff0c;我也试了一下&#xff0c;感觉还不错&#xff0c;不过&#xff0c;也遇到了一些问题&#xff0c;比方说&#xff0c;我有堆的.py文件&#xff0c;如果要一个一…...

【prometheus-operator】k8s监控集群外redis

1、部署exporter GitHub - oliver006/redis_exporter: Prometheus Exporter for Redis Metrics. Supports Redis 2.x, 3.x, 4.x, 5.x, 6.x, and 7.x redis_exporter-v1.57.0.linux-386.tar.gz # 解压 tar -zxvf redis_exporter-v1.57.0.linux-386.tar.gz # 启动 nohup ./redi…...

MySQL索引(图文并茂)

目录 一、索引的概念 二、索引的作用 三、创建索引的原则依据 四、索引的分类和创建 1、索引的分类 2、索引的创建 2.1 普通索引 2.1.1 直接创建索引 2.1.2 修改表方式创建 2.1.3 创建表的时候指定索引 2.2 唯一索引 2.2.1 直接创建唯一索引 2.2.2 修改表方式创建 …...

Redis 教程系列之Redis PHP 使用 Redis(十二)

PHP 使用 Redis 安装 开始在 PHP 中使用 Redis 前&#xff0c; 我们需要确保已经安装了 redis 服务及 PHP redis 驱动&#xff0c;且你的机器上能正常使用 PHP。 接下来让我们安装 PHP redis 驱动&#xff1a;下载地址为:https://github.com/phpredis/phpredis/releases。 P…...

JavaScript语法和数据类型

基础 JavaScript 借鉴了 Java 的大部分语法&#xff0c;但同时也受到 Awk、Perl 和 Python 的影响。 JavaScript 是区分大小写的&#xff0c;并使用 Unicode 字符集。举个例子&#xff0c;可以将单词 Frh&#xff08;在德语中意思是“早”&#xff09;用作变量名。 var Frh …...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;支持 Python 3.6。它基于标准 Python 类型提示&#xff0c;易于学习且功能强大。以下是一个完整的 FastAPI 入门教程&#xff0c;涵盖从环境搭建到创建并运行一个简单的…...

基于Uniapp开发HarmonyOS 5.0旅游应用技术实践

一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架&#xff0c;支持"一次开发&#xff0c;多端部署"&#xff0c;可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务&#xff0c;为旅游应用带来&#xf…...

江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命

在华东塑料包装行业面临限塑令深度调整的背景下&#xff0c;江苏艾立泰以一场跨国资源接力的创新实践&#xff0c;重新定义了绿色供应链的边界。 跨国回收网络&#xff1a;废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点&#xff0c;将海外废弃包装箱通过标准…...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案

随着新能源汽车的快速普及&#xff0c;充电桩作为核心配套设施&#xff0c;其安全性与可靠性备受关注。然而&#xff0c;在高温、高负荷运行环境下&#xff0c;充电桩的散热问题与消防安全隐患日益凸显&#xff0c;成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

06 Deep learning神经网络编程基础 激活函数 --吴恩达

深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...