java实现请求缓冲合并
业务背景:
我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例如下:
{"createBy": "aa","receiverList": ["bb","cc"]
}
实际第三方业务会进行多次调用接口,每次传递的数据可能如下:
{"createBy": "aa","receiverList": ["bb"]
}
或者
{"createBy": "aa","receiverList": ["cc"]
}
或者
{"createBy": "bb","receiverList": ["cc"]
}
或者
{"createBy": "aa","receiverList": ["bb","cc"]
}
所有需要对第三方业务传递过来的数据进行缓冲合并处理,减轻真正的后台服务的压力。
代码实现
package com.demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;/*** Description: 请求合并管理类*/
@Slf4j
@Component
public class RequestMerger {// 线程池核心线程数private final int corePoolSize = 200;// 任务执行超时时间,单位:毫秒private final int timeout = 5 * 60 * 1000;// 队列,队列长度为Integer.MAX_VALUEprivate final LinkedBlockingQueue<String> requestQueue = new LinkedBlockingQueue<>();// 定时器,所有任务共用线程池private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize,new CustomizableThreadFactory("schedule-executor-"));// 是否关闭标志private final AtomicBoolean isShutdown = new AtomicBoolean(false);/*** 构造函数,用于初始化请求合并器。** @param batchSize 每次合并的最大请求数量。* @param delayMillis 合并请求的周期间隔,单位为毫秒。*/public RequestMerger(int batchSize, long delayMillis) {// 启动定时器,定期合并请求,延迟delayMillis后开始,之后每隔delayMillis执行一次scheduler.scheduleAtFixedRate(() -> {if (!isShutdown.get()) {List<String> batch = new ArrayList<>(batchSize);int drainedCount = requestQueue.drainTo(batch, batchSize);log.info("==>scheduler,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());if (!batch.isEmpty()) {// 异步执行任务,防止业务执行时间过长导致业务整体延迟过大scheduler.submit(() -> {sendRequestBatch(batch);});}}}, delayMillis, delayMillis, TimeUnit.MILLISECONDS);}/*** 发送请求批次的方法。** @param batch 请求批次。*/private void sendRequestBatch(List<String> batch) {Future<?> future = scheduler.submit(() -> {try {// 在这里实现你的请求发送逻辑// 可以使用HTTP客户端库(如Apache HttpClient或OkHttp)来发送请求// ...System.out.println("Sending batch of " + batch.size() + " requests");} catch (Exception e) {// 异常处理逻辑System.err.println("Error sending requests: " + e.getMessage());}});// 尝试获取任务结果,如果超过超时时间则抛出TimeoutException异常,进行取消任务try {// 超时时间,单位:毫秒future.get(timeout, TimeUnit.MILLISECONDS);} catch (TimeoutException | ExecutionException e) {// 超时或执行异常时取消任务future.cancel(true);} catch (Exception e) {log.error("==>任务执行异常", e);// 任务执行异常future.cancel(true);}}/*** 在对象销毁前执行的关闭操作。* 该方法从请求队列中拉取所有未处理的请求,并将它们批量发送。* 无参数和返回值。*/@PreDestroypublic void shutdown() {isShutdown.set(true);List<String> batch = new ArrayList<>();// 获取请求队列中的剩余所有请求int drainedCount = requestQueue.drainTo(batch);log.info("==>shutdown,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size());// 批量发送收集到的剩余请求sendRequestBatch(batch);// 关闭定时执行器scheduler.shutdown();try {if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {log.error("Scheduler did not terminate gracefully within 60 seconds, force shutting down.");scheduler.shutdownNow();}} catch (InterruptedException e) {log.warn("Interrupted during scheduler termination, force shutting down.");scheduler.shutdownNow();Thread.currentThread().interrupt();}}/*** 向请求队列中添加一个请求。如果服务未关闭,则直接添加到请求队列中;* 如果服务已关闭,则将该请求作为一批请求发送。** @param request 要添加的请求字符串。*/public void addRequest(String request) throws InterruptedException {// 检查服务是否已关闭if (!isShutdown.get()) {// 未关闭,直接添加到请求队列requestQueue.put(request);} else {// 已关闭,将当前请求作为一批发送List<String> batch = new ArrayList<>();batch.add(request);sendRequestBatch(batch);}}
}
参考资料
https://gitee.com/huangjuncong/mumux-framework/tree/master/merge-request/src/main/java/com/mumux/concurrent
注意:此代码容易导致数据丢失。例如:调用add方法将10个元素放入队列,但是真正获取到9个元素。
造成原因:FlushThread#add()中使用offer方法将数据放入队列,如果此时队列已满,返回值为false,实际数据未进入队列,需要额外对数据进行处理。
修改建议:调大队列长度,并且将offer方法改为put方法,保证数据最终进入队列。
相关文章:
java实现请求缓冲合并
业务背景: 我们对外提供了一个rest接口给第三方业务进行调用,但是由于第三方框架限制,导致会发送大量相似无效请求,例如:接口入参json包含两个字段,createBy和receiverList,完整的入参json示例…...

分布式锁的原子性问题
4.6 分布式锁的原子性问题 更为极端的误删逻辑说明: 线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删…...

从零自制docker-8-【构建实现run命令的容器】
文章目录 log "github.com/sirupsen/logrus"args...go moduleimport第三方包失败package和 go import的导入go build . 和go runcli库log.SetFormatter(&log.JSONFormatter{})error和nil的关系cmd.Wait()和cmd.Start()arg……context.Args().Get(0)syscall.Exec和…...
2024.03.31 校招 实习 内推 面经
绿*泡*泡VX: neituijunsir 交流*裙 ,内推/实习/校招汇总表格 1、自动驾驶一周资讯 -小米SU7上市24小时,大定达88898台;小鹏汽车正式进入德国市场;地平线递交港股上市申请 自动驾驶一周资讯 -小米SU7上市24小时&…...
邦芒职场:塑造职场人气王的秘诀
在职场中,有些人总能吸引众人的目光,成为团队的焦点;而有些人却常常默默无闻,难以融入。那么,如何在职场中脱颖而出,成为一个受欢迎的人呢?下面,让我们来探讨一下塑造职场人气王的秘…...

滤波器网络变压器的作用
网络变压器的作用主要包括以下几点: 1. 信号传输:网络变压器可以将PHY送出来的差分信号用差模耦合的线圈耦合滤波以增强信号,并且通过电磁场的转换耦合到不同电平的连接网线的另外一端以达到传输数据的目的。 2. 电气电压隔离:…...

Python —— 简述
Houdini Python | 笔记合集 - 知乎 Houdini内置三大语言: 表达式,主要用于节点参数控制,可实现跨模块控制;vex,速度最快(比表达式和Python快一个数量级),非常适合密集型计算环境&…...

使用Rust加速Python程序,让代码飞起来
作为一种解释型语言,Python在开发速度和灵活性方面具有明显的优势,但在性能方面却不如编译型语言如C或Rust。对于性能要求苛刻的应用程序,如果纯粹使用Python编写可能会运行缓慢,影响用户体验。因此,如何利用Rust来加速…...
【RISC-V 指令集】RISC-V 向量V扩展指令集介绍(八)- 向量整数算术指令
1. 引言 以下是《riscv-v-spec-1.0.pdf》文档的关键内容: 这是一份关于向量扩展的详细技术文档,内容覆盖了向量指令集的多个关键方面,如向量寄存器状态映射、向量指令格式、向量加载和存储操作、向量内存对齐约束、向量内存一致性模型、向量…...
Qt Designer在布局中调整控件垂直伸展或者水平伸展之后控件没有变化
QtDesigner设置垂直伸展 在Qt Designer中,要对网格布局中的每一个网格设置垂直伸展,可以按照以下步骤操作: 1.打开Qt Designer并打开你的UI文件。 2.确保你的布局是一个网格布局(QGridLayout)。 3.选中你想要设置垂直…...

微信公众号粉丝迁移费用是多少?
公众号迁移后原来内容还在么?通过公众号迁移,可以实现这些目的:主体变更、开通留言功能、多号合并、订阅号升级为服务号、服务号转为订阅号。公众号迁移流程:①申请公证;②提交迁移申请;③第三方审核&#…...

基于Vue3 中后台管理系统框架
基于Vue3 中后台管理系统框架 文章目录 基于Vue3 中后台管理系统框架一、特点二、源码下载地址 一款开箱即用的 Vue 中后台管理系统框架,支持多款 UI 组件库,兼容PC、移动端。vue-admin, vue-element-admin, vue后台, 后台系统, 后台框架, 管理后台, 管理…...

Agent调研--19类Agent框架对比
代理(Agent)指能自主感知环境并采取行动实现目标的智能体,即AI作为一个人或一个组织的代表,进行某种特定行为和交易,降低一个人或组织的工作复杂程度,减少工作量和沟通成本。 背景 目前,我们在探…...

蓝桥杯-求阶乘
问题描述 满足 N!的末尾恰好有 区 个o的最小的 N 是多少? 如果这样的 N 不存在输出 -1。 输入格式 一个整数 区。 输出格式 一个整数代表答案。 样例输入 样例输出 10 评测用例规模与约定 对于 30% 的数据,1<K<106 对于 100% 的数据,1<K<1018 运行限制 最大运行时…...
计算两个日期之间相差的天数的四种方法
计算两个日期之间相差的天数的四种方法 第一种:时间戳的方式,计算两个日期的时间戳的差,再除当天的毫秒数即可得到相差的天数。 public static void main(String[] args) {DateFormat dft new SimpleDateFormat("yyyy-MM-dd");t…...
【leetcode面试经典150题】42. 有效的字母异位词(C++)
【leetcode面试经典150题】专栏系列将为准备暑期实习生以及秋招的同学们提高在面试时的经典面试算法题的思路和想法。本专栏将以一题多解和精简算法思路为主,题解使用C语言。(若有使用其他语言的同学也可了解题解思路,本质上语法内容一致&…...

Windows 2003 R2与Windows 2022建立域信任报错:本地安全机构无法跟域控制器获得RPC连接。请检查名称是否可以解析,服务器是否可用。
在Windows Server 2003 R2与Windows Server 2022之间建立域信任时遇到“本地安全机构无法与域控制器获得RPC连接”的错误,可能是由于以下几种原因: DNS 解析问题: 确保源域和目标域的DNS配置正确,能够互相解析对方的域名和IP地址。…...

UE5、CesiumForUnreal实现加载建筑轮廓GeoJson数据生成白模功能
1.实现目标 在UE5.3中,通过加载本地建筑边界轮廓面GeoJson数据,获取底面轮廓和楼高数据,拉伸生成白模,并支持点选高亮。为防止阻塞Game线程,使用了异步任务进行优化,GIF动图如下所示: 其中建筑数量:128871,顶点索引数量:6695748,三角面数量:2231916,顶点数量:165…...

JavaGUI编程
目录 GUI概念 Swing概念 组件 容器组件 窗口(JFrame) 代码 运行 面板(JPanel) 代码 运行 布局管理器 FlowLayout 代码 运行 BorderLayout 代码 运行 GridLayout 代码 运行 常用组件 标签(JLabel) 代码 运…...
Nginx 基础应用实战 03 基于反向代理的负载均衡、https配置
Nginx 基础应用实战 03 反向代理 proxy_pass http://baidu.com; location /mashibing {proxy_pass http://mashibing.com/;}基于反向代理的负载均衡 upstream httpd {server 192.168.43.152:80;server 192.168.43.153:80; }weight(权重) 指定轮询几率,weight和访…...

CTF show Web 红包题第六弹
提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...

SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心
当仓库学会“思考”,物流的终极形态正在诞生 想象这样的场景: 凌晨3点,某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径;AI视觉系统在0.1秒内扫描包裹信息;数字孪生平台正模拟次日峰值流量压力…...

Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...