智能BI(后端)-- 系统异步化
文章目录
- 系统问题分析
- 什么是异步化?
- 业务流程分析
- 标准异步化的业务流程
- 系统业务流程
- 线程池
- 为什么需要线程池?
- 线程池两种实现方式
- 线程池的参数
- 线程池的开发
- 项目异步化改造
系统问题分析
问题场景:调用的服务能力有限,或者接口的处理(或返回)时长较长时,就应该考虑异步化了
什么是异步化?
不用等一件事做完,就可以做另外一件事,等第一件事完成时,可以收到一个通知
业务流程分析
标准异步化的业务流程
- 当用户要进行耗时很长的操作时,点击提交后,不需要在界面空等,而是应该把这个任务保存到数据库中记录下来
- 用户要执行新任务时:
a. 任务提交成功:
ⅰ. 若程序存在空闲线程,可以立即执行此任务
ⅱ. 若所有线程均繁忙,任务将入队列等待处理
b. 任务提交失败:比如所有线程都在忙碌且任务队列满了
ⅰ.选择拒绝此任务,不再执行
ⅱ.通过查阅数据库记录,发现提交失败的任务,并在程序空闲时将这些任务取出执行 - 程序(线程)从任务队列中取出任务依次执行,每完成一项任务,就更新任务状态。
- 用户可以查询任务的执行状态,或者在任务执行成功或失败时接收通知(例如:发邮件、系统消息提示或短信),从而优化体验
- 对于复杂且包含多个环节的任务,在每个小任务完成时,要在程序(数据库中))记录任务的执行状态(进度)。
系统业务流程
- 用户点击智能分析页提交按钮时,先把图表立刻保存到数据库中(作为一个任务)
- 用户可以在图表管理查看所有图表(已生成的,生成中的,生成失败的)的信息和状态
- 用户可以修改生成失败的图表信息,点击重新生成,以尝试再次创建图表

问题分析?
- 任务队列的最大容量应该设置为多少
- 程序怎么从任务队列中取出任务去执行?这个任务队列的流程怎么实现?怎么保证程序最多同时执行多少个任务?
线程池实现
线程池
为什么需要线程池?
- 线程的管理比较复杂
- 任务存取比较复杂
- 线程池可以帮你轻松管理线程,协调任务的执行过程
线程池两种实现方式
- Spring中,可以用ThreadPoolTaskExrcutor配合@Async注解来实现(不推荐)
- 在Java中,可以使用JUC并发编程包中的ThreadPoolExecutor来实现非常灵活地自定义线程池
线程池的参数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
现状:AI生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队
corePoolSize(核心线程数):正常情况下,我们的系统应该能同时工作的线程数
maximumPoolSize(最大线程数):极限情况下,我们的线程池所拥有的线程
keepAliveTime(空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除,从而释放无用的线程资源
unit(空闲线程存活时间的单位):分钟,秒
workQueue(工作队列):用于存放给线程执行的任务,存在一个队列的长度(一定要设置)
threadFactory(线程工厂):控制每个线程的生成,线程的属性
RejectedExecutionHandler(拒绝策略):任务队列满的时候,我们采取什么措施
线程池的开发
自定义线程池配置
package com.yupi.springbootinit.config;import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class ThreadPoolExecutorConfig {@Beanpublic ThreadPoolExecutor threadPoolExecutor(){// 创建一个线程工厂ThreadFactory threadFactory = new ThreadFactory(){// 初始化线程数为 1private int count = 1;// 创建一个新的线程@Override// 每当线程池需要创建新线程时,就会调用newThread方法// @NotNull Runnable r 表示方法参数 r 应该永远不为null,public Thread newThread(@NotNull Runnable r) {Thread thread = new Thread(r);thread.setName("线程" + count ++);return thread;}};// 创建一个新的线程池,线程池核心大小为2,最大线程数为4,// 非核心线程空闲时间为100秒,任务队列为阻塞队列,长度为4,使用自定义的线程工厂创建线ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(4),threadFactory);return threadPoolExecutor;}
}
测试controller层(注意线上环境不要暴露出去)
package com.yupi.springbootinit.controller;import cn.hutool.json.JSONUtil;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;/*** 队列测试controller*/
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"}) // 只在开发环境和本地环境生效
public class QueueController {@Resourceprivate ThreadPoolExecutor threadPoolExecutor;@GetMapping("/add")// 接收一个参数name,然后将任务添加到线程池中public void add(String name){// 使用CompletableFuture运行一个异步任务CompletableFuture.runAsync(()->{log.info("任务执行中:" + name + "执行人:" + Thread.currentThread().getName());try {// 让线程休眠10分钟,模拟长时间运行的任务Thread.sleep(600000);} catch (InterruptedException e) {throw new RuntimeException(e);} 异步任务在threadPoolExecutor中执行},threadPoolExecutor);}@GetMapping("/get")public String get(){Map<String, Object> map = new HashMap<>();int size = threadPoolExecutor.getQueue().size();map.put("队列长度",size);long taskCount = threadPoolExecutor.getTaskCount();map.put("任务总数",taskCount);long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();map.put("已完成的任务总数",completedTaskCount);int activeCount = threadPoolExecutor.getActiveCount();map.put("正在工作的线程数",activeCount);return JSONUtil.toJsonStr(map);}
}
项目异步化改造
/*** 智能分析(异步)** @param multipartFile* @param genChartByAiRequest* @param request* @return*/
@PostMapping("/gen/async")
public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile,GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {String name = genChartByAiRequest.getName();String goal = genChartByAiRequest.getGoal();String chartType = genChartByAiRequest.getChartType();// 校验ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目标为空");ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名称过长");// 校验文件long size = multipartFile.getSize();String originalFilename = multipartFile.getOriginalFilename();// 校验文件大小final long ONE_MB = 1024 * 1024L;ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超过 1M");// 校验文件大小缀 aaa.pngString suffix = FileUtil.getSuffix(originalFilename);final List<String> validFileSuffixList = Arrays.asList("xlsx", "xls");ThrowUtils.throwIf(!validFileSuffixList.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后缀非法");User loginUser = userService.getLoginUser(request);// 限流判断,每个用户一个限流器redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());// 指定一个模型id(把id写死,也可以定义成一个常量)long biModelId = 1659171950288818178L;// 分析需求:// 分析网站用户的增长情况// 原始数据:// 日期,用户数// 1号,10// 2号,20// 3号,30// 构造用户输入StringBuilder userInput = new StringBuilder();userInput.append("分析需求:").append("\n");// 拼接分析目标String userGoal = goal;if (StringUtils.isNotBlank(chartType)) {userGoal += ",请使用" + chartType;}userInput.append(userGoal).append("\n");userInput.append("原始数据:").append("\n");// 压缩后的数据String csvData = ExcelUtils.excelToCsv(multipartFile);userInput.append(csvData).append("\n");// 先把图表保存到数据库中Chart chart = new Chart();chart.setName(name);chart.setGoal(goal);chart.setChartData(csvData);chart.setChartType(chartType);// 插入数据库时,还没生成结束,把生成结果都去掉
// chart.setGenChart(genChart);
// chart.setGenResult(genResult);// 设置任务状态为排队中chart.setStatus("wait");chart.setUserId(loginUser.getId());boolean saveResult = chartService.save(chart);ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");// 在最终的返回结果前提交一个任务// todo 建议处理任务队列满了后,抛异常的情况(因为提交任务报错了,前端会返回异常)CompletableFuture.runAsync(() -> {// 先修改图表任务状态为 “执行中”。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。(为了防止同一个任务被多次执行)Chart updateChart = new Chart();updateChart.setId(chart.getId());// 把任务状态改为执行中updateChart.setStatus("running");boolean b = chartService.updateById(updateChart);// 如果提交失败(一般情况下,更新失败可能意味着你的数据库出问题了)if (!b) {handleChartUpdateError(chart.getId(), "更新图表执行中状态失败");return;}// 调用 AIString result = aiManager.doChat(biModelId, userInput.toString());String[] splits = result.split("【【【【【");if (splits.length < 3) {handleChartUpdateError(chart.getId(), "AI 生成错误");return;}String genChart = splits[1].trim();String genResult = splits[2].trim();// 调用AI得到结果之后,再更新一次Chart updateChartResult = new Chart();updateChartResult.setId(chart.getId());updateChartResult.setGenChart(genChart);updateChartResult.setGenResult(genResult);updateChartResult.setStatus("succeed");boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {handleChartUpdateError(chart.getId(), "更新图表成功状态失败");}},threadPoolExecutor);BiResponse biResponse = new BiResponse();
// biResponse.setGenChart(genChart);
// biResponse.setGenResult(genResult);biResponse.setChartId(chart.getId());return ResultUtils.success(biResponse);
}
// 上面的接口很多用到异常,直接定义一个工具类
private void handleChartUpdateError(long chartId, String execMessage) {Chart updateChartResult = new Chart();updateChartResult.setId(chartId);updateChartResult.setStatus("failed");updateChartResult.setExecMessage(execMessage);boolean updateResult = chartService.updateById(updateChartResult);if (!updateResult) {log.error("更新图表失败状态失败" + chartId + "," + execMessage);
}
相关文章:
智能BI(后端)-- 系统异步化
文章目录 系统问题分析什么是异步化?业务流程分析标准异步化的业务流程系统业务流程 线程池为什么需要线程池?线程池两种实现方式线程池的参数线程池的开发 项目异步化改造 系统问题分析 问题场景:调用的服务能力有限,或者接口的…...
AI绘画Stable Diffusion 插件篇:智能标签提示词插件sd-danbooru-tags-upsampler
大家好,我是向阳。 关于智能标签提示词插件,在很早之前就介绍过很多款了,今天再给大家介绍一款智能标签提示词插件sd-danbooru-tags-upsampler。该智能提示词插件是今年2月23号才发布的第一版V0.1.0,算是比较新的智能提示词插件。…...
Android OpenMAX(六)OMXStore
在前面两节的学习中我们知道了OMX Core是用来管理(查询/创建/销毁)Android平台上的硬件编解码组件的。这一节我们再向上一层,Android平台除了提供有硬件编解码组件支持,还内置了一些软件编解码组件,为了统一管理所有(软/硬)编解码组件,Android在OMX Core之上又抽象了一…...
Ubuntu下halcon软件的下载安装
由于工作需求,点云配准需要使用halcon进行实现,并且将该功能放入QT界面中 1.下载halcon 进入halcon官网进行下载 官网链接:https://www.mvtec.com/products/halcon/ 注意:要注册登陆之后才能进行下载 接着点击Downloads->H…...
『ZJUBCA Collaboration』WTF Academy 赞助支持
非常荣幸宣布,浙江大学区块链协会收到WTF Academy的赞助与支持,未来将共同开展更多深度合作。 WTF Academy是开发者的Web3开源大学,旨在通过开源教育让100,000名开发者进入到Web3。截止目前,WTF开源教程在GitHub收获超15,000 ⭐&a…...
Python开源工具库使用之运动姿势追踪库mediapipe
文章目录 前言一、姿势估计1.1 姿态关键点1.2 旧版 solution API1.3 新版 solution API1.4 俯卧撑计数 二、手部追踪2.1 手部姿态2.2 API 使用2.3 识别手势含义 参考 前言 Mediapipe 是谷歌出品的一种开源框架,旨在为开发者提供一种简单而强大的工具,用…...
【Android Studio】开启真机调试
1 打开手机的开发者模式 各种款式的手机进入开发者模式的情况不同,但大致是在 【关于手机】中多次点击系统版本即可进入。这里以小米8为例,记录下流程。 1.1 进入手机开发者模式 【设置】->【我的设备】->【全部参数】->【MIUI版本】连续点击3…...
CMakeLists.txt语法规则:部分常用命令说明四
一. 简介 前面几篇文章学习了CMakeLists.txt语法中前面几篇文章学习了CMakeLists.txt语法中部分常用命令。文章如下: CMakeLists.txt语法规则:部分常用命令说明一-CSDN博客 CMakeLists.txt语法规则:部分常用命令说明二-CSDN博客 CMakeLi…...
学习前端第三十二天(Rest 参数与 Spread 语法,变量作用域,闭包)
一、Rest 参数与 Spread 语法 1.rest参数 ...变量名:收集剩余的参数并存进指定数组中,需要放到最后; 2.arguments变量 // arguments,以参数在参数列表中的索引作为键,存储所有参数,以类数组对象的形式输出所有函数参数 // 箭头…...
mysql从入门到起飞+面试基础题
mysql基础 MySQL基础 企业面试题1 代码 select m.id,m.num from ( select t.id as id,count(1) num from ( select ra.requester_id as id from RequestAccepted raunion all select ra.accepter_id as id from RequestAccepted ra ) t group by t.id ) m group by id ord…...
设计模式:命令模式
文章目录 一、什么是命令模式二、命令模式结构三、命令模式实现步骤四、命令模式应用场景 一、什么是命令模式 它允许将请求封装为对象,一个请求对应于一个命令,将发出命令的责任和执行命令的责任分割开。每一个命令都是一个操作:请求的一方…...
setinterval和settimeout区别在于
setinterval和settimeout区别在于 执行次数和执行频率 setInterval和setTimeout的主要区别在于执行次数和执行频率。以下是详细介绍:12 setTimeout是一次性的定时器,它在设定的延迟时间之后执行一次函数,然后停止。setInterval是重复性的定…...
shell_结束进程脚本
结束进程的shell脚本如下: #!/bin/bash# kill all process ps aux|grep "local" | grep -v grep | awk {print $2} | while read line; do kill -9 $line; done 解析: ps aux 命令常用于查看当前系统中运行的进程,以及它们所占用…...
GDPU unity游戏开发 碰撞器与触发器
砰砰叫,谁动了她的奶酪让你的小鹿乱撞了。基于此,亦即碰撞与触发的过程。 碰撞器与触发器的区别 通俗点讲,碰撞器检测碰撞,触发器检测触发,讲了跟没讲似的。碰撞器是用来检测碰撞事件的,在unity中ÿ…...
IP地址定位技术在网络安全中的作用
在当今数字化时代,网络安全已经成为企业、政府和个人面临的重要挑战之一。随着互联网的普及和网络攻击的增加,保护个人隐私和防止网络犯罪变得尤为重要。在这一背景下,IP地址定位技术作为网络安全的重要组成部分之一,发挥着关键作…...
R语言中,查看经安装的包,查看已经加载的包,查看特定包是否已经安装,安装包,更新包,卸载包
创建于:2024.5.4 R语言中,查看经安装的包,查看已经加载的包,查看特定包是否已经安装,安装包,更新包,卸载包 文章目录 1. 查看经安装的包2. 查看已经加载的包3. 查看特定包是否已经安装4. 安装包…...
spring boot3单模块项目工程搭建-下(个人开发模板)
⛰️个人主页: 蒾酒 🔥系列专栏:《spring boot实战》 目录 写在前面 上文衔接 常用依赖介绍以及整合 web组件 测试组件 样板代码生成 数据库连接器 常用工具包 面向切面编程 ORM框架 数据连接池 接口测试、文档导出 缓存中间件 参数校…...
精准清理 MongoDB 数据:删除集合的正确姿势
在 MongoDB 数据库管理中,数据清理是维护数据库性能和保持数据一致性的关键步骤之一。而删除集合是实现数据清理的重要手段之一。在这个信息爆炸的时代,了解如何正确地执行集合删除操作至关重要。本文将深入探讨 MongoDB 中删除集合的常用方法、最佳实践…...
java 执行修改语句
你可以使用Java中的JDBC(Java Database Connectivity)来执行修改语句。以下是一个示例: import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement;public class Main {public…...
【Linux系统化学习】网络套接字(编写简单的UDP服务端和客户端)
目录 理解源IP地址和目的IP地址 认识端口号 端口号和进程ID的区别 源端口号和目的端口号 认识TCP和UDP协议 TCP协议 UDP协议 网络字节序 socket编程接口 socket常见API sockaddr结构 简单的UDP网络程序 UDP服务端 创建套接字 填充本地网络信息 绑定 收取消息 …...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
遍历 Map 类型集合的方法汇总
1 方法一 先用方法 keySet() 获取集合中的所有键。再通过 gey(key) 方法用对应键获取值 import java.util.HashMap; import java.util.Set;public class Test {public static void main(String[] args) {HashMap hashMap new HashMap();hashMap.put("语文",99);has…...
LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...
在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...
在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)
考察一般的三次多项式,以r为参数: p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]; 此多项式的根为: 尽管看起来这个多项式是特殊的,其实一般的三次多项式都是可以通过线性变换化为这个形式…...
