如何解决微服务下引起的 分布式事务问题
一、什么是分布式事务?
虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。
-
跨数据源的分布式事务
-
跨服务的分布式事务
二、解决方案
1、使用阿里开源的Seata框架解决分布式事务
1)seata的架构
Seata事务管理中有三个重要的角色:
-
TC (Transaction Coordinator) - **事务协调者:**维护全局和分支事务的状态,协调全局事务提交或回滚。
-
TM (Transaction Manager) - **事务管理器:**定义全局事务的范围、开始全局事务、提交或回滚全局事务。
-
RM (Resource Manager) - **资源管理器:**管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

3)Seata的常用模式使用-
XA模式
在一阶段各个本地事务执行完成后,不提交,把执行状态给事务协调者TC,此时本地事务继续持有数据库锁
二阶段TC基于一阶段的报告来进行判断,如果一阶段均成功则通知所有的事务参与者,提交事务,如果一阶段任意一个参与者失败,则通知所有事务参与者回滚事务。
优点:能够实现强一致性,满足ACID原则;实现简单
缺点:性能较差;依赖数据库的事务
I. 在application.yml文件中开启XA模式(所有参与事务的服务都需要设置):
seata:data-source-proxy-mode: XAII. 在全局事务的入口方法添加@GlobalTransactional注解
@Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();} -
AT模式
和xa模式一样也是二阶段提交,不同的是AT模式本地事务结束后,直接提交。但是,它会在本地事务进行数据库数据更新的时候记录一下更新前后的快照。
-
在二阶段需要回滚的时候,根据快照进行数据的恢复,如果二阶段全局事务提交,则把记录的快照删除。
优点:性能好;实现也较为简单
缺点: 存在中间状态,只能达到最终的一致性;快照功能会影响一些性能,但是相对于XA模式还是要好很多
I. 在application.yml文件中开启AT模式(所有参与事务的服务都需要设置):
seata:data-source-proxy-mode: AT # 默认就是AT
II. 创建相关数据库表
#在分支事务所在的库里创建记录快照的表undo_log
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` longblob NOT NULL COMMENT 'rollback info',`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` datetime(6) NOT NULL COMMENT 'create datetime',`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;#在TC服务所使用的库里创建全局锁记录表lock_table
DROP TABLE IF EXISTS `lock_table`;
CREATE TABLE `lock_table` (`row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,`xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`transaction_id` bigint(20) NULL DEFAULT NULL,`branch_id` bigint(20) NOT NULL,`resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,`gmt_create` datetime NULL DEFAULT NULL,`gmt_modified` datetime NULL DEFAULT NULL,PRIMARY KEY (`row_key`) USING BTREE,INDEX `idx_branch_id`(`branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
III. 在全局事务的入口方法添加@GlobalTransactional注解
@Override@GlobalTransactionalpublic Long create(Order order) {// 创建订单orderMapper.insert(order);try {// 扣用户余额accountClient.deduct(order.getUserId(), order.getMoney());// 扣库存storageClient.deduct(order.getCommodityCode(), order.getCount());} catch (FeignException e) {log.error("下单失败,原因:{}", e.contentUTF8(), e);throw new RuntimeException(e.contentUTF8(), e);}return order.getId();}
2、使用RocketMQ实现可靠消息最终一致性方案 (适用于不同项目的情况)
模拟转账 a银行向b银行转账
a银行业务代码:
减少金额,像mq发送事务消息
- 引入rocketmq依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq‐spring‐boot‐starter</artifactId><version>2.0.2</version>
</dependency>
2)配置rocketmq
rocketmq.producer.group = zhuoye #设置生产者组的名称
rocketmq.name‐server = 127.0.0.1:9876 #指定rocketmq的地址
3) 业务层代码
@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//向mq发送转账消息@Overridepublic void sendTransferAccountsMessagesToMq(AccountChangeEvent accountChangeEvent) {//将accountChangeEvent转成jsonJSONObject jsonObject =new JSONObject();jsonObject.put("transferAccountInfo",accountChangeEvent);String jsonString = jsonObject.toJSONString();//生成message类型Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送一条事务消息/*** String txProducerGroup 生产组* String destination topic,* Message<?> message, 消息内容* Object arg 参数*/ rocketMQTemplate.sendMessageInTransaction("transferAccount_ABank","topic_transferAccount",message,null);}//更新账户,扣减金额@Override@Transactionalpublic void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//幂等判断if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//扣减金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);//添加事务日志tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}
4)编写RocketMQLocalTransactionListener接口实现类
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "transferAccount_ABank")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {@Autowiredprivate UserAccountService userAccountService;@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//执行本地事务,扣减金额userAccountService.doUpdateAccountBalance(accountChangeEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}//事务状态回查,查询是否扣减金额@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("transferAccountInfo");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//事务idString txNo = accountChangeEvent.getTxNo();int isExist = tansactionalRecordMapper.isExist(txNo);if(isExist>0){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}
}
b银行业务代码(前两步一样):
接收消息,增加金额
1)业务层代码
@Service
@Slf4j
public class UserAccountServiceImpl implements UserAccountService {@Autowiredprivate UserAccountMapper userAccountMapper;@Autowiredprivate TansactionalRecordMapper tansactionalRecordMapper;//更新账户,增加金额@Override@Transactionalpublic void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {//已更新if(tansactionalRecordMapper.isExist(accountChangeEvent.getTxNo())>0){return ;}//增加金额userAccountMapper.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());//添加事务记录,用于幂等tansactionalRecordMapper.add(accountChangeEvent.getTxNo());}
}
2)监听事务消息
@Component
@Slf4j
@RocketMQMessageListener(consumerGroup = "transferAccount_ABank",topic = "topic_transferAccount",maxReconsumeTimes = 3)
public class TxmsgConsumer implements RocketMQListener<String> {@AutowiredUserAccountService userAccountService;//接收消息@Overridepublic void onMessage(String message) {//解析消息JSONObject jsonObject = JSONObject.parseObject(message);String accountChangeString = jsonObject.getString("transferAccountInfo");//转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//更新本地账户,增加金额userAccountService.addAccountInfoBalance(accountChangeEvent);}
}相关文章:
如何解决微服务下引起的 分布式事务问题
一、什么是分布式事务? 虽然叫分布式事务,但不是一定是分布式部署的服务之间才会产生分布式事务。不是在同一个服务或同一个数据库架构下,产生的事务,也就是分布式事务。 跨数据源的分布式事务 跨服务的分布式事务 二、解决方…...
牛客周赛50轮+cf955+abc363
D-小红的因式分解_牛客周赛 Round 50 (nowcoder.com) 思路: 巨蠢的题目,ax^2bxca1*a2*x^2(b1*a2b2*a1)xb1*b2,即: aa1*a2,ba1*b2a2*b1,cb1*b2 数据范围很小,直接暴力枚举吧(注意条件) 代码…...
【MySQL】:对库和表的基本操作方法
数据库使用的介绍 什么是SQL 学习数据库的使用——>基于 SQL编程语言 来对数据库进行操作 重点表述的是“需求”,期望得到什么结果。(至于结果是如何得到的,并不关键,都是数据库服务器在背后做好了) 重点表述的是…...
Library not found for -lstdc++.6.0.9
解决方案一 由于项目已经很多年了,前段时间更新了Xcode发现编译报错lstdc这个库很早以前就被舍弃了,但是一个项目的维护都随着解决bug堆砌出来的,这也导致了我们的项目走上了这条路。 比如 Library not found for -lstdc.6.0.9 报的错&#x…...
防火墙之双机热备篇
为什么要在防火墙上配置双机热备技术呢? 相信大家都知道,为了提高可靠性,避免单点故障 肯定有聪明的小伙伴会想到那为什么不直接多配置两台防火墙,然后再将他们进行线路冗余,不就完成备份了吗? 答案是不…...
终端里面ifconfig命令无法运行
在 Ubuntu 以及基于 Debian 的系统中,ifconfig 命令可能不会默认安装,因为自 Ubuntu 17.10 版本开始,系统默认使用 ip 命令作为网络配置的主要工具,而 ifconfig 命令则来自 net-tools 包,该包不再作为标准工具被包含在…...
掌握Python中的文件序列化:Json和Pickle模块解析
Python 文件操作与管理:Open函数、Json与Pickle、Os模块 在Python中,文件是一个重要的数据处理对象。无论是读取数据、保存数据还是进行数据处理,文件操作都是Python编程中不可或缺的一部分。本文将详细介绍Python中文件操作的几种常用方法&…...
WordPress 6.6 “Dorsey多尔西”发布
WordPress 6.6 “Dorsey多尔西”已经发布,它以传奇的美国大乐队领袖 Tommy Dorsey 名字命名。Dorsey 以其音调流畅的长号和作品而闻名,他的音乐以其情感深度和充满活力的能量吸引了观众。 当您探索 WordPress 6.6 的新功能和增强功能时,让您的…...
核函数支持向量机(Kernel SVM)
核函数支持向量机(Kernel SVM)是一种非常强大的分类器,能够在非线性数据集上实现良好的分类效果。以下是关于核函数支持向量机的详细数学模型理论知识推导、实施步骤与参数解读,以及两个多维数据实例(一个未优化模型&a…...
二分查找(折半查找)
这次不排序了,对排好序的数组做个查找吧 介绍 二分查找排序英文名为BinarySort,是一种效率较高的查找方法要求线性表必须采用顺序存储结构 基本思路 通过不断地将搜索范围缩小一半来找到目标元素: 1、假定数组为arr,需要查找的…...
arcgis紧凑型切片缓存(解决大范围切片,文件数量大的问题)
ArcGIS 切片缓存的紧凑型存储格式是一种优化的存储方式,用于提高切片缓存的存储效率和访问速度。紧凑型存储格式将多个切片文件合并为一个单一的 .bundle 文件,从而减少文件系统的开销和切片的加载时间。这类格式已经应用很久了,我记得2013我…...
ESP32CAM人工智能教学15
ESP32CAM人工智能教学15 Flask服务器TCP连接 小智利用Flask在计算机中创建一个虚拟的网页服务器服务器,让ESP32Cam通过WiFi连接,把摄像头拍摄到的图片发送到电脑中,并在电脑中保存成图片文件。 Flask是用Python编写的网页服务程序WebServer。…...
Pandas 33个冷知识 0721
Pandas 33个冷知识 从Excel读取数据: 使用 pd.read_excel(file.xlsx) 来读取Excel文件。 写入Excel: 使用 df.to_excel(file.xlsx, indexFalse) 将DataFrame写入Excel文件。 创建日期索引: 使用 df.set_index(pd.to_datetime(df[date])) 创建日期索引。 向后填充缺失值: 使用…...
C++ map和set的使用
目录 0.前言 1.关联式容器 2.键值对 3.树形结构的关联式容器 3.1树形结构的特点 3.2树形结构在关联式容器中的应用 4.set 4.1概念与性质 4.2使用 5.multiset 5.1概念与性质 5.2使用 6.map 6.1概念与性质 6.2使用 7.multimap 7.1概念与性质 7.2使用 8.小结 &a…...
yarn的安装和配置以及更新总结,npm的对照使用差异
1. Yarn简介 Yarn 是一个由 Facebook 开发的现代 JavaScript 包管理器,旨在提供更快、更安全、更可靠的包管理体验。 1.1 什么是Yarn Yarn 是一个快速、可靠和安全的 JavaScript 包管理器,它通过并行化操作和智能缓存机制,显著提升了依赖安…...
【Git命令】git rebase之合并提交记录
使用场景 在本地提交了两个commit,但是发现根本没有没必要分为两次,需要想办法把两次提交合并成一个提交;这个时候可以使用如下命令启动交互式变基会话: git rebase -i HEAD~N这里 N 是你想要重新调整的最近的提交数。 如下在本地…...
为什么品牌需要做 IP 形象?
品牌做IP形象的原因有多方面,这些原因共同构成了IP形象在品牌建设中的重要性和价值,主要原因有以下几个方面: 增强品牌识别度与记忆点: IP形象作为品牌的视觉符号,具有独特性和辨识性,能够在消费者心中留…...
Kubernetes 1.24 版弃用 Dockershim 后如何迁移到 containerd 和 CRI-O
在本系列的上一篇文章中,我们讨论了什么是 CRI 和 OCI,Docker、containerd、CRI-O 之间的区别以及它们的架构等。最近,我们得知 Docker 即将从 kubernetes 中弃用!(查看 kubernetes 官方的这篇文章)那么让我…...
70. 爬楼梯【 力扣(LeetCode) 】
一、题目描述 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢? 二、测试用例 示例 1: 输入:n 2 输出:2 解释:有两种方法可以爬到楼顶。 1. 1 阶…...
R语言优雅的把数据基线表(表一)导出到word
基线表(Baseline Table)是医学研究中常用的一种数据表格,用于在研究开始时呈现参与者的初始特征和状态。这些特征通常包括人口统计学数据、健康状况和疾病史、临床指标、实验室检测、生活方式、社会经济等。 本人在既往文章《scitb包1.6版本发…...
深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
云启出海,智联未来|阿里云网络「企业出海」系列客户沙龙上海站圆满落地
借阿里云中企出海大会的东风,以**「云启出海,智联未来|打造安全可靠的出海云网络引擎」为主题的阿里云企业出海客户沙龙云网络&安全专场于5.28日下午在上海顺利举办,现场吸引了来自携程、小红书、米哈游、哔哩哔哩、波克城市、…...
《基于Apache Flink的流处理》笔记
思维导图 1-3 章 4-7章 8-11 章 参考资料 源码: https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...
C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
comfyui 工作流中 图生视频 如何增加视频的长度到5秒
comfyUI 工作流怎么可以生成更长的视频。除了硬件显存要求之外还有别的方法吗? 在ComfyUI中实现图生视频并延长到5秒,需要结合多个扩展和技巧。以下是完整解决方案: 核心工作流配置(24fps下5秒120帧) #mermaid-svg-yP…...
VisualXML全新升级 | 新增数据库编辑功能
VisualXML是一个功能强大的网络总线设计工具,专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑(如DBC、LDF、ARXML、HEX等),并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...
云原生安全实战:API网关Envoy的鉴权与限流详解
🔥「炎码工坊」技术弹药已装填! 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关 作为微服务架构的统一入口,负责路由转发、安全控制、流量管理等核心功能。 2. Envoy 由Lyft开源的高性能云原生…...
Spring Boot 与 Kafka 的深度集成实践(二)
3. 生产者实现 3.1 生产者配置 在 Spring Boot 项目中,配置 Kafka 生产者主要是配置生产者工厂(ProducerFactory)和 KafkaTemplate 。生产者工厂负责创建 Kafka 生产者实例,而 KafkaTemplate 则是用于发送消息的核心组件&#x…...
