当前位置: 首页 > news >正文

Rust实现基于Tokio的限制内存占用的channel

Rust实现基于Tokio的限制内存占用的channel

简介

本文介绍如何基于tokio的channel实现一个限制内存占用的channel。

Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。
常用的channel有两种:boundedunbounded,其中ubbounded的channel可以无限的发送数据,而bounded的channel则有限的发送数据。两种channel都没有对自身的内存占用做出限制。

异步网络编程中常用一个channel连接两个task,其中业务task与业务交互:将要发送的数据发送到channel,而网络task与操作系统交互:从channel中接收数据并写入socket。单有时候带宽有限或者对端接收速率过慢时,而网络task从channel中接收的速度小于业务task向channel中发送的速度时,会造成大量的数据阻塞在channel中,如果不对channel的占用内存做限制,则会造成内存占用过多甚至进程被OOM

实现

  1. 获取数据大小

    要想限制channel总的内存占用,必须要直到每个数据的大小。比较常见的作法是所有需要发送到channel的内容都必须实现一个Trait,此Trait中定义了一个get_size方法,用于获取数据的大小。

    pub trait GetSize {/// get total sizefn get_size(&self) -> usize;
    }
    

    要发送的内容必须实现GetSize的Trait,并实现get_size方法。注意:get_size方法获取到的大小需包括栈空间和堆空间,例如:

     struct MyData {data: Vec<u8>,}impl GetSize for MyData {fn get_size(&self) -> usize {return std::mem::size_of::<MyData>() + self.data.len();//stack size + heap size}}
    
  2. 创建SizedSenderSizedReceiver

    SizedSenderSizedReceiver都可以基于tokio的UnboundedSenderUnboundedReceiver实现。在tokio的基础上,需要共享一个条件变量用于在sender和receiver之间同步当前是否还有可用空间。

       
    pub struct SizedSender<T: GetSize> {inner: mpsc::UnboundedSender<T>,size_semaphore: Arc<(Semaphore, usize)>,
    }   pub struct SizedReceiver<T: GetSize> {inner: mpsc::UnboundedReceiver<T>,size_semaphore: Arc<(Semaphore, usize)>,
    }/// Limit space usage but not limit the number of messages, bytes_size must bigger than 0.
    pub fn sized_channel<T: GetSize>(bytes_size: usize) -> (SizedSender<T>, SizedReceiver<T>) {let (tx, rx) = mpsc::unbounded_channel::<T>();let semaphore = Arc::new((Semaphore::new(bytes_size), bytes_size));(SizedSender::new(tx, semaphore.clone()),SizedReceiver::new(rx, semaphore),)
    }          
  3. SizedSender实现

    发送端发送时需要调用get_size方法获取数据的大小,然后调用Semaphore::available_permits方法获取可用空间,如果可用空间大于数据大小,则发送成功,否则发送失败。

    impl<T: GetSize> SizedSender<T> {pub fn new(inner: mpsc::UnboundedSender<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}}fn do_send(&self,message: T,permits: Option<SemaphorePermit<'_>>,) -> Result<(), SendError<T>> {match self.inner.send(message) {Ok(r) => {if let Some(permits) = permits {permits.forget();}Ok(r)}Err(e) => {log::debug!("send value error!");Err(e)}}}pub async fn send(&self, message: T) -> Result<(), SendError<T>> {let message_size = message.get_size();if message_size > self.size_semaphore.1 {return Err(SendError(message));}let size = match u32::try_from(message_size) {Ok(size) => size,Err(_) => {return Err(SendError(message));}};if self.size_semaphore.0.available_permits() < size as usize {// The buffer is about to be depleted, sending may be blocked.}let permits = match self.size_semaphore.0.acquire_many(size).await {Ok(perimits) => Some(perimits),Err(_) => {return Err(SendError(message));}};self.do_send(message, permits)}}
    
  4. SizedReceiver的实现

    接收端接收时需要调用get_size方法获取数据的大小,然后将相应大小的permits还给信号量即可。

    impl<T: GetSize> SizedReceiver<T> {
    pub fn new(inner: mpsc::UnboundedReceiver<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}
    }pub async fn recv(&mut self) -> Option<T> {self.inner.recv().await.map(|r| {let message_size = r.get_size();self.size_semaphore.0.add_permits(message_size);r})
    }
    }
  5. 其他

    在上述实现的基础上,还可以实现更多方法,比如try_sendtry_recv等。

相关文章:

Rust实现基于Tokio的限制内存占用的channel

Rust实现基于Tokio的限制内存占用的channel 简介 本文介绍如何基于tokio的channel实现一个限制内存占用的channel。 Tokio提供了多种协程间同步的接口&#xff0c;用于在不同的协程中同步数据。 常用的channel有两种:bounded和unbounded&#xff0c;其中ubbounded的channel可…...

【C++】C++入门(上)--命名空间 输入输出 缺省参数 函数重载

目录 一 命名空间 1 命名空间的定义 2 命名空间的使用 二 C输入和输出 1 输出 2 输入 三 缺省参数 1 缺省参数概念 2 缺省参数分类 (1) 全缺省参数 (2)半缺省参数 四 函数重载 1 函数重载概念 2 分类 1 参数类型不同 2 参数个数不同 3 参数类型顺序不同 3 C为什…...

设计模式:原型模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)

上一篇《访问者模式》 下一篇《享元模式》 简介&#xff1a; 原型模式&#xff0c;它是一种创建型设计模式&#xff0c;它允许通过复制原型对象来创建新的对象&#xff0c;而无需知道创建的细节。其工作原…...

SpringMVC 资源状态转移RESTful

文章目录 1、RESTful简介a>资源b>资源的表述c>状态转移 2、RESTful的实现HiddenHttpMethodFilterRESTful案例 1、RESTful简介 REST&#xff1a;Representational State Transfer&#xff0c;表现层资源状态转移。 a>资源 资源是一种看待服务器的方式&#xff0c…...

verilog vscode linux

安装 vscode 插件 插件&#xff1a;Verilog-HDL/SystemVerilog/Bluespec SystemVerilog 功能&#xff1a;.xdc .ucf .v 等代码高亮、代码格式化、语法检查&#xff08;Linting&#xff09;、光标放到变量上提示变量的信息等 关于其他语言的依赖工具等信息查看插件说明 代码对齐…...

Postman日常操作

一.Postman介绍 1.1第一个简单的demo 路特斯&#xff08;英国汽车品牌&#xff09;_百度百科 (baidu.com) 1.2 cookie 用postman测试需要登录权限的接口时&#xff0c;会被拦截&#xff0c;解决办法就是每次请求接口前&#xff0c;先执行登录&#xff0c;然后记住cookie或者to…...

10月份程序员书单推荐

新书书单 1、C程序设计教程&#xff08;第9版&#xff09; 1.广受认可的《C程序设计教程》系列的第9版&#xff08;个别版本也译作《C语言大学教程》&#xff09;&#xff0c;秉承了该系列一贯的丰富而详细的风格。该系列一些版本因封面画有蚂蚁形象而被称为“C语言蚂蚁书”。…...

【ChatGPT系列】ChatGPT:创新工具还是失业威胁?

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…...

C++ 实现定时器的两种方法(线程定时和时间轮算法修改版)

定时器要求在固定的时间异步执行一个操作&#xff0c;比如boost库中的boost::asio::deadline_timer&#xff0c;以及MFC中的定时器。也可以利用c11的thread, mutex, condition_variable 来实现一个定时器。 1、使用C11中的thread, mutex, condition_variable来实现一个定时器。…...

2023mathorcup大数据竞赛选题建议及思路

大家好呀&#xff0c;昨天6点2023年第四届MathorCup高校数学建模挑战赛——大数据竞赛开赛&#xff0c;在这里给大家带来初步的选题建议及思路。 注意&#xff0c;本文章只是比较简略的图文讲解&#xff0c;更加详细完整的视频讲解请移步&#xff1a; 2023mathorcup大数据数学…...

部署vuepress项目到githubPage

部署vuepress项目到githubPage 1. 项目文件夹下有两个分支&#xff08;main和gh-page&#xff09; 1.1 main分支存放项目代码 1.2 gh-page分支存放 npm run docs:build之后的dist里面的所有文件 2. 分别提交到github上 3. 你的项目/docs/.vuepress/config.js module.export…...

ORACLE表空间说明及操作

ORACLE 表空间作用 数据存储&#xff1a;表空间是数据库中存储数据的逻辑结构。它提供了用于存储表、索引、视图、存储过程等数据库对象的空间。通过划分数据和索引等对象的存储&#xff0c;可以更好地管理和组织数据库的物理存储结构。性能管理和优化&#xff1a;通过将不同类…...

vue使用Element-plus的Image预览时样式崩乱

&#x1f525;博客主页&#xff1a; 破浪前进 &#x1f516;系列专栏&#xff1a; Vue、React、PHP ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 问题&#xff1a; 在使用组件库的image时出现了点小问题&#xff0c;预览的图片层级反而没有表格的层级高 效果图&#xff1a;…...

安装使用vcpkg的简易教程

目录 1. 首先安装vcpkg2. 在vcpkg目录下运行bootstrap-vcpkg.bat 命令3. 接着vs进行集成4. 使用vcpkg搜索可用的包5.下载安装所需包6.下载安装完成 1. 首先安装vcpkg 使用git命令下载 git clone https://github.com/Microsoft/vcpkg.git如果下载失败可直接下载文件 (vcpkg-ma…...

制作一个简单的C语言词法分析程序

1.分析组成 C语言的程序中&#xff0c;有很单词多符号和保留字。一些单词符号还有对应的左线性文法。所以我们需要先做出一个单词字符表&#xff0c;给出对应的识别码&#xff0c;然后跟据对应的表格来写出程序 2.程序设计 程序主要有循环判断构成。不需推理即可产生的符号我…...

Java项目中将MySQL改为8.0以上

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 maven依…...

软考高项-计算题(2)

题4 项目的总预算是包含管理储备的&#xff0c;所以总预算应该是&#xff1a;13238102*360 ETC(BAC-EV)/CPI BAC60 EV60*0.318 CPI18/200.9 ETC42/0.9 答案选择C A 题5 因为题目中提到了“按目前的状况继续发展”&#xff0c;那么是&#xff1a;ETC(BAC-EV)/CPI EV1230*0…...

Centos使用war文件部署jenkins

部署jenkins所需要的jdk环境如下&#xff1a; 这里下载官网最新的版本&#xff1a; 选择jenkins2.414.3版本&#xff0c;所以jdk环境最低得是java11 安装java11环境 这里直接安装open-jdk yum -y install java-11-openjdk.x86_64 java-11-openjdk-devel.x86_64下载jenkins最新…...

数据结构和算法——用C语言实现所有排序算法

文章目录 前言排序算法的基本概念内部排序插入排序直接插入排序折半插入排序希尔排序 交换排序冒泡排序快速排序 选择排序简单选择排序堆排序 归并排序基数排序 外部排序多路归并败者树置换——选择排序最佳归并树 前言 本文所有代码均在仓库中&#xff0c;这是一个完整的由纯…...

吃豆人C语言开发—Day2 需求分析 流程图 原型图

目录 需求分析 流程图 原型图 主菜单&#xff1a; 设置界面&#xff1a; 地图选择&#xff1a; 游戏界面&#xff1a; 收集完成提示&#xff1a; 游戏胜利界面&#xff1a; 游戏失败界面 死亡提示&#xff1a; 这个项目是我和朋友们一起开发的&#xff0c;在此声明一下…...

Nautilus Chain 联合香港数码港举办 BIG DEMO DAY活动,释放何信号?

在今年的 10 月 26 日 9:30-18:30 GMT8 期间&#xff0c;Nautilus Chain 联合香港数码港共同举办了 “BIG DEMO DAY” Web3 项目路演活动&#xff0c;包括Xwinner、Sleek、Tx、All weather、Coral Finance、DBOE、PARSIQ、Hookfi、Parallels、Fintestra 以及 dot.GAMING 等在内…...

手写RPC框架

文章目录 什么是RPC框架RPC框架中的关键点通信协议序列化协议动态代理和反射 目前已有的RPC框架手写RPC框架介绍项目框架项目执行流程项目启动 什么是RPC框架 RPC&#xff08;Remote Procedure Call&#xff0c;远程过程调用&#xff09;, 简单来说遵循RPC协议的就是RPC框架. …...

音视频常见问题(六):视频黑边或放大

摘要 本文介绍了视频黑边或放大的原因和解决方案。主要原因包括视频分辨率与显示视图尺寸不一致、摄像头采集、美颜滤镜格式兼容和分辨率。为了解决这些问题&#xff0c;开发者可以选择合适的渲染模式、动态调整分辨率、处理视频旋转和使用自定义视频渲染。 即构音视频SDK提供…...

Android笔记(八):基于CameraX库结合Compose和传统视图组件PreviewView实现照相机画面预览和照相功能

CameraX是JetPack库之一&#xff0c;通过CameraX可以向应用增加相机的功能。在下列内容中&#xff0c;将介绍一个结合CameraX实现一个简单的拍照应用。本应用必须采用Android SDK 34。并通过该简单示例&#xff0c;了解传统View层次组件的UI组件如何与Compose组件结合实现移动应…...

【每日一题Day361】LC2558从数量最多的堆取走礼物 | 大顶堆

从数量最多的堆取走礼物【LC2558】 给你一个整数数组 gifts &#xff0c;表示各堆礼物的数量。每一秒&#xff0c;你需要执行以下操作&#xff1a; 选择礼物数量最多的那一堆。如果不止一堆都符合礼物数量最多&#xff0c;从中选择任一堆即可。选中的那一堆留下平方根数量的礼物…...

【psychopy】【脑与认知科学】认知过程中的面孔识别加工

目录 实验描述 实验思路 python实现 实验描述 现有的文献认为&#xff0c;人们对倒置的面孔、模糊的面孔等可能会出现加工时长增加、准确率下降的问题&#xff0c;现请你设计一个相关实验&#xff0c;判断不同的面孔是否会出现上述现象。请按照认知科学要求&#xff0c;画…...

File类的常用API

判断文件类型 public boolean isDirectory() public boolean isFile() 获取文件信息 public boolean exists() public String getAbsolutePath() public String getPath() 返回创建文件对象时传入的抽象路径的字符串形式 public String getName() public long lastModi…...

02【Git分支的使用、Git回退、还原】

上一篇&#xff1a;01【Git的基本命令、底层命令、命令原理】 下一篇&#xff1a;03【Git的协同开发、TortoiseGit、IDEA的操作Git】 文章目录 02【Git分支的使用、Git回退、还原】一、分支1.1 分支概述1.1.1 Git分支简介1.1.2 Git分支原理 1.2 创建分支1.2.1 创建普通分支1.…...

Qt文件 I/O 操作

一.QFile 文件读取 QIODevice::ReadOnly QString filePath"/home/chenlang/RepUtils/1.txt"; QFile file(filePath); 1.逐行读取 if (file.open(QIODevice::ReadOnly | QIODevice::Text)) {QTextStream in(&file);while (!in.atEnd()) {QString line i…...

Springboot 使用JavaMailSender发送邮件 + Excel附件

目录 1.生成Excel表格 1.依赖设置 2.代码&#xff1a; 2.邮件发送 1.邮件发送功能实现-带附件 2.踩过的坑 1.附件名中文乱码问题 3.参考文章&#xff1a; 需求描述&#xff1a;项目审批完毕后&#xff0c;需要发送邮件通知相关人员&#xff0c;并且要附带数据库表生成的…...