如何用rust实现一个异步channel
目录
- 前言
- 思路
- 实现功能
- 代码实现
- 测试
- 先引测试版包
- 测试代码
- 结果与分析
- 思考
- 尾语
前言
使用通信来共享内存,而不是通过共享内存来通信
上面这句话,是每个go开发者在 处理多线程通信时 的座右铭,go甚至把实现这个理念的channel直接焊在编译器里,几乎所有的go程序里都有channel的身影。
rust的异步和go的goroutine有异曲同工之妙,甚至可以把 tokio::spawn 理解为go关键字。但在rust中好像并没有异步channel的实现。本着求人不如求己的原则,决定diy一个类似go的channel。
思路
先看一下发送流程
再看一下接收流程
总体来说流程清晰易懂,不管接收还是发送,都是先尝试从缓存队列中操作值,不成功则加入到对应队列,等待再次执行。反之则唤起相关任务,结束操作。
实现功能
- 首先需要实现一个存放值的环形缓冲区,并且每个单元应该是单独加锁的,从而避免全局锁。
- 需要两个任务队列,用来存放在饥饿模式(从缓存操作失败)下的 发送任务和接受任务。
- 按照rust习惯,将发送者和接受者拆开,并各自实现future
- 因为唤醒不是同步的,需要通过一个唤醒器来唤醒沉默的任务。
- 使用原子操作替代锁
代码实现
具体的就不写了,放在github上了
github地址:https://github.com/woshihaoren4/wd_tools/tree/main/src/channel
测试
这里主要和async-channel
测试一下
- async-channel 是最常见的异步channel,在crateio上有两千万的下载。
先引测试版包
cargo.toml
[dependencies]
tokio = {version = "1.22.0",features=["full"]}
wd_tools = {version = "0.8.3",features = ["sync","chan"]}
async-channel = "1.8.0"
- wd_tools 是我们的channel,这里引用的
sync
chan
两个feature,前者用于测试,后者是chan实现。
测试代码
测试场景:设置缓存长度为10,发100万数据,接100万数据。在1发送者1接受者,1发送者10接受者,10发送者1接受者,10发送者10接受者四种情况下的收发性能。
use std::fmt::Debug;
use wd_tools::channel as wd;
use async_channel as ac;#[tokio::main]
async fn main(){let ts = TestService::new(10);println!("test start ------------> wd_tools");ts.send_to_recv("1-v-1",true,100_0000,1,100_0000,1,|x|x).await;ts.send_to_recv("1-v-10",true,100_0000,1,10_0000,10,|x|x).await;ts.send_to_recv("10-v-1",true,10_0000,10,100_0000,1,|x|x).await;ts.send_to_recv("10-v-10",true,10_0000,10,10_0000,10,|x|x).await;println!("wd_tools <------------- test over");println!("test start ------------> async-channel");ts.send_to_recv("1-v-1",false,100_0000,1,100_0000,1,|x|x).await;ts.send_to_recv("1-v-10",false,100_0000,1,10_0000,10,|x|x).await;ts.send_to_recv("10-v-1",false,10_0000,10,100_0000,1,|x|x).await;ts.send_to_recv("10-v-10",false,10_0000,10,10_0000,10,|x|x).await;println!("async-channel <------------ test over");
}struct TestService<T>{wd_sender : wd::Sender<T>,wd_receiver : wd::Receiver<T>,ac_sender : ac::Sender<T>,ac_receiver : ac::Receiver<T>
}impl<T:Unpin+Send+Sync+Debug+'static> TestService<T>{pub fn new(cap:usize)->TestService<T>{let (wd_sender,wd_receiver) = wd::Channel::new(cap);let (ac_sender,ac_receiver) = ac::bounded(cap);TestService{wd_sender,wd_receiver,ac_sender,ac_receiver}}pub fn send<G:Fn(usize)->T+Send+Sync+'static>(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize,generater:G){let wd_sender = self.wd_sender.clone();let ac_sender = self.ac_sender.clone();wg.defer_args1(|is_wd|async move{for i in 0..max {let t = generater(i);if is_wd {wd_sender.send(t).await.expect(" 发送失败");}else{ac_sender.send(t).await.expect(" 发送失败");}}},is_wd);}pub fn recv(&self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize){let wd_receiver = self.wd_receiver.clone();let ac_receiver = self.ac_receiver.clone();wg.defer_args1(|is_wd|async move{for _i in 0..max {if is_wd {wd_receiver.recv().await.expect(" 接收失败");}else{ac_receiver.recv().await.expect(" 接收失败");}}},is_wd);}pub async fn send_to_recv<G:Fn(usize)->T+Send+Sync+Clone+'static>(&self,info:&'static str, is_wd:bool, sbase:usize, sgroup:usize, rbase:usize, rgroup:usize, generater:G){let now = std::time::Instant::now();let wg = wd_tools::sync::WaitGroup::default();let wg_send = wd_tools::sync::WaitGroup::default();let wg_recv = wd_tools::sync::WaitGroup::default();for _ in 0..sgroup{self.send(wg_send.clone(),is_wd,sbase,generater.clone());}for _ in 0..rgroup{self.recv(wg_recv.clone(),is_wd,rbase);}wg.defer(move ||async move{let now = std::time::Instant::now();wg_send.wait().await;println!("test[{}] ---> send use time:{}ms",info,now.elapsed().as_millis());});wg.defer(move ||async move{let now = std::time::Instant::now();wg_recv.wait().await;println!("test[{}] ---> recv use time:{}ms",info,now.elapsed().as_millis());});wg.wait().await;println!("test[{}] ---> all use time:{}ms",info,now.elapsed().as_millis());}
}
结果与分析
测试10次,取平均值做表,如下
如上图,得结论
- 在1发收者和10发收者的情况下,两种channel效率相差不多。
- 在发送者和接受者数量不等时,
wd_tools::channel
的性能明显优于async-channel
思考
分析结论之前先看一下async-channel
的实现。虽然async-channel
也是异步,但它并不依赖某个异步运行时来进行任务的上线文切换,而是使用concurrent-queue
和event-listener
进行消息调度,底层依赖于std::thread::park_timeout
。
相比event-listener
的调度方式,直接管理tokio的Context
则更适用于异步环境。尤其是存在大量等待的场景。如上面测试,接受者和发送者数量不等,需要长时间等待的情况。实际开发中,接受者或者发送者可能长时间处于饥饿的情况下,wd_tools::channel
不会产生多余的资源开销,毕竟上下文被挂起了,也就不会被cpu执行。
当然实际是复杂的,因情而异,使用的CPU数量(线程数),缓存长度,异步任务数同样会影响消息队列的性能,尤其是不需要等待的场景下async-channel
性能更优。
而wd_tools::channel
则更适合tokio异步环境。并且不会引起线程park
,而产生其他影响。
尾语
wd_tools::channel
目前只是一个初级版本,还有很多地方待优化,比如过多的状态判断,对缓存区直接轮训加锁,而没有采用优化算法, 唤醒器完全可以通过一定优化策略替换带。
但这个思路是没错的,欢迎有想法的同志加入进来。
相关文章:

如何用rust实现一个异步channel
目录 前言思路实现功能代码实现 测试先引测试版包测试代码结果与分析思考 尾语 前言 使用通信来共享内存,而不是通过共享内存来通信 上面这句话,是每个go开发者在 处理多线程通信时 的座右铭,go甚至把实现这个理念的channel直接焊在编译器里&…...
gitee上传项目到仓库
目录 一些常用的Git命令切换到其他盘符:列出目录下的所有文件:以树状图的形式显示目录下的文件和子目录:返回上一层目录:写的C#代码文件上传到新建的Git仓库中,可以按照以下步骤进行操作:出现的错误: 一些常…...

day27 贪心算法
1.什么是贪心? 比如10张钞票,有1,5,20,100等面额,取五张,如何取得到数额最多的钱?每次取面额最大的那张钞票;就是每个阶段的局部最优;全局最优就是最后拿到的…...
Java实现字符串反转
起因 自己在刷题的过程中,想把一个字符串翻转一下,便写了下面的代码: String str "abcd";str str.reverse();发现行不通,这是为什么呢? 分析 在Java中,字符串是不可变的对象,这意…...

vue - 常见的性能优化
文章目录 vue使用中常见的性能优化1, v-for 遍历避免同时使用 v-if2, 如果需要使用v-for给每项元素绑定事件时 可以使用事件代理**3, 一些数据不做响应式4,一些页面采用keep-alive缓存组件5,第三方UI库按需导入6&#…...
微服务系列文章 之 Nginx服务状态监控的方法
在Nginx的插件模块中有一个模块stub_status可以监控Nginx的一些状态信息,默认安装可能没有这个模块,手动编译的时候加一下即可。 1. 模块安装 先使用命令查看是否已经安装这个模块: [rootihxb123Z nginx]# ./nginx -V (V大写会显示版本号和…...

【网络系统集成】路由器实验
1.实验名称:路由器RIP协议配置 2.实验目的 在PacketTracer中进行模拟实验,配置RIP协议,验证RIP协议更新时间及路由状态变化,加深对路由器RIP协议相关知识的理解与掌握。 3.实验内容 (1)拓扑结构图 (2)ip地址分配与端口分配...
【mac 安装Miniconda】
1.下载Miniconda 注意mac是什么版本,m1下载m1版本 https://docs.conda.io/en/latest/miniconda.html#macos-installers 2.安装Miniconda 在下载文件所在目录下打开终端,输入一下命令: bash Miniconda3-latest-MacOSX-x86_64.sh 一路回车&…...

螺栓疲劳计算-风电行业,参考GL2010, ST0361,1993-1-9
由于不想再重新排版了,于是转成了图片。...
QT学习之旅 - QThread多线程
文章目录 首先是主线程 其次是一个程序 通过一个QThread来放入程序 进阶一点: 手动开启关闭线程俩个线程 其实QT中的thread(线程)是很容易的 首先是主线程 #include "mainwindow.h" #include "ui_mainwindow.h"#include <QDebug>MainWindow::MainWin…...

PROFINET转TCP/IP网关TCP/IP协议的含义是
大家好,今天要和大家分享一款自主研发的通讯网关,远创智控YC-PN-TCPIP。这款网关可是集多种功能于一身,PROFINET从站功能,让它在通讯领域独领风骚。想知道这款网关如何实现PROFINET和TCP/IP网络的连接吗?一起来看看吧&…...

计算机网络基础第六章
一、应用层概述 1.1 网络应用模型 1.1.1 客户/服务器(C/S)模型 1.1.2 P2P模型 二、域名解析系统——DNS系统 2.1 域名 2.2 域名服务器 2.3 域名解析过程 三、文件传输协议——FTP 3.1 FTP服务器和用户端 3.2 FTP工作原理 四、电子邮件 4.1 电子邮件系统概述 4.2 简单邮件传送…...

MobPush:Android客户端SDK厂商通道回执配置指南
华为厂商回执配置 登录华为AppGallery Connect网站。在左侧菜单点击增长 - 推送服务,进入推送服务页面后,点击配置页签,检查应用回执状态,如未开通请点击右侧开通按钮。 如已开通,点击修改按钮,弹出“选择…...

Karmada: Open, Multi-Cloud, Multi-Cluster Kubernetes Orchestration
Karmada是一个开源的多云应用编排和管理平台,旨在帮助用户在多个云提供商之间无缝地部署、编排和管理应用程序。 Karmada(Kubernetes Armada)是一个Kubernetes管理系统,它使您能够在多个Kubernetes集群和云环境中运行云原生应用程…...

arcgis拓扑检查
不能有悬挂点 不能有伪结点***路网处理很重要,看研究吧。 一直默认到最后。 导入要素类,单个 toupu2右键新建拓扑(T) 一般选不能有悬挂点,不能重叠。 一路默认 是 拉进图层可视化 线要素的话记得添加字段length&#…...

icp许可证 办理流程(icp资质申请条件)
icp许可证 办理流程(icp资质申请条件)是什么? ICP经营许可证是可以线上无忧办理的,包下证,流程也很简单,只需要你提供企业营业执照、法人身份证这些基础材料就可以。加急10-20工作日拿证,普通20-60工作日拿证。 在了解…...

三菱PLC 控制灯一秒钟交替闪烁
三菱PLC中常用的特殊继电器: M8000 上电一直ON标志 M8002 上电导通一次 M8004 PLC出错 M8005 PLC备用电池电量低标志 M8011 10ms时钟脉冲 M8012 100ms时钟脉冲 M8013 1s时钟脉冲 M8014 1min时钟脉冲 M8034…...

金融数据库的战场,太平洋保险和OceanBase打了场胜仗
点击关注 文丨刘雨琦 “数据库的国产替代,必须经过严格的考虑,保证不会出错,所以大多数企业的领导层选择按兵不动或者简单扩容。因为不换就不会错,选了很久如果选错,还可能会出现重大事故。” 某银行数据库技术人员…...

IP协议【图解TCP/IP(笔记九)】
文章目录 IP即网际协议IP相当于OSI参考模型的第3层网络层与数据链路层的关系 IP基础知识IP地址属于网络层地址路由控制■ 发送数据至最终目标地址■ 路由控制表 数据链路的抽象化IP属于面向无连接型 IP即网际协议 TCP/IP的心脏是互联网层。这一层主要由IP(Internet…...
C#仿热血江湖
目录 1 定义属性 2 GClass86_0 3 Int32 4 List 5 NpcID 定义属性 private int _NpcID; private GClass86 gclass86_0 = new GClass86(); private int int_0;...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...

Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
重启Eureka集群中的节点,对已经注册的服务有什么影响
先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...