Rust实现基于Tokio的限制内存占用的channel
Rust实现基于Tokio的限制内存占用的channel
简介
本文介绍如何基于tokio的channel实现一个限制内存占用的channel。
Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。
常用的channel有两种:bounded和unbounded,其中ubbounded的channel可以无限的发送数据,而bounded的channel则有限的发送数据。两种channel都没有对自身的内存占用做出限制。
异步网络编程中常用一个channel连接两个task,其中业务task与业务交互:将要发送的数据发送到channel,而网络task与操作系统交互:从channel中接收数据并写入socket。单有时候带宽有限或者对端接收速率过慢时,而网络task从channel中接收的速度小于业务task向channel中发送的速度时,会造成大量的数据阻塞在channel中,如果不对channel的占用内存做限制,则会造成内存占用过多甚至进程被OOM。
实现
-
获取数据大小
要想限制channel总的内存占用,必须要直到每个数据的大小。比较常见的作法是所有需要发送到channel的内容都必须实现一个Trait,此Trait中定义了一个
get_size方法,用于获取数据的大小。pub trait GetSize {/// get total sizefn get_size(&self) -> usize; }要发送的内容必须实现
GetSize的Trait,并实现get_size方法。注意:get_size方法获取到的大小需包括栈空间和堆空间,例如:struct MyData {data: Vec<u8>,}impl GetSize for MyData {fn get_size(&self) -> usize {return std::mem::size_of::<MyData>() + self.data.len();//stack size + heap size}} -
创建
SizedSender和SizedReceiverSizedSender和SizedReceiver都可以基于tokio的UnboundedSender和UnboundedReceiver实现。在tokio的基础上,需要共享一个条件变量用于在sender和receiver之间同步当前是否还有可用空间。pub struct SizedSender<T: GetSize> {inner: mpsc::UnboundedSender<T>,size_semaphore: Arc<(Semaphore, usize)>, } pub struct SizedReceiver<T: GetSize> {inner: mpsc::UnboundedReceiver<T>,size_semaphore: Arc<(Semaphore, usize)>, }/// Limit space usage but not limit the number of messages, bytes_size must bigger than 0. pub fn sized_channel<T: GetSize>(bytes_size: usize) -> (SizedSender<T>, SizedReceiver<T>) {let (tx, rx) = mpsc::unbounded_channel::<T>();let semaphore = Arc::new((Semaphore::new(bytes_size), bytes_size));(SizedSender::new(tx, semaphore.clone()),SizedReceiver::new(rx, semaphore),) } -
SizedSender实现发送端发送时需要调用
get_size方法获取数据的大小,然后调用Semaphore::available_permits方法获取可用空间,如果可用空间大于数据大小,则发送成功,否则发送失败。impl<T: GetSize> SizedSender<T> {pub fn new(inner: mpsc::UnboundedSender<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,}}fn do_send(&self,message: T,permits: Option<SemaphorePermit<'_>>,) -> Result<(), SendError<T>> {match self.inner.send(message) {Ok(r) => {if let Some(permits) = permits {permits.forget();}Ok(r)}Err(e) => {log::debug!("send value error!");Err(e)}}}pub async fn send(&self, message: T) -> Result<(), SendError<T>> {let message_size = message.get_size();if message_size > self.size_semaphore.1 {return Err(SendError(message));}let size = match u32::try_from(message_size) {Ok(size) => size,Err(_) => {return Err(SendError(message));}};if self.size_semaphore.0.available_permits() < size as usize {// The buffer is about to be depleted, sending may be blocked.}let permits = match self.size_semaphore.0.acquire_many(size).await {Ok(perimits) => Some(perimits),Err(_) => {return Err(SendError(message));}};self.do_send(message, permits)}} -
SizedReceiver的实现接收端接收时需要调用
get_size方法获取数据的大小,然后将相应大小的permits还给信号量即可。impl<T: GetSize> SizedReceiver<T> { pub fn new(inner: mpsc::UnboundedReceiver<T>, size_semaphore: Arc<(Semaphore, usize)>) -> Self {Self {inner,size_semaphore,} }pub async fn recv(&mut self) -> Option<T> {self.inner.recv().await.map(|r| {let message_size = r.get_size();self.size_semaphore.0.add_permits(message_size);r}) } } -
其他
在上述实现的基础上,还可以实现更多方法,比如
try_send、try_recv等。
相关文章:
Rust实现基于Tokio的限制内存占用的channel
Rust实现基于Tokio的限制内存占用的channel 简介 本文介绍如何基于tokio的channel实现一个限制内存占用的channel。 Tokio提供了多种协程间同步的接口,用于在不同的协程中同步数据。 常用的channel有两种:bounded和unbounded,其中ubbounded的channel可…...
【C++】C++入门(上)--命名空间 输入输出 缺省参数 函数重载
目录 一 命名空间 1 命名空间的定义 2 命名空间的使用 二 C输入和输出 1 输出 2 输入 三 缺省参数 1 缺省参数概念 2 缺省参数分类 (1) 全缺省参数 (2)半缺省参数 四 函数重载 1 函数重载概念 2 分类 1 参数类型不同 2 参数个数不同 3 参数类型顺序不同 3 C为什…...
设计模式:原型模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
上一篇《访问者模式》 下一篇《享元模式》 简介: 原型模式,它是一种创建型设计模式,它允许通过复制原型对象来创建新的对象,而无需知道创建的细节。其工作原…...
SpringMVC 资源状态转移RESTful
文章目录 1、RESTful简介a>资源b>资源的表述c>状态转移 2、RESTful的实现HiddenHttpMethodFilterRESTful案例 1、RESTful简介 REST:Representational State Transfer,表现层资源状态转移。 a>资源 资源是一种看待服务器的方式,…...
verilog vscode linux
安装 vscode 插件 插件:Verilog-HDL/SystemVerilog/Bluespec SystemVerilog 功能:.xdc .ucf .v 等代码高亮、代码格式化、语法检查(Linting)、光标放到变量上提示变量的信息等 关于其他语言的依赖工具等信息查看插件说明 代码对齐…...
Postman日常操作
一.Postman介绍 1.1第一个简单的demo 路特斯(英国汽车品牌)_百度百科 (baidu.com) 1.2 cookie 用postman测试需要登录权限的接口时,会被拦截,解决办法就是每次请求接口前,先执行登录,然后记住cookie或者to…...
10月份程序员书单推荐
新书书单 1、C程序设计教程(第9版) 1.广受认可的《C程序设计教程》系列的第9版(个别版本也译作《C语言大学教程》),秉承了该系列一贯的丰富而详细的风格。该系列一些版本因封面画有蚂蚁形象而被称为“C语言蚂蚁书”。…...
【ChatGPT系列】ChatGPT:创新工具还是失业威胁?
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…...
C++ 实现定时器的两种方法(线程定时和时间轮算法修改版)
定时器要求在固定的时间异步执行一个操作,比如boost库中的boost::asio::deadline_timer,以及MFC中的定时器。也可以利用c11的thread, mutex, condition_variable 来实现一个定时器。 1、使用C11中的thread, mutex, condition_variable来实现一个定时器。…...
2023mathorcup大数据竞赛选题建议及思路
大家好呀,昨天6点2023年第四届MathorCup高校数学建模挑战赛——大数据竞赛开赛,在这里给大家带来初步的选题建议及思路。 注意,本文章只是比较简略的图文讲解,更加详细完整的视频讲解请移步: 2023mathorcup大数据数学…...
部署vuepress项目到githubPage
部署vuepress项目到githubPage 1. 项目文件夹下有两个分支(main和gh-page) 1.1 main分支存放项目代码 1.2 gh-page分支存放 npm run docs:build之后的dist里面的所有文件 2. 分别提交到github上 3. 你的项目/docs/.vuepress/config.js module.export…...
ORACLE表空间说明及操作
ORACLE 表空间作用 数据存储:表空间是数据库中存储数据的逻辑结构。它提供了用于存储表、索引、视图、存储过程等数据库对象的空间。通过划分数据和索引等对象的存储,可以更好地管理和组织数据库的物理存储结构。性能管理和优化:通过将不同类…...
vue使用Element-plus的Image预览时样式崩乱
🔥博客主页: 破浪前进 🔖系列专栏: Vue、React、PHP ❤️感谢大家点赞👍收藏⭐评论✍️ 问题: 在使用组件库的image时出现了点小问题,预览的图片层级反而没有表格的层级高 效果图:…...
安装使用vcpkg的简易教程
目录 1. 首先安装vcpkg2. 在vcpkg目录下运行bootstrap-vcpkg.bat 命令3. 接着vs进行集成4. 使用vcpkg搜索可用的包5.下载安装所需包6.下载安装完成 1. 首先安装vcpkg 使用git命令下载 git clone https://github.com/Microsoft/vcpkg.git如果下载失败可直接下载文件 (vcpkg-ma…...
制作一个简单的C语言词法分析程序
1.分析组成 C语言的程序中,有很单词多符号和保留字。一些单词符号还有对应的左线性文法。所以我们需要先做出一个单词字符表,给出对应的识别码,然后跟据对应的表格来写出程序 2.程序设计 程序主要有循环判断构成。不需推理即可产生的符号我…...
Java项目中将MySQL改为8.0以上
博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容:毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 maven依…...
软考高项-计算题(2)
题4 项目的总预算是包含管理储备的,所以总预算应该是:13238102*360 ETC(BAC-EV)/CPI BAC60 EV60*0.318 CPI18/200.9 ETC42/0.9 答案选择C A 题5 因为题目中提到了“按目前的状况继续发展”,那么是:ETC(BAC-EV)/CPI EV1230*0…...
Centos使用war文件部署jenkins
部署jenkins所需要的jdk环境如下: 这里下载官网最新的版本: 选择jenkins2.414.3版本,所以jdk环境最低得是java11 安装java11环境 这里直接安装open-jdk yum -y install java-11-openjdk.x86_64 java-11-openjdk-devel.x86_64下载jenkins最新…...
数据结构和算法——用C语言实现所有排序算法
文章目录 前言排序算法的基本概念内部排序插入排序直接插入排序折半插入排序希尔排序 交换排序冒泡排序快速排序 选择排序简单选择排序堆排序 归并排序基数排序 外部排序多路归并败者树置换——选择排序最佳归并树 前言 本文所有代码均在仓库中,这是一个完整的由纯…...
吃豆人C语言开发—Day2 需求分析 流程图 原型图
目录 需求分析 流程图 原型图 主菜单: 设置界面: 地图选择: 游戏界面: 收集完成提示: 游戏胜利界面: 游戏失败界面 死亡提示: 这个项目是我和朋友们一起开发的,在此声明一下…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
HTML 列表、表格、表单
1 列表标签 作用:布局内容排列整齐的区域 列表分类:无序列表、有序列表、定义列表。 例如: 1.1 无序列表 标签:ul 嵌套 li,ul是无序列表,li是列表条目。 注意事项: ul 标签里面只能包裹 li…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)
目录 一、👋🏻前言 二、😈sinx波动的基本原理 三、😈波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、🌊波动优化…...
QT3D学习笔记——圆台、圆锥
类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体(对象或容器)QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质(定义颜色、反光等)QFirstPersonC…...
Rust 开发环境搭建
环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行: rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu 2、Hello World fn main() { println…...
Python 训练营打卡 Day 47
注意力热力图可视化 在day 46代码的基础上,对比不同卷积层热力图可视化的结果 import torch import torch.nn as nn import torch.optim as optim from torchvision import datasets, transforms from torch.utils.data import DataLoader import matplotlib.pypl…...
