当前位置: 首页 > news >正文

java实现请求缓冲合并

业务背景:

我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例如下:

{"createBy": "aa","receiverList": ["bb","cc"]
}

实际第三方业务会进行多次调用接口,每次传递的数据可能如下:

{"createBy": "aa","receiverList": ["bb"]
}
或者
{"createBy": "aa","receiverList": ["cc"]
}
或者
{"createBy": "bb","receiverList": ["cc"]
}
或者
{"createBy": "aa","receiverList": ["bb","cc"]
}

所有需要对第三方业务传递过来的数据进行缓冲合并处理,减轻真正的后台服务的压力。

代码实现

package com.demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** Description: 请求合并管理类*/
@Slf4j
@Component
public class RequestMerger {// 线程池核心线程数private final int corePoolSize = 200;// 任务执行超时时间,单位:毫秒private final int timeout = 5 * 60 * 1000;// 队列,队列长度为Integer.MAX_VALUEprivate final LinkedBlockingQueue<String> requestQueue = new LinkedBlockingQueue<>();// 定时器,所有任务共用线程池private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize,new CustomizableThreadFactory("schedule-executor-"));// 是否关闭标志private final AtomicBoolean isShutdown = new AtomicBoolean(false);/*** 构造函数,用于初始化请求合并器。** @param batchSize   每次合并的最大请求数量。* @param delayMillis 合并请求的周期间隔,单位为毫秒。*/public RequestMerger(int batchSize, long delayMillis) {// 启动定时器,定期合并请求,延迟delayMillis后开始,之后每隔delayMillis执行一次scheduler.scheduleAtFixedRate(() -> {if (!isShutdown.get()) {List<String> batch = new ArrayList<>(batchSize);int drainedCount = requestQueue.drainTo(batch, batchSize);log.info("==>scheduler,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());if (!batch.isEmpty()) {// 异步执行任务,防止业务执行时间过长导致业务整体延迟过大scheduler.submit(() -> {sendRequestBatch(batch);});}}}, delayMillis, delayMillis, TimeUnit.MILLISECONDS);}/*** 发送请求批次的方法。** @param batch 请求批次。*/private void sendRequestBatch(List<String> batch) {Future<?> future = scheduler.submit(() -> {try {// 在这里实现你的请求发送逻辑// 可以使用HTTP客户端库(如Apache HttpClient或OkHttp)来发送请求// ...System.out.println("Sending batch of " + batch.size() + " requests");} catch (Exception e) {// 异常处理逻辑System.err.println("Error sending requests: " + e.getMessage());}});// 尝试获取任务结果,如果超过超时时间则抛出TimeoutException异常,进行取消任务try {// 超时时间,单位:毫秒future.get(timeout, TimeUnit.MILLISECONDS);} catch (TimeoutException | ExecutionException e) {// 超时或执行异常时取消任务future.cancel(true);} catch (Exception e) {log.error("==>任务执行异常", e);// 任务执行异常future.cancel(true);}}/*** 在对象销毁前执行的关闭操作。* 该方法从请求队列中拉取所有未处理的请求,并将它们批量发送。* 无参数和返回值。*/@PreDestroypublic void shutdown() {isShutdown.set(true);List<String> batch = new ArrayList<>();// 获取请求队列中的剩余所有请求int drainedCount = requestQueue.drainTo(batch);log.info("==>shutdown,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());// 批量发送收集到的剩余请求sendRequestBatch(batch);// 关闭定时执行器scheduler.shutdown();try {if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {log.error("Scheduler did not terminate gracefully within 60 seconds, force shutting down.");scheduler.shutdownNow();}} catch (InterruptedException e) {log.warn("Interrupted during scheduler termination, force shutting down.");scheduler.shutdownNow();Thread.currentThread().interrupt();}}/*** 向请求队列中添加一个请求。如果服务未关闭,则直接添加到请求队列中;* 如果服务已关闭,则将该请求作为一批请求发送。** @param request 要添加的请求字符串。*/public void addRequest(String request) throws InterruptedException {// 检查服务是否已关闭if (!isShutdown.get()) {// 未关闭,直接添加到请求队列requestQueue.put(request);} else {// 已关闭,将当前请求作为一批发送List<String> batch = new ArrayList<>();batch.add(request);sendRequestBatch(batch);}}
}

参考资料

https://gitee.com/huangjuncong/mumux-framework/tree/master/merge-request/src/main/java/com/mumux/concurrent

注意:此代码容易导致数据丢失。例如:调用add方法将10个元素放入队列,但是真正获取到9个元素。
造成原因:FlushThread#add()中使用offer方法将数据放入队列,如果此时队列已满,返回值为false,实际数据未进入队列,需要额外对数据进行处理。
修改建议:调大队列长度,并且将offer方法改为put方法,保证数据最终进入队列。

相关文章:

java实现请求缓冲合并

业务背景&#xff1a; 我们对外提供了一个rest接口给第三方业务进行调用&#xff0c;但是由于第三方框架限制&#xff0c;导致会发送大量相似无效请求&#xff0c;例如&#xff1a;接口入参json包含两个字段&#xff0c;createBy和receiverList&#xff0c;完整的入参json示例…...

分布式锁的原子性问题

4.6 分布式锁的原子性问题 更为极端的误删逻辑说明&#xff1a; 线程1现在持有锁之后&#xff0c;在执行业务逻辑过程中&#xff0c;他正准备删除锁&#xff0c;而且已经走到了条件判断的过程中&#xff0c;比如他已经拿到了当前这把锁确实是属于他自己的&#xff0c;正准备删…...

从零自制docker-8-【构建实现run命令的容器】

文章目录 log "github.com/sirupsen/logrus"args...go moduleimport第三方包失败package和 go import的导入go build . 和go runcli库log.SetFormatter(&log.JSONFormatter{})error和nil的关系cmd.Wait()和cmd.Start()arg……context.Args().Get(0)syscall.Exec和…...

2024.03.31 校招 实习 内推 面经

绿*泡*泡VX&#xff1a; neituijunsir 交流*裙 &#xff0c;内推/实习/校招汇总表格 1、自动驾驶一周资讯 -小米SU7上市24小时&#xff0c;大定达88898台&#xff1b;小鹏汽车正式进入德国市场&#xff1b;地平线递交港股上市申请 自动驾驶一周资讯 -小米SU7上市24小时&…...

邦芒职场:塑造职场人气王的秘诀

在职场中&#xff0c;有些人总能吸引众人的目光&#xff0c;成为团队的焦点&#xff1b;而有些人却常常默默无闻&#xff0c;难以融入。那么&#xff0c;如何在职场中脱颖而出&#xff0c;成为一个受欢迎的人呢&#xff1f;下面&#xff0c;让我们来探讨一下塑造职场人气王的秘…...

滤波器网络变压器的作用

网络变压器的作用主要包括以下几点&#xff1a; 1. 信号传输&#xff1a;网络变压器可以将PHY送出来的差分信号用差模耦合的线圈耦合滤波以增强信号&#xff0c;并且通过电磁场的转换耦合到不同电平的连接网线的另外一端以达到传输数据的目的。 2. 电气电压隔离&#xff1a…...

Python —— 简述

Houdini Python | 笔记合集 - 知乎 Houdini内置三大语言&#xff1a; 表达式&#xff0c;主要用于节点参数控制&#xff0c;可实现跨模块控制&#xff1b;vex&#xff0c;速度最快&#xff08;比表达式和Python快一个数量级&#xff09;&#xff0c;非常适合密集型计算环境&…...

使用Rust加速Python程序,让代码飞起来

作为一种解释型语言&#xff0c;Python在开发速度和灵活性方面具有明显的优势&#xff0c;但在性能方面却不如编译型语言如C或Rust。对于性能要求苛刻的应用程序&#xff0c;如果纯粹使用Python编写可能会运行缓慢&#xff0c;影响用户体验。因此&#xff0c;如何利用Rust来加速…...

【RISC-V 指令集】RISC-V 向量V扩展指令集介绍(八)- 向量整数算术指令

1. 引言 以下是《riscv-v-spec-1.0.pdf》文档的关键内容&#xff1a; 这是一份关于向量扩展的详细技术文档&#xff0c;内容覆盖了向量指令集的多个关键方面&#xff0c;如向量寄存器状态映射、向量指令格式、向量加载和存储操作、向量内存对齐约束、向量内存一致性模型、向量…...

Qt Designer在布局中调整控件垂直伸展或者水平伸展之后控件没有变化

QtDesigner设置垂直伸展 在Qt Designer中&#xff0c;要对网格布局中的每一个网格设置垂直伸展&#xff0c;可以按照以下步骤操作&#xff1a; 1.打开Qt Designer并打开你的UI文件。 2.确保你的布局是一个网格布局&#xff08;QGridLayout&#xff09;。 3.选中你想要设置垂直…...

微信公众号粉丝迁移费用是多少?

公众号迁移后原来内容还在么&#xff1f;通过公众号迁移&#xff0c;可以实现这些目的&#xff1a;主体变更、开通留言功能、多号合并、订阅号升级为服务号、服务号转为订阅号。公众号迁移流程&#xff1a;①申请公证&#xff1b;②提交迁移申请&#xff1b;③第三方审核&#…...

基于Vue3 中后台管理系统框架

基于Vue3 中后台管理系统框架 文章目录 基于Vue3 中后台管理系统框架一、特点二、源码下载地址 一款开箱即用的 Vue 中后台管理系统框架&#xff0c;支持多款 UI 组件库&#xff0c;兼容PC、移动端。vue-admin, vue-element-admin, vue后台, 后台系统, 后台框架, 管理后台, 管理…...

Agent调研--19类Agent框架对比

代理&#xff08;Agent&#xff09;指能自主感知环境并采取行动实现目标的智能体&#xff0c;即AI作为一个人或一个组织的代表&#xff0c;进行某种特定行为和交易&#xff0c;降低一个人或组织的工作复杂程度&#xff0c;减少工作量和沟通成本。 背景 目前&#xff0c;我们在探…...

蓝桥杯-求阶乘

问题描述 满足 N!的末尾恰好有 区 个o的最小的 N 是多少? 如果这样的 N 不存在输出 -1。 输入格式 一个整数 区。 输出格式 一个整数代表答案。 样例输入 样例输出 10 评测用例规模与约定 对于 30% 的数据,1<K<106 对于 100% 的数据,1<K<1018 运行限制 最大运行时…...

计算两个日期之间相差的天数的四种方法

计算两个日期之间相差的天数的四种方法 第一种&#xff1a;时间戳的方式&#xff0c;计算两个日期的时间戳的差&#xff0c;再除当天的毫秒数即可得到相差的天数。 public static void main(String[] args) {DateFormat dft new SimpleDateFormat("yyyy-MM-dd");t…...

【leetcode面试经典150题】42. 有效的字母异位词(C++)

【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主&#xff0c;题解使用C语言。&#xff08;若有使用其他语言的同学也可了解题解思路&#xff0c;本质上语法内容一致&…...

Windows 2003 R2与Windows 2022建立域信任报错:本地安全机构无法跟域控制器获得RPC连接。请检查名称是否可以解析,服务器是否可用。

在Windows Server 2003 R2与Windows Server 2022之间建立域信任时遇到“本地安全机构无法与域控制器获得RPC连接”的错误&#xff0c;可能是由于以下几种原因&#xff1a; DNS 解析问题&#xff1a; 确保源域和目标域的DNS配置正确&#xff0c;能够互相解析对方的域名和IP地址。…...

UE5、CesiumForUnreal实现加载建筑轮廓GeoJson数据生成白模功能

1.实现目标 在UE5.3中,通过加载本地建筑边界轮廓面GeoJson数据,获取底面轮廓和楼高数据,拉伸生成白模,并支持点选高亮。为防止阻塞Game线程,使用了异步任务进行优化,GIF动图如下所示: 其中建筑数量:128871,顶点索引数量:6695748,三角面数量:2231916,顶点数量:165…...

JavaGUI编程

目录 GUI概念 Swing概念 组件 容器组件 窗口&#xff08;JFrame&#xff09; 代码 运行 面板&#xff08;JPanel&#xff09; 代码 运行 布局管理器 FlowLayout 代码 运行 BorderLayout 代码 运行 GridLayout 代码 运行 常用组件 标签(JLabel) 代码 运…...

Nginx 基础应用实战 03 基于反向代理的负载均衡、https配置

Nginx 基础应用实战 03 反向代理 proxy_pass http://baidu.com; location /mashibing {proxy_pass http://mashibing.com/;}基于反向代理的负载均衡 upstream httpd {server 192.168.43.152:80;server 192.168.43.153:80; }weight(权重) 指定轮询几率&#xff0c;weight和访…...

【位运算】消失的两个数字(hard)

消失的两个数字&#xff08;hard&#xff09; 题⽬描述&#xff1a;解法&#xff08;位运算&#xff09;&#xff1a;Java 算法代码&#xff1a;更简便代码 题⽬链接&#xff1a;⾯试题 17.19. 消失的两个数字 题⽬描述&#xff1a; 给定⼀个数组&#xff0c;包含从 1 到 N 所有…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

cf2117E

原题链接&#xff1a;https://codeforces.com/contest/2117/problem/E 题目背景&#xff1a; 给定两个数组a,b&#xff0c;可以执行多次以下操作&#xff1a;选择 i (1 < i < n - 1)&#xff0c;并设置 或&#xff0c;也可以在执行上述操作前执行一次删除任意 和 。求…...

ETLCloud可能遇到的问题有哪些?常见坑位解析

数据集成平台ETLCloud&#xff0c;主要用于支持数据的抽取&#xff08;Extract&#xff09;、转换&#xff08;Transform&#xff09;和加载&#xff08;Load&#xff09;过程。提供了一个简洁直观的界面&#xff0c;以便用户可以在不同的数据源之间轻松地进行数据迁移和转换。…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

三体问题详解

从物理学角度&#xff0c;三体问题之所以不稳定&#xff0c;是因为三个天体在万有引力作用下相互作用&#xff0c;形成一个非线性耦合系统。我们可以从牛顿经典力学出发&#xff0c;列出具体的运动方程&#xff0c;并说明为何这个系统本质上是混沌的&#xff0c;无法得到一般解…...

【数据分析】R版IntelliGenes用于生物标志物发现的可解释机器学习

禁止商业或二改转载&#xff0c;仅供自学使用&#xff0c;侵权必究&#xff0c;如需截取部分内容请后台联系作者! 文章目录 介绍流程步骤1. 输入数据2. 特征选择3. 模型训练4. I-Genes 评分计算5. 输出结果 IntelliGenesR 安装包1. 特征选择2. 模型训练和评估3. I-Genes 评分计…...

SQL慢可能是触发了ring buffer

简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...

无人机侦测与反制技术的进展与应用

国家电网无人机侦测与反制技术的进展与应用 引言 随着无人机&#xff08;无人驾驶飞行器&#xff0c;UAV&#xff09;技术的快速发展&#xff0c;其在商业、娱乐和军事领域的广泛应用带来了新的安全挑战。特别是对于关键基础设施如电力系统&#xff0c;无人机的“黑飞”&…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)

考察一般的三次多项式&#xff0c;以r为参数&#xff1a; p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]&#xff1b; 此多项式的根为&#xff1a; 尽管看起来这个多项式是特殊的&#xff0c;其实一般的三次多项式都是可以通过线性变换化为这个形式…...