【并发编程】自研数据同步工具优化:创建线程池多线程异步去分页调用其他服务接口获取海量数据
文章目录
- 场景:
- 解决方案
场景:
前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。
刚开始的设计思路是,,我创建多个服务同时去请求A服务的接口,每个服务都请求到全量数据,由于这些服务都注册在xxl-job上,而且采用的是分片广播的路由策略,那么每个服务就可以只处理请求到的所有数据中id%服务总数==分片索引的部分数据,然后交给kafka,由kafka决定这条数据应该放到哪个分区上。
解决方案
最近学了线程池后,回过头来思考,认为之前的方案还有很大的优化空间。
- 1.当数据量很大时,一次性查询所有数据会导致数据库的负载过大,而使用分页查询,每次只查询部分数据,可以减轻数据库的负担,从而提高数据库的性能和响应速度,所以请求数据方每次分页查询少量数据,这样可以整体降低请求数据的时间。
- 第一次优化.之前是每个服务都要把全量数据请求过来,假设全量数据1000w条,一个服务请求数据需要100s,我开5个服务,那请求数据的总时长就是500s。现在把1000w条数据均分给5个服务,那1个服务就只需要请求200w条数据,耗时20s,那所有服务的请求总时长就是100s。总体耗时缩小了5倍。上面说的分页查询就可以实现:页面大小假设10w(也就是将1000w/10w,逻辑上分成了100页),每个服务自己的分片索引作为页号,每次请求完,都给索引加上分片总数(例如:当前注册了五个服务,那分片总数=5,对于分片索引为1的服务来说,请求的页号为1,6,11,16,21。。。,对于分片索引为2的服务来说,请求的页号为2,7,12,17。。。,对于分片索引为3的服务来说,请求的页号为3,8,13,18,。。。。,对于分片索引为4的服务来说,请求的页号为4,9,14,19。。。。,对于分片索引为5的服务来说,请求的页号为5,10,15,20.。。)这样1000w条数据就均分到每个服务上了。对于每个服务都是单线程去请求数据,就可以将请求操作以及(页号+总服务数)的操作写在一个while循环里,一直请求数据,直到请求的数据为空时(也就是页号超过100了),退出while。
//单线程情况下while(true){String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
// logger.info("body:{}",body);//2.获取返回结果的messageJSONObject jsonObject = new JSONObject();
// if (StrUtil.isNotBlank(body)) {jsonObject = JSONUtil.parseObj(body);
// logger.info("name:{}",Thread.currentThread().getName());
// }
// logger.info("jsonObject:{}",jsonObject);//3.从body中获取dataList<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);if(CollectionUtil.isEmpty(tests)){break;}shardIndex+=shardTotal;}
- 第二次优化: 了解了线程池后,还可以再优化。之前是一个服务单线程循环请求需要20s(假设),每次请求10w条,需要请求200w/10w,也就是20次,那一次请求就需要1s。如果使用线程池的话,那么耗时还会更小,因为当你将任务都交给线程池去执行时,多个线程会同时(并行)去请求各自页的数据,假如你只设置了4个线程,那这4个线程会同时发起请求获取数据,1s会完成4次请求,那分给服务的200w,5s就请求完了。那5个服务从总耗时500s,降到了总耗时5s*5=25s。
这次优化,第一版代码(只展示了请求数据的代码,其他业务代码没有展示)
一直向线程池里扔请求数据的任务,当某个任务请求到的数据是空的时候,意味着要请求的数据已经没了,那就结束循环,不再扔请求数据的任务。
//线程共享变量static volatile boolean flag = true;@XxlJob(value = "fenpian")public void fenpian() {int shardIndex = XxlJobHelper.getShardIndex();
// int shardTotal = XxlJobHelper.getShardTotal();//分片总数int shardTotal = 4;AtomicInteger pageNum = new AtomicInteger(shardIndex);//多线程情况下
// List<CompletableFuture>completableFutureList=new ArrayList<>();while (flag){CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));JSONObject jsonObject = new JSONObject();jsonObject = JSONUtil.parseObj(body);List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);logger.info("tests的size:{}",tests.size());if(CollectionUtil.isEmpty(tests)){flag=false;}},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info("任务结束");executorService.shutdown();
上面代码会有一个问题,就是while循环往线程池里扔任务,所有线程在执行时,会在请求数据那里”停留“一段时间,“停留期间”还会一直循环向线程池扔任务,当线程执行完某次请求得到空数据结束循环时,等待队列中还排着大堆任务等着去请求数据。
为了解决这个问题,我改用了for循环提交任务,提前根据请求数据总量、每次读取的条数,以及服务总数得到每个服务需要执行的任务数。
第二版代码
@XxlJob(value = "fenpian")public void fenpian() {int shardIndex = XxlJobHelper.getShardIndex()+1;int shardTotal = XxlJobHelper.getShardTotal();//分片总数
// int shardTotal = 4;AtomicInteger pageNum = new AtomicInteger(shardIndex);//多线程情况下List<CompletableFuture>completableFutureList=new ArrayList<>();//总条数double total = 10000000;//读取的条数double pageSize=1000;double tasks = Math.ceil( total / (double) shardTotal / pageSize);logger.info("任务数{}",tasks);for(double i=0;i<tasks;i++){CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());String body = HttpUtil.get(url);JSONObject jsonObject = new JSONObject();jsonObject = JSONUtil.parseObj(body);List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);logger.info("tests的size:{}",tests.size());},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info("任务结束");
如有问题,请求指正(^^ゞ
相关文章:
【并发编程】自研数据同步工具优化:创建线程池多线程异步去分页调用其他服务接口获取海量数据
文章目录 场景:解决方案 场景: 前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。 刚开始的设…...
七、dokcer-compose部署springboot的jar
1、准备 打包后包名为 ruoyi-admin.jar 增加接口 httpL//{ip}:{port}/common/test/han #环境变量预application.yml 中REDIS_HOSTt的值,去环境变量去找;如果找不到REDIS_HOST就用myredis 1、Dockerfile FROM hlw/java:8-jreRUN ln -sf /usr/share/z…...
k8s 使用 containerd 运行时配置 http 私服
简介 Kubernetes 从 v1.20 开始弃用 Docker,并推荐用户切换到基于容器运行时接口(CRI)的容器引擎,如 containerd、cri-o 等。 目前使用的环境中使用了 Kubernetes v1.22.3,containerd 1.4.3,containerd 在…...
【新品发布】ChatWork企业知识库系统源码
系统简介 基于前后端分离架构以及Vue3、uni-app、ThinkPHP6.x、PostgreSQL、pgvector技术栈开发,包含PC端、H5端。 ChatWork支持问答式和文档式知识库,能够导入txt、doc、docx、pdf、md等多种格式文档。 导入数据完成向量化训练后,用户提问…...
疫情打卡 vue+springboot疾病防控管理系统java jsp源代码
本项目为前几天收费帮学妹做的一个项目,Java EE JSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。 一、项目描述 疫情打卡 vuespringboot 系统有1权限:管理…...
python --连接websocket
如果只是模拟js端发送接收的话,已经有了websocket server的话,只有client就好了 pip install websocket-client#-*- encoding:utf-8 -*-import sys sys.path.append("..") from socket import * import json, time, threading from websocket…...
数据库内日期类型数据大于小于条件查找注意事项
只传date格式的日期取查datetime的字段的话默认是 00:00:00 日期类型字符串需要使用 ’ ’ 单引号括住 使用大于小于条件查询某一天的日期数据 前后判断条件不能是同一天 一个例子 数据库内数据: 查询2023-08-14之后的数据: select * from tetst…...
网易有道押宝大模型,打响智能硬件突围战
本文转载自产业科技 自今年开年以来,AI大模型这场火势能不减,如今已燃到教育领域。 7月26日,网易有道举办了“powered by子曰”教育大模型应用成果发布会,推出国内首个教育领域垂直大模型“子曰”,并一口气发布了基于…...
KAFKA第二课之生产者(面试重点)
生产者学习 1.1 生产者消息发送流程 在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到K…...
Mybatis 源码 ∞ :杂七杂八
文章目录 一、前言二、TypeHandler三、KeyGenerator四、Plugin1 Interceptor2 org.apache.ibatis.plugin.Plugin3. 调用场景 五、Mybatis 嵌套映射 BUG1. 示例2. 原因3. 解决方案 六、discriminator 标签七、其他1. RowBounds2. ResultHandler3. MapKey 一、前言 Mybatis 官网…...
堆的实现以及应用
💓博主个人主页:不是笨小孩👀 ⏩专栏分类:数据结构与算法👀 刷题专栏👀 C语言👀 🚚代码仓库:笨小孩的代码库👀 ⏩社区:不是笨小孩👀 🌹欢迎大家三连关注&…...
MySql011——检索数据:过滤数据(使用正则表达式)
前提:使用《MySql006——检索数据:基础select语句》中创建的products表 一、正则表达式介绍 关于正则表达式的介绍大家可以看我的这一篇博客《Java038——正则表达式》,这里就不再累赘。 二、使用MySQL正则表达式 2.1、基本字符匹配 检索…...
数据结构与算法-栈(LIFO)(经典面试题)
一:面试经典 1. 如何设计一个括号匹配的功能?比如给你一串括号让你判断是否符合我们的括号原则, 栈 力扣 2. 如何设计一个浏览器的前进和后退功能? 思想:两个栈,一个栈存放前进栈&…...
NSI45030AT1G LED驱动器方案为汽车外部及内部照明恒流稳流器(CCR)方案
关于线性恒流调节器(CCR):是一种用于控制电流的稳定输出。它通常由一个功率晶体管和一个参考电流源组成。CCR的工作原理是通过不断调节功率晶体管的导通时间来维持输出电流的恒定。当输出电流超过设定值时,CCR会减少功率晶体管的导…...
uni-app中使用pinia
目录 Pinia 是什么? uni-app 使用Pinia main.js 中引用pinia 创建和注册模块 定义pinia方式 选项options方式 定义pinia 页面中使用 pinia选项options方式 函数方式 定义pinia 页面中使用 函数方式 定义的pinia Pinia 是什么? Pinia࿰…...
Spring之事务管理
文章目录 前言一、事务及其参数含义1.事务的四个特性2.事务的传播行为(propagation)3.事务隔离性4.事务的隔离级别(ioslation)5.timeout(超时)6.readOnly(是否只读)7.rollbackFor&am…...
linux常见的mysql问题
当涉及到MySQL在Linux系统上的常见问题时,以下是10个经常遇到的问题及其解答: 无法连接到MySQL服务器。 确保MySQL服务器正在运行:可以使用systemctl status mysql或service mysql status命令检查MySQL服务状态。确保MySQL服务器网络设置正确…...
常见分辨率时序信息
分辨率列表 分辨率一:640x480(逐行) 分辨率二:800x600(逐行) 分辨率三:1024x768(逐行) 分辨率四:大名鼎鼎720P(逐行) 注:选择720P@30帧的,需拉长HOR TOTAL TIME 分辨率五:1280x800(逐行) 分辨率六:1280x960(逐行...
机器人CPP编程基础-05完结The End
非常不可思议……之前四篇博文竟然有超过100的阅读量…… 此文此部分终结,没有继续写下去的必要了。 插入一个分享: 编程基础不重要了,只要明确需求,借助AI工具就能完成一个项目。 当然也不是一次成功,工具使用也需要…...
数据库应用系统DBAS功能设计与实施(三级数据库)
目录 一、了解软件体系结构及设计过程 1、软件体系结构与设计过程 2、软件设计过程 二、了解DBAS总体设计 1、DBAS体系结构设计 2、软件体系结构设计 3、软硬件选型与配置设计 4、业务规则初步设计 三、了解DBAS功能概要设计 1、表示层概要设计 2、业务逻辑层概要设计…...
idea大量爆红问题解决
问题描述 在学习和工作中,idea是程序员不可缺少的一个工具,但是突然在有些时候就会出现大量爆红的问题,发现无法跳转,无论是关机重启或者是替换root都无法解决 就是如上所展示的问题,但是程序依然可以启动。 问题解决…...
Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...
反向工程与模型迁移:打造未来商品详情API的可持续创新体系
在电商行业蓬勃发展的当下,商品详情API作为连接电商平台与开发者、商家及用户的关键纽带,其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息(如名称、价格、库存等)的获取与展示,已难以满足市场对个性化、智能…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...
Fabric V2.5 通用溯源系统——增加图片上传与下载功能
fabric-trace项目在发布一年后,部署量已突破1000次,为支持更多场景,现新增支持图片信息上链,本文对图片上传、下载功能代码进行梳理,包含智能合约、后端、前端部分。 一、智能合约修改 为了增加图片信息上链溯源,需要对底层数据结构进行修改,在此对智能合约中的农产品数…...
JavaScript基础-API 和 Web API
在学习JavaScript的过程中,理解API(应用程序接口)和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能,使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...
