一次业务的批量数据任务的处理优化
文章目录
- 一次业务的批量数据任务的处理优化
- 业务背景
- 1.0版本 分批处理模式
- 2.0版本 平衡任务队列模式
- 3.0版本 优化调度平衡任务队列模式
- 总结
一次业务的批量数据任务的处理优化
业务背景
一个重新生成所有客户的财务业务指标数据的批量数据处理任务。
1.0版本 分批处理模式
根据要处理的客户数量,按照最大线程数切分成多个段,尽量保证每个线程处理相同的客户数量。
private void updateForRegenerateByCustomer(List<Integer> customerIdList,SystemUserCommonDTO user, LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList)?customerInfoService.listAll():customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList,user,now);int maxSize = baseInfoList.size();//计算当前任务数量int currentMaxPoolSize = maxPoolSize<maxSize?maxPoolSize:maxSize;CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize];//计算每个任务分段的数量int size = maxSize / currentMaxPoolSize;for(int i=0;i<currentMaxPoolSize;i++){final int begin = i * size;final int end = i==currentMaxPoolSize-1?maxSize:(i+1)*size;//创建异步处理的分段任务tasks[i] = CompletableFuture.runAsync(()->updateForGenerateByCustomerIdList(baseInfoList,begin,end,user,now),executorService).whenCompleteAsync((k,v)-> log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName()));}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成",tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(List<CustomerBaseInfo> baseInfoList,int begin,int end,SystemUserCommonDTO user, LocalDateTime now){//每个线程只处理自己的分段的数据for(int i=begin;i<end;i++){CustomerBaseInfo baseInfo = baseInfoList.get(i);//每个客户独立事务TransactionalUtils.runWithNewTransactional(()->updateForGenerateByCustomerId(baseInfo.getId(),user,now));}}/*** 生成指定客户的所有数据**/private void updateForGenerateByCustomerId(Integer customerId,SystemUserCommonDTO user, LocalDateTime now){//1、重新生成客户的所有业务类型的数据List<FinanceBiMaintainDto> maintainDtoList =financeBiBusinessTypeSupport.getMaintainListByCustomerId(customerId);if(CollectionUtils.isEmpty(maintainDtoList)){return ;}//生成每个指标的数据Map<BusinessIndicatorEnum,List<FinanceBiMaintainDto>> indicatorMaintainDtoMap = maintainDtoList.stream().collect(Collectors.groupingBy(FinanceBiMaintainDto::getIndicator));indicatorMaintainDtoMap.forEach((k,v)->{log.info("重新生成财务业务指标指定客户【{}】的【{}】支持处理开始",customerId,k);financeBiManager.updateForBiMaintain(k, v,user,now);});}
运行耗时:1420.145秒
2.0版本 平衡任务队列模式
1.0 版本 由于不同客户的数据量不同,导致生成数据的耗时不同,因此按照客户数量均分任务的的方式对于每个线程来说,任务量是不一样的,因此可能会导致部分线程太忙,部分线程太空的情况。因此调整为使用队列方式来解决任务分配的问题,每个线程自己取队列中取要处理的客户,直到所有队列中的客户都被处理完,所有的线程结束。这样就避免的线程任务量不平衡问题。
updateForGenerateByCustomerId 方法不需要改造,只需要调整任务分配的相关方法就可以。
private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);//根据线程数,构建固定的任务数量CompletableFuture<?>[] tasks = new CompletableFuture<?>[currentMaxPoolSize];//构建待处理的客户队列,由于这里没有并发读写的情况,因此用ConcurrentLinkedQueue效率会更高一点。ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(baseInfoList.stream().map(CustomerBaseInfo::getId).collect(Collectors.toList()));//创建多个线程去消耗客户队列for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] =CompletableFuture.runAsync(() -> updateForGenerateByCustomerIdList(queue, user, now), executorService).whenCompleteAsync((k, v) -> {if (v != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常",Thread.currentThread().getName()), v);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName());}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(ConcurrentLinkedQueue<Integer> queue, SystemUserCommonDTO user,LocalDateTime now) {Integer customerId = queue.poll();//循环从客户队列中取出待处理的客户,直到所有客户都处理完毕。while (customerId != null) {final Integer currentCustomerId = customerId;TransactionalUtils.runWithNewTransactional(() -> updateForGenerateByCustomerId(currentCustomerId, user, now));customerId = queue.poll();}}
优化后的耗时:1037.059秒
3.0版本 优化调度平衡任务队列模式
2.0版本虽然解决的了每个线程任务量不平衡的问题,但可能出现某个数据量很大的客户在队列的尾部,导致当其他线程都处理完所有的客户时,取到最大数据量的客户的线程仍在运行,任务整体的耗时被增加。因此需要优化调度,将耗时高的客户调度到队列头部,保证耗时最长的客户的优先处理,从而避免最后等待耗时长的线程。
updateForGenerateByCustomerIdList 方法不需要改造,只需要队列构造处理就可以。
private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);//获取客户的统计数据Map<Integer, CustomerStatisticsInfo> customerStatisticsInfoMap =customerStatisticsInfoService.listAll().stream().collect(Collectors.toMap(CustomerStatisticsInfo::getCustomerId, Function.identity()));int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);CompletableFuture<String>[] tasks = new CompletableFuture[currentMaxPoolSize];//根据客户的统计数据,构建待处理的客户队列ConcurrentLinkedQueue<Integer> queue =baseInfoList.stream().map(item -> customerStatisticsInfoMap.get(item.getId())).filter(Objects::nonNull)
//队列按照客户数据量倒序排列 .sorted(Comparator.comparing(CustomerStatisticsInfo::getNumberOfCheckedSatisfactoryActivitys,Comparator.reverseOrder())).map(CustomerStatisticsInfo::getCustomerId).collect(Collectors.toCollection(ConcurrentLinkedQueue::new));for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] = CompletableFuture.supplyAsync(() -> {updateForGenerateByCustomerIdList(queue, user, now);return Thread.currentThread().getName();}, executorService).whenCompleteAsync((k, ex) -> {if (ex != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", k), ex);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", k);}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}
耗时:726.725秒
总结
最终的耗时从1400多秒 降低到700多秒。降低了一半左右。
相关文章:
一次业务的批量数据任务的处理优化
文章目录 一次业务的批量数据任务的处理优化业务背景1.0版本 分批处理模式2.0版本 平衡任务队列模式3.0版本 优化调度平衡任务队列模式总结 一次业务的批量数据任务的处理优化 业务背景 一个重新生成所有客户的财务业务指标数据的批量数据处理任务。 1.0版本 分批处理模式 …...
新能源汽车充电站远程监控系统S275钡铼技术无线RTU
新能源汽车充电站的远程监控系统在现代城市基础设施中扮演着至关重要的角色,而钡铼技术的S275无线RTU作为一款先进的物联网数据监测采集控制短信报警终端,为充电站的安全运行和高效管理提供了强大的技术支持。 技术特点和功能 钡铼S275采用了基于UCOSI…...
海外视频媒体发布/发稿:如何在国外媒体以视频的形式宣发
1. 背景介绍 在如今数字化时代,每个国家都拥有着各自的视频媒体平台,而主流媒体也都纷纷加入了视频发布的行列。视频媒体的宣发形式主要包括油管Youtube等视频分享平台,以及图文配合的发布方式。通过在视频中夹带链接,媒体可以以…...
HTML 【实用教程】(2024最新版)
核心思想 —— 语义化 【面试题】如何理解 HTML 语义化 ?仅通过标签便能判断内容的类型,特别是区分标题、段落、图片和表格 增加代码可读性,让人更容易读懂对SEO更加友好,让搜索引擎更容易读懂 html 文件的基本结构 html 文件的文件后缀为 …...
How to Describe Figures in a Research Article
How to Describe Figures in a Research Article DateAuthorVersionNote2024.07.10Dog TaoV1.0Finish the document. 文章目录 How to Describe Figures in a Research ArticleGeneral GuidelinesDetailed DescriptionsCommon Describing Phrases Effective communication of …...
昇思MindSpore学习入门-CELL与参数一
Cell作为神经网络构造的基础单元,与神经网络层(Layer)的概念相对应,对Tensor计算操作的抽象封装,能够更准确清晰地对神经网络结构进行表示。除了基础的Tensor计算流程定义外,神经网络层还包含了参数管理、状态管理等功能。而参数(…...
【k8s中安装rabbitmq】k8s中安装rabbitmq并搭建镜像集群-hostpath版
文章目录 简介一.条件及环境说明二.需求说明三.实现原理及说明四.详细步骤4.1.规划节点标签4.2.创建configmap配置4.3.创建三个statefulset和service headless配置4.4.创建service配置 五.安装完后的配置六.安装说明 简介 k8s集群中搭建rabbitmq集群服务一般都会用到pvc&#x…...
(5) 深入探索Python-Pandas库的核心数据结构:Series详解
目录 前言1. Series 简介2. Series的特点3. Series的创建3.1 使用列表创建Series3.2 使用字典创建Series3.3 使用列表和自定义索引创建Series3.4 指定数据类型和名称 4. Series的索引/切片4.1 下标索引:基于整数位置的索引4.2 基于标签的索引4.3 切片4.4 使用.loc[]…...
JAVA之开发神器——IntelliJ IDEA的下载与安装
一、IDEA是什么? IEAD是JetBrains公司开发的专用于java开发的一款集成开发环境。由于其功能强大且符合人体工程学(就是更懂你)的优点,深受java开发人员的喜爱。目前在java开发工具中占比3/4。如果你要走java开发方向,那…...
通过Umijs从0到1搭建一个React项目
有一阵时间没写react了,今天通过umi搭建一个demo项目复习一下react;umi是一个可扩展的企业级前端应用框架,在react市场中还是比较火的一个框架。 Umi官方文档:Umi 介绍 (umijs.org) 一、构建项目。 1、安装包管理工具。 官方推…...
Redis 数据过期及淘汰策略
Redis 数据过期及淘汰策略 过期策略 定时过期 在设置key的过期时间的同时,为该key创建一个定时器,让定时器在key的过期时间来临时,对key进行删除。到过期时间就会立即清除。该策略可以立即清除过期的数据,对内存很友好&a…...
vue vite+three在线编辑模型导入导出
文章目录 序一、1.0.0版本1.新增2.编辑3.导出4.导入 总结 序 要实现一个类似于数字孪生的场景 可以在线、新增、删除模型 、以及编辑模型的颜色、长宽高 然后还要实现 编辑完后 保存为json数据 记录模型数据 既可以导入也可以导出 一、1.0.0版本 1.新增 先拿建议的立方体来…...
去水印小程序源码修复版-前端后端内置接口+第三方接口
去水印小程序源码,前端后端,内置接口第三方接口, 修复数据库账号密码错误问题,内置接口支持替换第三方接口, 文件挺全的,可以添加流量主代码,搭建需要准备一台服务器,备案域名和http…...
机器学习:预测评估8类指标
机器学习:8类预测评估指标 R方值、平均值绝对误差值MAE、均方误差MSE、均方误差根EMSE、中位数绝对误差MAD、平均绝对百分误差MAPE、可解释方差分EVS、均方根对数误差MLSE。 一、R方值 1、说明: R方值,也称为确定系数或拟合优度ÿ…...
【深度学习基础】MAC pycharm 专业版安装与激活
文章目录 一、pycharm专业版安装二、激活 一、pycharm专业版安装 PyCharm是一款专为Python开发者设计的集成开发环境(IDE),旨在帮助用户在使用Python语言开发时提高效率。以下是对PyCharm软件的详细介绍,包括其作用和主要功能&…...
排序相关算法--1.插入排序+冒泡排序回顾
1.基本分类 2.插入排序 特点:有实践意义(例如后期快排的优化),适应性强,一般不会到时间复杂度最坏的情况。 将第一个元素视为已经排好序的序列。取出下一个元素,在已经排好序的序列中从后往前比较…...
变阻器的故障排除方法有哪些?
变阻器,特别是滑动变阻器,作为电子电路中的常见元件,其故障排除方法主要依据具体的故障现象来确定。以下是一些常见的故障现象及其排除方法: 一、接触不良 现象:电阻器不起作用或电压不稳定。 排除方法: …...
软考《信息系统运行管理员》-3.1信息系统设施运维的管理体系
3.1信息系统设施运维的管理体系 1 信息系统设施运维的对象 基础环境 主要包括信息系统运行环境(机房、设备间、配线室、基站、云计算中心 等)中的空调系统、供配电系统、通信应急设备系统、防护设备系统(如消防系统、安全系统) 等,能维持系统安全正常运转…...
Nginx重定向
Nginx重定向 location 匹配 location匹配的就是后面的URL /WordPress 192.168.118.10/wordpress location匹配的分类和优先级 1.精确匹配 location/对字符串进行完全匹配,必须完全符合2.正则匹配 ^~ 前缀匹配,以什么为开头~ 区分大小写的匹配~* 不区分大小写!~: 区分大小…...
私有化地图离线部署方案之高程检索服务
私有化地图离线部署整体解决方案,除硬件之外,一般主要由基础地图服务、查询定位服务、路径规划服务和高程检索服务构成。 我们已经分享过基础地图服务、查询定位服务和路径规划服务,现在再为你分享高程检索服务的方法。 私有化高程检索服务…...
KMS_VL_ALL_AIO架构解析:Windows与Office智能激活的技术方案
KMS_VL_ALL_AIO架构解析:Windows与Office智能激活的技术方案 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO KMS_VL_ALL_AIO是一款基于KMS协议实现的Windows和Office智能激活解决方案…...
BetterRTX终极指南:三步免费提升Minecraft画质的完整方案
BetterRTX终极指南:三步免费提升Minecraft画质的完整方案 【免费下载链接】BetterRTX-Installer The Powershell Installer for BetterRTX! BetterRTX is a Ray-Tracing mod for Minecraft Bedrock. 项目地址: https://gitcode.com/gh_mirrors/be/BetterRTX-Insta…...
为Dify扩展AI图表与文档生成能力:微服务架构实战指南
1. 项目概述:为Dify打造专属的AI图表与文档生成工具箱如果你正在使用Dify构建自己的AI应用,并且希望让AI不仅能生成文字,还能直接输出流程图、思维导图、PPT甚至试卷,那么这个项目就是为你准备的。brightwang/dify-tool-service是…...
为什么92%的AI创作者不敢打印自己的Midjourney作品?揭秘树莓派印相避坑指南,含色彩管理ICC配置包(限免72小时)
更多请点击: https://intelliparadigm.com 第一章:为什么92%的AI创作者不敢打印自己的Midjourney作品? 当一张由 Midjourney 生成的「超写实森林神殿」在屏幕上熠熠生辉时,创作者往往兴奋地截图、转发、设为壁纸——却极少有人按…...
别再傻傻分不清了!VB、VBS、VBA到底该用哪个?从Excel自动化到网页脚本的实战选择指南
VB、VBS与VBA实战指南:从Excel自动化到系统脚本的精准选择 每次打开Excel准备处理数据时,你是否纠结过该用VBA还是VBS?当需要批量重命名文件时,是否犹豫过VB和VBS哪个更高效?这三种看似相似的"VB系"语言&am…...
别再只调API了!微信支付Native/JSAPI开发中,订单号生成与回调处理的5个实战避坑点
微信支付开发实战:订单与回调的五个关键陷阱与解决方案 在移动支付领域,微信支付作为主流平台之一,其开发文档看似详尽,但实际落地时仍存在诸多"暗坑"。许多开发者过度关注支付接口调用本身,却忽视了订单生成…...
ExDark低光照图像数据集技术架构:构建真实世界低光照计算机视觉解决方案
ExDark低光照图像数据集技术架构:构建真实世界低光照计算机视觉解决方案 【免费下载链接】Exclusively-Dark-Image-Dataset Exclusively Dark (ExDARK) dataset which to the best of our knowledge, is the largest collection of low-light images taken in very …...
玩转Proteus虚拟仪器与图表仿真:用示波器、逻辑分析仪调试数字电路的完整指南
玩转Proteus虚拟仪器与图表仿真:用示波器、逻辑分析仪调试数字电路的完整指南 在数字电路设计领域,仿真验证环节往往决定着项目的成败。传统面包板调试需要反复焊接元器件、连接示波器探头,而一个简单的接线错误就可能导致数小时的排查。Prot…...
排查华为USG防火墙上不了网?先检查这5个配置点(附真实配置案例)
华为USG防火墙上网故障排查实战指南 当内网用户突然无法访问互联网时,作为运维人员往往会面临巨大的压力。华为USG防火墙作为企业网络的核心安全设备,其配置的每一个细节都可能成为网络连通性的关键。本文将从一个真实的故障排查案例出发,带您…...
【DL】信息注入
在多模态生成(如文生图、3D生成)和视觉语言模型(VLM/VLA)的架构设计中,如何将外部条件(如文本、音频、时间步、控制信号)优雅且高效地“注入”到主干网络(Backbone)中,是决定模型性能的核心。 以下是深度整合了底层张量维度差异的 5 大类主流信息注入方法全景指南:…...
