Rust实现基于Tokio的限制内存占用的channel
Rust实现基于Tokio的限制内存占用的channel
简介
本文介绍如何基于tokio的channel实现一个限制内存占用的channel。
Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。
常用的channel有两种:bounded和unbounded,其中ubbounded的channel可以无限的发送数据,而bounded的channel则有限的发送数据。两种channel都没有对自身的内存占用做出限制。
异步网络编程中常用一个channel连接两个task,其中业务task与业务交互:将要发送的数据发送到channel,而网络task与操作系统交互:从channel中接收数据并写入socket。单有时候带宽有限或者对端接收速率过慢时,而网络task从channel中接收的速度小于业务task向channel中发送的速度时,会造成大量的数据阻塞在channel中,如果不对channel的占用内存做限制,则会造成内存占用过多甚至进程被OOM。
实现
-
获取数据大小
要想限制channel总的内存占用,必须要直到每个数据的大小。比较常见的作法是所有需要发送到channel的内容都必须实现一个Trait,此Trait中定义了一个
get_size方法,用于获取数据的大小。pub trait GetSize {/// get total sizefn get_size(&self) -> usize; }要发送的内容必须实现
GetSize的Trait,并实现get_size方法。注意:get_size方法获取到的大小需包括栈空间和堆空间,例如:struct MyData {data: Vec<u8>,}impl GetSize for MyData {fn get_size(&self) -> usize {return std::mem::size_of::<MyData>() + self.data.len();//stack size + heap size}} -
创建
SizedSender和SizedReceiverSizedSender和SizedReceiver都可以基于tokio的UnboundedSender和UnboundedReceiver实现。在tokio的基础上,需要共享一个条件变量用于在sender和receiver之间同步当前是否还有可用空间。pub struct SizedSender<T: GetSize> {inner: mpsc::UnboundedSender<T>,size_semaphore: Arc<(Semaphore, usize)>, } pub struct SizedReceiver<T: GetSize> {inner: mpsc::UnboundedReceiver<T>,size_semaphore: Arc<(Semaphore, usize)>, }/// Limit space usage but not limit the number of messages, bytes_size must bigger than 0. pub fn sized_channel<T: GetSize>(bytes_size: usize) -> (SizedSender<T>, SizedReceiver<T>) {let (tx, rx) = mpsc::unbounded_channel::<T>();let semaphore = Arc::new((Semaphore::new(bytes_size), bytes_size));(SizedSender::new(tx, semaphore.clone()),SizedReceiver::new(rx, semaphore),) } -
SizedSender实现发送端发送时需要调用
get_size方法获取数据的大小,然后调用Semaphore::available_permits方法获取可用空间,如果可用空间大于数据大小,则发送成功,否则发送失败。impl<T: GetSize> SizedSender<T> {pub fn new(inner: mpsc::UnboundedSender<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}}fn do_send(&self,message: T,permits: Option<SemaphorePermit<'_>>,) -> Result<(), SendError<T>> {match self.inner.send(message) {Ok(r) => {if let Some(permits) = permits {permits.forget();}Ok(r)}Err(e) => {log::debug!("send value error!");Err(e)}}}pub async fn send(&self, message: T) -> Result<(), SendError<T>> {let message_size = message.get_size();if message_size > self.size_semaphore.1 {return Err(SendError(message));}let size = match u32::try_from(message_size) {Ok(size) => size,Err(_) => {return Err(SendError(message));}};if self.size_semaphore.0.available_permits() < size as usize {// The buffer is about to be depleted, sending may be blocked.}let permits = match self.size_semaphore.0.acquire_many(size).await {Ok(perimits) => Some(perimits),Err(_) => {return Err(SendError(message));}};self.do_send(message, permits)}} -
SizedReceiver的实现接收端接收时需要调用
get_size方法获取数据的大小,然后将相应大小的permits还给信号量即可。impl<T: GetSize> SizedReceiver<T> { pub fn new(inner: mpsc::UnboundedReceiver<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,} }pub async fn recv(&mut self) -> Option<T> {self.inner.recv().await.map(|r| {let message_size = r.get_size();self.size_semaphore.0.add_permits(message_size);r}) } } -
其他
在上述实现的基础上,还可以实现更多方法,比如
try_send、try_recv等。
相关文章:
Rust实现基于Tokio的限制内存占用的channel
Rust实现基于Tokio的限制内存占用的channel 简介 本文介绍如何基于tokio的channel实现一个限制内存占用的channel。 Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。 常用的channel有两种:bounded和unbounded,其中ubbounded的channel可…...
【C++】C++入门(上)--命名空间 输入输出 缺省参数 函数重载
目录 一 命名空间 1 命名空间的定义 2 命名空间的使用 二 C输入和输出 1 输出 2 输入 三 缺省参数 1 缺省参数概念 2 缺省参数分类 (1) 全缺省参数 (2)半缺省参数 四 函数重载 1 函数重载概念 2 分类 1 参数类型不同 2 参数个数不同 3 参数类型顺序不同 3 C为什…...
设计模式:原型模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
上一篇《访问者模式》 下一篇《享元模式》 简介: 原型模式,它是一种创建型设计模式,它允许通过复制原型对象来创建新的对象,而无需知道创建的细节。其工作原…...
SpringMVC 资源状态转移RESTful
文章目录 1、RESTful简介a>资源b>资源的表述c>状态转移 2、RESTful的实现HiddenHttpMethodFilterRESTful案例 1、RESTful简介 REST:Representational State Transfer,表现层资源状态转移。 a>资源 资源是一种看待服务器的方式,…...
verilog vscode linux
安装 vscode 插件 插件:Verilog-HDL/SystemVerilog/Bluespec SystemVerilog 功能:.xdc .ucf .v 等代码高亮、代码格式化、语法检查(Linting)、光标放到变量上提示变量的信息等 关于其他语言的依赖工具等信息查看插件说明 代码对齐…...
Postman日常操作
一.Postman介绍 1.1第一个简单的demo 路特斯(英国汽车品牌)_百度百科 (baidu.com) 1.2 cookie 用postman测试需要登录权限的接口时,会被拦截,解决办法就是每次请求接口前,先执行登录,然后记住cookie或者to…...
10月份程序员书单推荐
新书书单 1、C程序设计教程(第9版) 1.广受认可的《C程序设计教程》系列的第9版(个别版本也译作《C语言大学教程》),秉承了该系列一贯的丰富而详细的风格。该系列一些版本因封面画有蚂蚁形象而被称为“C语言蚂蚁书”。…...
【ChatGPT系列】ChatGPT:创新工具还是失业威胁?
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…...
C++ 实现定时器的两种方法(线程定时和时间轮算法修改版)
定时器要求在固定的时间异步执行一个操作,比如boost库中的boost::asio::deadline_timer,以及MFC中的定时器。也可以利用c11的thread, mutex, condition_variable 来实现一个定时器。 1、使用C11中的thread, mutex, condition_variable来实现一个定时器。…...
2023mathorcup大数据竞赛选题建议及思路
大家好呀,昨天6点2023年第四届MathorCup高校数学建模挑战赛——大数据竞赛开赛,在这里给大家带来初步的选题建议及思路。 注意,本文章只是比较简略的图文讲解,更加详细完整的视频讲解请移步: 2023mathorcup大数据数学…...
部署vuepress项目到githubPage
部署vuepress项目到githubPage 1. 项目文件夹下有两个分支(main和gh-page) 1.1 main分支存放项目代码 1.2 gh-page分支存放 npm run docs:build之后的dist里面的所有文件 2. 分别提交到github上 3. 你的项目/docs/.vuepress/config.js module.export…...
ORACLE表空间说明及操作
ORACLE 表空间作用 数据存储:表空间是数据库中存储数据的逻辑结构。它提供了用于存储表、索引、视图、存储过程等数据库对象的空间。通过划分数据和索引等对象的存储,可以更好地管理和组织数据库的物理存储结构。性能管理和优化:通过将不同类…...
vue使用Element-plus的Image预览时样式崩乱
🔥博客主页: 破浪前进 🔖系列专栏: Vue、React、PHP ❤️感谢大家点赞👍收藏⭐评论✍️ 问题: 在使用组件库的image时出现了点小问题,预览的图片层级反而没有表格的层级高 效果图:…...
安装使用vcpkg的简易教程
目录 1. 首先安装vcpkg2. 在vcpkg目录下运行bootstrap-vcpkg.bat 命令3. 接着vs进行集成4. 使用vcpkg搜索可用的包5.下载安装所需包6.下载安装完成 1. 首先安装vcpkg 使用git命令下载 git clone https://github.com/Microsoft/vcpkg.git如果下载失败可直接下载文件 (vcpkg-ma…...
制作一个简单的C语言词法分析程序
1.分析组成 C语言的程序中,有很单词多符号和保留字。一些单词符号还有对应的左线性文法。所以我们需要先做出一个单词字符表,给出对应的识别码,然后跟据对应的表格来写出程序 2.程序设计 程序主要有循环判断构成。不需推理即可产生的符号我…...
Java项目中将MySQL改为8.0以上
博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 maven依…...
软考高项-计算题(2)
题4 项目的总预算是包含管理储备的,所以总预算应该是:13238102*360 ETC(BAC-EV)/CPI BAC60 EV60*0.318 CPI18/200.9 ETC42/0.9 答案选择C A 题5 因为题目中提到了“按目前的状况继续发展”,那么是:ETC(BAC-EV)/CPI EV1230*0…...
Centos使用war文件部署jenkins
部署jenkins所需要的jdk环境如下: 这里下载官网最新的版本: 选择jenkins2.414.3版本,所以jdk环境最低得是java11 安装java11环境 这里直接安装open-jdk yum -y install java-11-openjdk.x86_64 java-11-openjdk-devel.x86_64下载jenkins最新…...
数据结构和算法——用C语言实现所有排序算法
文章目录 前言排序算法的基本概念内部排序插入排序直接插入排序折半插入排序希尔排序 交换排序冒泡排序快速排序 选择排序简单选择排序堆排序 归并排序基数排序 外部排序多路归并败者树置换——选择排序最佳归并树 前言 本文所有代码均在仓库中,这是一个完整的由纯…...
吃豆人C语言开发—Day2 需求分析 流程图 原型图
目录 需求分析 流程图 原型图 主菜单: 设置界面: 地图选择: 游戏界面: 收集完成提示: 游戏胜利界面: 游戏失败界面 死亡提示: 这个项目是我和朋友们一起开发的,在此声明一下…...
图自编码器在金融风控中的拓扑模式检测实践
1. 项目概述:当图机器学习遇上金融风控在金融科技领域摸爬滚打了十几年,我见过太多风控系统从“规则为王”到“数据驱动”的变迁。早期的反洗钱(AML)和反欺诈系统,本质上是一套复杂的“如果-那么”规则库:如…...
OpenCL图像格式兼容性与性能优化指南
1. OpenCL图像格式兼容性解析在OpenCL编程中,图像处理是一个核心功能模块。图像格式的正确配置直接影响着内核程序的执行效率和结果的准确性。CL_UNSIGNED_INT8和CL_RGB作为OpenCL中常见的图像格式参数,它们的兼容性问题值得深入探讨。1.1 图像格式描述符…...
Unity游戏本地化实战:XUnity.AutoTranslator核心机制与真机调试
1. 这不是“加个插件就完事”的翻译方案,而是游戏本地化工程的起点在Unity项目里点开Asset Store搜“translation”,你会看到一堆标着“一键汉化”“自动翻译”的插件,图标闪亮,描述诱人。我去年接手一个海外发行的休闲游戏时也这…...
S32K144FTM定时器中断
目录 FTM定时器概念定义 定时器运用常用概念 S32DS添加FTM库 S32DSFTM外设配置 S32DS添加库冲突概念理解 FTM_DRV_Init函数定义 FTM_DRV_InitCounter外设函数 FTM_DRV_InitCounter外设函数 FTM_DRV_CounterStart外设函数 INT_SYS_InstallHandler外设函数 INT_SYS_Ins…...
DeepSeek-R1模型压缩到<380MB还能保持98.7%对话准确率?——边缘设备量化微调四步法首次公开
更多请点击: https://intelliparadigm.com 第一章:DeepSeek边缘设备部署 DeepSeek系列大模型在边缘侧的轻量化部署,正成为工业质检、智能安防与车载语音等低延迟场景的关键技术路径。其核心挑战在于平衡模型精度、推理吞吐与硬件资源约束——…...
基于减法优化算法(SABO)优化CNN-BiGUR-Attention风电功率预测研究附Matlab代码
✅作者简介:热爱科研的Matlab仿真开发者,擅长毕业设计辅导、数学建模、数据处理、程序设计科研仿真。🍎完整代码获取 定制创新 论文复现点击:Matlab科研工作室👇 关注我领取海量matlab电子书和数学建模资料 dz…...
【信息科学与工程学】计算机科学与自动化 ——第六十五篇 虚拟化/MIG 系列02
编号 类型 领域 虚拟化/MIG模式 算法名称 算法逐步推理思考的数学方程式及参数/常量/向量/常数/数字/数值列表 算法的时序数学方程式 关联知识 401 性能优化 GPU虚拟化+容器 MIG+容器 基于GPU内存带宽隔离的容器化AI训练任务调度算法 1. 带宽模型:每个MIG实例带宽…...
ComfyUI-Impact-Pack V8进阶实战:掌握AI图像智能修复的3大核心场景与性能优化
ComfyUI-Impact-Pack V8进阶实战:掌握AI图像智能修复的3大核心场景与性能优化 【免费下载链接】ComfyUI-Impact-Pack Custom nodes pack for ComfyUI This custom node helps to conveniently enhance images through Detector, Detailer, Upscaler, Pipe, and more…...
告别笔记本续航焦虑:手把手教你用NVMe电源管理给SSD“降频省电”
告别笔记本续航焦虑:手把手教你用NVMe电源管理给SSD“降频省电”每次带着笔记本出差,最担心的就是电量撑不过一场会议。你可能已经关闭了背光键盘、调低了屏幕亮度,甚至忍痛停用了独显,但续航依然捉襟见肘。其实,有一个…...
5分钟掌握WebPShop:Photoshop终极WebP插件完全指南
5分钟掌握WebPShop:Photoshop终极WebP插件完全指南 【免费下载链接】WebPShop Photoshop plug-in for opening and saving WebP images 项目地址: https://gitcode.com/gh_mirrors/we/WebPShop 还在为Photoshop无法原生处理WebP格式而烦恼吗?WebP…...
