Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 分布式事务
Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务
文章目录
- Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源事务,分布式事务
- 0.前言
- 1. 基础介绍
- ConnectionFactory
- AbstractRoutingDataSource 动态路由数据源的抽象类
- DynamicLocalTransactionInterceptor 动态的本地事务拦截器
- 3. 使用步骤示例
- 4. 官方源码分析
- 5. 参考资料
0.前言
背景
处理多数据源事务一直是一个复杂而棘手的问题,通常我们有两种主流的解决方法。
第一种是通过Atomikos手动创建多数据源事务,这种方法更适合数据源数量较少,参数配置不复杂,对性能要求不高的项目。然而,这种方法的最大困难在于需要手动配置大量设置,这可能会消耗大量时间。
第二种是通过使用Seata等分布式事务解决方案。这种方法的难点在于需要建立并维护像Seata-server这样的统一管理中心。
今天我们使用Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 实现分布式事务和本地多数据源事务。
每种解决方案都有其适用的场景,然而在实际操作中,我经常接到如下的问题:
“我为什么在添加了事务注解之后,数据源切换还是失败了?”
“我了解到这涉及到分布式事务,但我并不想使用Seata。我的场景比较简单,有没有不需要依赖第三方的解决方案?”
这些问题突显出在现实工作中,我们可能需要更灵活、更简便的解决方案来处理多数据源事务问题。


1. 基础介绍
自从3.3.0开始,由seata的核心贡献者https://github.com/a364176773 贡献了基于connection代理的方案。
完整代码 https://github.com/baomidou/dynamic-datasource-spring-boot-starter/commit/f0cbad193528296eeb64faa76c79743afbdd811d
建议从3.4.0版本开始使用,其修复了一个功能,老版本不加@DS只加@DSTransactional会报错。

核心的几处代码
@Role(value = BeanDefinition.ROLE_INFRASTRUCTURE)@ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "seata", havingValue = "false",matchIfMissing = true)@Beanpublic Advisor localTransactionAdvisor() {AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();pointcut.setExpression("@annotation(com.baomidou.dynamic.datasource.annotation.DSTransactional)");return new DefaultPointcutAdvisor(pointcut, new DynamicTransactionAdvisor());}
我们可以看到通过spring.datasource.dynamic.seata=true配置来启用条件注解。这个是dynamic-datasource支持seata事务的开发和入口。
ConnectionFactory
ConnectionFactory 是一个工厂类,主要的作用是管理数据库连接,并提供获取和存储数据库连接的功能。
-
存储每个线程独立的数据库连接:
ConnectionFactory使用ThreadLocal为每个线程提供其自己的数据库连接池,这样可以防止在多线程环境中数据库连接的混乱。 -
提供获取数据库连接的方法:
ConnectionFactory提供getConnection方法,使得在同一个线程中的多个模块可以共享同一个数据库连接。 -
提供存储数据库连接的方法:
ConnectionFactory提供putConnection方法,可以存储新的数据库连接到当前线程的数据库连接池中。 -
提供通知数据库连接的方法:
ConnectionFactory提供notify方法,可以对当前线程的所有数据库连接进行统一的操作,比如提交或者回滚事务。
通过这些功能,ConnectionFactory 实现了数据库连接的有效管理,保证了在同一线程中对多个数据库进行操作时,可以共享同一连接,实现事务管理。核心代码如下。大家可以借鉴
package com.baomidou.dynamic.datasource.tx;import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author funkye*/
public class ConnectionFactory {// 使用ThreadLocal来保存与当前线程相关的数据库连接信息,以Map形式存储,Map中的key为数据源名称,value为对应的数据库连接代理类private static final ThreadLocal<Map<String, ConnectionProxy>> CONNECTION_HOLDER =new ThreadLocal<Map<String, ConnectionProxy>>() {@Overrideprotected Map<String, ConnectionProxy> initialValue() {return new ConcurrentHashMap<>(8);}};// 存储数据库连接到当前线程的连接池中,如果当前线程的连接池中没有该数据源的连接,则新建一个并放入public static void putConnection(String ds, ConnectionProxy connection) {Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();if (!concurrentHashMap.containsKey(ds)) {try {connection.setAutoCommit(false);} catch (SQLException e) {e.printStackTrace();}concurrentHashMap.put(ds, connection);}}// 从当前线程的连接池中获取指定数据源的数据库连接public static ConnectionProxy getConnection(String ds) {return CONNECTION_HOLDER.get().get(ds);}// 对当前线程的所有数据库连接执行通知操作,根据参数state决定是提交还是回滚,如果在执行过程中发生错误,则在所有连接处理完后抛出public static void notify(Boolean state) throws Exception {Exception exception = null;try {Map<String, ConnectionProxy> concurrentHashMap = CONNECTION_HOLDER.get();for (ConnectionProxy connectionProxy : concurrentHashMap.values()) {try {connectionProxy.notify(state);} catch (SQLException e) {exception = e;}}} finally {CONNECTION_HOLDER.remove(); //清除当前线程的连接池if (exception != null) {throw exception;}}}}
AbstractRoutingDataSource 动态路由数据源的抽象类
动态路由数据源的抽象类,用于根据不同的业务需要,动态地选择需要使用的数据源。关键的方法是getConnection()和getConnection(String username, String password),这两个方法会根据当前是否存在全局事务来动态地选择获取原始的数据库连接还是数据库连接代理。
public abstract class AbstractRoutingDataSource extends AbstractDataSource {// 抽象方法,子类需要实现该方法以确定数据源protected abstract DataSource determineDataSource();// 抽象方法,子类需要实现该方法以确定默认的数据源名称protected abstract String getPrimary();// 获取数据库连接,根据事务上下文中是否有XID来判断是否需要获取代理连接@Overridepublic Connection getConnection() throws SQLException {String xid = TransactionContext.getXID();if (StringUtils.isEmpty(xid)) {// 如果没有XID,说明当前不处于全局事务中,直接获取原始连接return determineDataSource().getConnection();} else {// 如果有XID,说明当前处于全局事务中,需要获取代理连接String ds = DynamicDataSourceContextHolder.peek();ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;ConnectionProxy connection = ConnectionFactory.getConnection(ds);return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;}}// 与上面的方法类似,只不过这个方法可以传入用户名和密码来获取数据库连接@Overridepublic Connection getConnection(String username, String password) throws SQLException {String xid = TransactionContext.getXID();if (StringUtils.isEmpty(xid)) {return determineDataSource().getConnection(username, password);} else {String ds = DynamicDataSourceContextHolder.peek();ds = StringUtils.isEmpty(ds) ? getPrimary() : ds;ConnectionProxy connection = ConnectionFactory.getConnection(ds);return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password)): connection;}}// 创建数据库连接代理,并将代理连接放入连接工厂private Connection getConnectionProxy(String ds, Connection connection) {ConnectionProxy connectionProxy = new ConnectionProxy(connection, ds);ConnectionFactory.putConnection(ds, connectionProxy);return connectionProxy;}// 获取指定类型的代理对象@Override@SuppressWarnings("unchecked")public <T> T unwrap(Class<T> iface) throws SQLException {if (iface.isInstance(this)) {return (T) this;}return determineDataSource().unwrap(iface);}// 判断是否是指定类型的代理对象@Overridepublic boolean isWrapperFor(Class<?> iface) throws SQLException {return (iface.isInstance(this) || determineDataSource().isWrapperFor(iface));}
}
DynamicLocalTransactionInterceptor 动态的本地事务拦截器
动态的本地事务拦截器。基本思想是在方法调用前后添加事务处理的逻辑。当这个拦截器被应用到某个方法时,那么在调用这个方法时,会首先检查当前是否已经存在事务,如果存在则直接调用原始方法。如果不存在,则会先开启一个新的事务,然后调用原始方法,方法结束后根据方法执行的结果来提交或回滚事务。入口在这,看一眼就懂了。

// 实现MethodInterceptor接口定义拦截器
public class DynamicLocalTransactionInterceptor implements MethodInterceptor {@Override// invoke方法会在原方法执行前后进行拦截public Object invoke(MethodInvocation methodInvocation) throws Throwable {// 如果当前上下文中已存在事务,则直接调用原方法,不进行拦截处理if (!StringUtils.isEmpty(TransactionContext.getXID())) {return methodInvocation.proceed();}// 定义一个状态标志,标记事务是否执行成功boolean state = true;Object o;// 开启一个新的事务LocalTxUtil.startTransaction();try {// 调用原始方法o = methodInvocation.proceed();} catch (Exception e) {// 如果原方法执行抛出异常,则标记事务执行失败state = false;throw e;} finally {// 根据事务执行状态,提交或回滚事务if (state) {LocalTxUtil.commit();} else {LocalTxUtil.rollback();}}// 返回原方法的执行结果return o;}
}
3. 使用步骤示例
官方示例:https://github.com/dynamic-datasource/dynamic-datasource-samples/tree/master/tx-samples/tx-local-sample
完整示例项目 数据库都已准备好,可以直接运行测试。http://localhost:8080/doc.html
示例项目A,B,C分别对应OrderService,ProductService,AccountService。分别是独立的数据库。
用户下单分别调用产品库扣库存,账户库扣余额。
如果库存不足,或用户余额不足都抛出RuntimeException,触发整体回滚。
@Slf4j
@Service
@AllArgsConstructor
public class OrderService {private final OrderMapper orderMapper;private final AccountService accountService;private final ProductService productService;//@DS("order") 这里不需要,因为order是默认库,如果开启事务的不是默认库则必须加@DSTransactional //注意这里开启事务public void placeOrder(PlaceOrderRequest request) {log.info("=============ORDER START=================");Long userId = request.getUserId();Long productId = request.getProductId();Integer amount = request.getAmount();log.info("收到下单请求,用户:{}, 商品:{},数量:{}", userId, productId, amount);log.info("当前 XID: {}", TransactionContext.getXID());Order order = Order.builder().userId(userId).productId(productId).status(OrderStatus.INIT).amount(amount).build();orderMapper.insert(order);log.info("订单一阶段生成,等待扣库存付款中");// 扣减库存并计算总价Double totalPrice = productService.reduceStock(productId, amount);// 扣减余额accountService.reduceBalance(userId, totalPrice);order.setStatus(OrderStatus.SUCCESS);order.setTotalPrice(totalPrice);orderMapper.updateById(order);log.info("订单已成功下单");log.info("=============ORDER END=================");}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ProductService {private final ProductMapper productMapper;@DS("product")public Double reduceStock(Long productId, Integer amount) {log.info("=============PRODUCT START=================");log.info("当前 XID: {}", TransactionContext.getXID());// 检查库存Product product = productMapper.selectById(productId);Assert.notNull(product, "商品不存在");Integer stock = product.getStock();log.info("商品编号为 {} 的库存为{},订单商品数量为{}", productId, stock, amount);if (stock < amount) {log.warn("商品编号为{} 库存不足,当前库存:{}", productId, stock);throw new RuntimeException("库存不足");}log.info("开始扣减商品编号为 {} 库存,单价商品价格为{}", productId, product.getPrice());// 扣减库存int currentStock = stock - amount;product.setStock(currentStock);productMapper.updateById(product);double totalPrice = product.getPrice() * amount;log.info("扣减商品编号为 {} 库存成功,扣减后库存为{}, {} 件商品总价为 {} ", productId, currentStock, amount, totalPrice);log.info("=============PRODUCT END=================");return totalPrice;}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class AccountService {private final AccountMapper accountMapper;@DS("account")public void reduceBalance(Long userId, Double price) {log.info("=============ACCOUNT START=================");log.info("当前 XID: {}", TransactionContext.getXID());Account account = accountMapper.selectById(userId);Assert.notNull(account, "用户不存在");Double balance = account.getBalance();log.info("下单用户{}余额为 {},商品总价为{}", userId, balance, price);if (balance < price) {log.warn("用户 {} 余额不足,当前余额:{}", userId, balance);throw new RuntimeException("余额不足");}log.info("开始扣减用户 {} 余额", userId);double currentBalance = account.getBalance() - price;account.setBalance(currentBalance);accountMapper.updateById(account);log.info("扣减用户 {} 余额成功,扣减后用户账户余额为{}", userId, currentBalance);log.info("=============ACCOUNT END=================");}
}
4. 官方源码分析
5. 参考资料
- dynamic-datasource GitHub 仓库 ↗:dynamic-datasource 的官方 GitHub 仓库,包含源代码、文档和示例等资源。
相关文章:
Springboot+mybatis-plus+dynamic-datasource+Druid 多数据源 分布式事务
Springbootmybatis-plusdynamic-datasourceDruid 多数据源事务,分布式事务 文章目录 Springbootmybatis-plusdynamic-datasourceDruid 多数据源事务,分布式事务0.前言1. 基础介绍ConnectionFactoryAbstractRoutingDataSource 动态路由数据源的抽象类 Dyn…...
673. 最长递增子序列的个数
673. 最长递增子序列的个数 原题链接:完成情况:解题思路:方法一:动态规划方法二:贪心 前缀和 二分查找 参考代码:__673最长递增子序列的个数__动态规划__673最长递增子序列的个数__贪心_前缀和_二分查找…...
Android12之ABuffer数据处理(三十四)
简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧和懒惰的唯一良药. 更多原创,欢迎关注:Android…...
whisper 语音识别项目部署
1.安装anaconda软件 在如下网盘免费获取软件: 链接:https://pan.baidu.com/s/1zOZCQOeiDhx6ebHh5zNasA 提取码:hfnd 2.使用conda命令创建python3.8环境 conda create -n whisper python3.83.进入whisper虚拟环境 conda activate whisper4.…...
实例044 在关闭窗口前加入确认对话框
实例说明 用户对程序进行操作时,难免会有错误操作的情况,例如不小心关闭程序,如果尚有许多资料没有保存,那么损失将非常严重,所以最好使程序具有灵活的交互性。人机交互过程一般都是通过对话框来实现的,对话…...
子查询和事务隔离以及用户管理
一、子查询 子查询是另一个语句中的select语句嵌套在另一个select中。注意子查询语法上必须使用()包起来。 嵌套的那个语句返回的结果有可能是: 一个字段,一行记录,一个列或一个表。嵌套的位置 where / having语句里面作为条件使用在from语…...
uniapp 滚动到指定元素的位置(锚点)
需求:在页面中,不管位于何处,点击按钮页面滚动到对应的标题位置。 最简单有效的方式(直接复制改数据就行) 使用 scroll-view 标签的属性:scroll-top(距离值 num) 或 scroll-into-view(子元素的id,不能以…...
Spring AOP 的 afterReturing 返回值是否能修改问题
文章目录 结论举例子原因外传 结论 最近要搞脱敏信息,所以,想了几种方案,最后使用全局的接口拦截,但是,又不能用注解的方式,毕竟是几年的老产品,有很多限制。 中间尝试过使用Spring AOP 的 aft…...
MyBatis分页插件PageHelper的使用及特殊字符的处理
目录 一、PageHelper简介 1.什么是分页 2.PageHelper是什么 3.使用PageHelper的优点 二、PageHelper插件的使用 原生limit查询 1. 导入pom依赖 2. Mybatis.cfg.xml 配置拦截器 3. 使用PageHelper进行分页 三、特殊字符的处理 1.SQL注入: 2.XML转义&#…...
[语音识别] 基于Python构建简易的音频录制与语音识别应用
语音识别技术的快速发展为实现更多智能化应用提供了无限可能。本文旨在介绍一个基于Python实现的简易音频录制与语音识别应用。文章简要介绍相关技术的应用,重点放在音频录制方面,而语音识别则关注于调用相关的语音识别库。本文将首先概述一些音频基础概…...
Matlab彩色图像转索引图像
索引图像 索引图像是一种把像素值直接作为RGB调色板下标的图像。索引图像包括一个数据矩阵X,一个调色板矩阵map,也称为颜色映像矩阵。其中,数据矩阵X可以是8位无符号整型、16位无符号整型或双精度类型。调色板矩阵map是一个m3的数据阵列&…...
测试框架pytest教程(11)-pytestAPI
常量 pytest.__version__ #输出pytest版本 pytest.version_tuple #输出版本的元组形式 功能 pytest.approx pytest.approx 是一个用于进行数值近似比较的 pytest 断言工具。 在测试中,有时候需要对浮点数或其他具有小数部分的数值进行比较。然而,由于…...
Docker自学:利用FastAPI建立一个简单的web app
环境配置:下载Docker Desktop 文件一:main.py from typing import Unionfrom fastapi import FastAPIimport uvicornapp FastAPI()app.get("/") def read_root():return {"Hello": "World"}app.get("/items/{item…...
微调bert做学术论文分类(以科大讯飞学术论文分类挑战赛为例)
代码 12-How to Fine-Tune BERT for Text Classification:链接:https://pan.baidu.com/s/1EKggbyC4ZW-ufnDW45eKzA 提取码:k3b2 baseline 链接:https://pan.baidu.com/s/12hkZNJjQ__FGAHiF3fifvQ 提取码:88tb 数据…...
Springboot中sharding-jdbc的API模式并使用自定义算法
Springboot中sharding-jdbc的API模式并使用自定义算法 可配合AbstractRoutingData使用切换数据源 程序用到了AbstractRoutingData来切换数据源(数据源是自定义的格式编写并没有用springboot的自动装配的格式写),但是又用到sharding-jdbc进行…...
MySQL回表是什么?哪些情况下会回表
🏆作者简介,黑夜开发者,全栈领域新星创作者✌,CSDN博客专家,阿里云社区专家博主,2023年6月CSDN上海赛道top4。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责…...
VR、AR、MR 傻傻分不清楚?区别的底层逻辑?
VR是一种能够制作虚拟物体并与人互动的基础技术。它与操作者所处的环境无关。AR可以让在特定位置出现或消失。MR可以让虚拟物体与真实物体进行互动。 AR和MR的大部分应用场景都是随机的,所以硬件基本都采用手机和眼镜。提升了便携性。牺牲了性能。这就导致了AR与MR…...
VScode运行C语言出现的调试问题 lauch:program does not exist 解决方法
"lauch:program does not exist"错误通常表示编译器或调试器无法找到指定的可执行文件。这可能是由于几个原因引起的。首先,确保你的源代码文件夹路径不包含中文字符,因为这可能导致编译器无法识别文件。其次,检查你的launch.json文…...
云原生安全:保护现代化应用的新一代安全策略
随着云计算和容器技术的快速发展,云原生应用已成为现代化软件开发和部署的主流趋势。然而,随之而来的安全挑战也变得更加复杂和严峻。本文将深入探讨云原生安全的概念、原则和最佳实践,帮助您理解如何有效保护云原生应用和敏感数据。 第一部…...
mysql操作
1、字符转Decimal CAST(column AS DECIMAL(9,2)) 2、将计算结果取两位小数: round(column, 2) 3、查询非空 select * from table_XX where id is not null; 4、连表update更新 update a inner join (select yy from b) c on a.id c.id set a.xx c.yy...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...
ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...
Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
使用Spring AI和MCP协议构建图片搜索服务
目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式(本地调用) SSE模式(远程调用) 4. 注册工具提…...
