使用MASA全家桶从零开始搭建IoT平台(三)管理设备的连接状态
文章目录
- 前言
- 分析
- 方案1:遗嘱消息
- 演示遗嘱消息的使用
- 实施流程
- 方案2:使用WebHook
- 开启WebHook
- 演示Webhook
- 编写代码
前言
获取一个设备的在线和离线状态,是一个很关键的功能。我们对设备下发的控制指令,设备处于在线状态才能及时给我们反馈。这里的在线和离线,我们可以简单的理解为设备与MQTT的连接状态。
分析
我们打电话的时候经常能听到:"您拨打的用户已关机“和”用户不在服务区或暂时无法接通“,这两种的区别是什么?

1、当用户开机时,会自动向最近的移动基站注册,基站标记该用户为"attach"(在线)状态。
2、当用户关机时,手机会发起datach流程,告知基站自己关机了,基站标记该用户为"detach"(离线)状态。这样再次拨打就可以节省寻呼资源,直接提示用户关机。
3、当用户忽然进入无网络的环境,或者手机故障,导致来不及发起datach流程,基站还认为用户"在线",当有人拨打用户号码时,基站测会对用户进行寻呼,但是超时得不到回应后,就会提示"不在服务区"或者"暂时无法接通" 的语音。
其实这个方案在IoT上也是可行的,我们可以让设备在线和离线的过程中向特定Topic发送状态消息,但是存在问题,我们需要一个单独的Broker去订阅这个Topic,但是这个单独的Broker很容易成为单点故障点。而且如果设备数量很大,这种意外离线的设备也很难及时发现,需要下发指令后等待设备响应超时才能发现。
方案1:遗嘱消息
MQTT 遗嘱消息可以在客户端意外断线时将“遗嘱”优雅地发送给第三方订阅者,以实现离线通知、设备状态更新等业务。其中意外断线指客户端断开前未向服务器发送 DISCONNECT 消息,比如:
因网络故障或网络波动,设备在保持连接周期内未能通讯,连接被服务端关闭
设备意外掉电
设备尝试进行不被允许的操作而被服务端关闭连接,例如订阅自身权限以外的主题等
遗嘱消息在 MQTT 客户端向服务器端 CONNECT 请求时设置,可选属性包括是否发送遗嘱消息 (Will Message)标志,和遗嘱消息主题 (Topic) 与内容(Payload) 以及 Properties。
值得一提的,遗嘱消息发布的时间可能会有延迟:通常意外断线时,服务器无法立即检测到断线行为,需要通过连接保活心跳机制并经过一定周期后才会触发;MQTT 5.0 提供的遗嘱延迟间隔(Will Delay Interval)属性也会影响发布时间。
演示遗嘱消息的使用
我们使用A、B两台电脑使用MQTT X来演示。
我们在A电脑的 MQTT X 中新建一个名为 Test 的连接,Host 修改为 修改为我们的MQTT地址(192.120.5.204),并输入账号密码,在 Advanced 部分选择 MQTT Version 为 5.0,并且将 Session Expiry Interval 设置为 10,确保会话不会在遗嘱消息发布前过期。

然后在 Lass Will and Testament 部分将 Last-Will Topic 设置为 offline,Last-Will Payload 设置为 I’m offline,Will Delay Interval (s) 设置为 5。

完成以上设置后,我们点击右上角的 Connect 按钮以建立连接。
我们在B电脑的MQTTX中新建一个连接Sub,mqtt地址同样指向我们的mqtt服务器(192.120.5.204)

并订阅offline主题

我们用任务管理器直接结束A电脑的MQTTX进程,这是连接会被直接断开,模拟了设备断电的场景,在5s之后,在B电脑的MQTTX订阅中收到了一条内容为 I‘m offline 的遗嘱消息。

实施流程
1、设备遗嘱消息内容设置为offline,该遗嘱主题与一个普通发送状态的主题设定成同一个 {设备名称}/status。例如 284202304230001/status
2、当设备连接时,向主题 {设备名称}/status 发送内容为 online 的Retained消息,其它客户端订阅主题 {设备名称}/status 的时候,将获取到 Retained 消息为 online。
保留消息(Retain )
MQTT 服务端收到 Retain 标志为 1 的 PUBLISH 报文时,会将该报文视为保留消息,除了被正常转发以外, 保留消息会被存储在服务端,每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换。
当客户端建立订阅时,如果服务端存在主题匹配的保留消息,则这些保留消息将被立即发送给该客户端。 借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下是非常重要的。
EMQX 默认开启保留消息的能力和服务,可以在 etc/emqx.conf 中修改 mqtt.retain_available 为 false 来关闭保留消息的能力, 这样客户端将被禁止发送 Retain 标志为 1 的 PUBLISH 报文,否则,客户端将会收到原因码为 0x9A(不支持保留消息)的 DISCONNECT 报文。
保留消息的服务会存储和管理客户端发送的保留消息,并发送给相应的订阅者。
3、当客户端异常断开时,系统自动向主题 {设备名称}/status 发送内容为 offline 的消息,其它订阅了此主题的客户端会马上收到 offline 消息;如果遗嘱消息设置了 Will Retain,那么此时如果有新的订阅 A/status 主题的客户端上线,也将获取到内容为 offline 的遗嘱消息。
方案2:使用WebHook
方案1需要设备主动设置遗嘱消息才能实现,那么有没有更简单的方式,直接通过设备与Mqtt的连接事件来获取连接状态呢。
EMQX 设计了一套WebHook系统,可以通过这个自带的WebHook系统获取内部的事件并进行处理。WebHook的原理很简单,当设备与mqtt建立连接或者断开连接时,EMQX会把事件的信息通过我们的配置调用特定的URL上的接口,实现通知。
使用WebHook还可以有限避免单点故障。所以本项目会采用WebHook的方式来实现对设备在线和离线的管理。
开启WebHook
在数据集成 -> 数据桥接 中创建一个Webhook

名称设置为ConnectedEvent,URL 中填写我们的Webhook地址,也就是触发事件之后的调用接口地址,这里我们填:
http://192.120.5.204:5000/api/Device/ConnectedEvent
请求方式为Post,其他内容保持默认不变

这里注意URL可以通过${field}的方式拼接,请求体也可以自己指定,如果留空会原样转发消息,我们这里请求体留空
设备在线和离线的事件转发的消息格式如下
{"username": "284202304230001","timestamp": 1682652598840,"sockname": "172.17.0.5:1883","receive_maximum": 32,"proto_ver": 5,"proto_name": "MQTT","peername": "172.17.0.1:48524","node": "emqx@172.17.0.5","mountpoint": "undefined","metadata": {"rule_id": "rule_3hsx"},"keepalive": 60,"is_bridge": false,"expiry_interval": 10,"event": "client.connected","connected_at": 1682652598840,"conn_props": {"User-Property": {},"Session-Expiry-Interval": 10},"clientid": "mqttx_c4491df0","clean_start": false
}
我们点击 创建 ,并继续点击 创建规则

我们在创建规则中指定新的规则名称 rule_client_connected,并在SQL编辑器复制以下内容
SELECT*
FROM"$events/client_connected","$events/client_disconnected"
在右侧的事件中,我们可以看到所有可用的事件,我们选择了连接和断开两个事件,在这两个事件触发时会通过Webhook调用我们配置的接口,这样我们就能获取到设备的在线、离线状态了。

我们点击 创建按钮 完成规则的创建
我们可以看见我们创建好的规则

点击规则ID,还可以看到统计数据

在FLows中还可以看到整个工作流程

演示Webhook
我们使用MQTTX模拟一次设备连接和断开动作,可以在规则统计界面看到我们的操作已经被记录。

编写代码
我们这里采用方案2。
我们需要实现之前配置的ConnectedEvent接口
/// <summary>/// 连接事件请求/// </summary>public class ConnectedEventRequest{/// <summary>/// 设备名称/// </summary>public string Username { get; set; }/// <summary>/// 时间戳/// </summary>public long Timestamp { get; set; }/// <summary>/// 事件(连接/断开)/// </summary>public string Event { get; set; }/// <summary>/// 连接时间(断开事件中为0)/// </summary>public long Connected_at { get; set; }/// <summary>/// Client ID/// </summary>public string Clientid { get; set; }}
/// <summary>/// 更新设备在线状态/// </summary>/// <param name="deviceName"></param>/// <param name="onlineStatus"></param>/// <returns></returns>public async Task UpdateDeviceOnlineStatusAsync(string deviceName, OnLineStates onlineStatus){var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.IoTDeviceExtend).AsNoTracking().FirstOrDefaultAsync(o => o.DeviceName == deviceName);if (device == null){return;}else{if (device.IoTDeviceExtend == null) //扩展表为空{device.IoTDeviceExtend = new IoTDeviceExtend{DeviceInfoId = device.Id,OnLineStates = (int)onlineStatus,};_ioTDbContext.Attach(device.IoTDeviceExtend);_ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Added;_ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;await _ioTDbContext.SaveChangesAsync();}if (device.IoTDeviceExtend.OnLineStates != (int)onlineStatus) //在线状态不一致{device.IoTDeviceExtend.OnLineStates = (int)onlineStatus;_ioTDbContext.Attach(device.IoTDeviceExtend);//防止更新其他字段_ioTDbContext.Entry(device.IoTDeviceExtend).State = EntityState.Unchanged;_ioTDbContext.Entry(device.IoTDeviceExtend).Property(o => o.OnLineStates).IsModified = true;await _ioTDbContext.SaveChangesAsync();}}}
我们根据Event中的内容来判断是 连接(client.connected)/断开(client.disconnected) 的事件
/// <summary>/// 连接、断开事件/// </summary>/// <param name="request"></param>/// <returns></returns>[HttpPost]public async Task ConnectedEventAsync([FromBody] ConnectedEventRequest request){var onlineStatus = request.Event switch{"client.connected" => OnLineStates.OnLine,_ => OnLineStates.OffLine};await _deviceHandler.UpdateDeviceOnlineStatusAsync(request.Username, onlineStatus);}
#总结
以上就是本文要讲的内容,我们可以通过MQTTX来测试我们的代码有效性。
该方案还存在部分缺点,例如:
1、每次设备上下线会导致频繁的请求接口,在大量设备接入的场景中需要考虑接口性能。
2、由于网络等问题,Web调用顺序可能不能完全保证,也许离线会比在线事件更早处理,从而导致状态不一致。我们后面会尝试用其他方案来替代WebHook,尝试解决上述问题,在此之前我们都会继续使用WebHook进行功能演示。
完整代码在这里:https://github.com/sunday866/MASA.IoT-Training-Demos
如果你对我们的 MASA 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们
WeChat:MasaStackTechOps
QQ:7424099
相关文章:
使用MASA全家桶从零开始搭建IoT平台(三)管理设备的连接状态
文章目录 前言分析方案1:遗嘱消息演示遗嘱消息的使用实施流程 方案2:使用WebHook开启WebHook演示Webhook编写代码 前言 获取一个设备的在线和离线状态,是一个很关键的功能。我们对设备下发的控制指令,设备处于在线状态才能及时给我们反馈。这里的在线和…...
我的新书上架了!
talk is cheap,show you my book! 新书《从0开始学ARM》终于在各大平台上架了!! 一、关于本书 1. 本书主要内容 ARM体系架构是目前市面上的主流处理器体系架构,在手机芯片和嵌入式芯片领域,ARM体系架构…...
语言与专业的奇迹:如何利用ChatGPT优化跨国贸易
贸易公司,在进行跨国贸易时,往往需要面对不同国家的甲方或者乙方,在与之沟通的过程中,语言和专业是必须要过的一关,顺畅的交流,往往会带来更好的收益。 今天以“茶”为例,给大家介绍一“知否AI…...
云服务器安装宝塔Linux面板命令脚本大全
阿里云服务器安装宝塔Linux面板,操作系统不同安装命令脚本也不同,支持CentOS、Alibaba Cloud Linux、Ubuntu/Deepin等Linux系统,阿里云服务器网分享阿里云服务器安装宝塔Linux面板命令脚本大全: 云服务器安装宝塔Linux面板命令 …...
zed2i相机中imu内参的标定及外参标定
zed2i中imu内参的标定 参考: https://blog.csdn.net/weixin_42681311/article/details/126109617 https://blog.csdn.net/weixin_43135184/article/details/123444090 值得注意,imu内参的标定其实不是那么重要,大致上给一个值应该影响不大…...
Java中的JUnit是什么?如何使用JUnit进行单元测试
JUnit是Java中最流行的单元测试框架之一。它可以帮助开发人员在代码编写过程中检测出错误和异常,从而提高代码的质量和可靠性。 什么是JUnit? JUnit是一个由Kent Beck和Erich Gamma创建的开源Java单元测试框架,它已经成为Java开发中最常用的…...
【seata的部署和集成】
seata的部署和集成 seata的部署和集成一、部署Seata的tc-server1.下载2.解压3.修改配置4.在nacos添加配置5.创建数据库表6.启动TC服务 二、微服务集成seata1.引入依赖2.修改配置文件 三、TC服务的高可用和异地容灾1.模拟异地容灾的TC集群2.将事务组映射配置到nacos3.微服务读取…...
uniapp学习日记之request自定义请求头
uniapp学习日记之request自定义请求头 在学习uniapp的过程中,由于笔者是从Vue项目转来学习uniapp,在使用uni.request时,发现在浏览器调试时,无法在请求头header中添加token字段,愤而弃之,便开始使用axios组…...
【Rust】速度入门---打印个螃蟹先
参考: 菜鸟教程 1 输出到命令行 这不得打印个螃蟹 // 代码来自官方入门教程 // ferris_say需要另外安装 use ferris_says::say; use std::io::{stdout, BufWriter};fn main() {let stdout: std::io::Stdout stdout();let msg: String String::from("Hello fellow Rusta…...
《Linux 内核设计与实现》12. 内存管理
文章目录 页区获得页获得填充为 0 的页释放页 kmalloc()gfp_mask 标志kfree()vmalloc() slab 层slab 层的设计slab 分配器的接口 在栈上的静态分配单页内核栈 高端内存的映射永久映射临时映射 每个 CPU 的分配新的每个 CPU 接口 页 struct page 结构表示系统中的物理页&#x…...
公司新来个卷王,让人崩溃...
最近内卷严重,各种跳槽裁员,相信很多小伙伴也在准备今年的面试计划。 在此展示一套学习笔记 / 面试手册,年后跳槽的朋友可以好好刷一刷,还是挺有必要的,它几乎涵盖了所有的软件测试技术栈,非常珍贵&#x…...
Docker 安全及日志管理
Docker 安全及日志管理 Docker 容器与虚拟机的区别隔离与共享性能与损耗 Docker 存在的安全问题Docker 自身漏洞Docker 源码问题Docker 架构缺陷与安全机制Docker 安全基线标准 容器相关的常用安全配置方法容器最小化Docker 远程 API 访问控制重启 Docker在宿主机的 firewalld …...
大厂面试必备 - MAC 地址 和 IP 地址分别有什么作用?
数据链路层 1、MAC 地址 和 IP 地址分别有什么作用? MAC 地址是数据链路层和物理层使用的地址,是写在网卡上的物理地址。MAC 地址用来定义网络设备的位置。IP 地址是网络层和以上各层使用的地址,是一种逻辑地址。IP 地址用来区别网络上的计…...
【sqlite】联查Join更新
系列文章 C#底层库–MySQLBuilder脚本构建类(select、insert、update、in、带条件的SQL自动生成) 本文链接:https://blog.csdn.net/youcheng_ge/article/details/129179216 C#底层库–MySQL数据库操作辅助类(推荐阅读࿰…...
asp.net+C#德育课程分数统计管理系统
本中小学德育管理系统主要学校内部提供服务,系统分为管理员,教师和学生3个大模块。 本研究课题重点主要包括了下面几大模块:用户登录,管理员信息管理学生信息管理,教师信息管理,班级成绩管理,学…...
Figma中文网?比Figma更懂你的设计网站!
一个比 Figma 更懂你的设计网站的 Figma 中文网 —— 即时设计是一个非常有用的设计资源平台,它提供了大量的免费设计素材,包括来自各大厂商的 UI 组件库、精美的模板、插画设计和矢量图标素材等等。设计师可以从中学习到大师的设计技巧和规范࿰…...
Nacos-01-Nacos基本介绍
背景 服务发现是⼀个古老的话题,当应用开始脱离单机运行和访问时,服务发现就诞生了。目前的网络架构是每个主机都有⼀个独立的 IP 地址,那么服务发现基本上都是通过某种方式获取到服务所部署的 IP 地址。DNS 协议是最早将⼀个网络名称翻译…...
SpringBoot集成Dubbo启用gRPC协议
文章目录 前言项目结构代码示例父工程api moduleservice module 注意事项区别 本文记录下SpringBoot集成Dubbo启用gRPC协议,以及与原生 gRPC 在代码编写过程中的区别。 下面还有投票,帮忙投个票👍 前言 Dubbo 在 2.7.5 版本开始支持原生 gRP…...
Kali HTTrack演示-渗透测试察打一体(1)
HTTrack是一个免费并易于使用的线下浏览器工具,全称是HTTrack Website Copier for Windows,它能够让你从互联网上下载指定的网站进行线下浏览(离线浏览),也可以用来收集信息(甚至有网站使用隐藏的密码文件),一些仿真度极高的伪网站(为了骗取用户密码),也是使用类似工具做…...
ThreeJS进阶之使用后期处理
什么是后期处理? 很多three.js应用程序是直接将三维物体渲染到屏幕上的。 有时,你或许希望应用一个或多个图形效果,例如景深、发光、胶片微粒或是各种类型的抗锯齿。 后期处理是一种被广泛使用、用于来实现这些效果的方式。 首先,场景被渲染到一个渲染目标上,渲染目标表示…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建
制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...
c#开发AI模型对话
AI模型 前面已经介绍了一般AI模型本地部署,直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型,但是目前国内可能使用不多,至少实践例子很少看见。开发训练模型就不介绍了&am…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
路由基础-路由表
本篇将会向读者介绍路由的基本概念。 前言 在一个典型的数据通信网络中,往往存在多个不同的IP网段,数据在不同的IP网段之间交互是需要借助三层设备的,这些设备具备路由能力,能够实现数据的跨网段转发。 路由是数据通信网络中最基…...
从0开始学习R语言--Day17--Cox回归
Cox回归 在用医疗数据作分析时,最常见的是去预测某类病的患者的死亡率或预测他们的结局。但是我们得到的病人数据,往往会有很多的协变量,即使我们通过计算来减少指标对结果的影响,我们的数据中依然会有很多的协变量,且…...
LeetCode - 148. 排序链表
目录 题目 思路 基本情况检查 复杂度分析 执行示例 读者可能出的错误 正确的写法 题目 148. 排序链表 - 力扣(LeetCode) 思路 链表归并排序采用"分治"的策略,主要分为三个步骤: 分割:将链表从中间…...
迁移科技3D视觉系统:重塑纸箱拆垛场景的智能革命
一、传统拆垛场景的困局与破局之道 在汽车零部件仓库中,每天有超过2万只异形纸箱需要拆垛分拣。传统人工拆垛面临三大挑战: 效率瓶颈:工人每小时仅能处理200-300件,且存在间歇性疲劳安全隐患:20kg以上重箱搬运导致年…...
Unity VR/MR开发-开发环境准备
视频讲解链接: 【XR马斯维】UnityVR/MR开发环境准备【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...
从零开始学Flink:揭开实时计算的神秘面纱
一、为什么需要Flink? 当你在电商平台秒杀商品时,1毫秒的延迟可能导致交易失败;当自动驾驶汽车遇到障碍物时,10毫秒的计算延迟可能酿成事故。这些场景揭示了一个残酷事实:数据的价值随时间呈指数级衰减。 传统批处理…...
