当前位置: 首页 > news >正文

MQTT客户端实战:从连接到通信。详细说明MQTT客户端和MQTT代理进行通信

在这里插入图片描述

EMQX安装

EMQX服务器安装

安装文档,见链接不另外写

https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html

启动 EMQX

启动为一个 systemd 服务:

sudo systemctl start emqx

在windows安装客户端

在线 MQTT WebSocket 客户端工具,MQTTX Web 是开源的 MQTT 5.0 浏览器客户端,但是经我测试没有成功,好像有bug.

建议使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX,旨在帮助开发者在不需要使用图形化界面的基础上,也能更快的开发和调试 MQTT 服务与应用。
在这里插入图片描述
由于是后期被写的博文,图是借官方的。请自行区分一下。

平台安装后的地址

1,平台的地址

  • http://127.0.0.1:18083
    后台登录 用户名:test 密码:test

Laravel中处理MQTT订阅

1,安装MQTT客户端库

在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:

composer require php-mqtt/client

2, 新建command文件

文件路径:app/Console/Commands/MqttClientCommand.php

这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。

MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT连接。

MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。

<?phpnamespace App\Console\Commands;use App\Http\Controllers\Wxapi\DeviceReportController;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Client;
use Simps\MQTT\Config\ClientConfig;
use Simps\MQTT\Hex\ReasonCode;use Swoole\Coroutine;
use Illuminate\Support\Facades\Redis;class MQTTUserConfig
{    const SIMPS_MQTT_REMOTE_HOST = '*';const SIMPS_MQTT_PORT = 1883;const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;const SIMPS_MQTT_USER = 'test*';const SIMPS_MQTT_PASSWORD = 'test*';
}class MqttClientCommand extends Command
{protected $signature = 'mqtt:handle {param1}';protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';protected  $mqtt ;const SWOOLE_MQTT_CONFIG = ['open_mqtt_protocol' => true,'package_max_length' => 2 * 1024 * 1024,'connect_timeout' => 5.0,'write_timeout' => 5.0,'read_timeout' => 5.0,];//模拟设备const CLiENT_IDs = ['mqttx_devA','mqttx_devB','mqttx_devC','mqttx_devD'];public function __construct(){parent::__construct();}public function handle(){$param1 =$this->argument('param1');
//        $param2 =$this->argument('param2');if ($param1=='subscribe') {$this->info('启动订阅...');$this->subscribeMqtt();} elseif ($param1=='public') {$this->info('启动发布...');$this->publishMQTT();}echo '\r\n\r\n分配工作执行完成!!!';}protected function getTestMQTT5ConnectConfig(){$config = new ClientConfig();$UserConfig = new MQTTUserConfig();return $config->setUserName($UserConfig::SIMPS_MQTT_USER)->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)->setClientId(Client::genClientID())->setKeepAlive(10)->setDelay(3000) // 3s->setMaxAttempts(5)->setProperties(['session_expiry_interval' => 60,'receive_maximum' => 65535,'topic_alias_maximum' => 65535,])->setProtocolLevel(5)->setSwooleConfig( ['open_mqtt_protocol' => true,'package_max_length' => 2 * 1024 * 1024,'connect_timeout' => 5.0,'write_timeout' => 5.0,'read_timeout' => 5.0,]);}private function heartbeat($message) {if ($message) {parse_str($message,$array);$device = $array['imei'];$hash = ':mqtt:heartbeat:online'.":{$device}";Redis::expire($hash,30);  ##30s有效Redis::sAdd($hash,1);}}/** 订阅*  private function subscribeMqtt(){Coroutine\run(function () {$client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());....*/private function subscribeMqtt(){Coroutine\run(function () {$UserConfig = new MQTTUserConfig();$client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,$this->getTestMQTT5ConnectConfig());$will = ['topic' => 'simps-mqtt/dinweiyi/delete','qos' => 1,'retain' => 0,'message' => 'byebye','properties' => ['will_delay_interval' => 60,'message_expiry_interval' => 60,'content_type' => 'test','payload_format_indicator' => true, // false 0 1],];$client->connect(true, $will);$topics['simps-mqtt/dinweiyi/subscribe_message'] = ['qos' => 2,'no_local' => true,'retain_as_published' => true,'retain_handling' => 2,];$res = $client->subscribe($topics);$timeSincePing = time();var_dump($res);echo '\r\n\r\n connect success !!!';while (true) {try {$buffer = $client->recv();$message = null;if ($buffer && $buffer !== true) {$message = $buffer["message"];// QoS1 PUBACKif ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {$client->send(['type' => Types::PUBACK,'message_id' => $buffer['message_id'],],false);}if ($buffer['type'] === Types::DISCONNECT) {echo sprintf("Broker is disconnected, The reason is %s [%d]\n",ReasonCode::getReasonPhrase($buffer['code']),$buffer['code']);$client->close($buffer['code']);break;}$reportObj = new DeviceReportController();$ret = $reportObj->store($message);var_dump("182>>>",$ret);unset($reportObj);}if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {$buffer = $client->ping();if ($buffer) {echo 'send ping success ...' ;$this->heartbeat($message);$timeSincePing = time();}}} catch (\Throwable $e) {throw $e;}}});}protected function getMessage() {$client_ids = ['mqttx_devA',
//            'mqttx_devB','mqttx_devC','mqttx_devD'];$message = [];$message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];$message['time'] = time();$message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];return json_encode($message);}/** 发布*/public function publishMQTT() {Coroutine\run(function () {$UserConfig = new MQTTUserConfig();$client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,$this->getTestMQTT5ConnectConfig());$client->connect();while (true) {$message = $this->getMessage();$response = $client->publish('simps-mqtt/user/subscribe_message',$message,1,0,0,['topic_alias' => 1,'message_expiry_interval' => 12,]);var_dump( 'publishMQTT>>>',$message);Coroutine::sleep(1);}});}}

3, 代码流程图

使用Mermaid语法描述的上述PHP代码的流程图:

subscribe
public
收到消息
心跳超时
开始
构造函数 __construct
handle 方法
param1 参数
调用 subscribeMqtt
调用 publishMQTT
Coroutine 运行 subscribeMqtt
创建 MQTT 客户端并连接
设置遗嘱消息
订阅主题
接收消息
处理消息
发送心跳
心跳函数 heartbeat
存储消息
是否断开连接
关闭连接
Coroutine 运行 publishMQTT
创建 MQTT 客户端并连接
循环发布消息
获取测试消息
发布消息
结束

流程说明:

  1. 开始:程序启动。
  2. 构造函数 __construct:初始化命令行工具。
  3. handle 方法:处理命令行输入。
  4. param1 参数:根据输入的参数决定是订阅还是发布。
  5. 调用 subscribeMqtt:如果参数是subscribe,则调用此方法。
  6. 调用 publishMQTT:如果参数是public,则调用此方法。
  7. Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
  8. 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
  9. 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
  10. 订阅主题:订阅特定的MQTT主题。
  11. 接收消息:持续监听并接收消息。
  12. 处理消息:对接收到的消息进行处理。
  13. 心跳函数 heartbeat:检查设备心跳。
  14. 存储消息:将消息存储到数据库或其他存储系统。
  15. 是否断开连接:检查客户端是否断开连接。
  16. 关闭连接:如果断开,则关闭连接。
  17. Coroutine 运行 publishMQTT:在协程中运行发布方法。
  18. 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
  19. 循环发布消息:循环发布消息。
  20. 获取测试消息:生成要发布的测试消息。
  21. 发布消息:将消息发布到MQTT服务器。
  22. 结束:程序结束。

后台常驻运行

1,php artisan命令在后台运行
  1. 打开您的终端或SSH到您的服务器。
  2. 使用nohup命令运行您的Artisan命令进行测试,如下所示
php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe

3.命令行的php的版本与web php的版本号要一致

2,使用宝塔的守护进程开启进程

在这里插入图片描述
也可以添加守护进程。
以上2种最好是只选一个

测试

打开emqx web ,在浏览器输入http://127.0.0.0.1:18083/#/websocket

在这里插入图片描述
主题:
主题跟php代码内的主题是一致的。
Payload:
是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。
已发送
会出现发布的主题和内容

检查发送的结果

打开数据库,检查device_report表是否成功。成功应下图所示:
在这里插入图片描述

实操完成

相关文章:

MQTT客户端实战:从连接到通信。详细说明MQTT客户端和MQTT代理进行通信

EMQX安装 EMQX服务器安装 安装文档&#xff0c;见链接不另外写 https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html 启动 EMQX 启动为一个 systemd 服务&#xff1a; sudo systemctl start emqx在windows安装客户端 在线 MQTT WebSocket 客户端工具&…...

【go/方法记录】cgo静态库编译以及使用dlv定位cgo崩溃问题

目录 说在前面文件树静态库编译cgo使用崩溃模拟使用dlv定位崩溃参考 说在前面 测试环境&#xff1a;WSL2go版本&#xff1a;go version go1.23.1 linux/amd64gcc版本&#xff1a;gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0cmake版本&#xff1a;3.22.1 文件树 ├── buffer …...

(笔记自用)位运算总结+LeetCode例题:颠倒二进制位+位1的个数

一.位运算总结: 在解题之前理解一下为什么需要位运算&#xff1f;它的本质是什么&#xff1f; 力扣上不少位运算相关的题&#xff0c;并且很多题也会用到位运算的技巧。这又是为什么&#xff1f; 位运算的由来 在计算机里面&#xff0c;任何数据最终都是用数字来表示的&…...

024.PL-SQL进阶—游标

课 程 推 荐我 的 个 人 主 页&#xff1a;&#x1f449;&#x1f449; 失心疯的个人主页 &#x1f448;&#x1f448;入 门 教 程 推 荐 &#xff1a;&#x1f449;&#x1f449; Python零基础入门教程合集 &#x1f448;&#x1f448;虚 拟 环 境 搭 建 &#xff1a;&#x1…...

从零开始使用树莓派debian系统使用opencv4.10.0进行人脸识别(保姆级教程)

一、总体架构 本文主要是使用树莓派自带的csi摄像头&#xff0c;搭配上opencv4.10.0进行物体的识别。本文使用的环境是python3.7.3&#xff0c;环境不一样有可能安装的opencv的过程也会很不一样&#xff0c;但是python的环境我们可以自己自行安装。 二、树莓派系统的安装 本文…...

golang qq邮件发送验证码

验证码的使用场景 注册/登录&#xff1a;使用验证码可以有效减少垃圾账号注册和恶意登录&#xff1b;短信接口保护&#xff1a;高效减少防止短信接口被刷情况&#xff1b;提交/投票&#xff1a;有效减少恶意刷单、恶意提交、恶意投票等情况&#xff1b;密码找回&#xff1a;用…...

鸿蒙 OS 开发单词打卡 APP 项目实战 20240922 笔记和源码分享

配套有完整的录播课, 需要的私信. 零基础入门级别, 有点前端基础都能学会. 效果截图: 代码截图: 页面完整代码: import { AnswerStatus } from ../enums/AnswerStatus import { PracticeStatus } from ../enums/PracticeStatus import { getRandomQuestions, Question …...

力扣P1706全排列问题 很好的引入暴力 递归 回溯 dfs

代码思路是受一个洛谷题解里面大佬的启发。应该算是一个dfs和回溯的入门题目&#xff0c;很好的入门题目了下面我会先给我原题解思路我想可以很快了解这个思路。下面是我自己根据力扣大佬写的。 我会进行详细讲解并配上图辅助理解大家请往下看 #include<iostream> #inc…...

使用Python Pandas导入数据库和文件数据

大家好&#xff0c;在数据分析过程中&#xff0c;数据的导入是第一步&#xff0c;也是最重要的一步。Python的Pandas提供了强大的数据读取功能&#xff0c;支持从多种数据源导入数据&#xff0c;包括CSV、Excel、JSON、SQL数据库、网页等。Pandas库不仅能够处理常见的文件格式&…...

lef 中antenna解释

这些规则主要涉及集成电路设计中的天线效应(Antenna Effect)和通孔(Via)设计规则。 ANTENNAAREADIFFREDUCEPWL 这条规则指定了一个分段线性函数,用于根据连接到切割层的扩散区面积来计算cut_area的缩减因子。扩散区面积值应从0开始单调增加。如果没有定义此规则,PAR(mi)方程中的…...

初试Bootstrap前端框架

文章目录 一、Bootstrap概述二、Bootstrap实例1、创建网页2、编写代码3、代码说明4、浏览网页&#xff0c;查看结果5、登录按钮事件处理6、浏览网页&#xff0c;查看结果 三、实战小结 一、Bootstrap概述 大家好&#xff0c;今天我们将一起学习一个非常流行的前端框架——Boot…...

mysql数据库:超键、候选键、主键与外键

mysql数据库&#xff1a;超键、候选键、主键与外键 1、超键&#xff08;Superkey&#xff09;2、候选键&#xff08;Candidate Key&#xff09;3、主键&#xff08;Primary Key&#xff09;4、外键&#xff08;Foreign Key&#xff09; &#x1f496;The Begin&#x1f496;点点…...

音频转MP3格式困难?如何轻松实现wav转mp3?

格式多样化为我们带来了灵活性和创意的无限可能&#xff0c;但同时&#xff0c;不同格式间的转换也成为了不少用户面临的难题。尤其是当你手握珍贵的WAV音频文件&#xff0c;却希望它们能在更多设备上流畅播放或节省存储空间时&#xff0c;wav转mp3的需求便应运而生。WAV以其无…...

基于vue框架的大连盐业有限公司生产管理系统的设计与实现3hk5y(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;计划员,工艺员,生产建模,生产计划,生产信息,生产监视,工艺质量,盐政信息 开题报告内容 一、引言 随着信息技术的飞速发展和市场竞争的日益激烈&#xff0c;传统盐业企业如大连盐业有限公司正面临着转型升级的迫切需求。传统管理模式下…...

《深入理解JAVA虚拟机(第2版)》- 第13章 - 学习笔记【终章】

第13章 线程安全与锁优化 13.1 概述 面向过程的编程思想 将数据和过程独立分开&#xff0c;数据是问题空间中的客体&#xff0c;程序代码是用来处理数据的&#xff0c;这种站在计算机角度来抽象和解决问题的思维方式&#xff0c;称为面向对象的编程思想。 面向对象的编程思想…...

网络工程师学习笔记——网络互连与互联网(三)

TCP三次握手 建立TCP连接是通过三次握手实现的&#xff0c;采用三报文握手主要是为了防止已失效的连接请求报文突然又传送到了&#xff0c;因而产生错误 主动发起TCP连接建立的称为客户端 被动等待的为TCP服务器&#xff0c;二者之间需要交换三个TCP报文段 首先是客户端主动…...

【Tomcat】常见面试题整理 共34题

文章目录 1. 简述什么是Tomcat&#xff1f;2. Tomcat的缺省端口是多少&#xff0c;怎么修改&#xff1f;3. 简述Tomcat 目录结构及作用4. 简述Tomcat有几种部署方式&#xff1f;5. 简述Tomcat容器是如何创建servlet类实例&#xff1f;6. Tomcat有哪几种Connector运行模式&#…...

到时间没回家又不接电话?如何迅速确定孩子的位置?

当孩子未按时回家且无法通过电话联系时&#xff0c;家长往往会感到焦虑。此时&#xff0c;如何迅速确定孩子的位置成为许多家长迫切需要解决的问题。 利用智能手机定位技术是最常见的方法之一。大多数智能手机都内置GPS定位功能&#xff0c;通过“查找设备”应用&#xff0c;家…...

接口自动化--commons内容详解-02

上篇文章主要讲解了接口自动化主要架构框架&#xff0c;这篇文庄主要讲解commons中的内容 1. requests_utils.py 首先讲解这个工具类&#xff0c;主要是因为在接口自动化中&#xff0c;基本都有的接口都是发送请求&#xff0c;获取响应结果&#xff0c;唯一不同的是&#xff0…...

WanFangAi论文写作研究生论文写作神器在线生成真实数据,标注参考文献位置,表格公式代码流程图查重20以内,研究生论文写作技巧

WanFangAi是一个专业的学术论文辅助平台&#xff0c;它提供了一系列工具来帮助用户提升论文写作的效率和质量。以下是WanFangAi的一些核心功能:1.主题探索与文献搜索:用户可以输入关键词和研究领域&#xff0c;WanFangAi会迅速推荐合适的论文主题并提供相关的文献搜索服务。系统…...

ONNXRuntime GPU推理想用BFloat16加速?手把手教你搞定PyTorch + CUDA环境配置与避坑

ONNXRuntime GPU推理想用BFloat16加速&#xff1f;手把手教你搞定PyTorch CUDA环境配置与避坑 在深度学习模型部署领域&#xff0c;BFloat16数据类型正逐渐成为提升推理性能的新宠。这种16位浮点格式保留了与32位浮点相同的指数位&#xff0c;在保持数值范围的同时减少了内存占…...

别再乱装CUDA了!用Anaconda为你的3060 Ti一键搞定PyTorch GPU环境(含CUDA 11.3实战)

3060 Ti显卡玩家的PyTorch环境配置指南&#xff1a;用Anaconda避开CUDA版本地狱 在深度学习领域&#xff0c;GPU加速已经成为提升模型训练效率的标配。然而&#xff0c;对于许多刚入门的开发者来说&#xff0c;配置PyTorch的GPU支持往往成为第一道门槛——尤其是当涉及到CUDA版…...

HS2-HF Patch:为《Honey Select 2》注入新生命的魔法补丁

HS2-HF Patch&#xff1a;为《Honey Select 2》注入新生命的魔法补丁 【免费下载链接】HS2-HF_Patch Automatically translate, uncensor and update HoneySelect2! 项目地址: https://gitcode.com/gh_mirrors/hs/HS2-HF_Patch 你是不是曾经打开《Honey Select 2》时&am…...

用PCA给高维数据‘瘦身’:从鸢尾花数据集到人脸图像,实战对比降维效果与可视化技巧

用PCA给高维数据‘瘦身’&#xff1a;从鸢尾花数据集到人脸图像&#xff0c;实战对比降维效果与可视化技巧 当面对成百上千维的数据时&#xff0c;我们常会陷入"维度灾难"的困境——计算资源吃紧、模型训练缓慢&#xff0c;更糟的是噪声干扰导致分析结果失真。主成分…...

Ruby中文分词利器Rurima:纯Ruby实现的高性能分词引擎详解

1. 项目概述&#xff1a;一个为Ruby打造的现代中文分词引擎在Ruby社区里&#xff0c;处理中文文本一直是个有点“硌脚”的活儿。如果你做过中文搜索、内容分析或者简单的词频统计&#xff0c;肯定遇到过这个经典难题&#xff1a;怎么把一串连续的中文字符&#xff0c;准确地切割…...

告别时间混乱:一份超全的Hive日期函数使用手册与常见错误排查

告别时间混乱&#xff1a;一份超全的Hive日期函数使用手册与常见错误排查 在数据开发领域&#xff0c;时间数据处理一直是高频且易错的环节。无论是日志分析、用户行为追踪还是财务报表生成&#xff0c;准确的时间计算都是确保数据质量的基础。Hive作为大数据生态中广泛使用的数…...

AI原生编程语言Reia:为LLM设计的编程范式变革

1. 项目概述&#xff1a;Reia&#xff0c;一个面向未来的AI原生编程语言最近在AI和编程语言交叉领域&#xff0c;一个名为Reia的项目引起了我的注意。它来自Quaint-Studios&#xff0c;定位是“AI原生”的编程语言。这听起来有点抽象&#xff0c;但简单来说&#xff0c;Reia试图…...

Cursor编辑器状态快照插件开发:一键保存与恢复工作区

1. 项目概述&#xff1a;一个专为开发者设计的“后悔药”如果你是一名重度使用 Cursor 编辑器的开发者&#xff0c;那么你一定经历过这样的场景&#xff1a;在沉浸式编码时&#xff0c;为了快速定位或修改&#xff0c;你可能会频繁地使用CtrlClick跳转到函数定义&#xff0c;或…...

Arm Fast Models中VGIC架构与中断虚拟化解析

1. Arm Fast Models中的VGIC架构解析虚拟通用中断控制器(Virtual Generic Interrupt Controller, VGIC)是Armv7/v8架构虚拟化扩展的核心组件之一。在Fast Models仿真环境中&#xff0c;Iris组件通过精确建模实现了VGIC的完整功能&#xff0c;包括&#xff1a;物理中断与虚拟中断…...

基于声明式Web自动化框架Hydra的电商数据监控实战

1. 项目概述&#xff1a;一个被低估的自动化利器 如果你经常需要处理一些重复性的、基于Web界面的操作&#xff0c;比如批量下载某个网站的资源、定时填写表单、或者监控网页内容的变化&#xff0c;那么你很可能已经厌倦了手动点击和等待。传统的脚本编写&#xff0c;尤其是涉及…...