当前位置: 首页 > news >正文

多线程事务

一、业务场景

        我们在工作中经常会到往数据库里插入大量数据的工作,但是既需要保证数据的一致性,又要保证程序执行的效率。因此需要在多线程中使用事务,这样既可以保证数据的一致性,又能保证程序的执行效率。但是spring自带的@Transactional注解无法满足多线程间的事务一致性,因为这几个事务执行的线程不同,无法保持数据的一致性。

二、解决方案

        我的解决方案参考分布式事务2PC(Two-phase commit protocol),各个线程需要等待所有的线程执行完成后才能进行下一步操作,在使用线程池执行任务时,如果线程池的最大线程数小于任务列表的数量,就会发生“死锁”,即获取到线程的任务阻塞等待没有获取线程的任务执行完成,而没有获取线程的任务会在阻塞队列中等待空闲线程的调用。这种情况需要使用一阶段的超时机制来“解开”,超时机制会发送回滚命令,线程池收到后进行回滚,但这种情况任务始终无法提交,再次提交结果依然是等到超时再回滚。再使用中需要结合具体业务来对线程池参数以及数据库连接池参数进行合理的设置。如果这里听的优点迷,可以先看下面具体代码实现再来结合这段文字思考。

 

1、工具类代码: 

import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;/*** @author poxiao* @create 2023-01-05 22:22* <p>* 多线程事务管理器* 基于分布式事务思想,采用2PC(Two-phase commit protocol)协议* 解决基于线程池的多线程事务一致性问题*/
@Slf4j
public class MultiThreadingTransactionManager {/*** 事务管理器*/private final PlatformTransactionManager transactionManager;/*** 超时时间*/private final long timeout;/*** 时间单位*/private final TimeUnit unit;/*** 一阶段门闩,(第一阶段的准备阶段),当所有子线程准备完成时(除“提交/回滚”操作以外的工作都完成),countDownLatch的值为0*/private CountDownLatch oneStageLatch = null;/*** 二阶段门闩,(第二阶段的执行执行),主线程将不再等待子线程执行,直接判定总的任务执行失败,执行第二阶段让等待确认的线程进行回滚*/private final CountDownLatch twoStageLatch = new CountDownLatch(1);/*** 是否提交事务,默认是true(当任一线程发生异常时,isSubmit会被设置为false,即回滚事务)*/private final AtomicBoolean isSubmit = new AtomicBoolean(true);/*** 构造方法* @param transactionManager 事务管理器* @param timeout 超时时间* @param unit 时间单位*/public MultiThreadingTransactionManager(PlatformTransactionManager transactionManager, long timeout, TimeUnit unit) {this.transactionManager = transactionManager;this.timeout = timeout;this.unit = unit;}/*** 线程池方式执行任务,可保证线程间的事务一致性* @param runnableList 任务列表* @param executor 线程池* @return*/public boolean execute(List<Runnable> runnableList, Executor executor) {// 排除null值runnableList.removeAll(Collections.singleton(null));// 属性初始化innit(runnableList.size());// 遍历任务列表并放入线程池for (Runnable runnable : runnableList) {// 创建线程Thread thread = new Thread() {@Overridepublic void run() {// 如果别的线程执行失败,则该任务就不需要再执行了if (!isSubmit.get()) {log.info("当前子线程执行中止,因为线程事务中有子线程执行失败");oneStageLatch.countDown();return;}// 开启事务TransactionStatus transactionStatus = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 执行业务逻辑runnable.run();} catch (Exception e) {// 执行体发生异常,设置回滚isSubmit.set(false);log.error("线程{}:业务发生异常,执行体:{}", Thread.currentThread().getName(), runnable);}// 计数器减一oneStageLatch.countDown();try {//等待所有线程任务完成,监控是否有异常,有则统一回滚twoStageLatch.await();// 根据isSubmit值判断事务是否提交,可能是子线程出现异常,也有可能是子线程执行超时if (isSubmit.get()) {// 提交transactionManager.commit(transactionStatus);log.info("线程{}:事务提交成功,执行体:{}", Thread.currentThread().getName(), runnable);} else {// 回滚transactionManager.rollback(transactionStatus);log.info("线程{}:事务回滚成功,执行体:{}", Thread.currentThread().getName(), runnable);}} catch (InterruptedException e) {e.printStackTrace();}}};executor.execute(thread);}/*** 主线程担任协调者,当第一阶段所有参与者准备完成,oneStageLatch的计数为0* 主线程发起第二阶段,执行阶段(提交或回滚),根据*/try {// 主线程等待所有线程执行完成,超时时间设置为五秒oneStageLatch.await(timeout, unit);long count = oneStageLatch.getCount();System.out.println("countDownLatch值:" + count);// 主线程等待超时,子线程可能发生长时间阻塞,死锁if (count > 0) {// 设置为回滚isSubmit.set(false);log.info("主线线程等待超时,任务即将全部回滚");}twoStageLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}// 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败return isSubmit.get();}/*** 初始化属性* @param size 任务数量*/private void innit(int size) {oneStageLatch = new CountDownLatch(size);}
}

2、业务代码:

(1)线程池参数

我这里采用自定义线程池,线程池参数如下:

@Configuration
public class ThreadPoolConfig {// 获取服务器的cpu个数private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();// 获取cpu个数private static final int COUR_SIZE = CPU_COUNT * 4;private static final int MAX_COUR_SIZE = CPU_COUNT * 8;// 接下来配置一个bean,配置线程池。@Beanpublic Executor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(COUR_SIZE);// 设置核心线程数threadPoolTaskExecutor.setMaxPoolSize(MAX_COUR_SIZE);// 配置最大线程数threadPoolTaskExecutor.setQueueCapacity(MAX_COUR_SIZE * 4);// 配置队列容量(这里设置成最大线程数的四倍)threadPoolTaskExecutor.setThreadNamePrefix("thirdParty-thread");// 给线程池设置名称threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 设置任务的拒绝策略return threadPoolTaskExecutor;}}

(2)任务业务正常,无异常抛出时正常提交事务情况

public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志: 

执行成功后数据库多次了两条数据 

 (3)展示出现异常任务时回滚事务情况

 public Result<?> testTransaction() throws SQLException {List<User> users = new LinkedList<>();User user = new User();user.setName("1111");users.add(user);User user1 = new User();user1.setName("2222");users.add(user1);MultiThreadingTransactionManager multiThreadingTransactionManage = new MultiThreadingTransactionManager(transactionManager, 60, TimeUnit.SECONDS);List<Runnable> runnableList = new ArrayList<>();//模拟任务出现异常runnableList.add(() -> {int a = 10 / 0;});users.forEach((x) -> {runnableList.add(new Runnable() {@Overridepublic void run() {System.out.println("当前线程:" + Thread.currentThread().getName() + "插入数据:" + x);secondUserMapper.insertUser(x);}});});multiThreadingTransactionManage.execute(runnableList, threadPoolTaskExecutor);return Result.success(1);}

执行时的日志:  

数据库没有新增的数据 

 

参考文章: 

Spring多线程事务解决方案-CSDN博客

 两阶段VS三阶段提交协议_两阶段提交-CSDN博客

详解Spring多线程下如何保证事务的一致性-51CTO.COM 

多线程结合sprongboot事务(完善)_springboot多线程事务-CSDN博客 

相关文章:

多线程事务

一、业务场景 我们在工作中经常会到往数据库里插入大量数据的工作&#xff0c;但是既需要保证数据的一致性&#xff0c;又要保证程序执行的效率。因此需要在多线程中使用事务&#xff0c;这样既可以保证数据的一致性&#xff0c;又能保证程序的执行效率。但是spring自带的Trans…...

春秋云境CVE-2020-26048

简介 CuppaCMS是一套内容管理系统&#xff08;CMS&#xff09;。 CuppaCMS 2019-11-12之前版本存在安全漏洞&#xff0c;攻击者可利用该漏洞在图像扩展内上传恶意文件&#xff0c;通过使用文件管理器提供的重命名函数的自定义请求&#xff0c;可以将图像扩展修改为PHP&#xf…...

MySQL 带游标的存储过程(实验报告)

一、实验名称&#xff1a; 带游标的存储过程 二、实验日期&#xff1a; 2024 年 5月 25 日 三、实验目的&#xff1a; 掌握MySQL带游标的存储过程的创建及调用&#xff1b; 四、实验用的仪器和材料&#xff1a; 硬件&#xff1a;PC电脑一台&#xff1b; 配置&#xff1…...

结构体(位段)内存分配

结构体由多个数据类型的成员组成。那编译器分配的内存是不是所有成员的字节数总和呢&#xff1f; 首先&#xff0c;stu的内存大小并不为29个字节&#xff0c;即证明结构体内存不是所有成员的字节数和。   其次&#xff0c;stu成员中sex的内存位置不在21&#xff0c;即可推测…...

基于SSH的母婴用品销售管理系统带万字文档

文章目录 母婴商城系统一、项目演示二、项目介绍三、系统部分功能截图四、万字论文参考五、部分代码展示六、底部获取项目源码和万字论文参考&#xff08;9.9&#xffe5;带走&#xff09; 母婴商城系统 一、项目演示 母婴商城系统 二、项目介绍 基于SSH的母婴商城系统 系统…...

说些什么好呢

大一&#xff1a;提前学C和C。学完语法去洛谷或者Acwing二选一&#xff0c;刷300道左右题目。主要培养编程思维&#xff0c;让自己的逻辑能够通过代码实现出来。 现在对算法有点感兴趣但是没有天赋&#xff0c;打不了acm&#xff0c;为就业做准备咯。 大二(算法竞赛)&#xff1…...

1301-习题1-1高等数学

1. 求下列函数的自然定义域 自然定义域就是使函数有意义的定义域。 常见自然定义域&#xff1a; 开根号 x \sqrt x x ​&#xff1a; x ≥ 0 x \ge 0 x≥0自变量为分式的分母 1 x \frac{1}{x} x1​&#xff1a; x ≠ 0 x \ne 0 x0三角函数 tan ⁡ x cot ⁡ x \tan x \cot x …...

C语言之指针进阶(3),函数指针

目录 前言&#xff1a; 一、函数指针变量的概念 二、函数指针变量的创建 三、函数指针变量的使用 四、两段特殊代码的理解 五、typedef 六、函数指针数组 总结&#xff1a; 前言&#xff1a; 本文主要讲述C语言指针中的函数指针&#xff0c;包括函数指针变量的概念、创建…...

RabbitMQ安装及配套Laravel使用

MQ MQ 全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。 为什么需要mq: 解耦:MQ能够使各个系统或组件之间解耦,降低它们之间的耦合度,提高系统的灵活性和可维护性异步处理:通过MQ可以实现异步处理,提高系统响应速度和吞…...

java在类的定义中创建自己的对象?

当在main方法中新建自身所在类的对象&#xff0c;并调用main方法时&#xff0c;会不断循环调用main方法&#xff0c;直到栈溢出 package com.keywordStudy;public class mainTest {static int value 33;public static void main(String[] args) throws Exception{String[] sn…...

掌握C++回调:按值捕获、按引用捕获与弱引用

文章目录 一、按引用捕获和按值捕获1.1 原理1.2 案例 二、弱引用2.1 原理2.2 案例一2.3 案例二&#xff1a;使用base库的弱引用 三、总结 在C回调中&#xff0c;当使用Lambda表达式捕获外部变量时&#xff0c;有两种捕获方式&#xff1a;按值捕获和按引用捕获。 一、按引用捕获…...

抖音运营_如何做出优质的短视频

目录 一 短视频内容的构成 1 图像 2 字幕 3 声音 4 特效 5 描述 6 评论 二 短视频的热门类型 1 颜值圈粉类 2 知识教学类 3 幽默搞笑类 4 商品展示类 5 才艺技能类 6 评论解说类 三 热门短视频的特征 1 产生共鸣 2 正能量 3 紧跟热点话题 4 富有创意 四 短视…...

Day21:Leetcode513.找树左下角的值 +112. 路径总和 113.路径总和ii + 106.从中序与后序遍历序列构造二叉树

LeetCode&#xff1a;513.找树左下角的值 解决方案&#xff1a; 1.思路 在遍历一个节点时&#xff0c;需要先把它的非空右子节点放入队列&#xff0c;然后再把它的非空左子节点放入队列&#xff0c;这样才能保证从右到左遍历每一层的节点。广度优先搜索所遍历的最后一个节点…...

Java数据结构和算法(B树)

前言 B树又叫平衡的多路搜索树&#xff1b;平衡的意思是又满足平衡二叉树的一些性质&#xff0c;左树大于右树&#xff1b; 多路意思是&#xff0c;可以多个结点&#xff0c;不再是像二叉树只有两个结点&#xff1b; 实现原理 B树是一种自平衡的搜索树&#xff0c;通常用于实…...

成为程序员后我都明白了什么?从入行到弃坑?

作为一个入行近10年的php程序员&#xff0c;真心感觉一切都才刚开始&#xff0c;对计算机&#xff0c;编程语言的理解也好&#xff0c;程序员中年危机也罢&#xff0c;之前都是听别人说的&#xff0c;真的自己到了这个水平&#xff0c;这个年龄才深刻体会到这其中的种种。 我一…...

python --创建固定字符串长度,先进先出

a 123def concatenate_within_limit(b, new_string):# 计算新字符串与a的长度之和a btotal_length len(a) len(new_string)# 如果长度超过1024&#xff0c;从前面删除足够的字符if total_length > 5:diff total_length - 5a a[diff:] new_string # 删除前diff个字符…...

容器化部署

目录 docker容器化部署 怎样使用Docker Compose或Kubernetes等容器编排工具来管理和扩展联邦学习系统 使用Docker Compose...

国产数据库TiDB的常用方法

TiDB的常用方法主要涉及安装配置、数据操作、性能调优以及监控和维护等方面。以下是对这些常用方法的归纳和介绍&#xff1a; 1. 安装与配置 安装TiDB&#xff1a;根据官方文档的指引&#xff0c;用户可以按照步骤进行TiDB的安装。配置TiDB&#xff1a;安装完成后&#xff0c…...

基于DdddOcr通用验证码离线本地识别SDK搭建个人云打码接口Api

前言 最近介绍了一款免费的验证码识别网站,识别效率太低,考虑到ddddocr是开源的,决定搭建搭建一个,发现原作者sml2h3已经推出好久了,但是网上没有宝塔安装的教程,于是本次通过宝塔搭建属于自己的带带弟弟OCR通用验证码离线本地识别 原项目地址:https://github.com/sml2…...

2、xss-labs之level2

1、打开页面 2、传入xss代码 payload&#xff1a;<script>alert(xss)</script>&#xff0c;发现返回<script>alert(xss)</script> 3、分析原因 打开f12&#xff0c;没什么发现 看后端源码&#xff0c;在这form表单通过get获取keyword的值赋给$str&am…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统

医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上&#xff0c;开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识&#xff0c;在 vs 2017 平台上&#xff0c;进行 ASP.NET 应用程序和简易网站的开发&#xff1b;初步熟悉开发一…...

蓝桥杯 2024 15届国赛 A组 儿童节快乐

P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡&#xff0c;轻快的音乐在耳边持续回荡&#xff0c;小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下&#xff0c;六一来了。 今天是六一儿童节&#xff0c;小蓝老师为了让大家在节…...

【磁盘】每天掌握一个Linux命令 - iostat

目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat&#xff08;I/O Statistics&#xff09;是Linux系统下用于监视系统输入输出设备和CPU使…...

土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等

&#x1f50d; 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术&#xff0c;可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势&#xff0c;还能有效评价重大生态工程…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日&#xff0c;2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席&#xff0c;并作《智能体在安全领域的应用实践》主题演讲&#xff0c;分享了在智能体在安全领域的突破性实践。他指出&#xff0c;百度通过将安全能力…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

初学 pytest 记录

安装 pip install pytest用例可以是函数也可以是类中的方法 def test_func():print()class TestAdd: # def __init__(self): 在 pytest 中不可以使用__init__方法 # self.cc 12345 pytest.mark.api def test_str(self):res add(1, 2)assert res 12def test_int(self):r…...

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数

高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...