多线程事务
一、业务场景
我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。
二、解决方案
我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。
1、工具类代码:
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** @author poxiao* @create 2023-01-05 22:22* <p>* 多线程事务管理器* 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议* 解决基于线程池的多线程事务一致性问题*/
@Slf4j
public class MultiThreadingTransactionManager {/*** 事务管理器*/private final PlatformTransactionManager transactionManager;/*** 超时时间*/private final long timeout;/*** 时间单位*/private final TimeUnit unit;/*** 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0*/private CountDownLatch oneStageLatch = null;/*** 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚*/private final CountDownLatch twoStageLatch = new CountDownLatch(1);/*** 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);/*** 构造方法* @param transactionManager 事务管理器* @param timeout 超时时间* @param unit 时间单位*/public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {this.transactionManager = transactionManager;this.timeout = timeout;this.unit = unit;}/*** 线程池方式执行任务,可保证线程间的事务一致性* @param runnableList 任务列表* @param executor 线程池* @return*/public boolean execute(List<Runnable> runnableList, Executor executor) {// 排除null值runnableList.removeAll(Collections.singleton(null));// 属性初始化innit(runnableList.size());// 遍历任务列表并放入线程池for (Runnable runnable : runnableList) {// 创建线程Thread thread = new Thread() {@Overridepublic void run() {// 如果别的线程执行失败,则该任务就不需要再执行了if (!isSubmit.get()) {log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");oneStageLatch.countDown();return;}// 开启事务TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 执行业务逻辑runnable.run();} catch (Exception e) {// 执行体发生异常,设置回滚isSubmit.set(false);log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);}// 计数器减一oneStageLatch.countDown();try {//等待所有线程任务完成,监控是否有异常,有则统一回滚twoStageLatch.await();// 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时if (isSubmit.get()) {// 提交transactionManager.commit(transactionStatus);log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);} else {// 回滚transactionManager.rollback(transactionStatus);log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);}} catch (InterruptedException e) {e.printStackTrace();}}};executor.execute(thread);}/*** 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0* 主线程发起第二阶段,执行阶段(提交或回滚),根据*/try {// 主线程等待所有线程执行完成,超时时间设置为五秒oneStageLatch.await(timeout, unit);long count = oneStageLatch.getCount();System.out.println("countDownLatch值:" + count);// 主线程等待超时,子线程可能发生长时间阻塞,死锁if (count > 0) {// 设置为回滚isSubmit.set(false);log.info("主线线程等待超时,任务即将全部回滚");}twoStageLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败return isSubmit.get();}/*** 初始化属性* @param size 任务数量*/private void innit(int size) {oneStageLatch = new CountDownLatch(size);}
}
2、业务代码:
(1)线程池参数
我这里采用自定义线程池,线程池参数如下:
@Configuration
public class ThreadPoolConfig {// 获取服务器的cpu个数private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数private static final int COUR_SIZE = CPU_COUNT * 4;private static final int MAX_COUR_SIZE = CPU_COUNT * 8;// 接下来配置一个bean,配置线程池。@Beanpublic Executor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略return threadPoolTaskExecutor;}}
(2)任务业务正常,无异常抛出时正常提交事务情况
public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}
执行时的日志:
执行成功后数据库多次了两条数据
(3)展示出现异常任务时回滚事务情况
public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();//模拟任务出现异常runnableList.add(() -> {int a = 10 / 0;});users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}
执行时的日志:
数据库没有新增的数据
参考文章:
Spring多线程事务解决方案-CSDN博客
两阶段VS三阶段提交协议_两阶段提交-CSDN博客
详解Spring多线程下如何保证事务的一致性-51CTO.COM
多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客
相关文章:

多线程事务
一、业务场景 我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的Trans…...

春秋云境CVE-2020-26048
简介 CuppaCMS是一套内容管理系统(CMS)。 CuppaCMS 2019-11-12之前版本存在安全漏洞,攻击者可利用该漏洞在图像扩展内上传恶意文件,通过使用文件管理器提供的重命名函数的自定义请求,可以将图像扩展修改为PHP…...

MySQL 带游标的存储过程(实验报告)
一、实验名称: 带游标的存储过程 二、实验日期: 2024 年 5月 25 日 三、实验目的: 掌握MySQL带游标的存储过程的创建及调用; 四、实验用的仪器和材料: 硬件:PC电脑一台; 配置࿱…...

结构体(位段)内存分配
结构体由多个数据类型的成员组成。那编译器分配的内存是不是所有成员的字节数总和呢? 首先,stu的内存大小并不为29个字节,即证明结构体内存不是所有成员的字节数和。 其次,stu成员中sex的内存位置不在21,即可推测…...

基于SSH的母婴用品销售管理系统带万字文档
文章目录 母婴商城系统一、项目演示二、项目介绍三、系统部分功能截图四、万字论文参考五、部分代码展示六、底部获取项目源码和万字论文参考(9.9¥带走) 母婴商城系统 一、项目演示 母婴商城系统 二、项目介绍 基于SSH的母婴商城系统 系统…...

说些什么好呢
大一:提前学C和C。学完语法去洛谷或者Acwing二选一,刷300道左右题目。主要培养编程思维,让自己的逻辑能够通过代码实现出来。 现在对算法有点感兴趣但是没有天赋,打不了acm,为就业做准备咯。 大二(算法竞赛)࿱…...

1301-习题1-1高等数学
1. 求下列函数的自然定义域 自然定义域就是使函数有意义的定义域。 常见自然定义域: 开根号 x \sqrt x x : x ≥ 0 x \ge 0 x≥0自变量为分式的分母 1 x \frac{1}{x} x1: x ≠ 0 x \ne 0 x0三角函数 tan x cot x \tan x \cot x …...

C语言之指针进阶(3),函数指针
目录 前言: 一、函数指针变量的概念 二、函数指针变量的创建 三、函数指针变量的使用 四、两段特殊代码的理解 五、typedef 六、函数指针数组 总结: 前言: 本文主要讲述C语言指针中的函数指针,包括函数指针变量的概念、创建…...

RabbitMQ安装及配套Laravel使用
MQ MQ 全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。 为什么需要mq: 解耦:MQ能够使各个系统或组件之间解耦,降低它们之间的耦合度,提高系统的灵活性和可维护性异步处理:通过MQ可以实现异步处理,提高系统响应速度和吞…...

java在类的定义中创建自己的对象?
当在main方法中新建自身所在类的对象,并调用main方法时,会不断循环调用main方法,直到栈溢出 package com.keywordStudy;public class mainTest {static int value 33;public static void main(String[] args) throws Exception{String[] sn…...

掌握C++回调:按值捕获、按引用捕获与弱引用
文章目录 一、按引用捕获和按值捕获1.1 原理1.2 案例 二、弱引用2.1 原理2.2 案例一2.3 案例二:使用base库的弱引用 三、总结 在C回调中,当使用Lambda表达式捕获外部变量时,有两种捕获方式:按值捕获和按引用捕获。 一、按引用捕获…...

抖音运营_如何做出优质的短视频
目录 一 短视频内容的构成 1 图像 2 字幕 3 声音 4 特效 5 描述 6 评论 二 短视频的热门类型 1 颜值圈粉类 2 知识教学类 3 幽默搞笑类 4 商品展示类 5 才艺技能类 6 评论解说类 三 热门短视频的特征 1 产生共鸣 2 正能量 3 紧跟热点话题 4 富有创意 四 短视…...

Day21:Leetcode513.找树左下角的值 +112. 路径总和 113.路径总和ii + 106.从中序与后序遍历序列构造二叉树
LeetCode:513.找树左下角的值 解决方案: 1.思路 在遍历一个节点时,需要先把它的非空右子节点放入队列,然后再把它的非空左子节点放入队列,这样才能保证从右到左遍历每一层的节点。广度优先搜索所遍历的最后一个节点…...

Java数据结构和算法(B树)
前言 B树又叫平衡的多路搜索树;平衡的意思是又满足平衡二叉树的一些性质,左树大于右树; 多路意思是,可以多个结点,不再是像二叉树只有两个结点; 实现原理 B树是一种自平衡的搜索树,通常用于实…...

成为程序员后我都明白了什么?从入行到弃坑?
作为一个入行近10年的php程序员,真心感觉一切都才刚开始,对计算机,编程语言的理解也好,程序员中年危机也罢,之前都是听别人说的,真的自己到了这个水平,这个年龄才深刻体会到这其中的种种。 我一…...

python --创建固定字符串长度,先进先出
a 123def concatenate_within_limit(b, new_string):# 计算新字符串与a的长度之和a btotal_length len(a) len(new_string)# 如果长度超过1024,从前面删除足够的字符if total_length > 5:diff total_length - 5a a[diff:] new_string # 删除前diff个字符…...

容器化部署
目录 docker容器化部署 怎样使用Docker Compose或Kubernetes等容器编排工具来管理和扩展联邦学习系统 使用Docker Compose...

国产数据库TiDB的常用方法
TiDB的常用方法主要涉及安装配置、数据操作、性能调优以及监控和维护等方面。以下是对这些常用方法的归纳和介绍: 1. 安装与配置 安装TiDB:根据官方文档的指引,用户可以按照步骤进行TiDB的安装。配置TiDB:安装完成后,…...

基于DdddOcr通用验证码离线本地识别SDK搭建个人云打码接口Api
前言 最近介绍了一款免费的验证码识别网站,识别效率太低,考虑到ddddocr是开源的,决定搭建搭建一个,发现原作者sml2h3已经推出好久了,但是网上没有宝塔安装的教程,于是本次通过宝塔搭建属于自己的带带弟弟OCR通用验证码离线本地识别 原项目地址:https://github.com/sml2…...

2、xss-labs之level2
1、打开页面 2、传入xss代码 payload:<script>alert(xss)</script>,发现返回<script>alert(xss)</script> 3、分析原因 打开f12,没什么发现 看后端源码,在这form表单通过get获取keyword的值赋给$str&am…...

人才测评的应用:人才选拔,岗位晋升,面试招聘测评
人才测评自诞生以来,就被广泛应用在各大方面,不仅是我们熟悉的招聘上,还有其他考核和晋升,都会需要用到人才测评。不知道怎么招聘?或者不懂得如何实现人才晋升?都可以参考人才测评,利用它帮我们…...

前端面试题日常练-day33 【面试题】
题目 希望这些选择题能够帮助您进行前端面试的准备,答案在文末。 在jQuery中,以下哪个选项用于在元素上绑定一个点击事件? a) click() b) bind() c) on() d) trigger() jQuery中,以下哪个选项用于获取元素的属性值? …...

非整数倍数据位宽转换24to128
描述 实现数据位宽转换电路,实现24bit数据输入转换为128bit数据输出。其中,先到的数据应置于输出的高bit位。 电路的接口如下图所示。valid_in用来指示数据输入data_in的有效性,valid_out用来指示数据输出data_out的有效性;clk是时…...

html通过数据改变,图片跟着改变
改变前 改变后 通过数据来控制样式展示 <template><div>通过num控制图标是否更改{{num}}<div class"box"><!-- 如果num大于1则是另一种,样式,如果小时1,则是另一种样式 --><div class"item&qu…...

centos7.9 安装SqlServer
1、导入Microsoft SQL Server CentOS存储库: sudo curl -o /etc/yum.repos.d/mssql-server.repo https://packages.microsoft.com/config/rhel/7/mssql-server-2019.repo2、安装SQL Server: sudo yum install -y mssql-server假如机器内存不足2G 需要对…...

Idea中flume的Interceptor的编写教程
1.新建-项目-新建项目 注意位置是将来打包文件存放的位置,即我们打包好的文件在这/export/data个目录下寻找 2. 在maven项目中导入依赖 Pom.xml文件中写入 <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifa…...

java单元测试:JUnit测试运行器
JUnit测试运行器(Test Runner)决定了JUnit如何执行测试。JUnit有多个测试运行器,每个运行器都有特定的功能和用途。 1. 默认运行器 当没有显式指定运行器时,JUnit会使用默认运行器,这在JUnit 4和JUnit 5之间有所不同…...

网络模型—BIO、NIO、IO多路复用、信号驱动IO、异步IO
一、用户空间和内核空间 以Linux系统为例,ubuntu和CentOS是Linux的两种比较常见的发行版,任何Linux发行版,其系统内核都是Linux。我们在发行版上操作应用,如Redis、Mysql等其实是无法直接执行访问计算机硬件(如cpu,内存…...

智能语义识别电影机器人的rasa实现
文章目录 0.前言1.项目整体框架2.rasa训练数据结构4.rasa启动命令及用到的API 0.前言 最近做了一个智能电影机器人的项目,我主要负责用户语义意图识别,用的框架是rasa,对应的版本为 3.6.15,对应的安装命令为: pip3 install rasa…...

C# 实现腾讯云 IM 常用 REST API 之会话管理
目录 关于腾讯 IM REST API 开发前准备 范例运行环境 常用会话管理API 查询账号会话总未读数 查询单聊会话消息记录 下载最近会话记录 小结 关于腾讯 IM REST API REST API 是腾讯即时通信 IM 提供给服务端的一组 HTTP 后台管理接口,如消息管理、群组管理…...