Rust实现基于Tokio的限制内存占用的channel
Rust实现基于Tokio的限制内存占用的channel
简介
本文介绍如何基于tokio的channel实现一个限制内存占用的channel。
Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。
常用的channel有两种:bounded
和unbounded
,其中ubbounded
的channel可以无限的发送数据,而bounded
的channel则有限的发送数据。两种channel都没有对自身的内存占用做出限制。
异步网络编程中常用一个channel
连接两个task,其中业务task
与业务交互:将要发送的数据发送到channel,而网络task
与操作系统交互:从channel中接收数据并写入socket。单有时候带宽有限或者对端接收速率过慢时,而网络task
从channel中接收的速度小于业务task
向channel中发送的速度时,会造成大量的数据阻塞在channel中,如果不对channel的占用内存做限制,则会造成内存占用过多甚至进程被OOM
。
实现
-
获取数据大小
要想限制channel总的内存占用,必须要直到每个数据的大小。比较常见的作法是所有需要发送到channel的内容都必须实现一个Trait,此Trait中定义了一个
get_size
方法,用于获取数据的大小。pub trait GetSize {/// get total sizefn get_size(&self) -> usize; }
要发送的内容必须实现
GetSize
的Trait,并实现get_size
方法。注意:get_size
方法获取到的大小需包括栈空间和堆空间,例如:struct MyData {data: Vec<u8>,}impl GetSize for MyData {fn get_size(&self) -> usize {return std::mem::size_of::<MyData>() + self.data.len();//stack size + heap size}}
-
创建
SizedSender
和SizedReceiver
SizedSender
和SizedReceiver
都可以基于tokio的UnboundedSender
和UnboundedReceiver
实现。在tokio的基础上,需要共享一个条件变量用于在sender和receiver之间同步当前是否还有可用空间。pub struct SizedSender<T: GetSize> {inner: mpsc::UnboundedSender<T>,size_semaphore: Arc<(Semaphore, usize)>, } pub struct SizedReceiver<T: GetSize> {inner: mpsc::UnboundedReceiver<T>,size_semaphore: Arc<(Semaphore, usize)>, }/// Limit space usage but not limit the number of messages, bytes_size must bigger than 0. pub fn sized_channel<T: GetSize>(bytes_size: usize) -> (SizedSender<T>, SizedReceiver<T>) {let (tx, rx) = mpsc::unbounded_channel::<T>();let semaphore = Arc::new((Semaphore::new(bytes_size), bytes_size));(SizedSender::new(tx, semaphore.clone()),SizedReceiver::new(rx, semaphore),) }
-
SizedSender
实现发送端发送时需要调用
get_size
方法获取数据的大小,然后调用Semaphore::available_permits
方法获取可用空间,如果可用空间大于数据大小,则发送成功,否则发送失败。impl<T: GetSize> SizedSender<T> {pub fn new(inner: mpsc::UnboundedSender<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}}fn do_send(&self,message: T,permits: Option<SemaphorePermit<'_>>,) -> Result<(), SendError<T>> {match self.inner.send(message) {Ok(r) => {if let Some(permits) = permits {permits.forget();}Ok(r)}Err(e) => {log::debug!("send value error!");Err(e)}}}pub async fn send(&self, message: T) -> Result<(), SendError<T>> {let message_size = message.get_size();if message_size > self.size_semaphore.1 {return Err(SendError(message));}let size = match u32::try_from(message_size) {Ok(size) => size,Err(_) => {return Err(SendError(message));}};if self.size_semaphore.0.available_permits() < size as usize {// The buffer is about to be depleted, sending may be blocked.}let permits = match self.size_semaphore.0.acquire_many(size).await {Ok(perimits) => Some(perimits),Err(_) => {return Err(SendError(message));}};self.do_send(message, permits)}}
-
SizedReceiver
的实现接收端接收时需要调用
get_size
方法获取数据的大小,然后将相应大小的permits
还给信号量即可。impl<T: GetSize> SizedReceiver<T> { pub fn new(inner: mpsc::UnboundedReceiver<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,} }pub async fn recv(&mut self) -> Option<T> {self.inner.recv().await.map(|r| {let message_size = r.get_size();self.size_semaphore.0.add_permits(message_size);r}) } }
-
其他
在上述实现的基础上,还可以实现更多方法,比如
try_send
、try_recv
等。
相关文章:
Rust实现基于Tokio的限制内存占用的channel
Rust实现基于Tokio的限制内存占用的channel 简介 本文介绍如何基于tokio的channel实现一个限制内存占用的channel。 Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。 常用的channel有两种:bounded和unbounded,其中ubbounded的channel可…...

【C++】C++入门(上)--命名空间 输入输出 缺省参数 函数重载
目录 一 命名空间 1 命名空间的定义 2 命名空间的使用 二 C输入和输出 1 输出 2 输入 三 缺省参数 1 缺省参数概念 2 缺省参数分类 (1) 全缺省参数 (2)半缺省参数 四 函数重载 1 函数重载概念 2 分类 1 参数类型不同 2 参数个数不同 3 参数类型顺序不同 3 C为什…...

设计模式:原型模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
上一篇《访问者模式》 下一篇《享元模式》 简介: 原型模式,它是一种创建型设计模式,它允许通过复制原型对象来创建新的对象,而无需知道创建的细节。其工作原…...

SpringMVC 资源状态转移RESTful
文章目录 1、RESTful简介a>资源b>资源的表述c>状态转移 2、RESTful的实现HiddenHttpMethodFilterRESTful案例 1、RESTful简介 REST:Representational State Transfer,表现层资源状态转移。 a>资源 资源是一种看待服务器的方式,…...
verilog vscode linux
安装 vscode 插件 插件:Verilog-HDL/SystemVerilog/Bluespec SystemVerilog 功能:.xdc .ucf .v 等代码高亮、代码格式化、语法检查(Linting)、光标放到变量上提示变量的信息等 关于其他语言的依赖工具等信息查看插件说明 代码对齐…...

Postman日常操作
一.Postman介绍 1.1第一个简单的demo 路特斯(英国汽车品牌)_百度百科 (baidu.com) 1.2 cookie 用postman测试需要登录权限的接口时,会被拦截,解决办法就是每次请求接口前,先执行登录,然后记住cookie或者to…...

10月份程序员书单推荐
新书书单 1、C程序设计教程(第9版) 1.广受认可的《C程序设计教程》系列的第9版(个别版本也译作《C语言大学教程》),秉承了该系列一贯的丰富而详细的风格。该系列一些版本因封面画有蚂蚁形象而被称为“C语言蚂蚁书”。…...

【ChatGPT系列】ChatGPT:创新工具还是失业威胁?
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…...
C++ 实现定时器的两种方法(线程定时和时间轮算法修改版)
定时器要求在固定的时间异步执行一个操作,比如boost库中的boost::asio::deadline_timer,以及MFC中的定时器。也可以利用c11的thread, mutex, condition_variable 来实现一个定时器。 1、使用C11中的thread, mutex, condition_variable来实现一个定时器。…...

2023mathorcup大数据竞赛选题建议及思路
大家好呀,昨天6点2023年第四届MathorCup高校数学建模挑战赛——大数据竞赛开赛,在这里给大家带来初步的选题建议及思路。 注意,本文章只是比较简略的图文讲解,更加详细完整的视频讲解请移步: 2023mathorcup大数据数学…...
部署vuepress项目到githubPage
部署vuepress项目到githubPage 1. 项目文件夹下有两个分支(main和gh-page) 1.1 main分支存放项目代码 1.2 gh-page分支存放 npm run docs:build之后的dist里面的所有文件 2. 分别提交到github上 3. 你的项目/docs/.vuepress/config.js module.export…...
ORACLE表空间说明及操作
ORACLE 表空间作用 数据存储:表空间是数据库中存储数据的逻辑结构。它提供了用于存储表、索引、视图、存储过程等数据库对象的空间。通过划分数据和索引等对象的存储,可以更好地管理和组织数据库的物理存储结构。性能管理和优化:通过将不同类…...

vue使用Element-plus的Image预览时样式崩乱
🔥博客主页: 破浪前进 🔖系列专栏: Vue、React、PHP ❤️感谢大家点赞👍收藏⭐评论✍️ 问题: 在使用组件库的image时出现了点小问题,预览的图片层级反而没有表格的层级高 效果图:…...

安装使用vcpkg的简易教程
目录 1. 首先安装vcpkg2. 在vcpkg目录下运行bootstrap-vcpkg.bat 命令3. 接着vs进行集成4. 使用vcpkg搜索可用的包5.下载安装所需包6.下载安装完成 1. 首先安装vcpkg 使用git命令下载 git clone https://github.com/Microsoft/vcpkg.git如果下载失败可直接下载文件 (vcpkg-ma…...

制作一个简单的C语言词法分析程序
1.分析组成 C语言的程序中,有很单词多符号和保留字。一些单词符号还有对应的左线性文法。所以我们需要先做出一个单词字符表,给出对应的识别码,然后跟据对应的表格来写出程序 2.程序设计 程序主要有循环判断构成。不需推理即可产生的符号我…...
Java项目中将MySQL改为8.0以上
博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 maven依…...

软考高项-计算题(2)
题4 项目的总预算是包含管理储备的,所以总预算应该是:13238102*360 ETC(BAC-EV)/CPI BAC60 EV60*0.318 CPI18/200.9 ETC42/0.9 答案选择C A 题5 因为题目中提到了“按目前的状况继续发展”,那么是:ETC(BAC-EV)/CPI EV1230*0…...

Centos使用war文件部署jenkins
部署jenkins所需要的jdk环境如下: 这里下载官网最新的版本: 选择jenkins2.414.3版本,所以jdk环境最低得是java11 安装java11环境 这里直接安装open-jdk yum -y install java-11-openjdk.x86_64 java-11-openjdk-devel.x86_64下载jenkins最新…...

数据结构和算法——用C语言实现所有排序算法
文章目录 前言排序算法的基本概念内部排序插入排序直接插入排序折半插入排序希尔排序 交换排序冒泡排序快速排序 选择排序简单选择排序堆排序 归并排序基数排序 外部排序多路归并败者树置换——选择排序最佳归并树 前言 本文所有代码均在仓库中,这是一个完整的由纯…...

吃豆人C语言开发—Day2 需求分析 流程图 原型图
目录 需求分析 流程图 原型图 主菜单: 设置界面: 地图选择: 游戏界面: 收集完成提示: 游戏胜利界面: 游戏失败界面 死亡提示: 这个项目是我和朋友们一起开发的,在此声明一下…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

JVM 内存结构 详解
内存结构 运行时数据区: Java虚拟机在运行Java程序过程中管理的内存区域。 程序计数器: 线程私有,程序控制流的指示器,分支、循环、跳转、异常处理、线程恢复等基础功能都依赖这个计数器完成。 每个线程都有一个程序计数…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用
在工业制造领域,无损检测(NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统,以非接触式光学麦克风技术为核心,打破传统检测瓶颈,为半导体、航空航天、汽车制造等行业提供了高灵敏…...
Python实现简单音频数据压缩与解压算法
Python实现简单音频数据压缩与解压算法 引言 在音频数据处理中,压缩算法是降低存储成本和传输效率的关键技术。Python作为一门灵活且功能强大的编程语言,提供了丰富的库和工具来实现音频数据的压缩与解压。本文将通过一个简单的音频数据压缩与解压算法…...

【UE5 C++】通过文件对话框获取选择文件的路径
目录 效果 步骤 源码 效果 步骤 1. 在“xxx.Build.cs”中添加需要使用的模块 ,这里主要使用“DesktopPlatform”模块 2. 添加后闭UE编辑器,右键点击 .uproject 文件,选择 "Generate Visual Studio project files",重…...

GraphRAG优化新思路-开源的ROGRAG框架
目前的如微软开源的GraphRAG的工作流程都较为复杂,难以孤立地评估各个组件的贡献,传统的检索方法在处理复杂推理任务时可能不够有效,特别是在需要理解实体间关系或多跳知识的情况下。先说结论,看完后感觉这个框架性能上不会比Grap…...
k8s从入门到放弃之Pod的容器探针检测
k8s从入门到放弃之Pod的容器探针检测 在Kubernetes(简称K8s)中,容器探测是指kubelet对容器执行定期诊断的过程,以确保容器中的应用程序处于预期的状态。这些探测是保障应用健康和高可用性的重要机制。Kubernetes提供了两种种类型…...
Linux信号保存与处理机制详解
Linux信号的保存与处理涉及多个关键机制,以下是详细的总结: 1. 信号的保存 进程描述符(task_struct):每个进程的PCB中包含信号相关信息。 pending信号集:记录已到达但未处理的信号(未决信号&a…...