Rust实现基于Tokio的限制内存占用的channel
Rust实现基于Tokio的限制内存占用的channel
简介
本文介绍如何基于tokio的channel实现一个限制内存占用的channel。
Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。
常用的channel有两种:bounded和unbounded,其中ubbounded的channel可以无限的发送数据,而bounded的channel则有限的发送数据。两种channel都没有对自身的内存占用做出限制。
异步网络编程中常用一个channel连接两个task,其中业务task与业务交互:将要发送的数据发送到channel,而网络task与操作系统交互:从channel中接收数据并写入socket。单有时候带宽有限或者对端接收速率过慢时,而网络task从channel中接收的速度小于业务task向channel中发送的速度时,会造成大量的数据阻塞在channel中,如果不对channel的占用内存做限制,则会造成内存占用过多甚至进程被OOM。
实现
-
获取数据大小
要想限制channel总的内存占用,必须要直到每个数据的大小。比较常见的作法是所有需要发送到channel的内容都必须实现一个Trait,此Trait中定义了一个
get_size方法,用于获取数据的大小。pub trait GetSize {/// get total sizefn get_size(&self) -> usize; }要发送的内容必须实现
GetSize的Trait,并实现get_size方法。注意:get_size方法获取到的大小需包括栈空间和堆空间,例如:struct MyData {data: Vec<u8>,}impl GetSize for MyData {fn get_size(&self) -> usize {return std::mem::size_of::<MyData>() + self.data.len();//stack size + heap size}} -
创建
SizedSender和SizedReceiverSizedSender和SizedReceiver都可以基于tokio的UnboundedSender和UnboundedReceiver实现。在tokio的基础上,需要共享一个条件变量用于在sender和receiver之间同步当前是否还有可用空间。pub struct SizedSender<T: GetSize> {inner: mpsc::UnboundedSender<T>,size_semaphore: Arc<(Semaphore, usize)>, } pub struct SizedReceiver<T: GetSize> {inner: mpsc::UnboundedReceiver<T>,size_semaphore: Arc<(Semaphore, usize)>, }/// Limit space usage but not limit the number of messages, bytes_size must bigger than 0. pub fn sized_channel<T: GetSize>(bytes_size: usize) -> (SizedSender<T>, SizedReceiver<T>) {let (tx, rx) = mpsc::unbounded_channel::<T>();let semaphore = Arc::new((Semaphore::new(bytes_size), bytes_size));(SizedSender::new(tx, semaphore.clone()),SizedReceiver::new(rx, semaphore),) } -
SizedSender实现发送端发送时需要调用
get_size方法获取数据的大小,然后调用Semaphore::available_permits方法获取可用空间,如果可用空间大于数据大小,则发送成功,否则发送失败。impl<T: GetSize> SizedSender<T> {pub fn new(inner: mpsc::UnboundedSender<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}}fn do_send(&self,message: T,permits: Option<SemaphorePermit<'_>>,) -> Result<(), SendError<T>> {match self.inner.send(message) {Ok(r) => {if let Some(permits) = permits {permits.forget();}Ok(r)}Err(e) => {log::debug!("send value error!");Err(e)}}}pub async fn send(&self, message: T) -> Result<(), SendError<T>> {let message_size = message.get_size();if message_size > self.size_semaphore.1 {return Err(SendError(message));}let size = match u32::try_from(message_size) {Ok(size) => size,Err(_) => {return Err(SendError(message));}};if self.size_semaphore.0.available_permits() < size as usize {// The buffer is about to be depleted, sending may be blocked.}let permits = match self.size_semaphore.0.acquire_many(size).await {Ok(perimits) => Some(perimits),Err(_) => {return Err(SendError(message));}};self.do_send(message, permits)}} -
SizedReceiver的实现接收端接收时需要调用
get_size方法获取数据的大小,然后将相应大小的permits还给信号量即可。impl<T: GetSize> SizedReceiver<T> { pub fn new(inner: mpsc::UnboundedReceiver<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,} }pub async fn recv(&mut self) -> Option<T> {self.inner.recv().await.map(|r| {let message_size = r.get_size();self.size_semaphore.0.add_permits(message_size);r}) } } -
其他
在上述实现的基础上,还可以实现更多方法,比如
try_send、try_recv等。
相关文章:
Rust实现基于Tokio的限制内存占用的channel
Rust实现基于Tokio的限制内存占用的channel 简介 本文介绍如何基于tokio的channel实现一个限制内存占用的channel。 Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。 常用的channel有两种:bounded和unbounded,其中ubbounded的channel可…...
【C++】C++入门(上)--命名空间 输入输出 缺省参数 函数重载
目录 一 命名空间 1 命名空间的定义 2 命名空间的使用 二 C输入和输出 1 输出 2 输入 三 缺省参数 1 缺省参数概念 2 缺省参数分类 (1) 全缺省参数 (2)半缺省参数 四 函数重载 1 函数重载概念 2 分类 1 参数类型不同 2 参数个数不同 3 参数类型顺序不同 3 C为什…...
设计模式:原型模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
上一篇《访问者模式》 下一篇《享元模式》 简介: 原型模式,它是一种创建型设计模式,它允许通过复制原型对象来创建新的对象,而无需知道创建的细节。其工作原…...
SpringMVC 资源状态转移RESTful
文章目录 1、RESTful简介a>资源b>资源的表述c>状态转移 2、RESTful的实现HiddenHttpMethodFilterRESTful案例 1、RESTful简介 REST:Representational State Transfer,表现层资源状态转移。 a>资源 资源是一种看待服务器的方式,…...
verilog vscode linux
安装 vscode 插件 插件:Verilog-HDL/SystemVerilog/Bluespec SystemVerilog 功能:.xdc .ucf .v 等代码高亮、代码格式化、语法检查(Linting)、光标放到变量上提示变量的信息等 关于其他语言的依赖工具等信息查看插件说明 代码对齐…...
Postman日常操作
一.Postman介绍 1.1第一个简单的demo 路特斯(英国汽车品牌)_百度百科 (baidu.com) 1.2 cookie 用postman测试需要登录权限的接口时,会被拦截,解决办法就是每次请求接口前,先执行登录,然后记住cookie或者to…...
10月份程序员书单推荐
新书书单 1、C程序设计教程(第9版) 1.广受认可的《C程序设计教程》系列的第9版(个别版本也译作《C语言大学教程》),秉承了该系列一贯的丰富而详细的风格。该系列一些版本因封面画有蚂蚁形象而被称为“C语言蚂蚁书”。…...
【ChatGPT系列】ChatGPT:创新工具还是失业威胁?
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…...
C++ 实现定时器的两种方法(线程定时和时间轮算法修改版)
定时器要求在固定的时间异步执行一个操作,比如boost库中的boost::asio::deadline_timer,以及MFC中的定时器。也可以利用c11的thread, mutex, condition_variable 来实现一个定时器。 1、使用C11中的thread, mutex, condition_variable来实现一个定时器。…...
2023mathorcup大数据竞赛选题建议及思路
大家好呀,昨天6点2023年第四届MathorCup高校数学建模挑战赛——大数据竞赛开赛,在这里给大家带来初步的选题建议及思路。 注意,本文章只是比较简略的图文讲解,更加详细完整的视频讲解请移步: 2023mathorcup大数据数学…...
部署vuepress项目到githubPage
部署vuepress项目到githubPage 1. 项目文件夹下有两个分支(main和gh-page) 1.1 main分支存放项目代码 1.2 gh-page分支存放 npm run docs:build之后的dist里面的所有文件 2. 分别提交到github上 3. 你的项目/docs/.vuepress/config.js module.export…...
ORACLE表空间说明及操作
ORACLE 表空间作用 数据存储:表空间是数据库中存储数据的逻辑结构。它提供了用于存储表、索引、视图、存储过程等数据库对象的空间。通过划分数据和索引等对象的存储,可以更好地管理和组织数据库的物理存储结构。性能管理和优化:通过将不同类…...
vue使用Element-plus的Image预览时样式崩乱
🔥博客主页: 破浪前进 🔖系列专栏: Vue、React、PHP ❤️感谢大家点赞👍收藏⭐评论✍️ 问题: 在使用组件库的image时出现了点小问题,预览的图片层级反而没有表格的层级高 效果图:…...
安装使用vcpkg的简易教程
目录 1. 首先安装vcpkg2. 在vcpkg目录下运行bootstrap-vcpkg.bat 命令3. 接着vs进行集成4. 使用vcpkg搜索可用的包5.下载安装所需包6.下载安装完成 1. 首先安装vcpkg 使用git命令下载 git clone https://github.com/Microsoft/vcpkg.git如果下载失败可直接下载文件 (vcpkg-ma…...
制作一个简单的C语言词法分析程序
1.分析组成 C语言的程序中,有很单词多符号和保留字。一些单词符号还有对应的左线性文法。所以我们需要先做出一个单词字符表,给出对应的识别码,然后跟据对应的表格来写出程序 2.程序设计 程序主要有循环判断构成。不需推理即可产生的符号我…...
Java项目中将MySQL改为8.0以上
博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 maven依…...
软考高项-计算题(2)
题4 项目的总预算是包含管理储备的,所以总预算应该是:13238102*360 ETC(BAC-EV)/CPI BAC60 EV60*0.318 CPI18/200.9 ETC42/0.9 答案选择C A 题5 因为题目中提到了“按目前的状况继续发展”,那么是:ETC(BAC-EV)/CPI EV1230*0…...
Centos使用war文件部署jenkins
部署jenkins所需要的jdk环境如下: 这里下载官网最新的版本: 选择jenkins2.414.3版本,所以jdk环境最低得是java11 安装java11环境 这里直接安装open-jdk yum -y install java-11-openjdk.x86_64 java-11-openjdk-devel.x86_64下载jenkins最新…...
数据结构和算法——用C语言实现所有排序算法
文章目录 前言排序算法的基本概念内部排序插入排序直接插入排序折半插入排序希尔排序 交换排序冒泡排序快速排序 选择排序简单选择排序堆排序 归并排序基数排序 外部排序多路归并败者树置换——选择排序最佳归并树 前言 本文所有代码均在仓库中,这是一个完整的由纯…...
吃豆人C语言开发—Day2 需求分析 流程图 原型图
目录 需求分析 流程图 原型图 主菜单: 设置界面: 地图选择: 游戏界面: 收集完成提示: 游戏胜利界面: 游戏失败界面 死亡提示: 这个项目是我和朋友们一起开发的,在此声明一下…...
【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
【第二十一章 SDIO接口(SDIO)】
第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
多模态商品数据接口:融合图像、语音与文字的下一代商品详情体验
一、多模态商品数据接口的技术架构 (一)多模态数据融合引擎 跨模态语义对齐 通过Transformer架构实现图像、语音、文字的语义关联。例如,当用户上传一张“蓝色连衣裙”的图片时,接口可自动提取图像中的颜色(RGB值&…...
React19源码系列之 事件插件系统
事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...
DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI
前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...
k8s业务程序联调工具-KtConnect
概述 原理 工具作用是建立了一个从本地到集群的单向VPN,根据VPN原理,打通两个内网必然需要借助一个公共中继节点,ktconnect工具巧妙的利用k8s原生的portforward能力,简化了建立连接的过程,apiserver间接起到了中继节…...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
均衡后的SNRSINR
本文主要摘自参考文献中的前两篇,相关文献中经常会出现MIMO检测后的SINR不过一直没有找到相关数学推到过程,其中文献[1]中给出了相关原理在此仅做记录。 1. 系统模型 复信道模型 n t n_t nt 根发送天线, n r n_r nr 根接收天线的 MIMO 系…...
【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...
