当前位置: 首页 > news >正文

一次业务的批量数据任务的处理优化

文章目录

  • 一次业务的批量数据任务的处理优化
    • 业务背景
    • 1.0版本 分批处理模式
    • 2.0版本 平衡任务队列模式
    • 3.0版本 优化调度平衡任务队列模式
    • 总结

一次业务的批量数据任务的处理优化

业务背景

一个重新生成所有客户的财务业务指标数据的批量数据处理任务。

1.0版本 分批处理模式

根据要处理的客户数量,按照最大线程数切分成多个段,尽量保证每个线程处理相同的客户数量。

    private void updateForRegenerateByCustomer(List<Integer> customerIdList,SystemUserCommonDTO user, LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList)?customerInfoService.listAll():customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList,user,now);int maxSize = baseInfoList.size();//计算当前任务数量int currentMaxPoolSize = maxPoolSize<maxSize?maxPoolSize:maxSize;CompletableFuture[] tasks = new CompletableFuture[currentMaxPoolSize];//计算每个任务分段的数量int size = maxSize / currentMaxPoolSize;for(int i=0;i<currentMaxPoolSize;i++){final int begin = i * size;final int end = i==currentMaxPoolSize-1?maxSize:(i+1)*size;//创建异步处理的分段任务tasks[i] = CompletableFuture.runAsync(()->updateForGenerateByCustomerIdList(baseInfoList,begin,end,user,now),executorService).whenCompleteAsync((k,v)-> log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName()));}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成",tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(List<CustomerBaseInfo> baseInfoList,int begin,int end,SystemUserCommonDTO user, LocalDateTime now){//每个线程只处理自己的分段的数据for(int i=begin;i<end;i++){CustomerBaseInfo baseInfo = baseInfoList.get(i);//每个客户独立事务TransactionalUtils.runWithNewTransactional(()->updateForGenerateByCustomerId(baseInfo.getId(),user,now));}}/*** 生成指定客户的所有数据**/private void updateForGenerateByCustomerId(Integer customerId,SystemUserCommonDTO user, LocalDateTime now){//1、重新生成客户的所有业务类型的数据List<FinanceBiMaintainDto> maintainDtoList =financeBiBusinessTypeSupport.getMaintainListByCustomerId(customerId);if(CollectionUtils.isEmpty(maintainDtoList)){return ;}//生成每个指标的数据Map<BusinessIndicatorEnum,List<FinanceBiMaintainDto>> indicatorMaintainDtoMap = maintainDtoList.stream().collect(Collectors.groupingBy(FinanceBiMaintainDto::getIndicator));indicatorMaintainDtoMap.forEach((k,v)->{log.info("重新生成财务业务指标指定客户【{}】的【{}】支持处理开始",customerId,k);financeBiManager.updateForBiMaintain(k, v,user,now);});}

运行耗时:1420.145秒

2.0版本 平衡任务队列模式

1.0 版本 由于不同客户的数据量不同,导致生成数据的耗时不同,因此按照客户数量均分任务的的方式对于每个线程来说,任务量是不一样的,因此可能会导致部分线程太忙,部分线程太空的情况。因此调整为使用队列方式来解决任务分配的问题,每个线程自己取队列中取要处理的客户,直到所有队列中的客户都被处理完,所有的线程结束。这样就避免的线程任务量不平衡问题。

updateForGenerateByCustomerId 方法不需要改造,只需要调整任务分配的相关方法就可以。

private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);//根据线程数,构建固定的任务数量CompletableFuture<?>[] tasks = new CompletableFuture<?>[currentMaxPoolSize];//构建待处理的客户队列,由于这里没有并发读写的情况,因此用ConcurrentLinkedQueue效率会更高一点。ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>(baseInfoList.stream().map(CustomerBaseInfo::getId).collect(Collectors.toList()));//创建多个线程去消耗客户队列for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] =CompletableFuture.runAsync(() -> updateForGenerateByCustomerIdList(queue, user, now), executorService).whenCompleteAsync((k, v) -> {if (v != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常",Thread.currentThread().getName()), v);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成",Thread.currentThread().getName());}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}/*** 生成指定客户列表的所有数据**/private void updateForGenerateByCustomerIdList(ConcurrentLinkedQueue<Integer> queue, SystemUserCommonDTO user,LocalDateTime now) {Integer customerId = queue.poll();//循环从客户队列中取出待处理的客户,直到所有客户都处理完毕。while (customerId != null) {final Integer currentCustomerId = customerId;TransactionalUtils.runWithNewTransactional(() -> updateForGenerateByCustomerId(currentCustomerId, user, now));customerId = queue.poll();}}

优化后的耗时:1037.059秒

3.0版本 优化调度平衡任务队列模式

2.0版本虽然解决的了每个线程任务量不平衡的问题,但可能出现某个数据量很大的客户在队列的尾部,导致当其他线程都处理完所有的客户时,取到最大数据量的客户的线程仍在运行,任务整体的耗时被增加。因此需要优化调度,将耗时高的客户调度到队列头部,保证耗时最长的客户的优先处理,从而避免最后等待耗时长的线程。

updateForGenerateByCustomerIdList 方法不需要改造,只需要队列构造处理就可以。


private void updateForRegenerateByCustomer(List<Integer> customerIdList, SystemUserCommonDTO user,LocalDateTime now) {List<CustomerBaseInfo> baseInfoList = CollectionUtils.isEmpty(customerIdList) ? customerInfoService.listAll() :customerInfoService.listByIdList(customerIdList);//先清理客户的数据updateForCleanByCustomerIdList(baseInfoList, user, now);//获取客户的统计数据Map<Integer, CustomerStatisticsInfo> customerStatisticsInfoMap =customerStatisticsInfoService.listAll().stream().collect(Collectors.toMap(CustomerStatisticsInfo::getCustomerId, Function.identity()));int maxSize = baseInfoList.size();int currentMaxPoolSize = Math.min(maxPoolSize, maxSize);CompletableFuture<String>[] tasks = new CompletableFuture[currentMaxPoolSize];//根据客户的统计数据,构建待处理的客户队列ConcurrentLinkedQueue<Integer> queue =baseInfoList.stream().map(item -> customerStatisticsInfoMap.get(item.getId())).filter(Objects::nonNull)
//队列按照客户数据量倒序排列               .sorted(Comparator.comparing(CustomerStatisticsInfo::getNumberOfCheckedSatisfactoryActivitys,Comparator.reverseOrder())).map(CustomerStatisticsInfo::getCustomerId).collect(Collectors.toCollection(ConcurrentLinkedQueue::new));for (int i = 0; i < currentMaxPoolSize; i++) {tasks[i] = CompletableFuture.supplyAsync(() -> {updateForGenerateByCustomerIdList(queue, user, now);return Thread.currentThread().getName();}, executorService).whenCompleteAsync((k, ex) -> {if (ex != null) {log.error(String.format("重新生成财务业务指标客户的所有数据-线程【%s】发生异常", k), ex);} else {log.info("重新生成财务业务指标客户的所有数据-线程【{}】完成", k);}});}// 向线程池提交任务CompletableFuture.allOf(tasks).whenComplete((v, th) -> log.info("重新生成财务业务指标客户的所有数据-【{}】个子线程处理完成", tasks.length)).join();}

耗时:726.725秒

总结

最终的耗时从1400多秒 降低到700多秒。降低了一半左右。

相关文章:

一次业务的批量数据任务的处理优化

文章目录 一次业务的批量数据任务的处理优化业务背景1.0版本 分批处理模式2.0版本 平衡任务队列模式3.0版本 优化调度平衡任务队列模式总结 一次业务的批量数据任务的处理优化 业务背景 一个重新生成所有客户的财务业务指标数据的批量数据处理任务。 1.0版本 分批处理模式 …...

新能源汽车充电站远程监控系统S275钡铼技术无线RTU

新能源汽车充电站的远程监控系统在现代城市基础设施中扮演着至关重要的角色&#xff0c;而钡铼技术的S275无线RTU作为一款先进的物联网数据监测采集控制短信报警终端&#xff0c;为充电站的安全运行和高效管理提供了强大的技术支持。 技术特点和功能 钡铼S275采用了基于UCOSI…...

海外视频媒体发布/发稿:如何在国外媒体以视频的形式宣发

1. 背景介绍 在如今数字化时代&#xff0c;每个国家都拥有着各自的视频媒体平台&#xff0c;而主流媒体也都纷纷加入了视频发布的行列。视频媒体的宣发形式主要包括油管Youtube等视频分享平台&#xff0c;以及图文配合的发布方式。通过在视频中夹带链接&#xff0c;媒体可以以…...

HTML 【实用教程】(2024最新版)

核心思想 —— 语义化 【面试题】如何理解 HTML 语义化 ?仅通过标签便能判断内容的类型&#xff0c;特别是区分标题、段落、图片和表格 增加代码可读性&#xff0c;让人更容易读懂对SEO更加友好&#xff0c;让搜索引擎更容易读懂 html 文件的基本结构 html 文件的文件后缀为 …...

How to Describe Figures in a Research Article

How to Describe Figures in a Research Article DateAuthorVersionNote2024.07.10Dog TaoV1.0Finish the document. 文章目录 How to Describe Figures in a Research ArticleGeneral GuidelinesDetailed DescriptionsCommon Describing Phrases Effective communication of …...

昇思MindSpore学习入门-CELL与参数一

Cell作为神经网络构造的基础单元&#xff0c;与神经网络层(Layer)的概念相对应&#xff0c;对Tensor计算操作的抽象封装&#xff0c;能够更准确清晰地对神经网络结构进行表示。除了基础的Tensor计算流程定义外&#xff0c;神经网络层还包含了参数管理、状态管理等功能。而参数(…...

【k8s中安装rabbitmq】k8s中安装rabbitmq并搭建镜像集群-hostpath版

文章目录 简介一.条件及环境说明二.需求说明三.实现原理及说明四.详细步骤4.1.规划节点标签4.2.创建configmap配置4.3.创建三个statefulset和service headless配置4.4.创建service配置 五.安装完后的配置六.安装说明 简介 k8s集群中搭建rabbitmq集群服务一般都会用到pvc&#x…...

(5) 深入探索Python-Pandas库的核心数据结构:Series详解

目录 前言1. Series 简介2. Series的特点3. Series的创建3.1 使用列表创建Series3.2 使用字典创建Series3.3 使用列表和自定义索引创建Series3.4 指定数据类型和名称 4. Series的索引/切片4.1 下标索引&#xff1a;基于整数位置的索引4.2 基于标签的索引4.3 切片4.4 使用.loc[]…...

JAVA之开发神器——IntelliJ IDEA的下载与安装

一、IDEA是什么&#xff1f; IEAD是JetBrains公司开发的专用于java开发的一款集成开发环境。由于其功能强大且符合人体工程学&#xff08;就是更懂你&#xff09;的优点&#xff0c;深受java开发人员的喜爱。目前在java开发工具中占比3/4。如果你要走java开发方向&#xff0c;那…...

通过Umijs从0到1搭建一个React项目

有一阵时间没写react了&#xff0c;今天通过umi搭建一个demo项目复习一下react&#xff1b;umi是一个可扩展的企业级前端应用框架&#xff0c;在react市场中还是比较火的一个框架。 Umi官方文档&#xff1a;Umi 介绍 (umijs.org) 一、构建项目。 1、安装包管理工具。 官方推…...

Redis 数据过期及淘汰策略

Redis 数据过期及淘汰策略 过期策略 定时过期 在设置key​的过期时间的同时&#xff0c;为该key​创建一个定时器&#xff0c;让定时器在key​的过期时间来临时&#xff0c;对key进行删除。到过期时间就会立即清除。该策略可以立即清除过期的数据&#xff0c;对内存很友好&a…...

vue vite+three在线编辑模型导入导出

文章目录 序一、1.0.0版本1.新增2.编辑3.导出4.导入 总结 序 要实现一个类似于数字孪生的场景 可以在线、新增、删除模型 、以及编辑模型的颜色、长宽高 然后还要实现 编辑完后 保存为json数据 记录模型数据 既可以导入也可以导出 一、1.0.0版本 1.新增 先拿建议的立方体来…...

去水印小程序源码修复版-前端后端内置接口+第三方接口

去水印小程序源码&#xff0c;前端后端&#xff0c;内置接口第三方接口&#xff0c; 修复数据库账号密码错误问题&#xff0c;内置接口支持替换第三方接口&#xff0c; 文件挺全的&#xff0c;可以添加流量主代码&#xff0c;搭建需要准备一台服务器&#xff0c;备案域名和http…...

机器学习:预测评估8类指标

机器学习&#xff1a;8类预测评估指标 R方值、平均值绝对误差值MAE、均方误差MSE、均方误差根EMSE、中位数绝对误差MAD、平均绝对百分误差MAPE、可解释方差分EVS、均方根对数误差MLSE。 一、R方值 1、说明&#xff1a; R方值&#xff0c;也称为确定系数或拟合优度&#xff…...

【深度学习基础】MAC pycharm 专业版安装与激活

文章目录 一、pycharm专业版安装二、激活 一、pycharm专业版安装 PyCharm是一款专为Python开发者设计的集成开发环境&#xff08;IDE&#xff09;&#xff0c;旨在帮助用户在使用Python语言开发时提高效率。以下是对PyCharm软件的详细介绍&#xff0c;包括其作用和主要功能&…...

排序相关算法--1.插入排序+冒泡排序回顾

1.基本分类 2.插入排序 特点&#xff1a;有实践意义&#xff08;例如后期快排的优化&#xff09;&#xff0c;适应性强&#xff0c;一般不会到时间复杂度最坏的情况。 将第一个元素视为已经排好序的序列。取出下一个元素&#xff0c;在已经排好序的序列中从后往前比较&#xf…...

变阻器的故障排除方法有哪些?

变阻器&#xff0c;特别是滑动变阻器&#xff0c;作为电子电路中的常见元件&#xff0c;其故障排除方法主要依据具体的故障现象来确定。以下是一些常见的故障现象及其排除方法&#xff1a; 一、接触不良 现象&#xff1a;电阻器不起作用或电压不稳定。 排除方法&#xff1a; …...

软考《信息系统运行管理员》-3.1信息系统设施运维的管理体系

3.1信息系统设施运维的管理体系 1 信息系统设施运维的对象 基础环境 主要包括信息系统运行环境(机房、设备间、配线室、基站、云计算中心 等)中的空调系统、供配电系统、通信应急设备系统、防护设备系统(如消防系统、安全系统) 等&#xff0c;能维持系统安全正常运转&#xf…...

Nginx重定向

Nginx重定向 location 匹配 location匹配的就是后面的URL /WordPress 192.168.118.10/wordpress location匹配的分类和优先级 1.精确匹配 location/对字符串进行完全匹配,必须完全符合2.正则匹配 ^~ 前缀匹配,以什么为开头~ 区分大小写的匹配~* 不区分大小写!~: 区分大小…...

私有化地图离线部署方案之高程检索服务

私有化地图离线部署整体解决方案&#xff0c;除硬件之外&#xff0c;一般主要由基础地图服务、查询定位服务、路径规划服务和高程检索服务构成。 我们已经分享过基础地图服务、查询定位服务和路径规划服务&#xff0c;现在再为你分享高程检索服务的方法。 私有化高程检索服务…...

Vim 调用外部命令学习笔记

Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

超短脉冲激光自聚焦效应

前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应&#xff0c;这是一种非线性光学现象&#xff0c;主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场&#xff0c;对材料产生非线性响应&#xff0c;可能…...

大话软工笔记—需求分析概述

需求分析&#xff0c;就是要对需求调研收集到的资料信息逐个地进行拆分、研究&#xff0c;从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要&#xff0c;后续设计的依据主要来自于需求分析的成果&#xff0c;包括: 项目的目的…...

Admin.Net中的消息通信SignalR解释

定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

云原生玩法三问:构建自定义开发环境

云原生玩法三问&#xff1a;构建自定义开发环境 引言 临时运维一个古董项目&#xff0c;无文档&#xff0c;无环境&#xff0c;无交接人&#xff0c;俗称三无。 运行设备的环境老&#xff0c;本地环境版本高&#xff0c;ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

2025季度云服务器排行榜

在全球云服务器市场&#xff0c;各厂商的排名和地位并非一成不变&#xff0c;而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势&#xff0c;对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析&#xff1a; 一、全球“三巨头”…...

08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险

C#入门系列【类的基本概念】&#xff1a;开启编程世界的奇妙冒险 嘿&#xff0c;各位编程小白探险家&#xff01;欢迎来到 C# 的奇幻大陆&#xff01;今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类&#xff01;别害怕&#xff0c;跟着我&#xff0c;保准让你轻松搞…...