模拟实战-用CompletableFuture优化远程RPC调用
实战场景
这是广州某500-900人互联网厂的面试原题
手写并发优化解决思路
我们要调用对方的RPC接口,我们的RPC接口每调用一次对方都会阻塞50ms
但是我们的业务要批量调用RPC,例如我们要批量调用1k次,我们不可能在for循环里面写1k次远程调用,因为我们1次就会阻塞50ms,我们for循环弄1k次那么就要等待1k×50ms
我们还要保证返回的结果是按照我们的请求顺序的
场景介绍:我们这边是C端的,我们不可能修改对方的代码,所以我们只能尽可能优化我们自己的代码提高接口效率
解决思路
1.通过Hash算法来分批运算,最后把结果存到map<Integer,String>里面然后来取,因为我们的顺序由id从低到高,所以我们可以通过id在map里面根据顺序取出然后放到我们的List里面
2.我们for循环,然后每一次循环都开启一个异步线程将结果存到Map里面,然后我们最终存到List。但我一开始有个问题,就是我没等全部执行完就存到我们的Map里面了,因为我不会写那个全局等待的代码......破防了
我最终的解决思路是2
package com.kira.scaffoldmvc.appender;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;public class RpcBatchRequestTest {static RpcService rpcService = new RpcService();public static void main(String[] args) throws ExecutionException, InterruptedException {// rpc 请求参数List<Integer> requestIds = IntStream.range(0, 1000).boxed().collect(Collectors.toList());// rpc 调用List<String> results = batchGetDetails(requestIds);// 输出for (String result : results) {System.out.println(result);}// 预期输出// details 0// details 1// details 2// .......// details 999}/*** 某个 rpc service 的接口只提供单个调用* 此处需要做一个封装,多次请求后返回** 要求按照顺序返回** @param ids* @return*/public static List<String> batchGetDetails(List<Integer> ids) throws ExecutionException, InterruptedException {
// 单次调用
// RpcService rpcService = new RpcService();
// String rpcResult = rpcService.rpcGetDetailsById(1);List<String> list=new ArrayList<>();HashMap<Integer,String> map=new HashMap<>();List<CompletableFuture<Void>> futures = new ArrayList<>();//for循环里面的每一个都开启一个forfor(int i=0;i<ids.size();i++){int finalI = i;CompletableFuture future=CompletableFuture.supplyAsync(() -> {String s = rpcService.rpcGetDetailsById(ids.get(finalI));map.put(finalI, s);return s;});
futures.add(future);}//futures.toArray(new CompletableFuture[0])) 将future数组转成CompletableFuture数组//如果你传入 new CompletableFuture[0],Java 会动态调整数组大小,以适应 futures 中的元素数//addOf()等待所有Completable异步线程都执行完CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();// TODO 在此处实现批量调用for(int i=0;i<ids.size();i++){list.add(map.get(i));}return list;}
}class RpcService {public String rpcGetDetailsById(int id) {// 模拟 rpc service 耗时try {Thread.sleep(50L);} catch (InterruptedException e) {throw new RuntimeException(e);}return "details " + id;}
}
分批推送的解决思路
每批为500份
package com.kira.scaffoldmvc.appender;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;public class RpcBatchRequestTest2 {static RpcService rpcService = new RpcService();public static void main(String[] args) throws ExecutionException, InterruptedException {// rpc 请求参数List<Integer> requestIds = IntStream.range(0, 1000).boxed().collect(Collectors.toList());// rpc 调用List<String> results = batchGetDetails(requestIds);// 输出for (String result : results) {System.out.println(result);}}/*** 按批次异步调用 RPC 接口,并确保按顺序返回** @param ids 请求 ID 列表* @return 按顺序返回的结果列表*/public static List<String> batchGetDetails(List<Integer> ids) throws ExecutionException, InterruptedException {int batchSize = 500; // 每批大小List<CompletableFuture<List<String>>> batchFutures = new ArrayList<>();// 按批次切分数据for (int i = 0; i < ids.size(); i += batchSize) {int start = i;int end = Math.min(i + batchSize, ids.size());List<Integer> batch = ids.subList(start, end);// 异步处理每个批次CompletableFuture<List<String>> batchFuture = CompletableFuture.supplyAsync(() -> batch.stream().map(rpcService::rpcGetDetailsById) // 调用 RPC 方法.collect(Collectors.toList()));batchFutures.add(batchFuture);}// 等待所有批次完成并收集结果List<String> results = new ArrayList<>();CompletableFuture.allOf(batchFutures.toArray(new CompletableFuture[0])).join();for (CompletableFuture<List<String>> future : batchFutures) {results.addAll(future.get());}return results;}
}class RpcService2 {public String rpcGetDetailsById(int id) {// 模拟 rpc service 耗时try {Thread.sleep(50L);} catch (InterruptedException e) {throw new RuntimeException(e);}return "details " + id;}
}
相关文章:
模拟实战-用CompletableFuture优化远程RPC调用
实战场景 这是广州某500-900人互联网厂的面试原题 手写并发优化解决思路 我们要调用对方的RPC接口,我们的RPC接口每调用一次对方都会阻塞50ms 但是我们的业务要批量调用RPC,例如我们要批量调用1k次,我们不可能在for循环里面写1k次远程调用…...
深入解析:Jsoup 库的多功能应用场景
Jsoup 是一个强大的 Java 库,主要用于解析和操作 HTML 文档。它不仅广泛应用于网络爬虫和数据抓取,还在网页内容分析、数据清洗与处理、自动化测试等多个领域有着广泛的应用。本文将详细介绍 Jsoup 库的多种用途,并提供具体的代码示例。 一、…...
Polardb三节点集群部署安装--附虚拟机
1. 架构 PolarDB-X 采用 Shared-nothing 与存储计算分离架构进行设计,系统由4个核心组件组成。 计算节点(CN, Compute Node) 计算节点是系统的入口,采用无状态设计,包括 SQL 解析器、优化器、执行器等模块。负责数据…...
Redis - 全局ID生成器 RedisIdWorker
文章目录 Redis - 全局ID生成器 RedisIdWorker一、引言二、实现原理三、代码实现代码说明 四、使用示例示例说明 五、总结 Redis - 全局ID生成器 RedisIdWorker 一、引言 在分布式系统中,生成全局唯一ID是一个常见的需求。传统的自增ID生成方式在分布式环境下容易出…...
【Vitest】单元测试
文章目录 测试:Vitest一、安装二、断言三、回调测试四、对象方法五、模拟第三库 测试:Vitest 一、安装 npm install vitest创建文件:example.test.ts 运行测试: npx vitest example二、断言 import { expect, test } from vi…...
达梦数据库从单主模式转换为主备模式
目录标题 达梦数据库单主转主备配置笔记前期准备服务器环境数据库安装磁盘空间 流程流程图说明基于脱机备份方式的单实例转主备流程图详细步骤说明 详细步骤1. 检查主库归档模式2. 配置主库配置文件dm.ini 文件dmmal.ini 文件dmarch.ini 文件 3. 备份主库数据库4. 备库配置新建…...
【Elasticsearch】nested聚合
在 Elasticsearch 中,嵌套聚合(nestedaggregation)的语法形式用于对嵌套字段(nestedfields)进行聚合操作。嵌套字段是 Elasticsearch 中的一种特殊字段类型,用于存储数组中的对象,这些对象需要独…...
虹科波形小课堂 | 三分钟掌握车辆相对压缩测试!不拆发动机、不测缸压就能判断故障缸!
不拆发动机、不测缸压,只测个电流也能知道哪个缸压缩有问题?没错!做个相对压缩测试,测下起动电流就行,简单又实用!今天,从原理到方法,几分钟教会你! 我们都知道…...
【玩转全栈】--创建一个自己的vue项目
目录 vue介绍 创建vue项目 vue页面介绍 element-plus组件库 启动项目 vue介绍 Vue.js 是一款轻量级、易于上手的前端 JavaScript 框架,旨在简化用户界面的开发。它采用了响应式数据绑定和组件化的设计理念,使得开发者可以通过声明式的方式轻松管理数据和…...
基于 Spring Cloud + Spring AI + VUE 的知识助理平台介绍以及问题
前言(一些废话) 在看这篇文章的各位大佬,感谢你们留出几分钟时间,来看这个产品介绍,其实重点说实话,不是这个产品怎么样。而是在最后有一个郁结在心里的几个问题,希望大佬们能给出一些建议。万…...
< 自用文儿 > 下载 MaxMind GeoIP Databases 对攻击的 IP 做 地理分析
起因 两个 VPM/VPS,安装了 fail2ban 去拦截密码穷举攻击。每天的记录都在增长,以前复制屏幕输出就行,一屏的内容还容易粘贴出来的。昨天已经过 500 条,好奇 fail2ban 是如何存储这些内容的?就发现它在使用 SQLite3 数…...
前端知识速记:重绘和回流
前端知识速记:重绘和回流 一、什么是重绘与回流 1. 重绘(Repaint) 重绘是指当元素的外观发生变化时,浏览器需要重新绘制这些元素。由于这些操作不会改变元素占据的空间,因此不需要进行回流。常见的重绘操作包括&…...
webrtc peerconnection_client peerconnection_server 连接失败问题解决 win10 win11
0 常见问题 (1) webrtc peerconnection_client 连接 peerconnection_server 无连接列表 (2)连接导致崩溃debug状态下因为这个断言 RTC_DCHECK_RUN_ON(&capture_checker_); 1 在 peerconnection\client\main.cc 当中 定义类 class CustomSock…...
【C++】STL——list的使用与底层实现
目录 💕1.带头双向链表List 💕2.list用法介绍 💕3.list的初始化 💕4.size函数与resize函数 💕5.empty函数 💕6.front函数与back函数 💕7.push_front,push_back,pop_front,pop_back函数…...
iOS 音频录制、播放与格式转换
iOS 音频录制、播放与格式转换:基于 AVFoundation 和 FFmpegKit 的实现 在 iOS 开发中,音频处理是一个非常常见的需求,比如录音、播放音频、音频格式转换等。本文将详细解读一段基于 AVFoundation 和 FFmpegKit 的代码,展示如何实现音频录制、播放以及 PCM 和 AAC 格式之间…...
【PyTorch】解决Boolean value of Tensor with more than one value is ambiguous报错
理解并避免 PyTorch 中的 “Boolean value of Tensor with more than one value is ambiguous” 错误 在深度学习和数据科学领域,PyTorch 是一个强大的工具,它允许我们以直观和灵活的方式处理张量(Tensor)。然而,即使…...
Jsoup库具体怎么用?
Jsoup 是一个非常强大的 Java 库,用于解析和操作 HTML 文档。它提供了丰富的功能,包括发送 HTTP 请求、解析 HTML 内容、提取数据、修改 HTML 元素等。以下将详细介绍 Jsoup 的基本用法和一些高级功能,帮助你更好地使用 Jsoup 进行网络爬虫开…...
python:如何播放 .spx 声音文件
.spx 是 Speex音频编解码器的文件扩展名,它是一种开源的、免费的音频编解码器,主要用于语音压缩和语音通信领域。spx 文件通常用于语音记录、VoIP应用、语音信箱等场景。 .mp3 是一种广泛使用的音频格式,它采用了有损压缩算法,可…...
HTML学习笔记(6)
利用dom操作实现,对一个表格的增删改查 代码如下: todolist.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, …...
走向基于大语言模型的新一代推荐系统:综述与展望
HightLight 论文题目:Towards Next-Generation LLM-based Recommender Systems: A Survey and Beyond作者机构:吉林大学、香港理工大学、悉尼科技大学、Meta AI论文地址: https://arxiv.org/abs/2410.1974 基于大语言模型的下一代推荐系统&…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
比较数据迁移后MySQL数据库和OceanBase数据仓库中的表
设计一个MySQL数据库和OceanBase数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较…...
Vite中定义@软链接
在webpack中可以直接通过符号表示src路径,但是vite中默认不可以。 如何实现: vite中提供了resolve.alias:通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...
Chrome 浏览器前端与客户端双向通信实战
Chrome 前端(即页面 JS / Web UI)与客户端(C 后端)的交互机制,是 Chromium 架构中非常核心的一环。下面我将按常见场景,从通道、流程、技术栈几个角度做一套完整的分析,特别适合你这种在分析和改…...
Modbus RTU与Modbus TCP详解指南
目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...
spring Security对RBAC及其ABAC的支持使用
RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型,它将权限分配给角色,再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...
