多线程事务回滚方法
多线程事务回滚方法
- 介绍
- 案例演示
- 线程池配置
- 异常类
- 实体类
- 控制层
- 业务层
- mapper
- 工具类
- 验证
- 解决方案
- 使用sqlSession控制手动提交事务
- SqlSessionTemplate注入容器中
- 改造业务层
- 验证
- 成功操作示例业务层改造
介绍
1.最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。
2.在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
案例演示
下面是事务不成功案例演示。
线程池配置
package com.mry.rollback.config;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 线程池配置*/
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService = newThreadPool();}}}return executorService;}private static ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}}
异常类
package com.mry.rollback.exception;import lombok.Data;/*** 异常类*/
@Data
public class ServiceException extends RuntimeException {private static final long serialVersionUID = 1L;private String msg;private int code = 500;public ServiceException(String msg) {super(msg);this.msg = msg;}public ServiceException(String msg, Throwable e) {super(msg, e);this.msg = msg;}public ServiceException(String msg, int code) {super(msg);this.msg = msg;this.code = code;}public ServiceException(String msg, int code, Throwable e) {super(msg, e);this.msg = msg;this.code = code;}}
实体类
package com.mry.rollback.entity;import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.ToString;
import java.util.Date;@ToString
@Data
@TableName("employee")
public class Employee {private Integer employeeId;private Integer age;private String employeeName;private Date birthDate;private Integer gender;private String idNumber;private Date createTime;private Date updateTime;private Integer status;
}
控制层
package com.mry.rollback.controller;import com.mry.rollback.entity.Employee;
import com.mry.rollback.service.EmployeeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;@Slf4j
@RestController
@RequestMapping("/sys")
public class EmployeeController {@AutowiredEmployeeService employeeService;@GetMapping("/add")public String batchAddEmployee(){int size = 10;List<Employee> employeeDOList = new ArrayList<>(size);for (int i = 0; i<size;i++){Employee employee = new Employee();employee.setEmployeeName("lol"+i);employee.setAge(18);employee.setGender(1);employee.setBirthDate(Calendar.getInstance().getTime());employee.setIdNumber(i+"XX");employee.setStatus(1);employee.setCreateTime(Calendar.getInstance().getTime());employee.setUpdateTime(Calendar.getInstance().getTime());employeeDOList.add(employee);}try {employeeService.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}return "添加成功";}}
业务层
package com.mry.rollback.service;import com.mry.rollback.entity.Employee;
import java.util.List;public interface EmployeeService {public void saveThread(List<Employee> employeeList);
}
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {@AutowiredEmployeeMapper employeeMapper;@Override@Transactionalpublic void saveThread(List<Employee> employeeList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚employeeMapper.delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List<List<Employee>> lists = ThreadUtil.averageAssign(employeeList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<Employee> list = lists.get(i);threadArray[i] = new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("出现异常",001);}//批量添加,mybatisPlus中自带的batch方法employeeMapper.saveBatchEmployee(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i <lists.size(); i++){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("出现异常",002);}finally {//connection.close();}}
}
mapper
package com.mry.rollback.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.mry.rollback.entity.Employee;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.util.List;@Repository
public interface EmployeeMapper extends BaseMapper<Employee> {@Insert("<script>" +"insert into employee (age, employee_name, birth_date, gender, id_number, create_time, update_time, status) " +"values " +"<foreach collection='employeeList' item='employee' index='index' separator=','>" +"(#{employee.age}, #{employee.employeeName}, #{employee.birthDate}, #{employee.gender}, #{employee.idNumber}, #{employee.createTime}, #{employee.updateTime}, #{employee.status})" +"</foreach>" +"</script>")public void saveBatchEmployee(@Param("employeeList") List<Employee> employeeList);}
工具类
package com.mry.rollback.util;import java.util.ArrayList;
import java.util.List;public class ThreadUtil {/*** 平均拆分list方法.* @param source* @param n* @param <T>* @return*/public static <T> List<List<T>> averageAssign(List<T> source,int n){List<List<T>> result=new ArrayList<List<T>>();int remaider=source.size()%n;int number=source.size()/n;int offset=0;//偏移量for(int i=0;i<n;i++){List<T> value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;}}
验证
1.数据库中存在一条数据:

2.请求接口:http://127.0.0.1:8866/sys/add
3.控制信息:

4.数据库信息:

注意:可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。
解决方案
使用sqlSession控制手动提交事务
SqlSessionTemplate注入容器中
package com.mry.rollback.config;import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;/*** 获取sqlSession*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}}
改造业务层
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.config.SqlContext;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {@AutowiredEmployeeMapper employeeMapper;@ResourceSqlContext sqlContext;@Overridepublic void saveThread(List<Employee> employeeList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList = new ArrayList<>();//拆分listList<List<Employee>> lists= ThreadUtil.averageAssign(employeeList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<Employee> list = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("出现异常",001);}return employeeMapper.saveBatchEmployee(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("出现异常",002);}finally {connection.close();}}
}
验证
1.数据库中存在一条数据:

2.请求接口:http://127.0.0.1:8877/sys/add
3.控制信息:

4.数据库信息:

注意:删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。
成功操作示例业务层改造
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.config.SqlContext;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {@AutowiredEmployeeMapper employeeMapper;@ResourceSqlContext sqlContext;@Overridepublic void saveThread(List<Employee> employeeList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList = new ArrayList<>();List<List<Employee>> lists=ThreadUtil.averageAssign(employeeList, 5);for (int i =0;i<lists.size();i++){List<Employee> list = lists.get(i);Callable<Integer> callable = () -> employeeMapper.saveBatchEmployee(list);callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("出现异常",002);// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}}}
控制台日志输出:

数据库中数据:

注意:删除的删除了,添加的添加成功了,测试成功。
相关文章:
多线程事务回滚方法
多线程事务回滚方法 介绍案例演示线程池配置异常类实体类控制层业务层mapper工具类验证 解决方案使用sqlSession控制手动提交事务SqlSessionTemplate注入容器中改造业务层验证成功操作示例业务层改造 介绍 1.最近有一个大数据量插入的操作入库的业务场景,需要先做一…...
java单元测试( Hamcrest 断言)
java单元测试( Hamcrest 断言) 单元测试特征: 1 范围狭窄 2 限于单一类或方法 3 体积小 为什么要编写单元测试? 为了防止错误(很明显!) 而且还可以提高开发人员的生产力,因为单元测试: (1) 帮助实施——在…...
讨论和总结 树模型 的三种序列化 方式的区别(模型存储大小、序列化所用内存、序列化速度)...
一、前言 本文总结常用树模型: rf,xgboost,catboost和lightgbm等模型的保存和加载(序列化和反序列化)的多种方式,并对多种方式从运行内存的使用和存储大小做对比 二、模型 2.1 安装环境 pip install xgboos…...
Halcon中的一些3D算子
一、记录一些Halcon里的关于3D的算子 1.read_object_model_3d 从文件读取一个3d模型 如下图,读的一个ply文件出来是个3d点云模型 2.visualize_object_model_3d 交互式展示3d模型 即上个算子读出来后,通过这个算子可以把3d模型显示出来旋转、平移&am…...
Android:Selector + Layer-lists 实现 AppCompatCheckBox
最近做项目涉及到一些UI相关的东东,虽然比较简单,但是也很有趣,写两篇简短的博客记录一下。 一."Selector 两张图片"实现 AppCompatCheckBox AppCompatCheckBox 是 androidx的一个widget:androidx.appcompat.widget.…...
TreeMap类型添加数据
package com.test.Test11;import java.util.*;public class Test02 {public static void main(String[] args) {/** 增加:put(K key,V value)* 删除:clear() remove(Object key)* 修改:* 查看:entrySet() get(Object key) keySet(…...
iOS 16 UI 设计系统免费在线使用方法
1、iOS 16 UI 设计系统中有什么? iOS 16 UI 设计系统通常包含以下组件和元素: 1. 按钮:包括操作按钮、图标按钮、导航按钮、滚动按钮、切换按钮、单选按钮、复选框按钮、呼叫按钮等各种类型的按钮。 2. 窗口和 UI 控件:包括标签…...
【接口测试】JMeter接口关联测试
1 前言 我们来学习接口管理测试,这就要使用到JMeter提供的JSON提取器和正则表达式提取器了,下面我们来看看是如何使用的吧。 2 JSON提取器 1、添加JSON提取器 在线程组右键 > 添加 > 后置处理器 > JSON提取器 2、JSON提取器参数说明 N…...
腾讯云服务器ping不通解决方法(公网IP/安全组/系统多维度)
腾讯云服务器ping不通什么原因?ping不通公网IP地址还是域名?新手站长从云服务器公网IP、安全组、Linux系统和Windows操作系统多方面来详细说明腾讯云服务器ping不通的解决方法: 目录 腾讯云服务器ping不通原因分析及解决方法 安全组ICMP协…...
【C++/嵌入式笔试面试八股】一、32.封装
封装 08.C++中struct和class的区别🍊 相同点 两者都拥有成员函数、公有和私有部分任何可以使用class完成的工作,同样可以使用struct完成不同点 两者中如果不对成员不指定公私有,struct默认是公有的,class则默认是私有的class默认是private继承, 而struct默认是public继…...
【算法】Transform to Chessboard 变为棋盘
文章目录 Transform to Chessboard 变为棋盘问题描述:分析代码 Transform to Chessboard 变为棋盘 问题描述: 一个 n x n 的二维网络 board 仅由 0 和 1 组成 。每次移动,你能任意交换两列或是两行的位置。 返回 将这个矩阵变为 棋盘 所需…...
vue通过封装$on定义全局事件
我们先在vue项目的src跟目录下创建一个文件夹 叫 utils 下面创建一个js文件夹 叫 bus.js 参考代码如下 import Vue from "vue"; export default new Vue();然后 我们就可以来用了 在需要定义事件的组件中编写 <template><div><h1>Hello world!&…...
资产管理规范
生产系统资产管理规范 1. 引言 生产系统的资产管理是确保生产系统正常运行和提高生产效率的关键因素之一。本文档旨在制定一套规范,以确保生产系统中的资产,包括服务器和软件等,得到有效管理和保护。 2. 资产分类 生产系统资产可根据其性质…...
已解决:如何从别人的仓库那里克隆到自己的仓库,并修改代码并提交。
一、场景 拉取项目代码后,如果要共同开发一个项目的自动化代码,此时需要把自己写的代码部分提交到代码仓库。 可以用pycharm把修改的代码push到代码仓库 二、操作方法 1.从别人的仓库那里点击fork,将仓库克隆到自己的仓库。 2.在pychar…...
剑指 Offer 18. 删除链表的节点
🚀 作者简介:一名在后端领域学习,并渴望能够学有所成的追梦人。 🚁 个人主页:不 良 🔥 系列专栏:🛸剑指 Offer 📕 学习格言:博观而约取,厚积而薄…...
WiFi 6 vs WiFi 5
在现代无线通信领域,WiFi已经成为人们日常生活中不可或缺的一部分。随着技术的不断发展,WiFi标准也在不断更新和演进。WiFi 6(802.11ax)和WiFi 5(802.11ac)是当前两个主要的WiFi标准。 本文将详细介绍WiFi …...
PHP语言基础
一.标记风格 标记风格分为四类(推荐XML) 1.XML风格 <?php echo这是xml风格‘; ?> 注意:结束标识符必须单独另起一行,并且不能有空格。在标识符前后有其他符号或者字符也会发生错误。 2.脚本风格 <script languagephp> …...
怎么用Excel VBA写一个excel批量合并的程序?
您可以按照以下VBA代码来实现把同一路径上的所有工作簿合并到同一个工作簿中: VBA Option Explicit Sub MergeWorkbooks() Dim path As String, fileName As String, sheet As Worksheet Dim targetWorkbook As Workbook, sourceWorkbook As Workbook Dim workshe…...
WuThreat身份安全云-TVD每日漏洞情报-2023-05-22
漏洞名称:Apple WebKit 任意代码执行漏洞 漏洞级别:中危 漏洞编号:CVE-2023-32373 相关涉及:Apple iOS和iPadOS 16.4.1 漏洞状态:在野 参考链接:https://tvd.wuthreat.com/#/listDetail?TVD_IDTVD-2023-12579 漏洞名称:海康威视部分iVMS系统存在文件上传漏洞 漏洞级别:未定义…...
Eclipse教程 Ⅵ
今天分享Eclipse Java 构建路径、Eclipse 运行配置(Run Configuration)和Eclipse 运行程序 Eclipse Java 构建路径 设置 Java 构建路径 Java构建路径用于在编译Java项目时找到依赖的类,包括以下几项: 源码包项目相关的 jar 包及类文件项目引用的的类…...
linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
OD 算法题 B卷【正整数到Excel编号之间的转换】
文章目录 正整数到Excel编号之间的转换 正整数到Excel编号之间的转换 excel的列编号是这样的:a b c … z aa ab ac… az ba bb bc…yz za zb zc …zz aaa aab aac…; 分别代表以下的编号1 2 3 … 26 27 28 29… 52 53 54 55… 676 677 678 679 … 702 703 704 705;…...
【HarmonyOS 5】鸿蒙中Stage模型与FA模型详解
一、前言 在HarmonyOS 5的应用开发模型中,featureAbility是旧版FA模型(Feature Ability)的用法,Stage模型已采用全新的应用架构,推荐使用组件化的上下文获取方式,而非依赖featureAbility。 FA大概是API7之…...
Selenium 查找页面元素的方式
Selenium 查找页面元素的方式 Selenium 提供了多种方法来查找网页中的元素,以下是主要的定位方式: 基本定位方式 通过ID定位 driver.find_element(By.ID, "element_id")通过Name定位 driver.find_element(By.NAME, "element_name"…...
Linux系统:进程间通信-匿名与命名管道
本节重点 匿名管道的概念与原理匿名管道的创建命名管道的概念与原理命名管道的创建两者的差异与联系命名管道实现EchoServer 一、管道 管道(Pipe)是一种进程间通信(IPC, Inter-Process Communication)机制,用于在不…...
智能问数Text2SQL Vanna windows场景验证
架构 Vanna 是一个开源 Python RAG(检索增强生成)框架,用于 SQL 生成和相关功能。 机制 Vanna 的工作过程分为两个简单步骤 - 在您的数据上训练 RAG“模型”,然后提出问题,这些问题将返回 SQL 查询,这些查…...
Linux——TCP和UDP
一、TCP协议 1.特点 TCP提供的是面向连接、可靠的、字节流服务。 2.编程流程 (1)服务器端的编程流程 ①socket() 方法创建套接字 ②bind()方法指定套接字使用的IP地址和端口。 ③listen()方法用来创建监听队列。 ④accept()方法处理客户端的连接…...
