MQTT客户端实战:从连接到通信。详细说明MQTT客户端和MQTT代理进行通信

EMQX安装
EMQX服务器安装
安装文档,见链接不另外写
https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
启动 EMQX
启动为一个 systemd 服务:
sudo systemctl start emqx
在windows安装客户端
在线 MQTT WebSocket 客户端工具,MQTTX Web 是开源的 MQTT 5.0 浏览器客户端,但是经我测试没有成功,好像有bug.
建议使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX,旨在帮助开发者在不需要使用图形化界面的基础上,也能更快的开发和调试 MQTT 服务与应用。

由于是后期被写的博文,图是借官方的。请自行区分一下。
平台安装后的地址
1,平台的地址
- http://127.0.0.1:18083
后台登录 用户名:test 密码:test
Laravel中处理MQTT订阅
1,安装MQTT客户端库
在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:
composer require php-mqtt/client
2, 新建command文件
文件路径:app/Console/Commands/MqttClientCommand.php
这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。
MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT连接。
MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。
<?phpnamespace App\Console\Commands;use App\Http\Controllers\Wxapi\DeviceReportController;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Client;
use Simps\MQTT\Config\ClientConfig;
use Simps\MQTT\Hex\ReasonCode;use Swoole\Coroutine;
use Illuminate\Support\Facades\Redis;class MQTTUserConfig
{ const SIMPS_MQTT_REMOTE_HOST = '*';const SIMPS_MQTT_PORT = 1883;const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;const SIMPS_MQTT_USER = 'test*';const SIMPS_MQTT_PASSWORD = 'test*';
}class MqttClientCommand extends Command
{protected $signature = 'mqtt:handle {param1}';protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';protected $mqtt ;const SWOOLE_MQTT_CONFIG = ['open_mqtt_protocol' => true,'package_max_length' => 2 * 1024 * 1024,'connect_timeout' => 5.0,'write_timeout' => 5.0,'read_timeout' => 5.0,];//模拟设备const CLiENT_IDs = ['mqttx_devA','mqttx_devB','mqttx_devC','mqttx_devD'];public function __construct(){parent::__construct();}public function handle(){$param1 =$this->argument('param1');
// $param2 =$this->argument('param2');if ($param1=='subscribe') {$this->info('启动订阅...');$this->subscribeMqtt();} elseif ($param1=='public') {$this->info('启动发布...');$this->publishMQTT();}echo '\r\n\r\n分配工作执行完成!!!';}protected function getTestMQTT5ConnectConfig(){$config = new ClientConfig();$UserConfig = new MQTTUserConfig();return $config->setUserName($UserConfig::SIMPS_MQTT_USER)->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)->setClientId(Client::genClientID())->setKeepAlive(10)->setDelay(3000) // 3s->setMaxAttempts(5)->setProperties(['session_expiry_interval' => 60,'receive_maximum' => 65535,'topic_alias_maximum' => 65535,])->setProtocolLevel(5)->setSwooleConfig( ['open_mqtt_protocol' => true,'package_max_length' => 2 * 1024 * 1024,'connect_timeout' => 5.0,'write_timeout' => 5.0,'read_timeout' => 5.0,]);}private function heartbeat($message) {if ($message) {parse_str($message,$array);$device = $array['imei'];$hash = ':mqtt:heartbeat:online'.":{$device}";Redis::expire($hash,30); ##30s有效Redis::sAdd($hash,1);}}/** 订阅* private function subscribeMqtt(){Coroutine\run(function () {$client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());....*/private function subscribeMqtt(){Coroutine\run(function () {$UserConfig = new MQTTUserConfig();$client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,$this->getTestMQTT5ConnectConfig());$will = ['topic' => 'simps-mqtt/dinweiyi/delete','qos' => 1,'retain' => 0,'message' => 'byebye','properties' => ['will_delay_interval' => 60,'message_expiry_interval' => 60,'content_type' => 'test','payload_format_indicator' => true, // false 0 1],];$client->connect(true, $will);$topics['simps-mqtt/dinweiyi/subscribe_message'] = ['qos' => 2,'no_local' => true,'retain_as_published' => true,'retain_handling' => 2,];$res = $client->subscribe($topics);$timeSincePing = time();var_dump($res);echo '\r\n\r\n connect success !!!';while (true) {try {$buffer = $client->recv();$message = null;if ($buffer && $buffer !== true) {$message = $buffer["message"];// QoS1 PUBACKif ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {$client->send(['type' => Types::PUBACK,'message_id' => $buffer['message_id'],],false);}if ($buffer['type'] === Types::DISCONNECT) {echo sprintf("Broker is disconnected, The reason is %s [%d]\n",ReasonCode::getReasonPhrase($buffer['code']),$buffer['code']);$client->close($buffer['code']);break;}$reportObj = new DeviceReportController();$ret = $reportObj->store($message);var_dump("182>>>",$ret);unset($reportObj);}if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {$buffer = $client->ping();if ($buffer) {echo 'send ping success ...' ;$this->heartbeat($message);$timeSincePing = time();}}} catch (\Throwable $e) {throw $e;}}});}protected function getMessage() {$client_ids = ['mqttx_devA',
// 'mqttx_devB','mqttx_devC','mqttx_devD'];$message = [];$message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];$message['time'] = time();$message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];return json_encode($message);}/** 发布*/public function publishMQTT() {Coroutine\run(function () {$UserConfig = new MQTTUserConfig();$client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,$this->getTestMQTT5ConnectConfig());$client->connect();while (true) {$message = $this->getMessage();$response = $client->publish('simps-mqtt/user/subscribe_message',$message,1,0,0,['topic_alias' => 1,'message_expiry_interval' => 12,]);var_dump( 'publishMQTT>>>',$message);Coroutine::sleep(1);}});}}
3, 代码流程图
使用Mermaid语法描述的上述PHP代码的流程图:
流程说明:
- 开始:程序启动。
- 构造函数 __construct:初始化命令行工具。
- handle 方法:处理命令行输入。
- param1 参数:根据输入的参数决定是订阅还是发布。
- 调用 subscribeMqtt:如果参数是
subscribe,则调用此方法。 - 调用 publishMQTT:如果参数是
public,则调用此方法。 - Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
- 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
- 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
- 订阅主题:订阅特定的MQTT主题。
- 接收消息:持续监听并接收消息。
- 处理消息:对接收到的消息进行处理。
- 心跳函数 heartbeat:检查设备心跳。
- 存储消息:将消息存储到数据库或其他存储系统。
- 是否断开连接:检查客户端是否断开连接。
- 关闭连接:如果断开,则关闭连接。
- Coroutine 运行 publishMQTT:在协程中运行发布方法。
- 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
- 循环发布消息:循环发布消息。
- 获取测试消息:生成要发布的测试消息。
- 发布消息:将消息发布到MQTT服务器。
- 结束:程序结束。
后台常驻运行
1,php artisan命令在后台运行
- 打开您的终端或SSH到您的服务器。
- 使用nohup命令运行您的Artisan命令进行测试,如下所示
php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe
3.命令行的php的版本与web php的版本号要一致
2,使用宝塔的守护进程开启进程

也可以添加守护进程。
以上2种最好是只选一个
测试
打开emqx web ,在浏览器输入http://127.0.0.0.1:18083/#/websocket


主题:
主题跟php代码内的主题是一致的。
Payload:
是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。
已发送
会出现发布的主题和内容
检查发送的结果
打开数据库,检查device_report表是否成功。成功应下图所示:

实操完成
相关文章:
MQTT客户端实战:从连接到通信。详细说明MQTT客户端和MQTT代理进行通信
EMQX安装 EMQX服务器安装 安装文档,见链接不另外写 https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html 启动 EMQX 启动为一个 systemd 服务: sudo systemctl start emqx在windows安装客户端 在线 MQTT WebSocket 客户端工具&…...
【go/方法记录】cgo静态库编译以及使用dlv定位cgo崩溃问题
目录 说在前面文件树静态库编译cgo使用崩溃模拟使用dlv定位崩溃参考 说在前面 测试环境:WSL2go版本:go version go1.23.1 linux/amd64gcc版本:gcc (Ubuntu 11.4.0-1ubuntu1~22.04) 11.4.0cmake版本:3.22.1 文件树 ├── buffer …...
(笔记自用)位运算总结+LeetCode例题:颠倒二进制位+位1的个数
一.位运算总结: 在解题之前理解一下为什么需要位运算?它的本质是什么? 力扣上不少位运算相关的题,并且很多题也会用到位运算的技巧。这又是为什么? 位运算的由来 在计算机里面,任何数据最终都是用数字来表示的&…...
024.PL-SQL进阶—游标
课 程 推 荐我 的 个 人 主 页:👉👉 失心疯的个人主页 👈👈入 门 教 程 推 荐 :👉👉 Python零基础入门教程合集 👈👈虚 拟 环 境 搭 建 :…...
从零开始使用树莓派debian系统使用opencv4.10.0进行人脸识别(保姆级教程)
一、总体架构 本文主要是使用树莓派自带的csi摄像头,搭配上opencv4.10.0进行物体的识别。本文使用的环境是python3.7.3,环境不一样有可能安装的opencv的过程也会很不一样,但是python的环境我们可以自己自行安装。 二、树莓派系统的安装 本文…...
golang qq邮件发送验证码
验证码的使用场景 注册/登录:使用验证码可以有效减少垃圾账号注册和恶意登录;短信接口保护:高效减少防止短信接口被刷情况;提交/投票:有效减少恶意刷单、恶意提交、恶意投票等情况;密码找回:用…...
鸿蒙 OS 开发单词打卡 APP 项目实战 20240922 笔记和源码分享
配套有完整的录播课, 需要的私信. 零基础入门级别, 有点前端基础都能学会. 效果截图: 代码截图: 页面完整代码: import { AnswerStatus } from ../enums/AnswerStatus import { PracticeStatus } from ../enums/PracticeStatus import { getRandomQuestions, Question …...
力扣P1706全排列问题 很好的引入暴力 递归 回溯 dfs
代码思路是受一个洛谷题解里面大佬的启发。应该算是一个dfs和回溯的入门题目,很好的入门题目了下面我会先给我原题解思路我想可以很快了解这个思路。下面是我自己根据力扣大佬写的。 我会进行详细讲解并配上图辅助理解大家请往下看 #include<iostream> #inc…...
使用Python Pandas导入数据库和文件数据
大家好,在数据分析过程中,数据的导入是第一步,也是最重要的一步。Python的Pandas提供了强大的数据读取功能,支持从多种数据源导入数据,包括CSV、Excel、JSON、SQL数据库、网页等。Pandas库不仅能够处理常见的文件格式&…...
lef 中antenna解释
这些规则主要涉及集成电路设计中的天线效应(Antenna Effect)和通孔(Via)设计规则。 ANTENNAAREADIFFREDUCEPWL 这条规则指定了一个分段线性函数,用于根据连接到切割层的扩散区面积来计算cut_area的缩减因子。扩散区面积值应从0开始单调增加。如果没有定义此规则,PAR(mi)方程中的…...
初试Bootstrap前端框架
文章目录 一、Bootstrap概述二、Bootstrap实例1、创建网页2、编写代码3、代码说明4、浏览网页,查看结果5、登录按钮事件处理6、浏览网页,查看结果 三、实战小结 一、Bootstrap概述 大家好,今天我们将一起学习一个非常流行的前端框架——Boot…...
mysql数据库:超键、候选键、主键与外键
mysql数据库:超键、候选键、主键与外键 1、超键(Superkey)2、候选键(Candidate Key)3、主键(Primary Key)4、外键(Foreign Key) 💖The Begin💖点点…...
音频转MP3格式困难?如何轻松实现wav转mp3?
格式多样化为我们带来了灵活性和创意的无限可能,但同时,不同格式间的转换也成为了不少用户面临的难题。尤其是当你手握珍贵的WAV音频文件,却希望它们能在更多设备上流畅播放或节省存储空间时,wav转mp3的需求便应运而生。WAV以其无…...
基于vue框架的大连盐业有限公司生产管理系统的设计与实现3hk5y(程序+源码+数据库+调试部署+开发环境)系统界面在最后面。
系统程序文件列表 项目功能:计划员,工艺员,生产建模,生产计划,生产信息,生产监视,工艺质量,盐政信息 开题报告内容 一、引言 随着信息技术的飞速发展和市场竞争的日益激烈,传统盐业企业如大连盐业有限公司正面临着转型升级的迫切需求。传统管理模式下…...
《深入理解JAVA虚拟机(第2版)》- 第13章 - 学习笔记【终章】
第13章 线程安全与锁优化 13.1 概述 面向过程的编程思想 将数据和过程独立分开,数据是问题空间中的客体,程序代码是用来处理数据的,这种站在计算机角度来抽象和解决问题的思维方式,称为面向对象的编程思想。 面向对象的编程思想…...
网络工程师学习笔记——网络互连与互联网(三)
TCP三次握手 建立TCP连接是通过三次握手实现的,采用三报文握手主要是为了防止已失效的连接请求报文突然又传送到了,因而产生错误 主动发起TCP连接建立的称为客户端 被动等待的为TCP服务器,二者之间需要交换三个TCP报文段 首先是客户端主动…...
【Tomcat】常见面试题整理 共34题
文章目录 1. 简述什么是Tomcat?2. Tomcat的缺省端口是多少,怎么修改?3. 简述Tomcat 目录结构及作用4. 简述Tomcat有几种部署方式?5. 简述Tomcat容器是如何创建servlet类实例?6. Tomcat有哪几种Connector运行模式&#…...
到时间没回家又不接电话?如何迅速确定孩子的位置?
当孩子未按时回家且无法通过电话联系时,家长往往会感到焦虑。此时,如何迅速确定孩子的位置成为许多家长迫切需要解决的问题。 利用智能手机定位技术是最常见的方法之一。大多数智能手机都内置GPS定位功能,通过“查找设备”应用,家…...
接口自动化--commons内容详解-02
上篇文章主要讲解了接口自动化主要架构框架,这篇文庄主要讲解commons中的内容 1. requests_utils.py 首先讲解这个工具类,主要是因为在接口自动化中,基本都有的接口都是发送请求,获取响应结果,唯一不同的是࿰…...
WanFangAi论文写作研究生论文写作神器在线生成真实数据,标注参考文献位置,表格公式代码流程图查重20以内,研究生论文写作技巧
WanFangAi是一个专业的学术论文辅助平台,它提供了一系列工具来帮助用户提升论文写作的效率和质量。以下是WanFangAi的一些核心功能:1.主题探索与文献搜索:用户可以输入关键词和研究领域,WanFangAi会迅速推荐合适的论文主题并提供相关的文献搜索服务。系统…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
镜像里切换为普通用户
如果你登录远程虚拟机默认就是 root 用户,但你不希望用 root 权限运行 ns-3(这是对的,ns3 工具会拒绝 root),你可以按以下方法创建一个 非 root 用户账号 并切换到它运行 ns-3。 一次性解决方案:创建非 roo…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
MySQL 部分重点知识篇
一、数据库对象 1. 主键 定义 :主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 :确保数据的完整性,便于数据的查询和管理。 示例 :在学生信息表中,学号可以作为主键ÿ…...
【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验
Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...
客户案例 | 短视频点播企业海外视频加速与成本优化:MediaPackage+Cloudfront 技术重构实践
01技术背景与业务挑战 某短视频点播企业深耕国内用户市场,但其后台应用系统部署于东南亚印尼 IDC 机房。 随着业务规模扩大,传统架构已较难满足当前企业发展的需求,企业面临着三重挑战: ① 业务:国内用户访问海外服…...
