如何在 Rust 中通过 Rumqttc 实现 MQTT 通信
Rust 简介
Rust 是一门系统级编程语言,以其卓越的性能、并发能力以及内存安全特性著称。Rust 由 Mozilla 推出,目标是在现代软件开发中提供一种安全高效的编程语言。其设计旨在提供安全、并发和高效的编程体验,同时保持开发效率和代码质量不受影响。
Rust 的核心特性包括:
- 内存安全:Rust 通过所有权系统和借用检查器确保内存安全。所有权系统在编译时追踪每个值的所有权,并负责管理内存释放。借用检查器防止空指针引用和数据竞争等常见的内存问题。
- 并发性:Rust 提供了一组轻量级的并发工具,让开发人员能够轻松编写安全的并发程序。通过 std::thread 模块,可以方便创建和管理线程,而 std::sync 模块则提供了如互斥锁、信号量和通道等同步原语,保证线程之间安全的数据共享和通信。
- 高性能:Rust 强调零成本抽象和极低的运行时开销。它支持内联汇编、无锁编程和异步编程等高级功能,帮助开发者编写高性能的系统应用和网络服务。
总的来说,Rust 是一门功能强大、安全可靠、高性能的编程语言,适用于广泛的应用场景,从嵌入式开发到大规模分布式系统,甚至网络服务等领域。随着其生态系统的不断完善和活跃的社区支持,Rust 正逐渐成为开发人员的热门选择。
选择基于 Rust 的 MQTT 库
在 Rust 生态系统中,有几种常见的 MQTT 库,其中最受欢迎的是 rumqtt 和 paho-mqtt。
paho-mqtt
paho-mqtt 是 Eclipse Paho 项目的一部分,它是一个跨平台的 MQTT 客户端库,支持包括 Rust 在内的多种编程语言。paho-mqtt 支持 MQTT v3.1 和 v5.0 协议,以稳定和成熟著称。
特点:paho-mqtt 在众多项目中得到了广泛应用,并拥有活跃的社区支持。它提供了同步和异步 API,适用于多种应用场景。
rumqtt
rumqtt 是一个用 Rust 编写的开源库,旨在实现 MQTT 协议,具有简单、健壮和高性能的特点。该项目包括两个主要组件:rumqttc 和 rumqttd。
-
rumqttc
rumqttc 是一个纯 Rust 实现的 MQTT 客户端,设计目标是稳健、高效且易于使用。它基于异步(使用 tokio)的事件循环,使开发者能够方便地发送和接收 MQTT 消息,与 MQTT Broker 进行通信。
-
rumqttd
rumqttd 是一个高性能的 Rust 实现的 MQTT Broker,它的设计轻量且可嵌入,可以将其作为库集成到代码中,甚至扩展其功能。
特点:rumqtt 采用现代设计,提供符合 Rust 异步编程模型的异步 API。其轻量级和高性能的设计使其即使在资源有限的环境中也能表现出色。此外,rumqtt 的 API 设计简洁明了,遵循 Rust 的语言风格,易于使用和理解。
选择 rumqtt 的理由:
- 现代设计
- 轻量级且高性能
- 简洁的 API
- 活跃的社区支持
- 灵活的配置选项
在本文中,我们将使用 rumqttc 进行示例演示。
在 Rust 中使用 MQTT 的示例程序
以下示例将演示如何使用 rumqttc 库创建一个 MQTT 客户端,并实现消息的发布和订阅。通过这些示例,您将学习如何初始化客户端、设置选项、连接到 MQTT 服务器,以及发布/订阅消息。
准备工作
本示例使用 EMQX 提供的免费公共 MQTT 服务器进行测试,连接信息如下:
Broker:broker.emqx.io
TCP 端口:1883
Websocket 端口:8083
-
创建一个新的 Rust 项目:
$ cargo new mqtt-rust-exampleCreated binary (application) `mqtt-rust-example` package
-
修改 Cargo.toml 文件,添加所需的依赖项:
[dependencies] rumqttc = "0.24.0" pretty_env_logger = "0.4" tokio = { version = "1", features = ["full"] }
同步订阅和发布 MQTT 消息
下面的内容展示了如何实现同步订阅和发布 MQTT 消息。
-
修改 Cargo.toml:
[[bin]] name = "syncpubsub" path = "src/syncpubsub.rs"
-
在项目的 src 目录下创建 syncpubsub.rs 文件,并添加以下代码:
use rumqttc::{Client, LastWill, MqttOptions, QoS}; use std::thread; use std::time::Duration;/** 这是程序的主函数。* 在该函数中,将初始化 MQTT 客户端、设置连接选项和遗嘱消息。* 然后,创建客户端和连接、并在新线程中调用发布函数。* 最后,使用 connection.iter() 方法遍历并处理接中的每个通知。*/ fn main() {// 初始化日志记录器pretty_env_logger::init();// 设置 MQTT 连接选项和遗嘱消息let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883);let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);mqttoptions.set_keep_alive(Duration::from_secs(5)).set_last_will(will);// 创建 MQTT 客户端和连接,并启动新线程进行消息发布let (client, mut connection) = Client::new(mqttoptions, 10);thread::spawn(move || publish(client));// 遍历并处理连接中的每个通知for (i, notification) in connection.iter().enumerate() {match notification {Ok(notif) => {println!("{i}. Notification = {notif:?}");}Err(error) => {println!("{i}. Notification = {error:?}");return;}}} println!("Done with the stream!!"); }/** 这是一个用于发布 MQTT 消息的辅助函数。* 在该函数中,首先休眠一秒钟,然后订阅一个主题。* 接着,循环发送 10 条长度从 0 到 9 不等的消息,每条消息的 QoS 都设置为“至少一次”。*/ fn publish(client: Client) {// 订阅主题前等待一秒thread::sleep(Duration::from_secs(1));client.subscribe("hello/+/world", QoS::AtMostOnce).unwrap();// 发送 10 条消息,长度从 0 到 9 不等,每条消息的 QoS 都设置为“至少一次”for i in 0..10_usize {let payload = vec![1; i]; let topic = format!("hello/{i}/world");let qos = QoS::AtLeastOnce;client.publish(topic, qos, true, payload).unwrap();}thread::sleep(Duration::from_secs(1)); }
-
编译:
$ cargo build
-
运行 syncpubsub:
$ ./target/debug/syncpubsub 0. Notification = Incoming(ConnAck(ConnAck { session_present: false, code: Success })) 1. Notification = Outgoing(Subscribe(1)) 2. Notification = Outgoing(Publish(2)) 3. Notification = Outgoing(Publish(3)) 4. Notification = Outgoing(Publish(4)) 5. Notification = Outgoing(Publish(5)) 6. Notification = Outgoing(Publish(6)) 7. Notification = Outgoing(Publish(7)) 8. Notification = Outgoing(Publish(8)) 9. Notification = Outgoing(Publish(9)) 10. Notification = Outgoing(Publish(10)) 11. Notification = Outgoing(Publish(11)) 12. Notification = Incoming(Publish(Topic = hello/9/world, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 9)) 13. Notification = Incoming(Publish(Topic = hello/8/world, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 8)) 14. Notification = Incoming(Publish(Topic = hello/7/world, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 7)) 15. Notification = Incoming(Publish(Topic = hello/6/world, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 6)) ...
异步订阅和发布 MQTT 消息
下面的示例展示了如何使用 tokio 库有效管理异步任务,实现异步订阅和发布 MQTT 消息。
-
修改 Cargo.toml:
[[bin]] name = "asyncpubsub" path = "src/asyncpubsub.rs"
-
在项目的 src 目录下创建 asyncpubsub.rs 文件,并添加以下代码:
/** 这行代码从 tokio 库导入了 task 和 time 模块,* 用于管理异步任务和处理与时间相关的操作。*/ use tokio::{task, time};use rumqttc::{AsyncClient, MqttOptions, QoS}; use std::error::Error; use std::time::Duration;/** 这个宏注解表明使用的是 tokio 运行时,* 其中 current_thread 表示异步代码将在单线程上下文中运行。*/ #[tokio::main(flavor = "current_thread")] /** 这是程序的主函数,是一个异步函数。* 在这个函数中,首先初始化一个 MQTT 客户端并设置连接选项。* 然后,创建异步客户端和事件循环,并在任务中调用请求函数。* 最后,通过事件循环轮询并处理事件。*/ async fn main() -> Result<(), Box<dyn Error>> {// 初始化日志记录器pretty_env_logger::init();// color_backtrace::install();// 设置 MQTT 连接选项let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883);mqttoptions.set_keep_alive(Duration::from_secs(5));// 创建异步 MQTT 客户端和事件循环let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);/** 创建一个包含闭包的异步任务。* 在闭包内部,首先调用 requests(client).await;执行消息发布和订阅操作。* 然后,使用 time::sleep(Duration::from_secs(3)).await; 让任务休眠 3 秒。*/task::spawn(async move {requests(client).await;time::sleep(Duration::from_secs(3)).await;}); loop {// 在事件循环中等待并获取下一个事件。let event = eventloop.poll().await;// 对检索到的事件执行模式匹配,以确定其类型match &event {Ok(v) => {println!("Event = {v:?}");}Err(e) => {println!("Error = {e:?}");return Ok(());}}} }/** 这是一个异步函数,用于发布和订阅消息。* 在此函数中,订阅一个主题,并循环发送长度从 1 到 10 的消息,每秒发送一条信息。* 最后,休眠 120 秒,再处理后续事件。*/ async fn requests(client: AsyncClient) {/** 用于订阅 MQTT 服务器上的特定主题("hello/world")。* 指定服务质量(QoS)为 AtMostOnce,表示最多一次消息传递。*/client.subscribe("hello/world", QoS::AtMostOnce).await.unwrap();/** 向“hello/world”主题发送 10 条消息,每条消息的长度从 1 到 10 递增,发送间隔为 1 秒。* 每条消息的服务质量(QoS)设置为 ExactlyOnce。*/for i in 1..=10 {client.publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) .await.unwrap();time::sleep(Duration::from_secs(1)).await;}time::sleep(Duration::from_secs(120)).await; }
-
编译:
$ cargo build
-
运行 asyncpubsub:
$ ./target/debug/asyncpubsub Event = Incoming(ConnAck(ConnAck { session_present: false, code: Success })) Event = Outgoing(Subscribe(1)) Event = Outgoing(Publish(2)) Event = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(ExactlyOnce)] })) Event = Outgoing(PubRel(2)) Event = Incoming(PubRec(PubRec { pkid: 2 })) Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 1)) Event = Incoming(PubComp(PubComp { pkid: 2 })) Event = Outgoing(Publish(3)) Event = Outgoing(PubRel(3)) ...
结语
通过以上基于 rumqttc 的示例,我们演示了如何实现简单的异步订阅和发布功能。除了基本的 MQTT 功能,rumqttc 还支持 MQTT v5 的新特性,如用户属性等。了解更多信息,请参考 rumqtt 示例。
相关文章:
如何在 Rust 中通过 Rumqttc 实现 MQTT 通信
Rust 简介 Rust 是一门系统级编程语言,以其卓越的性能、并发能力以及内存安全特性著称。Rust 由 Mozilla 推出,目标是在现代软件开发中提供一种安全高效的编程语言。其设计旨在提供安全、并发和高效的编程体验,同时保持开发效率和代码质量不…...

广东高校建设AIGC实验室时需要注意哪几个关键点?
随着人工智能技术的飞速发展,特别是生成式人工智能(AIGC)在各行各业中的广泛应用,它已经成为推动新一轮科技革命和产业变革的关键力量。教育部等相关部门近年来也高度重视人工智能领域的人才培养工作,强调要加快推动高…...
设计模式-PIMPL 模式
PIMPL(Pointer to IMPLementation),又称Opaque Pointer模式或编译防火墙,是一种在C中广泛应用的编程技术。其核心思想是将类的实现细节从公共接口中分离出来,通过指向实现类的指针来访问类的具体功能。这种模式在提高代…...
Docker部署MongoDB教程
嘿,大家好!今天我在三丰云免费服务器上进行了一次激动人心的MongoDB部署测试。这款免费云服务器1核CPU、1G内存、10G硬盘、5M带宽,是不错的免费服务器选择。 首先,让我们简要介绍一下使用到的Docker和MongoDB软件。Docker是一个开…...

堆排序易错点
1.建堆和调整堆(插入和删除) 建堆和调整堆的过程是不一样的: 建堆 从非终端节点编号的结点开始依次建立大根堆,例如: 拿第2个图说,首先比较-1,7,从中选一个小的,即“-1”…...

安卓13长按电源按键直接关机 andriod13不显示关机对话框直接关机
总纲 android13 rom 开发总纲说明 文章目录 1.前言2.问题分析3.代码分析4.代码修改5.编译6.彩蛋1.前言 有些设备需要在长按电源键的时候,直接关机。不需要弹出对话框进行询问。 2.问题分析 过滤电源按键,需要在系统里面处理的话,那么我们需要熟悉android的事件分发,然后再…...

React学习笔记(四)——React 组件生命周期
目录 1. 生命周期-概览 2. 生命周期-挂载阶段 3. 生命周期-更新阶段 4. 生命周期-卸载阶段 5. setState扩展-发现问题 6. setState扩展-更多用法 7. setState扩展-异步 1. 生命周期-概览 了解react类组件生命周期整体情况 大致步骤: 什么是生命周期React类组…...
PHP的guzzlehttp/guzzle库在碰到各种异常时的场景
PHP的guzzlehttp/guzzle库在碰到各种异常时的场景 结论: 经过测试得知 在http状态码为1xx, 2xx, 3xx时, 会在111处输出返回 在http状态码为4xx, 5xx时, 会在222处被捕获 在目标服务不可达或其他异常时会在333处被捕获 测试过程: 用其他程序写个接口, 实现输入什么状态码就返…...

多机部署,负载均衡-LoadBalance
文章目录 多机部署,负载均衡-LoadBalance1. 开启多个服务2. 什么是负载均衡负载均衡的实现客户端负载均衡 3. Spring Cloud LoadBalance快速上手使用Spring Cloud LoadBalance实现负载均衡修改IP,端口号为服务名称启动多个服务 负载均衡策略自定义负载均衡策略 LoadBalance原理…...

Hadoop安装与配置
一、Hadoop安装与配置 1、解压Hadoop安装包 找到hadoop-2.6.0.tar.gz,将其复到master0节点的”/home/csu”目录内,解压hadoop [csumaster0 ~]$ tar -zxvf ~/hadoop-2.6.0.tar.gz 解压成成功后自动在csu目录下创建hadoop-2.6.0子目录,可以用cd hadoo…...
一个自制的比较low的刷题软件
一个自制的比较low的刷题软件 一、背景 工作中往往涉及一些考试,比如阿里云ACP认证,华为GAUSS认证、软考等,应对这些考试的时候,我们往往是先看书后刷题(当然也有直接刷题的大神,毕竟考试,懂的…...

【Java 集合】List接口 —— ArrayList 与 LinkedList 详解
List接口继承自Collection接口,是单列集合的一个重要分支。 在List集合中允许出现重复的元素,所有的元素是以一种线性方式进行存储的,在程序中可以通过索引(类似于数组中的元素角标)来访问集合中的指定元素。另外&…...

通信工程学习:什么是PNF物理网络功能
PNF:物理网络功能 PNF(Physical Network Function)即物理网络功能,是指支持网络功能的物理设备。以下是关于PNF的详细解释: 一、定义与特点 定义: PNF是网络设备厂商(如Cisco、华为、H3C等)通过专用硬件实体提供软件功能的设备。这些设备直接在物理服务器上运…...
Unity的Text组件中实现输入内容的渐变色效果
要在Unity的Text组件中实现输入内容的渐变色效果,默认的Text组件不直接支持渐变色。但是,你可以通过以下几种方式实现: ### 1. **使用Shader**来实现渐变效果 通过自定义Shader为Text组件创建一个渐变效果。这是一个常用的做法࿰…...

network-scripts目录下没有ens33文件的问题
作者:程序那点事儿 日期:2023/11/09 06:52 systemctl start NetworkManager #开启网络管理器nmcli con show #查看ens33网卡对应的是ifcfg-Wired_connection_3这个文件(网络管理器要开启,不然报错),或者根据…...

OpenHarmony(鸿蒙南向)——平台驱动指南【DAC】
往期知识点记录: 鸿蒙(HarmonyOS)应用层开发(北向)知识点汇总 鸿蒙(OpenHarmony)南向开发保姆级知识点汇总~ 持续更新中…… 概述 功能简介 DAC(Digital to Analog Converter&…...

10.Lab Nine —— file system-下
Symbolic links 添加符号链接 1.添加有关symlink系统调用的定义声明,包括kernel/syscall.h, kernel/syscall.c, user/usys.pl 和 user/user.h. 2.添加新的文件类型T_SYMLINK到kernel/stat.h中,添加新的文件标识位O_NOFOLLOW到kernel/fcntl.h中 3.在ken…...
低代码中实现数据映射的必要性与方案
在数字化转型的浪潮中,低代码平台因其快速开发和灵活性而受到越来越多企业的青睐。然而,随着业务需求的复杂化,单纯依赖低代码工具往往难以满足企业在数据处理和业务逻辑上的要求。数据映射作为连接不同数据源和业务逻辑的桥梁,显…...

SpringBoot集成阿里easyexcel(一)基础导入导出
easyexcel主要用于excel文件的读写,可使用model实体类来定义文件读写的模板,对开发人员来说实现简单Excel文件的读写很便捷。可参考官方文档 https://github.com/alibaba/easyexcel 一、引入依赖 <!-- 阿里开源EXCEL --><dependency><gr…...

四元组问题
目录 问题描述 输入格式 输出格式 样例输入 样例输出 说明 评测数据规模 运行限制 原题链接 代码思路 问题描述 从小学开始,小明就是一个非常喜欢数学的孩子。他喜欢用数学的方式解决各种问题。在他的高中时期,他遇到了一个非常有趣的问题&…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

LeetCode - 394. 字符串解码
题目 394. 字符串解码 - 力扣(LeetCode) 思路 使用两个栈:一个存储重复次数,一个存储字符串 遍历输入字符串: 数字处理:遇到数字时,累积计算重复次数左括号处理:保存当前状态&a…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...

Redis数据倾斜问题解决
Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中,部分节点存储的数据量或访问量远高于其他节点,导致这些节点负载过高,影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...

人机融合智能 | “人智交互”跨学科新领域
本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

在Mathematica中实现Newton-Raphson迭代的收敛时间算法(一般三次多项式)
考察一般的三次多项式,以r为参数: p[z_, r_] : z^3 (r - 1) z - r; roots[r_] : z /. Solve[p[z, r] 0, z]; 此多项式的根为: 尽管看起来这个多项式是特殊的,其实一般的三次多项式都是可以通过线性变换化为这个形式…...