一次业务的批量数据任务的处理优化
文章目录
- 一次业务的批量数据任务的处理优化
- 业务背景
- 1.0版本 分批处理模式
- 2.0版本 平衡任务队列模式
- 3.0版本 优化调度平衡任务队列模式
- 总结
一次业务的批量数据任务的处理优化
业务背景
一个重新生成所有客户的财务业务指标数据的批量数据处理任务。
1.0版本 分批处理模式
根据要处理的客户数量,按照最大线程数切分成多个段,尽量保证每个线程处理相同的客户数量。
private void updateForRegenerateByCustomer(List<Integer> customerIdList,SystemUserCommonDTO user, LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList)?customerInfoService.listAll():customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList,user,now);int maxSize = baseInfoList.size();//计算当前任务数量int currentMaxPoolSize = maxPoolSize<maxSize?maxPoolSize:maxSize;CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize];//计算每个任务分段的数量int size = maxSize / currentMaxPoolSize;for(int i=0;i<currentMaxPoolSize;i++){final int begin = i * size;final int end = i==currentMaxPoolSize-1?maxSize:(i+1)*size;//创建异步处理的分段任务tasks[i] = CompletableFuture.runAsync(()->updateForGenerateByCustomerIdList(baseInfoList,begin,end,user,now),executorService).whenCompleteAsync((k,v)-> log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName()));}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成",tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(List<CustomerBaseInfo> baseInfoList,int begin,int end,SystemUserCommonDTO user, LocalDateTime now){//每个线程只处理自己的分段的数据for(int i=begin;i<end;i++){CustomerBaseInfo baseInfo = baseInfoList.get(i);//每个客户独立事务TransactionalUtils.runWithNewTransactional(()->updateForGenerateByCustomerId(baseInfo.getId(),user,now));}}/*** 生成指定客户的所有数据**/private void updateForGenerateByCustomerId(Integer customerId,SystemUserCommonDTO user, LocalDateTime now){//1、重新生成客户的所有业务类型的数据List<FinanceBiMaintainDto> maintainDtoList =financeBiBusinessTypeSupport.getMaintainListByCustomerId(customerId);if(CollectionUtils.isEmpty(maintainDtoList)){return ;}//生成每个指标的数据Map<BusinessIndicatorEnum,List<FinanceBiMaintainDto>> indicatorMaintainDtoMap = maintainDtoList.stream().collect(Collectors.groupingBy(FinanceBiMaintainDto::getIndicator));indicatorMaintainDtoMap.forEach((k,v)->{log.info("重新生成财务业务指标指定客户【{}】的【{}】支持处理开始",customerId,k);financeBiManager.updateForBiMaintain(k, v,user,now);});}
运行耗时:1420.145秒
2.0版本 平衡任务队列模式
1.0 版本 由于不同客户的数据量不同,导致生成数据的耗时不同,因此按照客户数量均分任务的的方式对于每个线程来说,任务量是不一样的,因此可能会导致部分线程太忙,部分线程太空的情况。因此调整为使用队列方式来解决任务分配的问题,每个线程自己取队列中取要处理的客户,直到所有队列中的客户都被处理完,所有的线程结束。这样就避免的线程任务量不平衡问题。
updateForGenerateByCustomerId 方法不需要改造,只需要调整任务分配的相关方法就可以。
private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);//根据线程数,构建固定的任务数量CompletableFuture<?>[] tasks = new CompletableFuture<?>[currentMaxPoolSize];//构建待处理的客户队列,由于这里没有并发读写的情况,因此用ConcurrentLinkedQueue效率会更高一点。ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(baseInfoList.stream().map(CustomerBaseInfo::getId).collect(Collectors.toList()));//创建多个线程去消耗客户队列for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] =CompletableFuture.runAsync(() -> updateForGenerateByCustomerIdList(queue, user, now), executorService).whenCompleteAsync((k, v) -> {if (v != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常",Thread.currentThread().getName()), v);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName());}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(ConcurrentLinkedQueue<Integer> queue, SystemUserCommonDTO user,LocalDateTime now) {Integer customerId = queue.poll();//循环从客户队列中取出待处理的客户,直到所有客户都处理完毕。while (customerId != null) {final Integer currentCustomerId = customerId;TransactionalUtils.runWithNewTransactional(() -> updateForGenerateByCustomerId(currentCustomerId, user, now));customerId = queue.poll();}}
优化后的耗时:1037.059秒
3.0版本 优化调度平衡任务队列模式
2.0版本虽然解决的了每个线程任务量不平衡的问题,但可能出现某个数据量很大的客户在队列的尾部,导致当其他线程都处理完所有的客户时,取到最大数据量的客户的线程仍在运行,任务整体的耗时被增加。因此需要优化调度,将耗时高的客户调度到队列头部,保证耗时最长的客户的优先处理,从而避免最后等待耗时长的线程。
updateForGenerateByCustomerIdList 方法不需要改造,只需要队列构造处理就可以。
private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);//获取客户的统计数据Map<Integer, CustomerStatisticsInfo> customerStatisticsInfoMap =customerStatisticsInfoService.listAll().stream().collect(Collectors.toMap(CustomerStatisticsInfo::getCustomerId, Function.identity()));int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);CompletableFuture<String>[] tasks = new CompletableFuture[currentMaxPoolSize];//根据客户的统计数据,构建待处理的客户队列ConcurrentLinkedQueue<Integer> queue =baseInfoList.stream().map(item -> customerStatisticsInfoMap.get(item.getId())).filter(Objects::nonNull)
//队列按照客户数据量倒序排列 .sorted(Comparator.comparing(CustomerStatisticsInfo::getNumberOfCheckedSatisfactoryActivitys,Comparator.reverseOrder())).map(CustomerStatisticsInfo::getCustomerId).collect(Collectors.toCollection(ConcurrentLinkedQueue::new));for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] = CompletableFuture.supplyAsync(() -> {updateForGenerateByCustomerIdList(queue, user, now);return Thread.currentThread().getName();}, executorService).whenCompleteAsync((k, ex) -> {if (ex != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", k), ex);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", k);}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}
耗时:726.725秒
总结
最终的耗时从1400多秒 降低到700多秒。降低了一半左右。
相关文章:
一次业务的批量数据任务的处理优化
文章目录 一次业务的批量数据任务的处理优化业务背景1.0版本 分批处理模式2.0版本 平衡任务队列模式3.0版本 优化调度平衡任务队列模式总结 一次业务的批量数据任务的处理优化 业务背景 一个重新生成所有客户的财务业务指标数据的批量数据处理任务。 1.0版本 分批处理模式 …...
新能源汽车充电站远程监控系统S275钡铼技术无线RTU
新能源汽车充电站的远程监控系统在现代城市基础设施中扮演着至关重要的角色,而钡铼技术的S275无线RTU作为一款先进的物联网数据监测采集控制短信报警终端,为充电站的安全运行和高效管理提供了强大的技术支持。 技术特点和功能 钡铼S275采用了基于UCOSI…...
海外视频媒体发布/发稿:如何在国外媒体以视频的形式宣发
1. 背景介绍 在如今数字化时代,每个国家都拥有着各自的视频媒体平台,而主流媒体也都纷纷加入了视频发布的行列。视频媒体的宣发形式主要包括油管Youtube等视频分享平台,以及图文配合的发布方式。通过在视频中夹带链接,媒体可以以…...
HTML 【实用教程】(2024最新版)
核心思想 —— 语义化 【面试题】如何理解 HTML 语义化 ?仅通过标签便能判断内容的类型,特别是区分标题、段落、图片和表格 增加代码可读性,让人更容易读懂对SEO更加友好,让搜索引擎更容易读懂 html 文件的基本结构 html 文件的文件后缀为 …...
How to Describe Figures in a Research Article
How to Describe Figures in a Research Article DateAuthorVersionNote2024.07.10Dog TaoV1.0Finish the document. 文章目录 How to Describe Figures in a Research ArticleGeneral GuidelinesDetailed DescriptionsCommon Describing Phrases Effective communication of …...
昇思MindSpore学习入门-CELL与参数一
Cell作为神经网络构造的基础单元,与神经网络层(Layer)的概念相对应,对Tensor计算操作的抽象封装,能够更准确清晰地对神经网络结构进行表示。除了基础的Tensor计算流程定义外,神经网络层还包含了参数管理、状态管理等功能。而参数(…...
【k8s中安装rabbitmq】k8s中安装rabbitmq并搭建镜像集群-hostpath版
文章目录 简介一.条件及环境说明二.需求说明三.实现原理及说明四.详细步骤4.1.规划节点标签4.2.创建configmap配置4.3.创建三个statefulset和service headless配置4.4.创建service配置 五.安装完后的配置六.安装说明 简介 k8s集群中搭建rabbitmq集群服务一般都会用到pvc&#x…...
(5) 深入探索Python-Pandas库的核心数据结构:Series详解
目录 前言1. Series 简介2. Series的特点3. Series的创建3.1 使用列表创建Series3.2 使用字典创建Series3.3 使用列表和自定义索引创建Series3.4 指定数据类型和名称 4. Series的索引/切片4.1 下标索引:基于整数位置的索引4.2 基于标签的索引4.3 切片4.4 使用.loc[]…...
JAVA之开发神器——IntelliJ IDEA的下载与安装
一、IDEA是什么? IEAD是JetBrains公司开发的专用于java开发的一款集成开发环境。由于其功能强大且符合人体工程学(就是更懂你)的优点,深受java开发人员的喜爱。目前在java开发工具中占比3/4。如果你要走java开发方向,那…...
通过Umijs从0到1搭建一个React项目
有一阵时间没写react了,今天通过umi搭建一个demo项目复习一下react;umi是一个可扩展的企业级前端应用框架,在react市场中还是比较火的一个框架。 Umi官方文档:Umi 介绍 (umijs.org) 一、构建项目。 1、安装包管理工具。 官方推…...
Redis 数据过期及淘汰策略
Redis 数据过期及淘汰策略 过期策略 定时过期 在设置key的过期时间的同时,为该key创建一个定时器,让定时器在key的过期时间来临时,对key进行删除。到过期时间就会立即清除。该策略可以立即清除过期的数据,对内存很友好&a…...
vue vite+three在线编辑模型导入导出
文章目录 序一、1.0.0版本1.新增2.编辑3.导出4.导入 总结 序 要实现一个类似于数字孪生的场景 可以在线、新增、删除模型 、以及编辑模型的颜色、长宽高 然后还要实现 编辑完后 保存为json数据 记录模型数据 既可以导入也可以导出 一、1.0.0版本 1.新增 先拿建议的立方体来…...
去水印小程序源码修复版-前端后端内置接口+第三方接口
去水印小程序源码,前端后端,内置接口第三方接口, 修复数据库账号密码错误问题,内置接口支持替换第三方接口, 文件挺全的,可以添加流量主代码,搭建需要准备一台服务器,备案域名和http…...
机器学习:预测评估8类指标
机器学习:8类预测评估指标 R方值、平均值绝对误差值MAE、均方误差MSE、均方误差根EMSE、中位数绝对误差MAD、平均绝对百分误差MAPE、可解释方差分EVS、均方根对数误差MLSE。 一、R方值 1、说明: R方值,也称为确定系数或拟合优度ÿ…...
【深度学习基础】MAC pycharm 专业版安装与激活
文章目录 一、pycharm专业版安装二、激活 一、pycharm专业版安装 PyCharm是一款专为Python开发者设计的集成开发环境(IDE),旨在帮助用户在使用Python语言开发时提高效率。以下是对PyCharm软件的详细介绍,包括其作用和主要功能&…...
排序相关算法--1.插入排序+冒泡排序回顾
1.基本分类 2.插入排序 特点:有实践意义(例如后期快排的优化),适应性强,一般不会到时间复杂度最坏的情况。 将第一个元素视为已经排好序的序列。取出下一个元素,在已经排好序的序列中从后往前比较…...
变阻器的故障排除方法有哪些?
变阻器,特别是滑动变阻器,作为电子电路中的常见元件,其故障排除方法主要依据具体的故障现象来确定。以下是一些常见的故障现象及其排除方法: 一、接触不良 现象:电阻器不起作用或电压不稳定。 排除方法: …...
软考《信息系统运行管理员》-3.1信息系统设施运维的管理体系
3.1信息系统设施运维的管理体系 1 信息系统设施运维的对象 基础环境 主要包括信息系统运行环境(机房、设备间、配线室、基站、云计算中心 等)中的空调系统、供配电系统、通信应急设备系统、防护设备系统(如消防系统、安全系统) 等,能维持系统安全正常运转…...
Nginx重定向
Nginx重定向 location 匹配 location匹配的就是后面的URL /WordPress 192.168.118.10/wordpress location匹配的分类和优先级 1.精确匹配 location/对字符串进行完全匹配,必须完全符合2.正则匹配 ^~ 前缀匹配,以什么为开头~ 区分大小写的匹配~* 不区分大小写!~: 区分大小…...
私有化地图离线部署方案之高程检索服务
私有化地图离线部署整体解决方案,除硬件之外,一般主要由基础地图服务、查询定位服务、路径规划服务和高程检索服务构成。 我们已经分享过基础地图服务、查询定位服务和路径规划服务,现在再为你分享高程检索服务的方法。 私有化高程检索服务…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
在web-view 加载的本地及远程HTML中调用uniapp的API及网页和vue页面是如何通讯的?
uni-app 中 Web-view 与 Vue 页面的通讯机制详解 一、Web-view 简介 Web-view 是 uni-app 提供的一个重要组件,用于在原生应用中加载 HTML 页面: 支持加载本地 HTML 文件支持加载远程 HTML 页面实现 Web 与原生的双向通讯可用于嵌入第三方网页或 H5 应…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
Spring是如何解决Bean的循环依赖:三级缓存机制
1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间互相持有对方引用,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...
android RelativeLayout布局
<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...
Vue 模板语句的数据来源
🧩 Vue 模板语句的数据来源:全方位解析 Vue 模板(<template> 部分)中的表达式、指令绑定(如 v-bind, v-on)和插值({{ }})都在一个特定的作用域内求值。这个作用域由当前 组件…...
在golang中如何将已安装的依赖降级处理,比如:将 go-ansible/v2@v2.2.0 更换为 go-ansible/@v1.1.7
在 Go 项目中降级 go-ansible 从 v2.2.0 到 v1.1.7 具体步骤: 第一步: 修改 go.mod 文件 // 原 v2 版本声明 require github.com/apenella/go-ansible/v2 v2.2.0 替换为: // 改为 v…...
Python 高级应用10:在python 大型项目中 FastAPI 和 Django 的相互配合
无论是python,或者java 的大型项目中,都会涉及到 自身平台微服务之间的相互调用,以及和第三发平台的 接口对接,那在python 中是怎么实现的呢? 在 Python Web 开发中,FastAPI 和 Django 是两个重要但定位不…...
