多线程事务回滚方法
多线程事务回滚方法
- 介绍
- 案例演示
- 线程池配置
- 异常类
- 实体类
- 控制层
- 业务层
- mapper
- 工具类
- 验证
- 解决方案
- 使用sqlSession控制手动提交事务
- SqlSessionTemplate注入容器中
- 改造业务层
- 验证
- 成功操作示例业务层改造
介绍
1.最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。
2.在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。
案例演示
下面是事务不成功案例演示。
线程池配置
package com.mry.rollback.config;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 线程池配置*/
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService = newThreadPool();}}}return executorService;}private static ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}}
异常类
package com.mry.rollback.exception;import lombok.Data;/*** 异常类*/
@Data
public class ServiceException extends RuntimeException {private static final long serialVersionUID = 1L;private String msg;private int code = 500;public ServiceException(String msg) {super(msg);this.msg = msg;}public ServiceException(String msg, Throwable e) {super(msg, e);this.msg = msg;}public ServiceException(String msg, int code) {super(msg);this.msg = msg;this.code = code;}public ServiceException(String msg, int code, Throwable e) {super(msg, e);this.msg = msg;this.code = code;}}
实体类
package com.mry.rollback.entity;import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.ToString;
import java.util.Date;@ToString
@Data
@TableName("employee")
public class Employee {private Integer employeeId;private Integer age;private String employeeName;private Date birthDate;private Integer gender;private String idNumber;private Date createTime;private Date updateTime;private Integer status;
}
控制层
package com.mry.rollback.controller;import com.mry.rollback.entity.Employee;
import com.mry.rollback.service.EmployeeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;@Slf4j
@RestController
@RequestMapping("/sys")
public class EmployeeController {@AutowiredEmployeeService employeeService;@GetMapping("/add")public String batchAddEmployee(){int size = 10;List<Employee> employeeDOList = new ArrayList<>(size);for (int i = 0; i<size;i++){Employee employee = new Employee();employee.setEmployeeName("lol"+i);employee.setAge(18);employee.setGender(1);employee.setBirthDate(Calendar.getInstance().getTime());employee.setIdNumber(i+"XX");employee.setStatus(1);employee.setCreateTime(Calendar.getInstance().getTime());employee.setUpdateTime(Calendar.getInstance().getTime());employeeDOList.add(employee);}try {employeeService.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}return "添加成功";}}
业务层
package com.mry.rollback.service;import com.mry.rollback.entity.Employee;
import java.util.List;public interface EmployeeService {public void saveThread(List<Employee> employeeList);
}
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {@AutowiredEmployeeMapper employeeMapper;@Override@Transactionalpublic void saveThread(List<Employee> employeeList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚employeeMapper.delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List<List<Employee>> lists = ThreadUtil.averageAssign(employeeList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<Employee> list = lists.get(i);threadArray[i] = new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("出现异常",001);}//批量添加,mybatisPlus中自带的batch方法employeeMapper.saveBatchEmployee(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i <lists.size(); i++){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("出现异常",002);}finally {//connection.close();}}
}
mapper
package com.mry.rollback.mapper;import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.mry.rollback.entity.Employee;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.springframework.stereotype.Repository;
import java.util.List;@Repository
public interface EmployeeMapper extends BaseMapper<Employee> {@Insert("<script>" +"insert into employee (age, employee_name, birth_date, gender, id_number, create_time, update_time, status) " +"values " +"<foreach collection='employeeList' item='employee' index='index' separator=','>" +"(#{employee.age}, #{employee.employeeName}, #{employee.birthDate}, #{employee.gender}, #{employee.idNumber}, #{employee.createTime}, #{employee.updateTime}, #{employee.status})" +"</foreach>" +"</script>")public void saveBatchEmployee(@Param("employeeList") List<Employee> employeeList);}
工具类
package com.mry.rollback.util;import java.util.ArrayList;
import java.util.List;public class ThreadUtil {/*** 平均拆分list方法.* @param source* @param n* @param <T>* @return*/public static <T> List<List<T>> averageAssign(List<T> source,int n){List<List<T>> result=new ArrayList<List<T>>();int remaider=source.size()%n;int number=source.size()/n;int offset=0;//偏移量for(int i=0;i<n;i++){List<T> value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;}}
验证
1.数据库中存在一条数据:
2.请求接口:http://127.0.0.1:8866/sys/add
3.控制信息:
4.数据库信息:
注意:可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,@Transactional注解没有生效。
解决方案
使用sqlSession控制手动提交事务
SqlSessionTemplate注入容器中
package com.mry.rollback.config;import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;/*** 获取sqlSession*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}}
改造业务层
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.config.SqlContext;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {@AutowiredEmployeeMapper employeeMapper;@ResourceSqlContext sqlContext;@Overridepublic void saveThread(List<Employee> employeeList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList = new ArrayList<>();//拆分listList<List<Employee>> lists= ThreadUtil.averageAssign(employeeList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<Employee> list = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("出现异常",001);}return employeeMapper.saveBatchEmployee(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("出现异常",002);}finally {connection.close();}}
}
验证
1.数据库中存在一条数据:
2.请求接口:http://127.0.0.1:8877/sys/add
3.控制信息:
4.数据库信息:
注意:删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。
成功操作示例业务层改造
package com.mry.rollback.service.impl;import com.mry.rollback.config.ExecutorConfig;
import com.mry.rollback.config.SqlContext;
import com.mry.rollback.entity.Employee;
import com.mry.rollback.exception.ServiceException;
import com.mry.rollback.mapper.EmployeeMapper;
import com.mry.rollback.service.EmployeeService;
import com.mry.rollback.util.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;@Slf4j
@Service("employeeService")
public class EmployeeServiceImpl implements EmployeeService {@AutowiredEmployeeMapper employeeMapper;@ResourceSqlContext sqlContext;@Overridepublic void saveThread(List<Employee> employeeList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList = new ArrayList<>();List<List<Employee>> lists=ThreadUtil.averageAssign(employeeList, 5);for (int i =0;i<lists.size();i++){List<Employee> list = lists.get(i);Callable<Integer> callable = () -> employeeMapper.saveBatchEmployee(list);callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("出现异常",002);// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}}}
控制台日志输出:
数据库中数据:
注意:删除的删除了,添加的添加成功了,测试成功。
相关文章:

多线程事务回滚方法
多线程事务回滚方法 介绍案例演示线程池配置异常类实体类控制层业务层mapper工具类验证 解决方案使用sqlSession控制手动提交事务SqlSessionTemplate注入容器中改造业务层验证成功操作示例业务层改造 介绍 1.最近有一个大数据量插入的操作入库的业务场景,需要先做一…...

java单元测试( Hamcrest 断言)
java单元测试( Hamcrest 断言) 单元测试特征: 1 范围狭窄 2 限于单一类或方法 3 体积小 为什么要编写单元测试? 为了防止错误(很明显!) 而且还可以提高开发人员的生产力,因为单元测试: (1) 帮助实施——在…...

讨论和总结 树模型 的三种序列化 方式的区别(模型存储大小、序列化所用内存、序列化速度)...
一、前言 本文总结常用树模型: rf,xgboost,catboost和lightgbm等模型的保存和加载(序列化和反序列化)的多种方式,并对多种方式从运行内存的使用和存储大小做对比 二、模型 2.1 安装环境 pip install xgboos…...

Halcon中的一些3D算子
一、记录一些Halcon里的关于3D的算子 1.read_object_model_3d 从文件读取一个3d模型 如下图,读的一个ply文件出来是个3d点云模型 2.visualize_object_model_3d 交互式展示3d模型 即上个算子读出来后,通过这个算子可以把3d模型显示出来旋转、平移&am…...

Android:Selector + Layer-lists 实现 AppCompatCheckBox
最近做项目涉及到一些UI相关的东东,虽然比较简单,但是也很有趣,写两篇简短的博客记录一下。 一."Selector 两张图片"实现 AppCompatCheckBox AppCompatCheckBox 是 androidx的一个widget:androidx.appcompat.widget.…...
TreeMap类型添加数据
package com.test.Test11;import java.util.*;public class Test02 {public static void main(String[] args) {/** 增加:put(K key,V value)* 删除:clear() remove(Object key)* 修改:* 查看:entrySet() get(Object key) keySet(…...

iOS 16 UI 设计系统免费在线使用方法
1、iOS 16 UI 设计系统中有什么? iOS 16 UI 设计系统通常包含以下组件和元素: 1. 按钮:包括操作按钮、图标按钮、导航按钮、滚动按钮、切换按钮、单选按钮、复选框按钮、呼叫按钮等各种类型的按钮。 2. 窗口和 UI 控件:包括标签…...

【接口测试】JMeter接口关联测试
1 前言 我们来学习接口管理测试,这就要使用到JMeter提供的JSON提取器和正则表达式提取器了,下面我们来看看是如何使用的吧。 2 JSON提取器 1、添加JSON提取器 在线程组右键 > 添加 > 后置处理器 > JSON提取器 2、JSON提取器参数说明 N…...

腾讯云服务器ping不通解决方法(公网IP/安全组/系统多维度)
腾讯云服务器ping不通什么原因?ping不通公网IP地址还是域名?新手站长从云服务器公网IP、安全组、Linux系统和Windows操作系统多方面来详细说明腾讯云服务器ping不通的解决方法: 目录 腾讯云服务器ping不通原因分析及解决方法 安全组ICMP协…...
【C++/嵌入式笔试面试八股】一、32.封装
封装 08.C++中struct和class的区别🍊 相同点 两者都拥有成员函数、公有和私有部分任何可以使用class完成的工作,同样可以使用struct完成不同点 两者中如果不对成员不指定公私有,struct默认是公有的,class则默认是私有的class默认是private继承, 而struct默认是public继…...
【算法】Transform to Chessboard 变为棋盘
文章目录 Transform to Chessboard 变为棋盘问题描述:分析代码 Transform to Chessboard 变为棋盘 问题描述: 一个 n x n 的二维网络 board 仅由 0 和 1 组成 。每次移动,你能任意交换两列或是两行的位置。 返回 将这个矩阵变为 棋盘 所需…...
vue通过封装$on定义全局事件
我们先在vue项目的src跟目录下创建一个文件夹 叫 utils 下面创建一个js文件夹 叫 bus.js 参考代码如下 import Vue from "vue"; export default new Vue();然后 我们就可以来用了 在需要定义事件的组件中编写 <template><div><h1>Hello world!&…...
资产管理规范
生产系统资产管理规范 1. 引言 生产系统的资产管理是确保生产系统正常运行和提高生产效率的关键因素之一。本文档旨在制定一套规范,以确保生产系统中的资产,包括服务器和软件等,得到有效管理和保护。 2. 资产分类 生产系统资产可根据其性质…...

已解决:如何从别人的仓库那里克隆到自己的仓库,并修改代码并提交。
一、场景 拉取项目代码后,如果要共同开发一个项目的自动化代码,此时需要把自己写的代码部分提交到代码仓库。 可以用pycharm把修改的代码push到代码仓库 二、操作方法 1.从别人的仓库那里点击fork,将仓库克隆到自己的仓库。 2.在pychar…...

剑指 Offer 18. 删除链表的节点
🚀 作者简介:一名在后端领域学习,并渴望能够学有所成的追梦人。 🚁 个人主页:不 良 🔥 系列专栏:🛸剑指 Offer 📕 学习格言:博观而约取,厚积而薄…...

WiFi 6 vs WiFi 5
在现代无线通信领域,WiFi已经成为人们日常生活中不可或缺的一部分。随着技术的不断发展,WiFi标准也在不断更新和演进。WiFi 6(802.11ax)和WiFi 5(802.11ac)是当前两个主要的WiFi标准。 本文将详细介绍WiFi …...
PHP语言基础
一.标记风格 标记风格分为四类(推荐XML) 1.XML风格 <?php echo这是xml风格‘; ?> 注意:结束标识符必须单独另起一行,并且不能有空格。在标识符前后有其他符号或者字符也会发生错误。 2.脚本风格 <script languagephp> …...

怎么用Excel VBA写一个excel批量合并的程序?
您可以按照以下VBA代码来实现把同一路径上的所有工作簿合并到同一个工作簿中: VBA Option Explicit Sub MergeWorkbooks() Dim path As String, fileName As String, sheet As Worksheet Dim targetWorkbook As Workbook, sourceWorkbook As Workbook Dim workshe…...
WuThreat身份安全云-TVD每日漏洞情报-2023-05-22
漏洞名称:Apple WebKit 任意代码执行漏洞 漏洞级别:中危 漏洞编号:CVE-2023-32373 相关涉及:Apple iOS和iPadOS 16.4.1 漏洞状态:在野 参考链接:https://tvd.wuthreat.com/#/listDetail?TVD_IDTVD-2023-12579 漏洞名称:海康威视部分iVMS系统存在文件上传漏洞 漏洞级别:未定义…...

Eclipse教程 Ⅵ
今天分享Eclipse Java 构建路径、Eclipse 运行配置(Run Configuration)和Eclipse 运行程序 Eclipse Java 构建路径 设置 Java 构建路径 Java构建路径用于在编译Java项目时找到依赖的类,包括以下几项: 源码包项目相关的 jar 包及类文件项目引用的的类…...
大语言模型如何处理长文本?常用文本分割技术详解
为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
JDK 17 新特性
#JDK 17 新特性 /**************** 文本块 *****************/ python/scala中早就支持,不稀奇 String json “”" { “name”: “Java”, “version”: 17 } “”"; /**************** Switch 语句 -> 表达式 *****************/ 挺好的ÿ…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...

Python Ovito统计金刚石结构数量
大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...

毫米波雷达基础理论(3D+4D)
3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文: 一文入门汽车毫米波雷达基本原理 :https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...
es6+和css3新增的特性有哪些
一:ECMAScript 新特性(ES6) ES6 (2015) - 革命性更新 1,记住的方法,从一个方法里面用到了哪些技术 1,let /const块级作用域声明2,**默认参数**:函数参数可以设置默认值。3&#x…...

Unity VR/MR开发-VR开发与传统3D开发的差异
视频讲解链接:【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...

热门Chrome扩展程序存在明文传输风险,用户隐私安全受威胁
赛门铁克威胁猎手团队最新报告披露,数款拥有数百万活跃用户的Chrome扩展程序正在通过未加密的HTTP连接静默泄露用户敏感数据,严重威胁用户隐私安全。 知名扩展程序存在明文传输风险 尽管宣称提供安全浏览、数据分析或便捷界面等功能,但SEMR…...