当前位置: 首页 > news >正文

如何解决微服务下引起的 分布式事务问题

一、什么是分布式事务?

虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。

  • 跨数据源的分布式事务

  • 跨服务的分布式事务

二、解决方案
1、使用阿里开源的Seata框架解决分布式事务

​ 1)seata的架构

​ Seata事务管理中有三个重要的角色:

  • TC (Transaction Coordinator) - **事务协调者:**维护全局和分支事务的状态,协调全局事务提交或回滚。

  • TM (Transaction Manager) - **事务管理器:**定义全局事务的范围、开始全局事务、提交或回滚全局事务。

  • RM (Resource Manager) - **资源管理器:**管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
    在这里插入图片描述
    3)Seata的常用模式使用

    • XA模式

      在一阶段各个本地事务执行完成后,不提交,把执行状态给事务协调者TC,此时本地事务继续持有数据库锁

      二阶段TC基于一阶段的报告来进行判断,如果一阶段均成功则通知所有的事务参与者,提交事务,如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务。

      优点:能够实现强一致性,满足ACID原则;实现简单

      缺点:性能较差;依赖数据库的事务

      I. 在application.yml文件中开启XA模式(所有参与事务的服务都需要设置):

      seata:data-source-proxy-mode: XA  
      

      II. 在全局事务的入口方法添加@GlobalTransactional注解

      @Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
      
    • AT模式

      和xa模式一样也是二阶段提交,不同的是AT模式本地事务结束后,直接提交。但是,它会在本地事务进行数据库数据更新的时候记录一下更新前后的快照。

​ 在二阶段需要回滚的时候,根据快照进行数据的恢复,如果二阶段全局事务提交,则把记录的快照删除。

​ 优点:性能好;实现也较为简单

​ 缺点: 存在中间状态,只能达到最终的一致性;快照功能会影响一些性能,但是相对于XA模式还是要好很多

I. 在application.yml文件中开启AT模式(所有参与事务的服务都需要设置):

seata:data-source-proxy-mode: AT # 默认就是AT

II. 创建相关数据库表

#在分支事务所在的库里创建记录快照的表undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log`  (`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` longblob NOT NULL COMMENT 'rollback info',`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` datetime(6) NOT NULL COMMENT 'create datetime',`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;#在TC服务所使用的库里创建全局锁记录表lock_table
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table`  (`row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`transaction_id` bigint(20) NULL DEFAULT NULL,`branch_id` bigint(20) NOT NULL,`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`gmt_create` datetime NULL DEFAULT NULL,`gmt_modified` datetime NULL DEFAULT NULL,PRIMARY KEY (`row_key`) USING BTREE,INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

III. 在全局事务的入口方法添加@GlobalTransactional注解

    @Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
2、使用RocketMQ实现可靠消息最终一致性方案 (适用于不同项目的情况)在这里插入图片描述

模拟转账 a银行向b银行转账

a银行业务代码:

减少金额,像mq发送事务消息

  1. 引入rocketmq依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq‐spring‐boot‐starter</artifactId><version>2.0.2</version>
</dependency>

2)配置rocketmq

rocketmq.producer.group = zhuoye #设置生产者组的名称
rocketmq.name‐server = 127.0.0.1:9876  #指定rocketmq的地址

3) 业务层代码

@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate  RocketMQTemplate rocketMQTemplate;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//向mq发送转账消息@Overridepublic void sendTransferAccountsMessagesToMq(AccountChangeEvent accountChangeEvent) {//将accountChangeEvent转成jsonJSONObject jsonObject =new JSONObject();jsonObject.put("transferAccountInfo",accountChangeEvent);String jsonString = jsonObject.toJSONString();//生成message类型Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送一条事务消息/*** String txProducerGroup 生产组* String destination topic,* Message<?> message, 消息内容* Object arg 参数*/ rocketMQTemplate.sendMessageInTransaction("transferAccount_ABank","topic_transferAccount",message,null);}//更新账户,扣减金额@Override@Transactionalpublic void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//幂等判断if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//扣减金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);//添加事务日志tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}

4)编写RocketMQLocalTransactionListener接口实现类

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "transferAccount_ABank")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {@Autowiredprivate UserAccountService userAccountService;@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//执行本地事务,扣减金额userAccountService.doUpdateAccountBalance(accountChangeEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}//事务状态回查,查询是否扣减金额@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//事务idString txNo = accountChangeEvent.getTxNo();int isExist = tansactionalRecordMapper.isExist(txNo);if(isExist>0){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}
}

b银行业务代码(前两步一样):

接收消息,增加金额

1)业务层代码

@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//更新账户,增加金额@Override@Transactionalpublic void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {//已更新if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//增加金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());//添加事务记录,用于幂等tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}
}

2)监听事务消息

@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "transferAccount_ABank",topic = "topic_transferAccount",maxReconsumeTimes = 3)
public class TxmsgConsumer implements RocketMQListener<String> {@AutowiredUserAccountService userAccountService;//接收消息@Overridepublic void onMessage(String message) {//解析消息JSONObject jsonObject = JSONObject.parseObject(message);String accountChangeString = jsonObject.getString("transferAccountInfo");//转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//更新本地账户,增加金额userAccountService.addAccountInfoBalance(accountChangeEvent);}
}

相关文章:

如何解决微服务下引起的 分布式事务问题

一、什么是分布式事务&#xff1f; 虽然叫分布式事务&#xff0c;但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下&#xff0c;产生的事务&#xff0c;也就是分布式事务。 跨数据源的分布式事务 跨服务的分布式事务 二、解决方…...

牛客周赛50轮+cf955+abc363

D-小红的因式分解_牛客周赛 Round 50 (nowcoder.com) 思路&#xff1a; 巨蠢的题目&#xff0c;ax^2bxca1*a2*x^2(b1*a2b2*a1)xb1*b2&#xff0c;即&#xff1a; aa1*a2,ba1*b2a2*b1,cb1*b2 数据范围很小&#xff0c;直接暴力枚举吧&#xff08;注意条件&#xff09; 代码…...

【MySQL】:对库和表的基本操作方法

数据库使用的介绍 什么是SQL 学习数据库的使用——>基于 SQL编程语言 来对数据库进行操作 重点表述的是“需求”&#xff0c;期望得到什么结果。&#xff08;至于结果是如何得到的&#xff0c;并不关键&#xff0c;都是数据库服务器在背后做好了&#xff09; 重点表述的是…...

Library not found for -lstdc++.6.0.9

解决方案一 由于项目已经很多年了&#xff0c;前段时间更新了Xcode发现编译报错lstdc这个库很早以前就被舍弃了&#xff0c;但是一个项目的维护都随着解决bug堆砌出来的&#xff0c;这也导致了我们的项目走上了这条路。 比如 Library not found for -lstdc.6.0.9 报的错&#x…...

防火墙之双机热备篇

为什么要在防火墙上配置双机热备技术呢&#xff1f; 相信大家都知道&#xff0c;为了提高可靠性&#xff0c;避免单点故障 肯定有聪明的小伙伴会想到那为什么不直接多配置两台防火墙&#xff0c;然后再将他们进行线路冗余&#xff0c;不就完成备份了吗&#xff1f; 答案是不…...

终端里面ifconfig命令无法运行

在 Ubuntu 以及基于 Debian 的系统中&#xff0c;ifconfig 命令可能不会默认安装&#xff0c;因为自 Ubuntu 17.10 版本开始&#xff0c;系统默认使用 ip 命令作为网络配置的主要工具&#xff0c;而 ifconfig 命令则来自 net-tools 包&#xff0c;该包不再作为标准工具被包含在…...

掌握Python中的文件序列化:Json和Pickle模块解析

Python 文件操作与管理&#xff1a;Open函数、Json与Pickle、Os模块 在Python中&#xff0c;文件是一个重要的数据处理对象。无论是读取数据、保存数据还是进行数据处理&#xff0c;文件操作都是Python编程中不可或缺的一部分。本文将详细介绍Python中文件操作的几种常用方法&…...

WordPress 6.6 “Dorsey多尔西”发布

WordPress 6.6 “Dorsey多尔西”已经发布&#xff0c;它以传奇的美国大乐队领袖 Tommy Dorsey 名字命名。Dorsey 以其音调流畅的长号和作品而闻名&#xff0c;他的音乐以其情感深度和充满活力的能量吸引了观众。 当您探索 WordPress 6.6 的新功能和增强功能时&#xff0c;让您的…...

核函数支持向量机(Kernel SVM)

核函数支持向量机&#xff08;Kernel SVM&#xff09;是一种非常强大的分类器&#xff0c;能够在非线性数据集上实现良好的分类效果。以下是关于核函数支持向量机的详细数学模型理论知识推导、实施步骤与参数解读&#xff0c;以及两个多维数据实例&#xff08;一个未优化模型&a…...

二分查找(折半查找)

这次不排序了&#xff0c;对排好序的数组做个查找吧 介绍 二分查找排序英文名为BinarySort&#xff0c;是一种效率较高的查找方法要求线性表必须采用顺序存储结构 基本思路 通过不断地将搜索范围缩小一半来找到目标元素&#xff1a; 1、假定数组为arr&#xff0c;需要查找的…...

arcgis紧凑型切片缓存(解决大范围切片,文件数量大的问题)

ArcGIS 切片缓存的紧凑型存储格式是一种优化的存储方式&#xff0c;用于提高切片缓存的存储效率和访问速度。紧凑型存储格式将多个切片文件合并为一个单一的 .bundle 文件&#xff0c;从而减少文件系统的开销和切片的加载时间。这类格式已经应用很久了&#xff0c;我记得2013我…...

ESP32CAM人工智能教学15

ESP32CAM人工智能教学15 Flask服务器TCP连接 小智利用Flask在计算机中创建一个虚拟的网页服务器服务器&#xff0c;让ESP32Cam通过WiFi连接&#xff0c;把摄像头拍摄到的图片发送到电脑中&#xff0c;并在电脑中保存成图片文件。 Flask是用Python编写的网页服务程序WebServer。…...

Pandas 33个冷知识 0721

Pandas 33个冷知识 从Excel读取数据: 使用 pd.read_excel(file.xlsx) 来读取Excel文件。 写入Excel: 使用 df.to_excel(file.xlsx, indexFalse) 将DataFrame写入Excel文件。 创建日期索引: 使用 df.set_index(pd.to_datetime(df[date])) 创建日期索引。 向后填充缺失值: 使用…...

C++ map和set的使用

目录 0.前言 1.关联式容器 2.键值对 3.树形结构的关联式容器 3.1树形结构的特点 3.2树形结构在关联式容器中的应用 4.set 4.1概念与性质 4.2使用 5.multiset 5.1概念与性质 5.2使用 6.map 6.1概念与性质 6.2使用 7.multimap 7.1概念与性质 7.2使用 8.小结 &a…...

yarn的安装和配置以及更新总结,npm的对照使用差异

1. Yarn简介 Yarn 是一个由 Facebook 开发的现代 JavaScript 包管理器&#xff0c;旨在提供更快、更安全、更可靠的包管理体验。 1.1 什么是Yarn Yarn 是一个快速、可靠和安全的 JavaScript 包管理器&#xff0c;它通过并行化操作和智能缓存机制&#xff0c;显著提升了依赖安…...

【Git命令】git rebase之合并提交记录

使用场景 在本地提交了两个commit&#xff0c;但是发现根本没有没必要分为两次&#xff0c;需要想办法把两次提交合并成一个提交&#xff1b;这个时候可以使用如下命令启动交互式变基会话&#xff1a; git rebase -i HEAD~N这里 N 是你想要重新调整的最近的提交数。 如下在本地…...

为什么品牌需要做 IP 形象?

品牌做IP形象的原因有多方面&#xff0c;这些原因共同构成了IP形象在品牌建设中的重要性和价值&#xff0c;主要原因有以下几个方面&#xff1a; 增强品牌识别度与记忆点&#xff1a; IP形象作为品牌的视觉符号&#xff0c;具有独特性和辨识性&#xff0c;能够在消费者心中留…...

Kubernetes 1.24 版弃用 Dockershim 后如何迁移到 containerd 和 CRI-O

在本系列的上一篇文章中&#xff0c;我们讨论了什么是 CRI 和 OCI&#xff0c;Docker、containerd、CRI-O 之间的区别以及它们的架构等。最近&#xff0c;我们得知 Docker 即将从 kubernetes 中弃用&#xff01;&#xff08;查看 kubernetes 官方的这篇文章&#xff09;那么让我…...

70. 爬楼梯【 力扣(LeetCode) 】

一、题目描述 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 二、测试用例 示例 1&#xff1a; 输入&#xff1a;n 2 输出&#xff1a;2 解释&#xff1a;有两种方法可以爬到楼顶。 1. 1 阶…...

R语言优雅的把数据基线表(表一)导出到word

基线表&#xff08;Baseline Table&#xff09;是医学研究中常用的一种数据表格&#xff0c;用于在研究开始时呈现参与者的初始特征和状态。这些特征通常包括人口统计学数据、健康状况和疾病史、临床指标、实验室检测、生活方式、社会经济等。 本人在既往文章《scitb包1.6版本发…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

C++:std::is_convertible

C++标志库中提供is_convertible,可以测试一种类型是否可以转换为另一只类型: template <class From, class To> struct is_convertible; 使用举例: #include <iostream> #include <string>using namespace std;struct A { }; struct B : A { };int main…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

【C++进阶篇】智能指针

C内存管理终极指南&#xff1a;智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

Python 实现 Web 静态服务器(HTTP 协议)

目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1&#xff09;下载安装包2&#xff09;配置环境变量3&#xff09;安装镜像4&#xff09;node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1&#xff09;使用 http-server2&#xff09;详解 …...

深入理解Optional:处理空指针异常

1. 使用Optional处理可能为空的集合 在Java开发中&#xff0c;集合判空是一个常见但容易出错的场景。传统方式虽然可行&#xff0c;但存在一些潜在问题&#xff1a; // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...

Ubuntu系统多网卡多相机IP设置方法

目录 1、硬件情况 2、如何设置网卡和相机IP 2.1 万兆网卡连接交换机&#xff0c;交换机再连相机 2.1.1 网卡设置 2.1.2 相机设置 2.3 万兆网卡直连相机 1、硬件情况 2个网卡n个相机 电脑系统信息&#xff0c;系统版本&#xff1a;Ubuntu22.04.5 LTS&#xff1b;内核版本…...

嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)

目录 一、网络编程--OSI模型 二、网络编程--TCP/IP模型 三、网络接口 四、UDP网络相关编程及主要函数 ​编辑​编辑 UDP的特征 socke函数 bind函数 recvfrom函数&#xff08;接收函数&#xff09; sendto函数&#xff08;发送函数&#xff09; 五、网络编程之 UDP 用…...

基于单片机的宠物屋智能系统设计与实现(论文+源码)

本设计基于单片机的宠物屋智能系统核心是实现对宠物生活环境及状态的智能管理。系统以单片机为中枢&#xff0c;连接红外测温传感器&#xff0c;可实时精准捕捉宠物体温变化&#xff0c;以便及时发现健康异常&#xff1b;水位检测传感器时刻监测饮用水余量&#xff0c;防止宠物…...

2.2.2 ASPICE的需求分析

ASPICE的需求分析是汽车软件开发过程中至关重要的一环&#xff0c;它涉及到对需求进行详细分析、验证和确认&#xff0c;以确保软件产品能够满足客户和用户的需求。在ASPICE中&#xff0c;需求分析的关键步骤包括&#xff1a; 需求细化&#xff1a;将从需求收集阶段获得的高层需…...