当前位置: 首页 > news >正文

如何解决微服务下引起的 分布式事务问题

一、什么是分布式事务?

虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。

  • 跨数据源的分布式事务

  • 跨服务的分布式事务

二、解决方案
1、使用阿里开源的Seata框架解决分布式事务

​ 1)seata的架构

​ Seata事务管理中有三个重要的角色:

  • TC (Transaction Coordinator) - **事务协调者:**维护全局和分支事务的状态,协调全局事务提交或回滚。

  • TM (Transaction Manager) - **事务管理器:**定义全局事务的范围、开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - **资源管理器:**管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
    在这里插入图片描述
    3)Seata的常用模式使用

    • XA模式

      在一阶段各个本地事务执行完成后,不提交,把执行状态给事务协调者TC,此时本地事务继续持有数据库锁

      二阶段TC基于一阶段的报告来进行判断,如果一阶段均成功则通知所有的事务参与者,提交事务,如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务。

      优点:能够实现强一致性,满足ACID原则;实现简单

      缺点:性能较差;依赖数据库的事务

      I. 在application.yml文件中开启XA模式(所有参与事务的服务都需要设置):

      seata:data-source-proxy-mode: XA  
      

      II. 在全局事务的入口方法添加@GlobalTransactional注解

      @Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
      
    • AT模式

      和xa模式一样也是二阶段提交,不同的是AT模式本地事务结束后,直接提交。但是,它会在本地事务进行数据库数据更新的时候记录一下更新前后的快照。

​ 在二阶段需要回滚的时候,根据快照进行数据的恢复,如果二阶段全局事务提交,则把记录的快照删除。

​ 优点:性能好;实现也较为简单

​ 缺点: 存在中间状态,只能达到最终的一致性;快照功能会影响一些性能,但是相对于XA模式还是要好很多

I. 在application.yml文件中开启AT模式(所有参与事务的服务都需要设置):

seata:data-source-proxy-mode: AT # 默认就是AT

II. 创建相关数据库表

#在分支事务所在的库里创建记录快照的表undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` longblob NOT NULL COMMENT 'rollback info',`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` datetime(6) NOT NULL COMMENT 'create datetime',`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;#在TC服务所使用的库里创建全局锁记录表lock_table
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table`  (`row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`transaction_id` bigint(20) NULL DEFAULT NULL,`branch_id` bigint(20) NOT NULL,`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`gmt_create` datetime NULL DEFAULT NULL,`gmt_modified` datetime NULL DEFAULT NULL,PRIMARY KEY (`row_key`) USING BTREE,INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

III. 在全局事务的入口方法添加@GlobalTransactional注解

    @Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
2、使用RocketMQ实现可靠消息最终一致性方案 (适用于不同项目的情况)在这里插入图片描述

模拟转账 a银行向b银行转账

a银行业务代码:

减少金额,像mq发送事务消息

  1. 引入rocketmq依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq‐spring‐boot‐starter</artifactId><version>2.0.2</version>
</dependency>

2)配置rocketmq

rocketmq.producer.group = zhuoye #设置生产者组的名称
rocketmq.name‐server = 127.0.0.1:9876  #指定rocketmq的地址

3) 业务层代码

@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate  RocketMQTemplate rocketMQTemplate;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//向mq发送转账消息@Overridepublic void sendTransferAccountsMessagesToMq(AccountChangeEvent accountChangeEvent) {//将accountChangeEvent转成jsonJSONObject jsonObject =new JSONObject();jsonObject.put("transferAccountInfo",accountChangeEvent);String jsonString = jsonObject.toJSONString();//生成message类型Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送一条事务消息/*** String txProducerGroup 生产组* String destination topic,* Message<?> message, 消息内容* Object arg 参数*/ rocketMQTemplate.sendMessageInTransaction("transferAccount_ABank","topic_transferAccount",message,null);}//更新账户,扣减金额@Override@Transactionalpublic void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//幂等判断if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//扣减金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);//添加事务日志tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}

4)编写RocketMQLocalTransactionListener接口实现类

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "transferAccount_ABank")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {@Autowiredprivate UserAccountService userAccountService;@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//执行本地事务,扣减金额userAccountService.doUpdateAccountBalance(accountChangeEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}//事务状态回查,查询是否扣减金额@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//事务idString txNo = accountChangeEvent.getTxNo();int isExist = tansactionalRecordMapper.isExist(txNo);if(isExist>0){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}
}

b银行业务代码(前两步一样):

接收消息,增加金额

1)业务层代码

@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//更新账户,增加金额@Override@Transactionalpublic void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {//已更新if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//增加金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());//添加事务记录,用于幂等tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}
}

2)监听事务消息

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "transferAccount_ABank",topic = "topic_transferAccount",maxReconsumeTimes = 3)
public class TxmsgConsumer implements RocketMQListener<String> {@AutowiredUserAccountService userAccountService;//接收消息@Overridepublic void onMessage(String message) {//解析消息JSONObject jsonObject = JSONObject.parseObject(message);String accountChangeString = jsonObject.getString("transferAccountInfo");//转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//更新本地账户,增加金额userAccountService.addAccountInfoBalance(accountChangeEvent);}
}

相关文章:

如何解决微服务下引起的 分布式事务问题

一、什么是分布式事务&#xff1f; 虽然叫分布式事务&#xff0c;但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下&#xff0c;产生的事务&#xff0c;也就是分布式事务。 跨数据源的分布式事务 跨服务的分布式事务 二、解决方…...

牛客周赛50轮+cf955+abc363

D-小红的因式分解_牛客周赛 Round 50 (nowcoder.com) 思路&#xff1a; 巨蠢的题目&#xff0c;ax^2bxca1*a2*x^2(b1*a2b2*a1)xb1*b2&#xff0c;即&#xff1a; aa1*a2,ba1*b2a2*b1,cb1*b2 数据范围很小&#xff0c;直接暴力枚举吧&#xff08;注意条件&#xff09; 代码…...

【MySQL】:对库和表的基本操作方法

数据库使用的介绍 什么是SQL 学习数据库的使用——>基于 SQL编程语言 来对数据库进行操作 重点表述的是“需求”&#xff0c;期望得到什么结果。&#xff08;至于结果是如何得到的&#xff0c;并不关键&#xff0c;都是数据库服务器在背后做好了&#xff09; 重点表述的是…...

Library not found for -lstdc++.6.0.9

解决方案一 由于项目已经很多年了&#xff0c;前段时间更新了Xcode发现编译报错lstdc这个库很早以前就被舍弃了&#xff0c;但是一个项目的维护都随着解决bug堆砌出来的&#xff0c;这也导致了我们的项目走上了这条路。 比如 Library not found for -lstdc.6.0.9 报的错&#x…...

防火墙之双机热备篇

为什么要在防火墙上配置双机热备技术呢&#xff1f; 相信大家都知道&#xff0c;为了提高可靠性&#xff0c;避免单点故障 肯定有聪明的小伙伴会想到那为什么不直接多配置两台防火墙&#xff0c;然后再将他们进行线路冗余&#xff0c;不就完成备份了吗&#xff1f; 答案是不…...

终端里面ifconfig命令无法运行

在 Ubuntu 以及基于 Debian 的系统中&#xff0c;ifconfig 命令可能不会默认安装&#xff0c;因为自 Ubuntu 17.10 版本开始&#xff0c;系统默认使用 ip 命令作为网络配置的主要工具&#xff0c;而 ifconfig 命令则来自 net-tools 包&#xff0c;该包不再作为标准工具被包含在…...

掌握Python中的文件序列化:Json和Pickle模块解析

Python 文件操作与管理&#xff1a;Open函数、Json与Pickle、Os模块 在Python中&#xff0c;文件是一个重要的数据处理对象。无论是读取数据、保存数据还是进行数据处理&#xff0c;文件操作都是Python编程中不可或缺的一部分。本文将详细介绍Python中文件操作的几种常用方法&…...

WordPress 6.6 “Dorsey多尔西”发布

WordPress 6.6 “Dorsey多尔西”已经发布&#xff0c;它以传奇的美国大乐队领袖 Tommy Dorsey 名字命名。Dorsey 以其音调流畅的长号和作品而闻名&#xff0c;他的音乐以其情感深度和充满活力的能量吸引了观众。 当您探索 WordPress 6.6 的新功能和增强功能时&#xff0c;让您的…...

核函数支持向量机(Kernel SVM)

核函数支持向量机&#xff08;Kernel SVM&#xff09;是一种非常强大的分类器&#xff0c;能够在非线性数据集上实现良好的分类效果。以下是关于核函数支持向量机的详细数学模型理论知识推导、实施步骤与参数解读&#xff0c;以及两个多维数据实例&#xff08;一个未优化模型&a…...

二分查找(折半查找)

这次不排序了&#xff0c;对排好序的数组做个查找吧 介绍 二分查找排序英文名为BinarySort&#xff0c;是一种效率较高的查找方法要求线性表必须采用顺序存储结构 基本思路 通过不断地将搜索范围缩小一半来找到目标元素&#xff1a; 1、假定数组为arr&#xff0c;需要查找的…...

arcgis紧凑型切片缓存(解决大范围切片,文件数量大的问题)

ArcGIS 切片缓存的紧凑型存储格式是一种优化的存储方式&#xff0c;用于提高切片缓存的存储效率和访问速度。紧凑型存储格式将多个切片文件合并为一个单一的 .bundle 文件&#xff0c;从而减少文件系统的开销和切片的加载时间。这类格式已经应用很久了&#xff0c;我记得2013我…...

ESP32CAM人工智能教学15

ESP32CAM人工智能教学15 Flask服务器TCP连接 小智利用Flask在计算机中创建一个虚拟的网页服务器服务器&#xff0c;让ESP32Cam通过WiFi连接&#xff0c;把摄像头拍摄到的图片发送到电脑中&#xff0c;并在电脑中保存成图片文件。 Flask是用Python编写的网页服务程序WebServer。…...

Pandas 33个冷知识 0721

Pandas 33个冷知识 从Excel读取数据: 使用 pd.read_excel(file.xlsx) 来读取Excel文件。 写入Excel: 使用 df.to_excel(file.xlsx, indexFalse) 将DataFrame写入Excel文件。 创建日期索引: 使用 df.set_index(pd.to_datetime(df[date])) 创建日期索引。 向后填充缺失值: 使用…...

C++ map和set的使用

目录 0.前言 1.关联式容器 2.键值对 3.树形结构的关联式容器 3.1树形结构的特点 3.2树形结构在关联式容器中的应用 4.set 4.1概念与性质 4.2使用 5.multiset 5.1概念与性质 5.2使用 6.map 6.1概念与性质 6.2使用 7.multimap 7.1概念与性质 7.2使用 8.小结 &a…...

yarn的安装和配置以及更新总结,npm的对照使用差异

1. Yarn简介 Yarn 是一个由 Facebook 开发的现代 JavaScript 包管理器&#xff0c;旨在提供更快、更安全、更可靠的包管理体验。 1.1 什么是Yarn Yarn 是一个快速、可靠和安全的 JavaScript 包管理器&#xff0c;它通过并行化操作和智能缓存机制&#xff0c;显著提升了依赖安…...

【Git命令】git rebase之合并提交记录

使用场景 在本地提交了两个commit&#xff0c;但是发现根本没有没必要分为两次&#xff0c;需要想办法把两次提交合并成一个提交&#xff1b;这个时候可以使用如下命令启动交互式变基会话&#xff1a; git rebase -i HEAD~N这里 N 是你想要重新调整的最近的提交数。 如下在本地…...

为什么品牌需要做 IP 形象?

品牌做IP形象的原因有多方面&#xff0c;这些原因共同构成了IP形象在品牌建设中的重要性和价值&#xff0c;主要原因有以下几个方面&#xff1a; 增强品牌识别度与记忆点&#xff1a; IP形象作为品牌的视觉符号&#xff0c;具有独特性和辨识性&#xff0c;能够在消费者心中留…...

Kubernetes 1.24 版弃用 Dockershim 后如何迁移到 containerd 和 CRI-O

在本系列的上一篇文章中&#xff0c;我们讨论了什么是 CRI 和 OCI&#xff0c;Docker、containerd、CRI-O 之间的区别以及它们的架构等。最近&#xff0c;我们得知 Docker 即将从 kubernetes 中弃用&#xff01;&#xff08;查看 kubernetes 官方的这篇文章&#xff09;那么让我…...

70. 爬楼梯【 力扣(LeetCode) 】

一、题目描述 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 二、测试用例 示例 1&#xff1a; 输入&#xff1a;n 2 输出&#xff1a;2 解释&#xff1a;有两种方法可以爬到楼顶。 1. 1 阶…...

R语言优雅的把数据基线表(表一)导出到word

基线表&#xff08;Baseline Table&#xff09;是医学研究中常用的一种数据表格&#xff0c;用于在研究开始时呈现参与者的初始特征和状态。这些特征通常包括人口统计学数据、健康状况和疾病史、临床指标、实验室检测、生活方式、社会经济等。 本人在既往文章《scitb包1.6版本发…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合

强化学习&#xff08;Reinforcement Learning, RL&#xff09;是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程&#xff0c;然后使用强化学习的Actor-Critic机制&#xff08;中文译作“知行互动”机制&#xff09;&#xff0c;逐步迭代求解…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案

随着新能源汽车的快速普及&#xff0c;充电桩作为核心配套设施&#xff0c;其安全性与可靠性备受关注。然而&#xff0c;在高温、高负荷运行环境下&#xff0c;充电桩的散热问题与消防安全隐患日益凸显&#xff0c;成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...