【并发编程】自研数据同步工具优化:创建线程池多线程异步去分页调用其他服务接口获取海量数据
文章目录
- 场景:
- 解决方案
场景:
前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。
刚开始的设计思路是,,我创建多个服务同时去请求A服务的接口,每个服务都请求到全量数据,由于这些服务都注册在xxl-job上,而且采用的是分片广播的路由策略,那么每个服务就可以只处理请求到的所有数据中id%服务总数==分片索引的部分数据,然后交给kafka,由kafka决定这条数据应该放到哪个分区上。
解决方案
最近学了线程池后,回过头来思考,认为之前的方案还有很大的优化空间。
- 1.当数据量很大时,一次性查询所有数据会导致数据库的负载过大,而使用分页查询,每次只查询部分数据,可以减轻数据库的负担,从而提高数据库的性能和响应速度,所以请求数据方每次分页查询少量数据,这样可以整体降低请求数据的时间。
- 第一次优化.之前是每个服务都要把全量数据请求过来,假设全量数据1000w条,一个服务请求数据需要100s,我开5个服务,那请求数据的总时长就是500s。现在把1000w条数据均分给5个服务,那1个服务就只需要请求200w条数据,耗时20s,那所有服务的请求总时长就是100s。总体耗时缩小了5倍。上面说的分页查询就可以实现:页面大小假设10w(也就是将1000w/10w,逻辑上分成了100页),每个服务自己的分片索引作为页号,每次请求完,都给索引加上分片总数(例如:当前注册了五个服务,那分片总数=5,对于分片索引为1的服务来说,请求的页号为1,6,11,16,21。。。,对于分片索引为2的服务来说,请求的页号为2,7,12,17。。。,对于分片索引为3的服务来说,请求的页号为3,8,13,18,。。。。,对于分片索引为4的服务来说,请求的页号为4,9,14,19。。。。,对于分片索引为5的服务来说,请求的页号为5,10,15,20.。。)这样1000w条数据就均分到每个服务上了。对于每个服务都是单线程去请求数据,就可以将请求操作以及(页号+总服务数)的操作写在一个while循环里,一直请求数据,直到请求的数据为空时(也就是页号超过100了),退出while。
//单线程情况下while(true){String body = HttpUtil.get(remoteURL+"?pageSize=100000&pageNum="+shardIndex);
// logger.info("body:{}",body);//2.获取返回结果的messageJSONObject jsonObject = new JSONObject();
// if (StrUtil.isNotBlank(body)) {jsonObject = JSONUtil.parseObj(body);
// logger.info("name:{}",Thread.currentThread().getName());
// }
// logger.info("jsonObject:{}",jsonObject);//3.从body中获取dataList<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);if(CollectionUtil.isEmpty(tests)){break;}shardIndex+=shardTotal;}
- 第二次优化: 了解了线程池后,还可以再优化。之前是一个服务单线程循环请求需要20s(假设),每次请求10w条,需要请求200w/10w,也就是20次,那一次请求就需要1s。如果使用线程池的话,那么耗时还会更小,因为当你将任务都交给线程池去执行时,多个线程会同时(并行)去请求各自页的数据,假如你只设置了4个线程,那这4个线程会同时发起请求获取数据,1s会完成4次请求,那分给服务的200w,5s就请求完了。那5个服务从总耗时500s,降到了总耗时5s*5=25s。
这次优化,第一版代码(只展示了请求数据的代码,其他业务代码没有展示)
一直向线程池里扔请求数据的任务,当某个任务请求到的数据是空的时候,意味着要请求的数据已经没了,那就结束循环,不再扔请求数据的任务。
//线程共享变量static volatile boolean flag = true;@XxlJob(value = "fenpian")public void fenpian() {int shardIndex = XxlJobHelper.getShardIndex();
// int shardTotal = XxlJobHelper.getShardTotal();//分片总数int shardTotal = 4;AtomicInteger pageNum = new AtomicInteger(shardIndex);//多线程情况下
// List<CompletableFuture>completableFutureList=new ArrayList<>();while (flag){CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {String body = HttpUtil.get(remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal));JSONObject jsonObject = new JSONObject();jsonObject = JSONUtil.parseObj(body);List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);logger.info("tests的size:{}",tests.size());if(CollectionUtil.isEmpty(tests)){flag=false;}},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info("任务结束");executorService.shutdown();
上面代码会有一个问题,就是while循环往线程池里扔任务,所有线程在执行时,会在请求数据那里”停留“一段时间,“停留期间”还会一直循环向线程池扔任务,当线程执行完某次请求得到空数据结束循环时,等待队列中还排着大堆任务等着去请求数据。
为了解决这个问题,我改用了for循环提交任务,提前根据请求数据总量、每次读取的条数,以及服务总数得到每个服务需要执行的任务数。
第二版代码
@XxlJob(value = "fenpian")public void fenpian() {int shardIndex = XxlJobHelper.getShardIndex()+1;int shardTotal = XxlJobHelper.getShardTotal();//分片总数
// int shardTotal = 4;AtomicInteger pageNum = new AtomicInteger(shardIndex);//多线程情况下List<CompletableFuture>completableFutureList=new ArrayList<>();//总条数double total = 10000000;//读取的条数double pageSize=1000;double tasks = Math.ceil( total / (double) shardTotal / pageSize);logger.info("任务数{}",tasks);for(double i=0;i<tasks;i++){CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {String url = remoteURL + "?pageSize=1000&pageNum=" + pageNum.getAndAdd(shardTotal);logger.info("url:{},threadName:{}",url,Thread.currentThread().getName());String body = HttpUtil.get(url);JSONObject jsonObject = new JSONObject();jsonObject = JSONUtil.parseObj(body);List<TestPO> tests = JSONUtil.toList(jsonObject.getJSONArray("data"), TestPO.class);logger.info("tests的size:{}",tests.size());},executorService);completableFutureList.add(future);}CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);CompletableFuture.allOf(completableFutures).join();logger.info("任务结束");
如有问题,请求指正(^^ゞ
相关文章:
【并发编程】自研数据同步工具优化:创建线程池多线程异步去分页调用其他服务接口获取海量数据
文章目录 场景:解决方案 场景: 前段时间在做一个数据同步工具,其中一个服务的任务是调用A服务的接口,将数据库中指定数据请求过来,交给kafka去判断哪些数据是需要新增,哪些数据是需要修改的。 刚开始的设…...
七、dokcer-compose部署springboot的jar
1、准备 打包后包名为 ruoyi-admin.jar 增加接口 httpL//{ip}:{port}/common/test/han #环境变量预application.yml 中REDIS_HOSTt的值,去环境变量去找;如果找不到REDIS_HOST就用myredis 1、Dockerfile FROM hlw/java:8-jreRUN ln -sf /usr/share/z…...
k8s 使用 containerd 运行时配置 http 私服
简介 Kubernetes 从 v1.20 开始弃用 Docker,并推荐用户切换到基于容器运行时接口(CRI)的容器引擎,如 containerd、cri-o 等。 目前使用的环境中使用了 Kubernetes v1.22.3,containerd 1.4.3,containerd 在…...
【新品发布】ChatWork企业知识库系统源码
系统简介 基于前后端分离架构以及Vue3、uni-app、ThinkPHP6.x、PostgreSQL、pgvector技术栈开发,包含PC端、H5端。 ChatWork支持问答式和文档式知识库,能够导入txt、doc、docx、pdf、md等多种格式文档。 导入数据完成向量化训练后,用户提问…...
疫情打卡 vue+springboot疾病防控管理系统java jsp源代码
本项目为前几天收费帮学妹做的一个项目,Java EE JSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。 一、项目描述 疫情打卡 vuespringboot 系统有1权限:管理…...
python --连接websocket
如果只是模拟js端发送接收的话,已经有了websocket server的话,只有client就好了 pip install websocket-client#-*- encoding:utf-8 -*-import sys sys.path.append("..") from socket import * import json, time, threading from websocket…...
数据库内日期类型数据大于小于条件查找注意事项
只传date格式的日期取查datetime的字段的话默认是 00:00:00 日期类型字符串需要使用 ’ ’ 单引号括住 使用大于小于条件查询某一天的日期数据 前后判断条件不能是同一天 一个例子 数据库内数据: 查询2023-08-14之后的数据: select * from tetst…...
网易有道押宝大模型,打响智能硬件突围战
本文转载自产业科技 自今年开年以来,AI大模型这场火势能不减,如今已燃到教育领域。 7月26日,网易有道举办了“powered by子曰”教育大模型应用成果发布会,推出国内首个教育领域垂直大模型“子曰”,并一口气发布了基于…...
KAFKA第二课之生产者(面试重点)
生产者学习 1.1 生产者消息发送流程 在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到K…...
Mybatis 源码 ∞ :杂七杂八
文章目录 一、前言二、TypeHandler三、KeyGenerator四、Plugin1 Interceptor2 org.apache.ibatis.plugin.Plugin3. 调用场景 五、Mybatis 嵌套映射 BUG1. 示例2. 原因3. 解决方案 六、discriminator 标签七、其他1. RowBounds2. ResultHandler3. MapKey 一、前言 Mybatis 官网…...
堆的实现以及应用
💓博主个人主页:不是笨小孩👀 ⏩专栏分类:数据结构与算法👀 刷题专栏👀 C语言👀 🚚代码仓库:笨小孩的代码库👀 ⏩社区:不是笨小孩👀 🌹欢迎大家三连关注&…...
MySql011——检索数据:过滤数据(使用正则表达式)
前提:使用《MySql006——检索数据:基础select语句》中创建的products表 一、正则表达式介绍 关于正则表达式的介绍大家可以看我的这一篇博客《Java038——正则表达式》,这里就不再累赘。 二、使用MySQL正则表达式 2.1、基本字符匹配 检索…...
数据结构与算法-栈(LIFO)(经典面试题)
一:面试经典 1. 如何设计一个括号匹配的功能?比如给你一串括号让你判断是否符合我们的括号原则, 栈 力扣 2. 如何设计一个浏览器的前进和后退功能? 思想:两个栈,一个栈存放前进栈&…...
NSI45030AT1G LED驱动器方案为汽车外部及内部照明恒流稳流器(CCR)方案
关于线性恒流调节器(CCR):是一种用于控制电流的稳定输出。它通常由一个功率晶体管和一个参考电流源组成。CCR的工作原理是通过不断调节功率晶体管的导通时间来维持输出电流的恒定。当输出电流超过设定值时,CCR会减少功率晶体管的导…...
uni-app中使用pinia
目录 Pinia 是什么? uni-app 使用Pinia main.js 中引用pinia 创建和注册模块 定义pinia方式 选项options方式 定义pinia 页面中使用 pinia选项options方式 函数方式 定义pinia 页面中使用 函数方式 定义的pinia Pinia 是什么? Pinia࿰…...
Spring之事务管理
文章目录 前言一、事务及其参数含义1.事务的四个特性2.事务的传播行为(propagation)3.事务隔离性4.事务的隔离级别(ioslation)5.timeout(超时)6.readOnly(是否只读)7.rollbackFor&am…...
linux常见的mysql问题
当涉及到MySQL在Linux系统上的常见问题时,以下是10个经常遇到的问题及其解答: 无法连接到MySQL服务器。 确保MySQL服务器正在运行:可以使用systemctl status mysql或service mysql status命令检查MySQL服务状态。确保MySQL服务器网络设置正确…...
常见分辨率时序信息
分辨率列表 分辨率一:640x480(逐行) 分辨率二:800x600(逐行) 分辨率三:1024x768(逐行) 分辨率四:大名鼎鼎720P(逐行) 注:选择720P@30帧的,需拉长HOR TOTAL TIME 分辨率五:1280x800(逐行) 分辨率六:1280x960(逐行...
机器人CPP编程基础-05完结The End
非常不可思议……之前四篇博文竟然有超过100的阅读量…… 此文此部分终结,没有继续写下去的必要了。 插入一个分享: 编程基础不重要了,只要明确需求,借助AI工具就能完成一个项目。 当然也不是一次成功,工具使用也需要…...
数据库应用系统DBAS功能设计与实施(三级数据库)
目录 一、了解软件体系结构及设计过程 1、软件体系结构与设计过程 2、软件设计过程 二、了解DBAS总体设计 1、DBAS体系结构设计 2、软件体系结构设计 3、软硬件选型与配置设计 4、业务规则初步设计 三、了解DBAS功能概要设计 1、表示层概要设计 2、业务逻辑层概要设计…...
React第五十七节 Router中RouterProvider使用详解及注意事项
前言 在 React Router v6.4 中,RouterProvider 是一个核心组件,用于提供基于数据路由(data routers)的新型路由方案。 它替代了传统的 <BrowserRouter>,支持更强大的数据加载和操作功能(如 loader 和…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...
[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】
大家好,我是java1234_小锋老师,看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】,分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...
[大语言模型]在个人电脑上部署ollama 并进行管理,最后配置AI程序开发助手.
ollama官网: 下载 https://ollama.com/ 安装 查看可以使用的模型 https://ollama.com/search 例如 https://ollama.com/library/deepseek-r1/tags # deepseek-r1:7bollama pull deepseek-r1:7b改token数量为409622 16384 ollama命令说明 ollama serve #:…...
毫米波雷达基础理论(3D+4D)
3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文: 一文入门汽车毫米波雷达基本原理 :https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...
云原生周刊:k0s 成为 CNCF 沙箱项目
开源项目推荐 HAMi HAMi(原名 k8s‑vGPU‑scheduler)是一款 CNCF Sandbox 级别的开源 K8s 中间件,通过虚拟化 GPU/NPU 等异构设备并支持内存、计算核心时间片隔离及共享调度,为容器提供统一接口,实现细粒度资源配额…...
Sklearn 机器学习 缺失值处理 获取填充失值的统计值
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...
面试高频问题
文章目录 🚀 消息队列核心技术揭秘:从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"?性能背后的秘密1.1 顺序写入与零拷贝:性能的双引擎1.2 分区并行:数据的"八车道高速公路"1.3 页缓存与批量处理…...
