当前位置: 首页 > news >正文

多线程事务

一、业务场景

        我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。

二、解决方案

        我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。

 

1、工具类代码: 

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** @author poxiao* @create 2023-01-05 22:22* <p>* 多线程事务管理器* 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议* 解决基于线程池的多线程事务一致性问题*/
@Slf4j
public class MultiThreadingTransactionManager {/*** 事务管理器*/private final PlatformTransactionManager transactionManager;/*** 超时时间*/private final long timeout;/*** 时间单位*/private final TimeUnit unit;/*** 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0*/private CountDownLatch oneStageLatch = null;/*** 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚*/private final CountDownLatch twoStageLatch = new CountDownLatch(1);/*** 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);/*** 构造方法* @param transactionManager 事务管理器* @param timeout 超时时间* @param unit 时间单位*/public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {this.transactionManager = transactionManager;this.timeout = timeout;this.unit = unit;}/*** 线程池方式执行任务,可保证线程间的事务一致性* @param runnableList 任务列表* @param executor 线程池* @return*/public boolean execute(List<Runnable> runnableList, Executor executor) {// 排除null值runnableList.removeAll(Collections.singleton(null));// 属性初始化innit(runnableList.size());// 遍历任务列表并放入线程池for (Runnable runnable : runnableList) {// 创建线程Thread thread = new Thread() {@Overridepublic void run() {// 如果别的线程执行失败,则该任务就不需要再执行了if (!isSubmit.get()) {log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");oneStageLatch.countDown();return;}// 开启事务TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 执行业务逻辑runnable.run();} catch (Exception e) {// 执行体发生异常,设置回滚isSubmit.set(false);log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);}// 计数器减一oneStageLatch.countDown();try {//等待所有线程任务完成,监控是否有异常,有则统一回滚twoStageLatch.await();// 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时if (isSubmit.get()) {// 提交transactionManager.commit(transactionStatus);log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);} else {// 回滚transactionManager.rollback(transactionStatus);log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);}} catch (InterruptedException e) {e.printStackTrace();}}};executor.execute(thread);}/*** 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0* 主线程发起第二阶段,执行阶段(提交或回滚),根据*/try {// 主线程等待所有线程执行完成,超时时间设置为五秒oneStageLatch.await(timeout, unit);long count = oneStageLatch.getCount();System.out.println("countDownLatch值:" + count);// 主线程等待超时,子线程可能发生长时间阻塞,死锁if (count > 0) {// 设置为回滚isSubmit.set(false);log.info("主线线程等待超时,任务即将全部回滚");}twoStageLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败return isSubmit.get();}/*** 初始化属性* @param size 任务数量*/private void innit(int size) {oneStageLatch = new CountDownLatch(size);}
}

2、业务代码:

(1)线程池参数

我这里采用自定义线程池,线程池参数如下:

@Configuration
public class ThreadPoolConfig {// 获取服务器的cpu个数private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数private static final int COUR_SIZE = CPU_COUNT * 4;private static final int MAX_COUR_SIZE = CPU_COUNT * 8;// 接下来配置一个bean,配置线程池。@Beanpublic Executor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略return threadPoolTaskExecutor;}}

(2)任务业务正常,无异常抛出时正常提交事务情况

public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志: 

执行成功后数据库多次了两条数据 

 (3)展示出现异常任务时回滚事务情况

 public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();//模拟任务出现异常runnableList.add(() -> {int a = 10 / 0;});users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志:  

数据库没有新增的数据 

 

参考文章: 

Spring多线程事务解决方案-CSDN博客

 两阶段VS三阶段提交协议_两阶段提交-CSDN博客

详解Spring多线程下如何保证事务的一致性-51CTO.COM 

多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客 

相关文章:

多线程事务

一、业务场景 我们在工作中经常会到往数据库里插入大量数据的工作&#xff0c;但是既需要保证数据的一致性&#xff0c;又要保证程序执行的效率。因此需要在多线程中使用事务&#xff0c;这样既可以保证数据的一致性&#xff0c;又能保证程序的执行效率。但是spring自带的Trans…...

春秋云境CVE-2020-26048

简介 CuppaCMS是一套内容管理系统&#xff08;CMS&#xff09;。 CuppaCMS 2019-11-12之前版本存在安全漏洞&#xff0c;攻击者可利用该漏洞在图像扩展内上传恶意文件&#xff0c;通过使用文件管理器提供的重命名函数的自定义请求&#xff0c;可以将图像扩展修改为PHP&#xf…...

MySQL 带游标的存储过程(实验报告)

一、实验名称&#xff1a; 带游标的存储过程 二、实验日期&#xff1a; 2024 年 5月 25 日 三、实验目的&#xff1a; 掌握MySQL带游标的存储过程的创建及调用&#xff1b; 四、实验用的仪器和材料&#xff1a; 硬件&#xff1a;PC电脑一台&#xff1b; 配置&#xff1…...

结构体(位段)内存分配

结构体由多个数据类型的成员组成。那编译器分配的内存是不是所有成员的字节数总和呢&#xff1f; 首先&#xff0c;stu的内存大小并不为29个字节&#xff0c;即证明结构体内存不是所有成员的字节数和。   其次&#xff0c;stu成员中sex的内存位置不在21&#xff0c;即可推测…...

基于SSH的母婴用品销售管理系统带万字文档

文章目录 母婴商城系统一、项目演示二、项目介绍三、系统部分功能截图四、万字论文参考五、部分代码展示六、底部获取项目源码和万字论文参考&#xff08;9.9&#xffe5;带走&#xff09; 母婴商城系统 一、项目演示 母婴商城系统 二、项目介绍 基于SSH的母婴商城系统 系统…...

说些什么好呢

大一&#xff1a;提前学C和C。学完语法去洛谷或者Acwing二选一&#xff0c;刷300道左右题目。主要培养编程思维&#xff0c;让自己的逻辑能够通过代码实现出来。 现在对算法有点感兴趣但是没有天赋&#xff0c;打不了acm&#xff0c;为就业做准备咯。 大二(算法竞赛)&#xff1…...

1301-习题1-1高等数学

1. 求下列函数的自然定义域 自然定义域就是使函数有意义的定义域。 常见自然定义域&#xff1a; 开根号 x \sqrt x x ​&#xff1a; x ≥ 0 x \ge 0 x≥0自变量为分式的分母 1 x \frac{1}{x} x1​&#xff1a; x ≠ 0 x \ne 0 x0三角函数 tan ⁡ x cot ⁡ x \tan x \cot x …...

C语言之指针进阶(3),函数指针

目录 前言&#xff1a; 一、函数指针变量的概念 二、函数指针变量的创建 三、函数指针变量的使用 四、两段特殊代码的理解 五、typedef 六、函数指针数组 总结&#xff1a; 前言&#xff1a; 本文主要讲述C语言指针中的函数指针&#xff0c;包括函数指针变量的概念、创建…...

RabbitMQ安装及配套Laravel使用

MQ MQ 全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。 为什么需要mq: 解耦:MQ能够使各个系统或组件之间解耦,降低它们之间的耦合度,提高系统的灵活性和可维护性异步处理:通过MQ可以实现异步处理,提高系统响应速度和吞…...

java在类的定义中创建自己的对象?

当在main方法中新建自身所在类的对象&#xff0c;并调用main方法时&#xff0c;会不断循环调用main方法&#xff0c;直到栈溢出 package com.keywordStudy;public class mainTest {static int value 33;public static void main(String[] args) throws Exception{String[] sn…...

掌握C++回调:按值捕获、按引用捕获与弱引用

文章目录 一、按引用捕获和按值捕获1.1 原理1.2 案例 二、弱引用2.1 原理2.2 案例一2.3 案例二&#xff1a;使用base库的弱引用 三、总结 在C回调中&#xff0c;当使用Lambda表达式捕获外部变量时&#xff0c;有两种捕获方式&#xff1a;按值捕获和按引用捕获。 一、按引用捕获…...

抖音运营_如何做出优质的短视频

目录 一 短视频内容的构成 1 图像 2 字幕 3 声音 4 特效 5 描述 6 评论 二 短视频的热门类型 1 颜值圈粉类 2 知识教学类 3 幽默搞笑类 4 商品展示类 5 才艺技能类 6 评论解说类 三 热门短视频的特征 1 产生共鸣 2 正能量 3 紧跟热点话题 4 富有创意 四 短视…...

Day21:Leetcode513.找树左下角的值 +112. 路径总和 113.路径总和ii + 106.从中序与后序遍历序列构造二叉树

LeetCode&#xff1a;513.找树左下角的值 解决方案&#xff1a; 1.思路 在遍历一个节点时&#xff0c;需要先把它的非空右子节点放入队列&#xff0c;然后再把它的非空左子节点放入队列&#xff0c;这样才能保证从右到左遍历每一层的节点。广度优先搜索所遍历的最后一个节点…...

Java数据结构和算法(B树)

前言 B树又叫平衡的多路搜索树&#xff1b;平衡的意思是又满足平衡二叉树的一些性质&#xff0c;左树大于右树&#xff1b; 多路意思是&#xff0c;可以多个结点&#xff0c;不再是像二叉树只有两个结点&#xff1b; 实现原理 B树是一种自平衡的搜索树&#xff0c;通常用于实…...

成为程序员后我都明白了什么?从入行到弃坑?

作为一个入行近10年的php程序员&#xff0c;真心感觉一切都才刚开始&#xff0c;对计算机&#xff0c;编程语言的理解也好&#xff0c;程序员中年危机也罢&#xff0c;之前都是听别人说的&#xff0c;真的自己到了这个水平&#xff0c;这个年龄才深刻体会到这其中的种种。 我一…...

python --创建固定字符串长度,先进先出

a 123def concatenate_within_limit(b, new_string):# 计算新字符串与a的长度之和a btotal_length len(a) len(new_string)# 如果长度超过1024&#xff0c;从前面删除足够的字符if total_length > 5:diff total_length - 5a a[diff:] new_string # 删除前diff个字符…...

容器化部署

目录 docker容器化部署 怎样使用Docker Compose或Kubernetes等容器编排工具来管理和扩展联邦学习系统 使用Docker Compose...

国产数据库TiDB的常用方法

TiDB的常用方法主要涉及安装配置、数据操作、性能调优以及监控和维护等方面。以下是对这些常用方法的归纳和介绍&#xff1a; 1. 安装与配置 安装TiDB&#xff1a;根据官方文档的指引&#xff0c;用户可以按照步骤进行TiDB的安装。配置TiDB&#xff1a;安装完成后&#xff0c…...

基于DdddOcr通用验证码离线本地识别SDK搭建个人云打码接口Api

前言 最近介绍了一款免费的验证码识别网站,识别效率太低,考虑到ddddocr是开源的,决定搭建搭建一个,发现原作者sml2h3已经推出好久了,但是网上没有宝塔安装的教程,于是本次通过宝塔搭建属于自己的带带弟弟OCR通用验证码离线本地识别 原项目地址:https://github.com/sml2…...

2、xss-labs之level2

1、打开页面 2、传入xss代码 payload&#xff1a;<script>alert(xss)</script>&#xff0c;发现返回<script>alert(xss)</script> 3、分析原因 打开f12&#xff0c;没什么发现 看后端源码&#xff0c;在这form表单通过get获取keyword的值赋给$str&am…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

进程地址空间(比特课总结)

一、进程地址空间 1. 环境变量 1 &#xff09;⽤户级环境变量与系统级环境变量 全局属性&#xff1a;环境变量具有全局属性&#xff0c;会被⼦进程继承。例如当bash启动⼦进程时&#xff0c;环 境变量会⾃动传递给⼦进程。 本地变量限制&#xff1a;本地变量只在当前进程(ba…...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

Robots.txt 文件

什么是robots.txt&#xff1f; robots.txt 是一个位于网站根目录下的文本文件&#xff08;如&#xff1a;https://example.com/robots.txt&#xff09;&#xff0c;它用于指导网络爬虫&#xff08;如搜索引擎的蜘蛛程序&#xff09;如何抓取该网站的内容。这个文件遵循 Robots…...

【python异步多线程】异步多线程爬虫代码示例

claude生成的python多线程、异步代码示例&#xff0c;模拟20个网页的爬取&#xff0c;每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程&#xff1a;允许程序同时执行多个任务&#xff0c;提高IO密集型任务&#xff08;如网络请求&#xff09;的效率…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...