如何用rust实现一个异步channel
目录
- 前言
- 思路
- 实现功能
- 代码实现
- 测试
- 先引测试版包
- 测试代码
- 结果与分析
- 思考
- 尾语
前言
使用通信来共享内存,而不是通过共享内存来通信
上面这句话,是每个go开发者在 处理多线程通信时 的座右铭,go甚至把实现这个理念的channel直接焊在编译器里,几乎所有的go程序里都有channel的身影。
rust的异步和go的goroutine有异曲同工之妙,甚至可以把 tokio::spawn 理解为go关键字。但在rust中好像并没有异步channel的实现。本着求人不如求己的原则,决定diy一个类似go的channel。
思路
先看一下发送流程
再看一下接收流程
总体来说流程清晰易懂,不管接收还是发送,都是先尝试从缓存队列中操作值,不成功则加入到对应队列,等待再次执行。反之则唤起相关任务,结束操作。
实现功能
- 首先需要实现一个存放值的环形缓冲区,并且每个单元应该是单独加锁的,从而避免全局锁。
- 需要两个任务队列,用来存放在饥饿模式(从缓存操作失败)下的 发送任务和接受任务。
- 按照rust习惯,将发送者和接受者拆开,并各自实现future
- 因为唤醒不是同步的,需要通过一个唤醒器来唤醒沉默的任务。
- 使用原子操作替代锁
代码实现
具体的就不写了,放在github上了
github地址:https://github.com/woshihaoren4/wd_tools/tree/main/src/channel
测试
这里主要和async-channel测试一下
- async-channel 是最常见的异步channel,在crateio上有两千万的下载。
先引测试版包
cargo.toml
[dependencies]
tokio = {version = "1.22.0",features=["full"]}
wd_tools = {version = "0.8.3",features = ["sync","chan"]}
async-channel = "1.8.0"
- wd_tools 是我们的channel,这里引用的
syncchan两个feature,前者用于测试,后者是chan实现。
测试代码
测试场景:设置缓存长度为10,发100万数据,接100万数据。在1发送者1接受者,1发送者10接受者,10发送者1接受者,10发送者10接受者四种情况下的收发性能。
use std::fmt::Debug;
use wd_tools::channel as wd;
use async_channel as ac;#[tokio::main]
async fn main(){let ts = TestService::new(10);println!("test start ------------> wd_tools");ts.send_to_recv("1-v-1",true,100_0000,1,100_0000,1,|x|x).await;ts.send_to_recv("1-v-10",true,100_0000,1,10_0000,10,|x|x).await;ts.send_to_recv("10-v-1",true,10_0000,10,100_0000,1,|x|x).await;ts.send_to_recv("10-v-10",true,10_0000,10,10_0000,10,|x|x).await;println!("wd_tools <------------- test over");println!("test start ------------> async-channel");ts.send_to_recv("1-v-1",false,100_0000,1,100_0000,1,|x|x).await;ts.send_to_recv("1-v-10",false,100_0000,1,10_0000,10,|x|x).await;ts.send_to_recv("10-v-1",false,10_0000,10,100_0000,1,|x|x).await;ts.send_to_recv("10-v-10",false,10_0000,10,10_0000,10,|x|x).await;println!("async-channel <------------ test over");
}struct TestService<T>{wd_sender : wd::Sender<T>,wd_receiver : wd::Receiver<T>,ac_sender : ac::Sender<T>,ac_receiver : ac::Receiver<T>
}impl<T:Unpin+Send+Sync+Debug+'static> TestService<T>{pub fn new(cap:usize)->TestService<T>{let (wd_sender,wd_receiver) = wd::Channel::new(cap);let (ac_sender,ac_receiver) = ac::bounded(cap);TestService{wd_sender,wd_receiver,ac_sender,ac_receiver}}pub fn send<G:Fn(usize)->T+Send+Sync+'static>(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize,generater:G){let wd_sender = self.wd_sender.clone();let ac_sender = self.ac_sender.clone();wg.defer_args1(|is_wd|async move{for i in 0..max {let t = generater(i);if is_wd {wd_sender.send(t).await.expect(" 发送失败");}else{ac_sender.send(t).await.expect(" 发送失败");}}},is_wd);}pub fn recv(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize){let wd_receiver = self.wd_receiver.clone();let ac_receiver = self.ac_receiver.clone();wg.defer_args1(|is_wd|async move{for _i in 0..max {if is_wd {wd_receiver.recv().await.expect(" 接收失败");}else{ac_receiver.recv().await.expect(" 接收失败");}}},is_wd);}pub async fn send_to_recv<G:Fn(usize)->T+Send+Sync+Clone+'static>(&self,info:&'static str, is_wd:bool, sbase:usize, sgroup:usize, rbase:usize, rgroup:usize, generater:G){let now = std::time::Instant::now();let wg = wd_tools::sync::WaitGroup::default();let wg_send = wd_tools::sync::WaitGroup::default();let wg_recv = wd_tools::sync::WaitGroup::default();for _ in 0..sgroup{self.send(wg_send.clone(),is_wd,sbase,generater.clone());}for _ in 0..rgroup{self.recv(wg_recv.clone(),is_wd,rbase);}wg.defer(move ||async move{let now = std::time::Instant::now();wg_send.wait().await;println!("test[{}] ---> send use time:{}ms",info,now.elapsed().as_millis());});wg.defer(move ||async move{let now = std::time::Instant::now();wg_recv.wait().await;println!("test[{}] ---> recv use time:{}ms",info,now.elapsed().as_millis());});wg.wait().await;println!("test[{}] ---> all use time:{}ms",info,now.elapsed().as_millis());}
}
结果与分析
测试10次,取平均值做表,如下

如上图,得结论
- 在1发收者和10发收者的情况下,两种channel效率相差不多。
- 在发送者和接受者数量不等时,
wd_tools::channel的性能明显优于async-channel
思考
分析结论之前先看一下async-channel的实现。虽然async-channel也是异步,但它并不依赖某个异步运行时来进行任务的上线文切换,而是使用concurrent-queue和event-listener进行消息调度,底层依赖于std::thread::park_timeout。
相比event-listener的调度方式,直接管理tokio的Context则更适用于异步环境。尤其是存在大量等待的场景。如上面测试,接受者和发送者数量不等,需要长时间等待的情况。实际开发中,接受者或者发送者可能长时间处于饥饿的情况下,wd_tools::channel不会产生多余的资源开销,毕竟上下文被挂起了,也就不会被cpu执行。
当然实际是复杂的,因情而异,使用的CPU数量(线程数),缓存长度,异步任务数同样会影响消息队列的性能,尤其是不需要等待的场景下async-channel性能更优。
而wd_tools::channel则更适合tokio异步环境。并且不会引起线程park,而产生其他影响。
尾语
wd_tools::channel 目前只是一个初级版本,还有很多地方待优化,比如过多的状态判断,对缓存区直接轮训加锁,而没有采用优化算法, 唤醒器完全可以通过一定优化策略替换带。
但这个思路是没错的,欢迎有想法的同志加入进来。
相关文章:
如何用rust实现一个异步channel
目录 前言思路实现功能代码实现 测试先引测试版包测试代码结果与分析思考 尾语 前言 使用通信来共享内存,而不是通过共享内存来通信 上面这句话,是每个go开发者在 处理多线程通信时 的座右铭,go甚至把实现这个理念的channel直接焊在编译器里&…...
gitee上传项目到仓库
目录 一些常用的Git命令切换到其他盘符:列出目录下的所有文件:以树状图的形式显示目录下的文件和子目录:返回上一层目录:写的C#代码文件上传到新建的Git仓库中,可以按照以下步骤进行操作:出现的错误: 一些常…...
day27 贪心算法
1.什么是贪心? 比如10张钞票,有1,5,20,100等面额,取五张,如何取得到数额最多的钱?每次取面额最大的那张钞票;就是每个阶段的局部最优;全局最优就是最后拿到的…...
Java实现字符串反转
起因 自己在刷题的过程中,想把一个字符串翻转一下,便写了下面的代码: String str "abcd";str str.reverse();发现行不通,这是为什么呢? 分析 在Java中,字符串是不可变的对象,这意…...
vue - 常见的性能优化
文章目录 vue使用中常见的性能优化1, v-for 遍历避免同时使用 v-if2, 如果需要使用v-for给每项元素绑定事件时 可以使用事件代理**3, 一些数据不做响应式4,一些页面采用keep-alive缓存组件5,第三方UI库按需导入6&#…...
微服务系列文章 之 Nginx服务状态监控的方法
在Nginx的插件模块中有一个模块stub_status可以监控Nginx的一些状态信息,默认安装可能没有这个模块,手动编译的时候加一下即可。 1. 模块安装 先使用命令查看是否已经安装这个模块: [rootihxb123Z nginx]# ./nginx -V (V大写会显示版本号和…...
【网络系统集成】路由器实验
1.实验名称:路由器RIP协议配置 2.实验目的 在PacketTracer中进行模拟实验,配置RIP协议,验证RIP协议更新时间及路由状态变化,加深对路由器RIP协议相关知识的理解与掌握。 3.实验内容 (1)拓扑结构图 (2)ip地址分配与端口分配...
【mac 安装Miniconda】
1.下载Miniconda 注意mac是什么版本,m1下载m1版本 https://docs.conda.io/en/latest/miniconda.html#macos-installers 2.安装Miniconda 在下载文件所在目录下打开终端,输入一下命令: bash Miniconda3-latest-MacOSX-x86_64.sh 一路回车&…...
螺栓疲劳计算-风电行业,参考GL2010, ST0361,1993-1-9
由于不想再重新排版了,于是转成了图片。...
QT学习之旅 - QThread多线程
文章目录 首先是主线程 其次是一个程序 通过一个QThread来放入程序 进阶一点: 手动开启关闭线程俩个线程 其实QT中的thread(线程)是很容易的 首先是主线程 #include "mainwindow.h" #include "ui_mainwindow.h"#include <QDebug>MainWindow::MainWin…...
PROFINET转TCP/IP网关TCP/IP协议的含义是
大家好,今天要和大家分享一款自主研发的通讯网关,远创智控YC-PN-TCPIP。这款网关可是集多种功能于一身,PROFINET从站功能,让它在通讯领域独领风骚。想知道这款网关如何实现PROFINET和TCP/IP网络的连接吗?一起来看看吧&…...
计算机网络基础第六章
一、应用层概述 1.1 网络应用模型 1.1.1 客户/服务器(C/S)模型 1.1.2 P2P模型 二、域名解析系统——DNS系统 2.1 域名 2.2 域名服务器 2.3 域名解析过程 三、文件传输协议——FTP 3.1 FTP服务器和用户端 3.2 FTP工作原理 四、电子邮件 4.1 电子邮件系统概述 4.2 简单邮件传送…...
MobPush:Android客户端SDK厂商通道回执配置指南
华为厂商回执配置 登录华为AppGallery Connect网站。在左侧菜单点击增长 - 推送服务,进入推送服务页面后,点击配置页签,检查应用回执状态,如未开通请点击右侧开通按钮。 如已开通,点击修改按钮,弹出“选择…...
Karmada: Open, Multi-Cloud, Multi-Cluster Kubernetes Orchestration
Karmada是一个开源的多云应用编排和管理平台,旨在帮助用户在多个云提供商之间无缝地部署、编排和管理应用程序。 Karmada(Kubernetes Armada)是一个Kubernetes管理系统,它使您能够在多个Kubernetes集群和云环境中运行云原生应用程…...
arcgis拓扑检查
不能有悬挂点 不能有伪结点***路网处理很重要,看研究吧。 一直默认到最后。 导入要素类,单个 toupu2右键新建拓扑(T) 一般选不能有悬挂点,不能重叠。 一路默认 是 拉进图层可视化 线要素的话记得添加字段length&#…...
icp许可证 办理流程(icp资质申请条件)
icp许可证 办理流程(icp资质申请条件)是什么? ICP经营许可证是可以线上无忧办理的,包下证,流程也很简单,只需要你提供企业营业执照、法人身份证这些基础材料就可以。加急10-20工作日拿证,普通20-60工作日拿证。 在了解…...
三菱PLC 控制灯一秒钟交替闪烁
三菱PLC中常用的特殊继电器: M8000 上电一直ON标志 M8002 上电导通一次 M8004 PLC出错 M8005 PLC备用电池电量低标志 M8011 10ms时钟脉冲 M8012 100ms时钟脉冲 M8013 1s时钟脉冲 M8014 1min时钟脉冲 M8034…...
金融数据库的战场,太平洋保险和OceanBase打了场胜仗
点击关注 文丨刘雨琦 “数据库的国产替代,必须经过严格的考虑,保证不会出错,所以大多数企业的领导层选择按兵不动或者简单扩容。因为不换就不会错,选了很久如果选错,还可能会出现重大事故。” 某银行数据库技术人员…...
IP协议【图解TCP/IP(笔记九)】
文章目录 IP即网际协议IP相当于OSI参考模型的第3层网络层与数据链路层的关系 IP基础知识IP地址属于网络层地址路由控制■ 发送数据至最终目标地址■ 路由控制表 数据链路的抽象化IP属于面向无连接型 IP即网际协议 TCP/IP的心脏是互联网层。这一层主要由IP(Internet…...
C#仿热血江湖
目录 1 定义属性 2 GClass86_0 3 Int32 4 List 5 NpcID 定义属性 private int _NpcID; private GClass86 gclass86_0 = new GClass86(); private int int_0;...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
Python:操作 Excel 折叠
💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 Python 操作 Excel 系列 读取单元格数据按行写入设置行高和列宽自动调整行高和列宽水平…...
Qt Widget类解析与代码注释
#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this); }Widget::~Widget() {delete ui; }//解释这串代码,写上注释 当然可以!这段代码是 Qt …...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
生成 Git SSH 证书
🔑 1. 生成 SSH 密钥对 在终端(Windows 使用 Git Bash,Mac/Linux 使用 Terminal)执行命令: ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 参数说明: -t rsa&#x…...
学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1
每日一言 生活的美好,总是藏在那些你咬牙坚持的日子里。 硬件:OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写,"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...
NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合
在汽车智能化的汹涌浪潮中,车辆不再仅仅是传统的交通工具,而是逐步演变为高度智能的移动终端。这一转变的核心支撑,来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒(T-Box)方案:NXP S32K146 与…...
PAN/FPN
import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
