java实现请求缓冲合并
业务背景:
我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例如下:
{"createBy": "aa","receiverList": ["bb","cc"]
}
实际第三方业务会进行多次调用接口,每次传递的数据可能如下:
{"createBy": "aa","receiverList": ["bb"]
}
或者
{"createBy": "aa","receiverList": ["cc"]
}
或者
{"createBy": "bb","receiverList": ["cc"]
}
或者
{"createBy": "aa","receiverList": ["bb","cc"]
}
所有需要对第三方业务传递过来的数据进行缓冲合并处理,减轻真正的后台服务的压力。
代码实现
package com.demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** Description: 请求合并管理类*/
@Slf4j
@Component
public class RequestMerger {// 线程池核心线程数private final int corePoolSize = 200;// 任务执行超时时间,单位:毫秒private final int timeout = 5 * 60 * 1000;// 队列,队列长度为Integer.MAX_VALUEprivate final LinkedBlockingQueue<String> requestQueue = new LinkedBlockingQueue<>();// 定时器,所有任务共用线程池private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize,new CustomizableThreadFactory("schedule-executor-"));// 是否关闭标志private final AtomicBoolean isShutdown = new AtomicBoolean(false);/*** 构造函数,用于初始化请求合并器。** @param batchSize 每次合并的最大请求数量。* @param delayMillis 合并请求的周期间隔,单位为毫秒。*/public RequestMerger(int batchSize, long delayMillis) {// 启动定时器,定期合并请求,延迟delayMillis后开始,之后每隔delayMillis执行一次scheduler.scheduleAtFixedRate(() -> {if (!isShutdown.get()) {List<String> batch = new ArrayList<>(batchSize);int drainedCount = requestQueue.drainTo(batch, batchSize);log.info("==>scheduler,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());if (!batch.isEmpty()) {// 异步执行任务,防止业务执行时间过长导致业务整体延迟过大scheduler.submit(() -> {sendRequestBatch(batch);});}}}, delayMillis, delayMillis, TimeUnit.MILLISECONDS);}/*** 发送请求批次的方法。** @param batch 请求批次。*/private void sendRequestBatch(List<String> batch) {Future<?> future = scheduler.submit(() -> {try {// 在这里实现你的请求发送逻辑// 可以使用HTTP客户端库(如Apache HttpClient或OkHttp)来发送请求// ...System.out.println("Sending batch of " + batch.size() + " requests");} catch (Exception e) {// 异常处理逻辑System.err.println("Error sending requests: " + e.getMessage());}});// 尝试获取任务结果,如果超过超时时间则抛出TimeoutException异常,进行取消任务try {// 超时时间,单位:毫秒future.get(timeout, TimeUnit.MILLISECONDS);} catch (TimeoutException | ExecutionException e) {// 超时或执行异常时取消任务future.cancel(true);} catch (Exception e) {log.error("==>任务执行异常", e);// 任务执行异常future.cancel(true);}}/*** 在对象销毁前执行的关闭操作。* 该方法从请求队列中拉取所有未处理的请求,并将它们批量发送。* 无参数和返回值。*/@PreDestroypublic void shutdown() {isShutdown.set(true);List<String> batch = new ArrayList<>();// 获取请求队列中的剩余所有请求int drainedCount = requestQueue.drainTo(batch);log.info("==>shutdown,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());// 批量发送收集到的剩余请求sendRequestBatch(batch);// 关闭定时执行器scheduler.shutdown();try {if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {log.error("Scheduler did not terminate gracefully within 60 seconds, force shutting down.");scheduler.shutdownNow();}} catch (InterruptedException e) {log.warn("Interrupted during scheduler termination, force shutting down.");scheduler.shutdownNow();Thread.currentThread().interrupt();}}/*** 向请求队列中添加一个请求。如果服务未关闭,则直接添加到请求队列中;* 如果服务已关闭,则将该请求作为一批请求发送。** @param request 要添加的请求字符串。*/public void addRequest(String request) throws InterruptedException {// 检查服务是否已关闭if (!isShutdown.get()) {// 未关闭,直接添加到请求队列requestQueue.put(request);} else {// 已关闭,将当前请求作为一批发送List<String> batch = new ArrayList<>();batch.add(request);sendRequestBatch(batch);}}
}
参考资料
https://gitee.com/huangjuncong/mumux-framework/tree/master/merge-request/src/main/java/com/mumux/concurrent
注意:此代码容易导致数据丢失。例如:调用add方法将10个元素放入队列,但是真正获取到9个元素。
造成原因:FlushThread#add()中使用offer方法将数据放入队列,如果此时队列已满,返回值为false,实际数据未进入队列,需要额外对数据进行处理。
修改建议:调大队列长度,并且将offer方法改为put方法,保证数据最终进入队列。
相关文章:
java实现请求缓冲合并
业务背景: 我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例…...

分布式锁的原子性问题
4.6 分布式锁的原子性问题 更为极端的误删逻辑说明: 线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删…...

从零自制docker-8-【构建实现run命令的容器】
文章目录 log "github.com/sirupsen/logrus"args...go moduleimport第三方包失败package和 go import的导入go build . 和go runcli库log.SetFormatter(&log.JSONFormatter{})error和nil的关系cmd.Wait()和cmd.Start()arg……context.Args().Get(0)syscall.Exec和…...
2024.03.31 校招 实习 内推 面经
绿*泡*泡VX: neituijunsir 交流*裙 ,内推/实习/校招汇总表格 1、自动驾驶一周资讯 -小米SU7上市24小时,大定达88898台;小鹏汽车正式进入德国市场;地平线递交港股上市申请 自动驾驶一周资讯 -小米SU7上市24小时&…...
邦芒职场:塑造职场人气王的秘诀
在职场中,有些人总能吸引众人的目光,成为团队的焦点;而有些人却常常默默无闻,难以融入。那么,如何在职场中脱颖而出,成为一个受欢迎的人呢?下面,让我们来探讨一下塑造职场人气王的秘…...

滤波器网络变压器的作用
网络变压器的作用主要包括以下几点: 1. 信号传输:网络变压器可以将PHY送出来的差分信号用差模耦合的线圈耦合滤波以增强信号,并且通过电磁场的转换耦合到不同电平的连接网线的另外一端以达到传输数据的目的。 2. 电气电压隔离:…...

Python —— 简述
Houdini Python | 笔记合集 - 知乎 Houdini内置三大语言: 表达式,主要用于节点参数控制,可实现跨模块控制;vex,速度最快(比表达式和Python快一个数量级),非常适合密集型计算环境&…...

使用Rust加速Python程序,让代码飞起来
作为一种解释型语言,Python在开发速度和灵活性方面具有明显的优势,但在性能方面却不如编译型语言如C或Rust。对于性能要求苛刻的应用程序,如果纯粹使用Python编写可能会运行缓慢,影响用户体验。因此,如何利用Rust来加速…...
【RISC-V 指令集】RISC-V 向量V扩展指令集介绍(八)- 向量整数算术指令
1. 引言 以下是《riscv-v-spec-1.0.pdf》文档的关键内容: 这是一份关于向量扩展的详细技术文档,内容覆盖了向量指令集的多个关键方面,如向量寄存器状态映射、向量指令格式、向量加载和存储操作、向量内存对齐约束、向量内存一致性模型、向量…...
Qt Designer在布局中调整控件垂直伸展或者水平伸展之后控件没有变化
QtDesigner设置垂直伸展 在Qt Designer中,要对网格布局中的每一个网格设置垂直伸展,可以按照以下步骤操作: 1.打开Qt Designer并打开你的UI文件。 2.确保你的布局是一个网格布局(QGridLayout)。 3.选中你想要设置垂直…...

微信公众号粉丝迁移费用是多少?
公众号迁移后原来内容还在么?通过公众号迁移,可以实现这些目的:主体变更、开通留言功能、多号合并、订阅号升级为服务号、服务号转为订阅号。公众号迁移流程:①申请公证;②提交迁移申请;③第三方审核&#…...

基于Vue3 中后台管理系统框架
基于Vue3 中后台管理系统框架 文章目录 基于Vue3 中后台管理系统框架一、特点二、源码下载地址 一款开箱即用的 Vue 中后台管理系统框架,支持多款 UI 组件库,兼容PC、移动端。vue-admin, vue-element-admin, vue后台, 后台系统, 后台框架, 管理后台, 管理…...

Agent调研--19类Agent框架对比
代理(Agent)指能自主感知环境并采取行动实现目标的智能体,即AI作为一个人或一个组织的代表,进行某种特定行为和交易,降低一个人或组织的工作复杂程度,减少工作量和沟通成本。 背景 目前,我们在探…...

蓝桥杯-求阶乘
问题描述 满足 N!的末尾恰好有 区 个o的最小的 N 是多少? 如果这样的 N 不存在输出 -1。 输入格式 一个整数 区。 输出格式 一个整数代表答案。 样例输入 样例输出 10 评测用例规模与约定 对于 30% 的数据,1<K<106 对于 100% 的数据,1<K<1018 运行限制 最大运行时…...
计算两个日期之间相差的天数的四种方法
计算两个日期之间相差的天数的四种方法 第一种:时间戳的方式,计算两个日期的时间戳的差,再除当天的毫秒数即可得到相差的天数。 public static void main(String[] args) {DateFormat dft new SimpleDateFormat("yyyy-MM-dd");t…...
【leetcode面试经典150题】42. 有效的字母异位词(C++)
【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主,题解使用C语言。(若有使用其他语言的同学也可了解题解思路,本质上语法内容一致&…...

Windows 2003 R2与Windows 2022建立域信任报错:本地安全机构无法跟域控制器获得RPC连接。请检查名称是否可以解析,服务器是否可用。
在Windows Server 2003 R2与Windows Server 2022之间建立域信任时遇到“本地安全机构无法与域控制器获得RPC连接”的错误,可能是由于以下几种原因: DNS 解析问题: 确保源域和目标域的DNS配置正确,能够互相解析对方的域名和IP地址。…...

UE5、CesiumForUnreal实现加载建筑轮廓GeoJson数据生成白模功能
1.实现目标 在UE5.3中,通过加载本地建筑边界轮廓面GeoJson数据,获取底面轮廓和楼高数据,拉伸生成白模,并支持点选高亮。为防止阻塞Game线程,使用了异步任务进行优化,GIF动图如下所示: 其中建筑数量:128871,顶点索引数量:6695748,三角面数量:2231916,顶点数量:165…...

JavaGUI编程
目录 GUI概念 Swing概念 组件 容器组件 窗口(JFrame) 代码 运行 面板(JPanel) 代码 运行 布局管理器 FlowLayout 代码 运行 BorderLayout 代码 运行 GridLayout 代码 运行 常用组件 标签(JLabel) 代码 运…...
Nginx 基础应用实战 03 基于反向代理的负载均衡、https配置
Nginx 基础应用实战 03 反向代理 proxy_pass http://baidu.com; location /mashibing {proxy_pass http://mashibing.com/;}基于反向代理的负载均衡 upstream httpd {server 192.168.43.152:80;server 192.168.43.153:80; }weight(权重) 指定轮询几率,weight和访…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
条件运算符
C中的三目运算符(也称条件运算符,英文:ternary operator)是一种简洁的条件选择语句,语法如下: 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true,则整个表达式的结果为“表达式1”…...

如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...

React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...