当前位置: 首页 > news >正文

一种基于springboot、redis的分布式任务引擎的实现(一)

 总体思路是,主节点接收到任务请求,将根据任务情况拆分成多个任务块,将任务块标识的主键放入redis。发送redis消息,等待其他节点运行完毕,结束处理。接收到信息的节点注册本节点信息到redis、开启多线程、获取任务块、执行任务、结束处理。

1、主节点接收任务请求

    @Overridepublic void executeTaskInfo(PrepareDTO prepareDTO) {//异常标记String taskInfo = prepareDTO.getTaskId();//任务分组状态String taskStatus = "";try {log.info("数据准备任务并设定任务执行状态,{}", prepareDTO);this.dataPrepareBo.doStartGroupJobInfo(prepareDTO);//给redis集合中放计算对象log.info("开始放入计算任务:{}", prepareDTO);boolean getTaskFlag = this.dataPrepareBo.pushCalculationObject(prepareDTO);if (!getTaskFlag) {taskStatus = String.format("没有获取数据或计划已取消,%s", taskInfo);log.error(taskStatus);throw new Exception(taskStatus);}//发消息执行缓存中任务log.info("发消息执行任务:{}", prepareDTO);sendMessage(prepareDTO);//等待任务执行完毕log.info("等待任务执行结果");taskStatus = this.getGroupUpLoadTaskFinsh(prepareDTO);} catch (Exception e) {//捕获日志e.printStackTrace();taskStatus = "获取任务状态异常" + e;log.info(taskStatus);dataPrepareBo.putExceptionMsg2Cache(taskInfo, "数据准备分发计算任务线程异常:" + taskStatus);} finally {//做任务结束处理this.doGroupTaskFinshpPocess(prepareDTO, taskStatus);}}

2,发送消息

    @Overridepublic void sendMessage(String topic, String msg) {this.redisTemplate.convertAndSend(topic, msg);}

3,节点接收任务,并执行

    public void doUpLoadTask(String msg) throws Exception {log.info("开始执行明细任务{}" + msg);String taskId = this.getTaskId(msg);try {Object cancelFlag = this.redisTemplate.opsForValue().get(String.format(EngineConstant.JOB_CANCEL_FLAG, taskId));if(cancelFlag != null && "1".equals(cancelFlag.toString())){log.info("本次任务已取消");return;}//上传本机执行信息到redisthis.cacheBo.initGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());//从缓存获取任务,获取任务后启线程执行任务。如果没获取到任务,则本节点任务执行完毕//循环获取任务this.groupTaskProcessBO.doGroupTaskProcess(taskId, null);//处理结束this.cacheBo.finishGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());} catch (Exception e) {//记录日志taskUpldExeLogCDTO.setRunStas("-1");String exceptionInfo = this.taskLogUtils.getExceptionInfo(e) ;taskUpldExeLogCDTO.setAbnInfo(exceptionInfo);throw e;} finally {//记录日志taskUpldExeLogCDTO.setEndtime(DateUtil.getCurrentDate());if("-1".equals(taskUpldExeLogCDTO.getRunStas())){//异常结束this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务异常");} else {//正常结束taskUpldExeLogCDTO.setRunStas("1");this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"执行上传任务正常");}}}

4,开启线程执行任务

    @Overridepublic CalculationDTO doGroupTaskProcess(String taskId, TaskUpldExeLogCDTO taskUpldExeLogCDTO) throws Exception {List<Future> futureList = new ArrayList<>();//开始执行明细任务处理ThreadPoolTaskExecutor taskTransferExecutor = ToolUtil.getExecutor("engine-file-tasks-pool-", Math.min(parallelProcessNum,10), 8);ExecutorListHolder.putThreadPool(String.format(GroupConstant.PREPARE_ENGINE_POOL,taskId), taskTransferExecutor.getThreadPoolExecutor());for(int i = 0 ; i < parallelProcessNum ; i++) {DoGroupUpLoadTaskThread doGroupUpLoadTaskThread = new DoGroupUpLoadTaskThread(taskId, redisTemplate, calculationBo, null, null);Future<?> future = taskTransferExecutor.submit(doGroupUpLoadTaskThread);futureList.add(future);}if (!CollectionUtil.isEmpty(futureList)) {futureList.forEach(f -> {try {f.get(GroupTaskProcessBOImpl.maxTime, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();}});}log.info("本节点执行分组任务执行完毕{}", taskId + ":" + GroupConstant.IDENTITY);return null;}

5,线程执行明细

    @Overridepublic ResponseDTO call() throws Exception {//执行任务while(true) {FilterTableUniqueDTO filterTableUniqueDTO = (FilterTableUniqueDTO)this.redisTemplate.opsForList().leftPop(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));log.debug("取出任务:" + filterTableUniqueDTO);if(null == filterTableUniqueDTO) {break ;}long lastNum = this.redisTemplate.opsForList().size(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));log.info("生成文件剩余任务数量:" + lastNum);
//           处理任务calculationBo.GenerateFile(filterTableUniqueDTO, taskUpldDetlLogCDTO);}return null;}

以上是主要入口总体思路涉及代码,详细实现整理起来涉及内容比较繁多,将在第二部分分享。

相关文章:

一种基于springboot、redis的分布式任务引擎的实现(一)

总体思路是&#xff0c;主节点接收到任务请求&#xff0c;将根据任务情况拆分成多个任务块&#xff0c;将任务块标识的主键放入redis。发送redis消息&#xff0c;等待其他节点运行完毕&#xff0c;结束处理。接收到信息的节点注册本节点信息到redis、开启多线程、获取任务块、执…...

基于IDE Eval Resetter延长IntelliJ IDEA等软件试用期的方法(包含新版本软件的操作方法)

本文介绍基于IDE Eval Resetter插件&#xff0c;对集成开发环境IntelliJ IDEA等JetBrains公司下属的多个开发软件&#xff0c;加以试用期延长的方法。 我们这里就以IntelliJ IDEA为例&#xff0c;来介绍这一插件发挥作用的具体方式。不过&#xff0c;需要说明使用IDE Eval Rese…...

RocketMQ消费者可以手动消费但无法主动消费问题,或生成者发送超时

1.大多数是配置问题 修改rocketmq文件夹broker.conf 2.配置与集群IP或本地IPV4一样 重启 在RocketMQ独享实例中支持IPv4和IPv6双栈&#xff0c;主要是通过在网络层面上同时支持IPv4和IPv6协议栈来实现的。RocketMQ的Broker端、Namesrv端和客户端都需要支持IPv4和IPv6协议&…...

【数据库系统】--【2】DBMS架构

DBMS架构 01DBMS架构概述02 DBMS的物理架构03 DBMS的运行和数据架构DBMS的运行架构DBMS的数据架构PostgreSQL的体系结构RMDB的运行架构 04DBMS的逻辑和开发架构DBMS的层次结构DBMS的开发架构DBMS的代码架构 05小结 01DBMS架构概述 02 DBMS的物理架构 数据库系统的体系结构 数据…...

第三章 图论 No.13拓扑排序

文章目录 裸题&#xff1a;1191. 家谱树差分约束拓扑排序&#xff1a;1192. 奖金集合拓扑序&#xff1a;164. 可达性统计差分约束拓扑序&#xff1a;456. 车站分级 拓扑序和DAG有向无环图联系在一起&#xff0c;通常用于最短/长路的线性求解 裸题&#xff1a;1191. 家谱树 119…...

喜报 | 擎创再度入围IDC中国FinTech 50榜单

8月16日&#xff0c;2023年度“IDC中国FinTech 50”榜单正式揭晓&#xff0c;擎创科技继2022年入选该榜单后&#xff0c;再次以创新者姿态成功入选&#xff0c;并以技术赋能业务创新&#xff0c;成为中国金融科技领域创新与活力的重要贡献者。 “IDC中国FinTech 50”旨在评选出…...

【C++ 记忆站】引用

文章目录 一、引用概念二、引用特性1、引用在定义时必须初始化2、一个变量可以有多个引用3、引用一旦引用一个实体&#xff0c;再不能引用其他实体 三、常引用四、使用场景1、做参数1、输出型参数2、大对象传参 2、做返回值1、传值返回2、传引用返回 五、传值、传引用效率比较六…...

Hlang--用Python写个编程语言-变量的实现

文章目录 前言语法规则表示次幂实现变量实现优先级实现步骤解析关键字语法解析解释器总结前言 先前的话,我们终于是把我们整个架子搭起来了,这里重复一下我们的流程,那就是,首先,我们通过解析文本,然后呢遍历文本当中的我们定义的合法关键字,然后呢,把他们封装为一个T…...

多维时序 | MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测

多维时序 | MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测 目录 多维时序 | MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测基本介绍模型特点程序设计参考资料 基本介绍 本次运行测试环境MATLAB2021b&#xff0c;MATLAB实现PSO-CNN-BiLSTM多变量时间序列预测。代码说明&#xff1a…...

实现Java异步调用的高效方法

文章目录 为什么需要异步调用&#xff1f;Java中的异步编程方式1. 使用多线程2. 使用Java异步框架 异步调用的关键细节结论 &#x1f389;欢迎来到Java学习路线专栏~实现Java异步调用的高效方法 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒&#x1f379;✨博客主页&#xff1a;IT陈寒的博…...

批量提取文件名到excel,详细的提取步骤

如何批量提取文件名到excel&#xff1f;我们的电脑中可能存储着数量非常多的电子文件&#xff0c;现在需要快速将这些文件的名称全部提取到Excel中。虽然少量数据可以通过复制粘贴的方式轻松完成&#xff0c;但是对于上万个数据而言&#xff0c;复制粘贴都是行不通的&#xff0…...

C#中的泛型约束可以用在以下几个地方?

1.泛型类型参数&#xff1a; 在定义泛型类型或泛型方法时&#xff0c;可以使用泛型约束来限制泛型类型参数的类型。这可以确保类型参数满足特定的条件&#xff0c;从而在编译时捕获错误并提供更安全和可靠的代码。 public class MyClass<T> where T : IComparable<T&…...

Linux Vm上部署Docker

创建ubutu虚拟机并远程连接&#xff0c; 参考 https://blog.csdn.net/m0_48468018/article/details/132267096 在终端中切换到root用户&#xff0c;并安装docker服务 2.1 切换到root用户 sudo su2.2 安装docker服务 , 参考 https://docs.docker.com/engine/install/ubuntu/ …...

ubuntu bind dns服务配置

sudo apt-get install bind9 内网搭建DNS服务器&#xff0c;大多数是解析纯内网地址使用。但是偶尔也需要解析外网的地址&#xff0c;所以我们可以配置DNS没有添加A记录的URL时&#xff0c;forward到外网DNS服务器或者内网的其他DNS服务器解析。 打开配置文件&#xff1a; sud…...

安卓的代码加固和其他安全问题

文章目录 安卓加固apk文件结构dex加固过程 其它安全问题 安卓加固 从App的加固技术来看:主流分为dex加密和so加密,目前来看保护dex文件更为重要,因为dex反编译后的java代码可读性更强。 android-ndk: Native Development Kit 官网解释&#xff1a;这套工具使您能在 Android 应…...

关于Linux Docker springboot jar 日志时间不正确 问题解决

使用Springboot项目的jar&#xff0c;制作了一个Docker镜像&#xff0c;启动该镜像后发现容器和容器中的Springboot 项目的日志时间不正确。 解决 查看容器时间命令为&#xff1a; docker exec 容器id date 1. 容器与宿主机同步时间 在启动镜像时候把操作系统的时间通过&q…...

提高批量爬虫工作效率

大家好&#xff01;作为一名专业的爬虫程序员&#xff0c;我今天要和大家分享一些关于提高批量爬虫工作效率的实用技巧。无论你是要批量采集图片、文本还是视频数据&#xff0c;这些经验都能帮助你在大规模数据采集中事半功倍。废话不多说&#xff0c;让我们开始吧&#xff01;…...

E96系列电阻阻值和代码、乘数对照表

1、为什么要用代码表示&#xff1f; 0805封装还可以简单易懂写下四位丝印&#xff0c;比如10K的1002&#xff0c;但0603的封装上面再想写下四位丝印就没空间了&#xff0c;就算写了也不容易看不清。 2、E96系列电阻阻值和代码、乘数对照表 下面是E96系列的对照表&#xff0c;…...

基于CentOS7.9安装部署docker(简洁版)

安装部署 1基于官方脚本安装&#xff08;不推荐 不能自行选择版本&#xff09; 官方文档&#xff1a;https://docs.docker.com/engine/install/centos/ 2 使用yum安装 阿里云文档&#xff1a;docker-ce镜像_docker-ce下载地址_docker-ce安装教程-阿里巴巴开源镜像站 # ste…...

MySQL常用练手题目

数据库表名和字段设计 1.学生表 Student(s_id,s_name,s_birth,s_sex) 学生编号,学生姓名, 出生年月,学生性别 2.课程表 Course(c_id,c_name,t_id) 课程编号, 课程名称, 教师编号 3.教师表 Teacher(t_id,t_name) 教师编号,教师姓名 4.成绩表 Score (s_id,c_id,s_score) 学生编号…...

打卡信奥刷题(3025)用C++实现信奥题 P6393 隔离的日子

P6393 隔离的日子 题目背景 &#xff08;背景改编自 百度贴吧/南北组备用吧/呆萌南北日常/F9023&#xff0c;作者 落墨成白&#xff09; 数据已修复。 一天又一天&#xff0c;每到深夜房间里又只余下手机屏幕的亮光&#xff0c;洛天依总会有一种与世界割离的失落感。   闷…...

Codeforces Round 1082 (Div. 2)2202

Submission #368219050 - Codeforces A. Parkour Design 思路&#xff1a;第一个操作第三个操作两步第二个操作&#xff0c;所以实际上只需要考虑y坐标的变化&#xff0c;然后看一下x的差值是否能整除3就可以了 B. ABAB Construction 思路&#xff1a;奇数长度开头一定是a或者…...

GTE文本向量中文模型保姆级教程:从环境搭建到API调用全流程

GTE文本向量中文模型保姆级教程&#xff1a;从环境搭建到API调用全流程 1. 环境准备与快速部署 1.1 系统要求与依赖安装 在开始之前&#xff0c;确保你的系统满足以下基本要求&#xff1a; 操作系统&#xff1a;推荐使用Ubuntu 18.04或更高版本Python版本&#xff1a;Pytho…...

Windows右键菜单管理终极指南:3分钟打造高效桌面操作环境

Windows右键菜单管理终极指南&#xff1a;3分钟打造高效桌面操作环境 【免费下载链接】ContextMenuManager &#x1f5b1;️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 你是否曾因Windows右键菜单过于臃肿而烦恼&…...

语音降噪效果主观评价:设计盲听测试与收集用户反馈

语音降噪效果主观评价&#xff1a;设计盲听测试与收集用户反馈 我们常说一个语音降噪算法好不好&#xff0c;看技术指标是一方面&#xff0c;但最终还得耳朵说了算。毕竟&#xff0c;声音是给人听的&#xff0c;处理后的音频听起来舒不舒服、清不清晰&#xff0c;这才是最直接…...

Windows 11三指拖拽功能完全配置指南:从驱动安装到手势优化

Windows 11三指拖拽功能完全配置指南&#xff1a;从驱动安装到手势优化 【免费下载链接】ThreeFingersDragOnWindows Enables macOS-style three-finger dragging functionality on Windows Precision touchpads. 项目地址: https://gitcode.com/gh_mirrors/th/ThreeFingersD…...

高可用(HA)架构的商业价值:从技术冗余到业务连续性的战略升级

在大型企业数字化转型进入深水区的今天&#xff0c;ERP、CRM、OA、BI工具等核心系统已成为业务运转的“生命线”&#xff0c;系统中断哪怕是分钟级&#xff0c;都可能引发业务停滞、数据泄露、合规违规等连锁风险&#xff0c;直接损害企业商业利益与品牌声誉。高可用&#xff0…...

从L298到自举H桥:深入聊聊直流电机驱动方案的演进与选型心得

从L298到自举H桥&#xff1a;直流电机驱动方案的技术演进与工程实践 在机器人底盘、自动化产线和智能硬件开发中&#xff0c;直流电机驱动电路的设计往往决定着整个系统的性能天花板。十年前我们可能还在用L298这类经典驱动芯片&#xff0c;如今工程师们的工具箱里已经出现了IR…...

SecGPT-14B部署教程:适配国产昇腾910B的vLLM分支编译与性能调优

SecGPT-14B部署教程&#xff1a;适配国产昇腾910B的vLLM分支编译与性能调优 1. SecGPT-14B简介 SecGPT是由云起无垠推出的开源大语言模型&#xff0c;专注于网络安全领域。该模型融合了自然语言理解、代码生成和安全知识推理等能力&#xff0c;旨在为安全专业人员提供智能辅助…...

Materials Studio8.0在CentOS7.9环境下的安装与配置指南

1. 环境准备与系统检查 在CentOS 7.9上安装Materials Studio 8.0之前&#xff0c;我们需要确保系统环境满足最低要求。我遇到过不少因为环境配置不当导致的安装失败案例&#xff0c;这里分享几个关键检查点&#xff1a; 首先检查主机名是否包含特殊字符。Materials Studio对主机…...