MQTT宝典
文章目录
- 1.介绍
- 2.发布和订阅
- 3.MQTT 数据包结构
- 4.Demo
- 5.EMQX
1.介绍
什么是MQTT协议
MQTT(消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上。
MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。
作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
特点
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。
MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。
MQTT 与 HTTP 一样,MQTT 运行在传输控制协议/互联网协议 (TCP/IP) 堆栈之上。
2.发布和订阅
MQTT使用的发布/订阅消息模式,它提供了一对多的消息分发机制,从而实现与应用程序的解耦。
这是一种消息传递模式,消息不是直接从发送器发送到接收器(即点对点),而是由MQTT server(或称为 MQTT Broker)分发的。

MQTT 服务器是发布-订阅架构的核心。
它可以非常简单地在Raspberry Pi或NAS等单板计算机上实现,当然也可以在大型机或 Internet 服务器上实现。
服务器分发消息,因此必须是发布者,但绝不是订阅者!
客户端可以发布消息(发送方)、订阅消息(接收方)或两者兼而有之。
客户端(也称为节点)是一种智能设备,如微控制器或具有 TCP/IP 堆栈和实现 MQTT 协议的软件的计算机。
消息在允许过滤的主题下发布。主题是分层划分的 UTF-8 字符串。不同的主题级别用斜杠/作为分隔符号。
EG:


QoS(Quality of Service levels)
服务质量是 MQTT 的一个重要特性。当我们使用 TCP/IP 时,连接已经在一定程度上受到保护。但是在无线网络中,中断和干扰很频繁,MQTT 在这里帮助避免信息丢失及其服务质量水平。这些级别在发布时使用。如果客户端发布到 MQTT 服务器,则客户端将是发送者,MQTT 服务器将是接收者。当MQTT服务器向客户端发布消息时,服务器是发送者,客户端是接收者。
- QoS 0 : 这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1

- QoS 1 : 承诺消息将至少传送一次给订阅者。

- QoS 2 : 我们保证消息仅传送到目的地一次。为此,带有唯一消息 ID 的消息会存储两次,首先来自发送者,然后是接收者。QoS 级别 2 在网络中具有最高的开销,因为在发送方和接收方之间需要两个流。

3.MQTT 数据包结构
-
固定头(Fixed header),存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识;
-
可变头(Variable header),存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容;
-
消息体(Payload),存在于部分MQTT数据包中,表示客户端收到的具体内容;
整体MQTT的消息格式如下图所示:

MQTT固定头
固定头存在于所有MQTT数据包中,其结构如下:

固定头的消息格式
- 消息类型 / message type
位置: byte 1, bits 7-4
4位的无符号值,类型如下:

- 标识位 / DUP/RET
位置: byte 1, bits 3-0。
在不使用标识位的消息类型中,标识位被作为保留位。如果收到无效的标志时,接收端必须关闭网络连接:

DUP:发布消息的副本。用来在保证消息的可靠传输,如果设置为 1,则在下面的变长中增加MessageId,并且需要回复确认,以保证消息传输完成,但不能用于检测消息重复发送。
QoS发布消息的服务质量,即:保证消息传递的次数

RETAIN:发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。
- 剩余长度
位置:byte 1
固定头的第二字节用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为 1时,表示长度不足,需要使用二个字节继续保存。例如:计算出后面的大小为0
MQTT可变头 / Variable header
MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是做为包的标识:

很多类型数据包中都包括一个2字节的数据包标识字段,这些类型的包有:
PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、
SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK
Payload消息体
Payload消息体是MQTT数据包的第三部分,CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息 有消息体:
-
CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码
-
SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。
-
SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。
-
UNSUBSCRIBE,消息体内容是要订阅的主题。
4.Demo
- DEMO1
public static void main(String[] args) {String broker = "tcp://172.168.1.122:2314";String clientId = "JavaSample";//Use the memory persistenceMemoryPersistence persistence = new MemoryPersistence();try {MqttClient sampleClient = new MqttClient(broker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true);System.out.println("broker:" + broker);sampleClient.connect(connOpts);System.out.println("Connected");String topic = "demo/topics";System.out.println("Subscribe to topic:" + topic);sampleClient.subscribe(topic);//订阅主题sampleClient.setCallback(new MqttCallback() {public void messageArrived(String topic, MqttMessage message) throws Exception {String theMsg = MessageFormat.format("{0} is arrived for topic {1}.", new String(message.getPayload()), topic);System.out.println(theMsg);}public void deliveryComplete(IMqttDeliveryToken token) {}public void connectionLost(Throwable throwable) {}});String content = "Message from MqttPublishSample";int qos = 2;System.out.println("Publishing message:" + content);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);sampleClient.publish(topic, message);//发布消息System.out.println("Message published");} catch (MqttException me) {System.out.println("reason" + me.getReasonCode());System.out.println("msg" + me.getMessage());System.out.println("loc" + me.getLocalizedMessage());System.out.println("cause" + me.getCause());System.out.println("excep" + me);me.printStackTrace();}}
5.EMQX
- 本地搭建EMQX服务
1、下载emqx压缩文件:wget https://www.emqx.com/zh/downloads/broker/5.0.3/emqx-5.0.3-el7-amd64.tar.gz
2、解压文件:mkdir -p emqx && tar -zxvf emqx-5.0.3-el7-amd64.tar.gz -C emqx
3、启动服务:./emqx/bin/emqx start

- 整合使用

配置
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency><!-- MQTT --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
# Mqtt配置
mqtt:#我是在本地搭建了emqx服务,192.168.2.31就是我本地emqx服务的地址。也可用公共测试不需要搭建emqx服务,将ip改为broker.emqx.io即可。serverURIs: tcp://192.168.111.5:1883username: #可不填写password: #可不填写qos: 2 #等级 有 0 1 2 三种clientId: mqttxxxtopic: testTopic #订阅的主题,多个时可以使用逗号分开 如:topic1,topic2,topicenabled: true #是否打开mqtt服务keepalive: 100 #心跳时间 不需要动timeout: 100 # 超时时间秒 不需要动
@Configuration
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;@Value("${mqtt.username:{null}}")private String username;@Value("${mqtt.password:{null}}")private String password;@Value("${mqtt.serverURIs:{null}}")private String hostUrl;@Value("${mqtt.clientId:{null}}")private String clientId;@Value("${mqtt.topic:{null}}")private String defaultTopic;@Value("${mqtt.qos:{null}}")private int qos;@Value("${mqtt.enabled:{null}}")private boolean enabled;@Value("${mqtt.keepalive:{null}}")private int keepalive;@Value("${mqtt.timeout:{null}}")private int timeout;//订阅主体@Beanpublic MqttPushClient getMqttPushClient() {if(enabled == true){String mqtt_topic[] = defaultTopic.split(",");mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//连接for(int i=0; i<mqtt_topic.length; i++){mqttPushClient.subscribe(mqtt_topic[i], 0);//订阅主题}}return mqttPushClient;}//发送消息到对应主题@Beanpublic MqttPushClient pushMessage() {mqttPushClient.publish(0,true,"wsy","呵呵哈哈哈 你好呀");return mqttPushClient;}}
@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host ip+端口* @param clientID 客户端Id* @param username 用户名* @param password 密码* @param timeout 超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(pushCallback);client.connect(options);} catch (Exception e) {e.printStackTrace();}} catch (Exception e) {e.printStackTrace();}}/*** 发布** @param qos 连接方式* @param retained 是否保留* @param topic 主题* @param pushMessage 消息体*/public boolean publish(int qos, boolean retained, String topic, String pushMessage) {MqttMessage message = new MqttMessage();message.setQos(qos);message.setRetained(retained);message.setPayload(pushMessage.getBytes());MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);if (null == mTopic) {logger.error("topic not exist");}MqttDeliveryToken token;try {token = mTopic.publish(message);token.waitForCompletion();return true;} catch (MqttPersistenceException e) {e.printStackTrace();return false;} catch (MqttException e) {e.printStackTrace();return false;}}/*** 订阅某个主题** @param topic 主题* @param qos 连接方式*/public void subscribe(String topic, int qos) {logger.info("开始订阅主题" + topic);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (MqttException e) {e.printStackTrace();}}}
@Component
public class PushCallback implements MqttCallback {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;private static String _topic;private static String _qos;private static String _msg;@Overridepublic void connectionLost(Throwable throwable) {// 连接丢失后,一般在这里面进行重连logger.info("连接断开,可以做重连");if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {// subscribe后得到的消息会执行到这里面logger.info("接收消息主题 : " + topic);logger.info("接收消息Qos : " + mqttMessage.getQos());logger.info("接收消息内容 : " + new String(mqttMessage.getPayload()));_topic = topic;_qos = mqttMessage.getQos()+"";_msg = new String(mqttMessage.getPayload());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}//别的Controller层会调用这个方法来 获取 接收到的硬件数据public String receive() {JSONObject jsonObject = new JSONObject();jsonObject.put("topic", _topic);jsonObject.put("qos", _qos);jsonObject.put("msg", _msg);return jsonObject.toString();}
}
注意:
clientId不能重复,若存在重复的 clientId 连接,会导致争抢而连接不上。(就是存在两个客户端 clientId 相同,两个在打架,一直争同一连接,导致一直重连,重复发布消息)
发布:



订阅:



参考资料
https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html
相关文章:
MQTT宝典
文章目录 1.介绍2.发布和订阅3.MQTT 数据包结构4.Demo5.EMQX 1.介绍 什么是MQTT协议 MQTT(消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协…...
【前端】CSS水平居中的6种方法
文章目录 flex绝对定位margin:auto绝对定位margin:负值定位transformtext-align: center;margin: 0 auto;思维导图 后文:【前端】CSS垂直居中的7种方法_karshey的博客-CSDN博客 左右两边间隔相等的居中 flex display: flex;justify-content: center; <div clas…...
nginx如何获取真实的ip
我这里使用是springboot项目,使用nginx做代理,但header里面的参数没有将ip带过来,所有需要配置nginx将ip带过来。 nginx.conf文件 server {listen 80;listen 443 ssl;server_name xxx.xxx.com;ssl_certificate /web/project/ai…...
kotlin + LiveData 测试
viewModel测试:https://developer.android.com/codelabs/basic-android-kotlin-compose-test-viewmodel#3 androidTestImplementation "org.jetbrains.kotlin:kotlin-test:1.9.0"androidTestImplementation org.jetbrains.kotlinx:kotlinx-coroutines-tes…...
【dnf5文档】新一代RedHat自动化包管理器
前言 HI,CSDN的码友们,距离上一次我发文章已经过去了半年的时间,现在我又来介绍自己新发现和探究的开源技术了。计算机的发展总是飞速的,当我在写这篇文章的时候,Fedora rawhide已经进入了40版本、默认采用的自动化包管理器为dnf…...
数据可视化工具的三大类报表制作流程分享
电脑(pc)、移动、大屏三大类型的BI数据可视化报表制作步骤基本相同,差别就在于尺寸调整和具体的报表布局。这对于采用点击、拖拉拽方式来制作报表的奥威BI数据可视化工具来说就显得特别简单。接下来,我们就一起看看不这三大类型的…...
lua使用心得
lua语言的一些注意事项 在控制结构的条件中除了false和nil为假,其他值都为真。所以Lua认为0和空串都是真。lua5.3之前的版本只支持浮点数,lua5.3才引入了对整数的支持,/仅支持浮点数除法,要实现C里的整除效果必须使用双斜杠//超过…...
Docker升级后,出现Error response from daemon: Unknown runtime specified docker-runc
现象:docker升级版本后,重启docker服务出现: [rootDocker scripts]# docker start registry Error response from daemon: Unknown runtime specified docker-runc Error: failed to start containers: registry解决办法: 改完之…...
[Poetize6] IncDec Sequence
题目描述 给定一个长度为 n 的数列 a_1,a_2,...,a_n,每次可以选择一个区间[l,r],使这个区间内的数都加 1 或者都减 1。 请问至少需要多少次操作才能使数列中的所有数都一样,并求出在保证最少次数的前提下,最终得到的数列有多…...
通过Microsoft Loopback Adapter实现虚拟机和物理机的通信
问题 问:不借助路由器或交换机的情况下,能不能实现主机和虚拟及之间两个软件的通信呢?要求主机和虚拟及均有独立的ip地址,从而进行指定源的组播通信。 答:可以。通过借助虚拟网络适配器,不需要路由器或交…...
算法leetcode|70. 爬楼梯(rust重拳出击)
文章目录 70. 爬楼梯:样例 1:样例 2:提示: 分析:题解:rust:go:c:python:java: 70. 爬楼梯: 假设你正在爬楼梯。需要 n 阶你才能到达楼…...
基于epoll的TCP服务器端(C++)
网络编程——C实现socket通信(TCP)高并发之epoll模式_tcp通信c 多客户端epoll_n大橘为重n的博客-CSDN博客 网络编程——C实现socket通信(TCP)高并发之select模式_n大橘为重n的博客-CSDN博客 server.cpp #include <stdio.h> #include <sys/types.h> #include <…...
实时安全分析监控加强网络安全
网络犯罪分子只需几分钟,有时甚至几秒钟即可泄露敏感数据。但是,IT 团队可能无法在数周内发现这些违规行为。通常,这些违规行为是由外部方或客户发现的,到那时为时已晚。随着网络漏洞的激增,对安全分析的需求空前高涨。…...
基于ipad协议的gewe框架进行微信群组管理(二)
友情链接 geweapi.com 点击访问即可。 获取群组详情 小提示: 该接口可以一次查询20个群组查询出来的信息是不带公告的 请求URL: http://域名地址/api/group/detail 请求方式: POST 请求头: Content-Type:applica…...
大数据-玩转数据-Flink网页埋点PV统计
一、说明 衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录1次PV,多次打开同一页面则浏览量累计。 一般来说,PV与来访者的数量成正比,但是PV并不…...
什么是伪类选择器?
聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 伪类选择器⭐ 一些常见的伪类选择器示例::hover:active:focus:nth-child(n):first-child 和 :last-child ⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何…...
PLY模型格式详解【3D】
本文介绍PLY 多边形文件格式,这是一种用于存储被描述为多边形集合的图形对象。 PLY文件格式的目标是提供一种简单且易于实现但通用的格式足以适用于各种模型。 PLY有两种子格式:易于入门的 ASCII 表示形式和用于紧凑存储和快速保存和加载的二进制格式。 …...
Java的反射机制、Lambda表达式和枚举
Java的反射机制、Lambda表达式和枚举 文章目录 Java的反射机制、Lambda表达式和枚举1.反射机制反射的概念、用途、优缺点反射相关的类及使用(重要!!)相关类Class类:代表类实体,表示类和接口Field类…...
数据结构:堆的实现
1.堆的概念 如果有一个关键码的集合 K { k1 ,k2 ,k3 ,…,kn },把它的所有元素按完全二叉树的顺序存储方式存储在一个一维数组中,并且 k(i) < k(i*21) 和 k(i) < k(i*22), i 0 ÿ…...
zabbix-6.4 监控 MySQL
目录 1、rpm安装zabbix_agentd服务 2、编写zabbix_agentd.conf文件 3、编写模板文件 4、创建mysql用户并赋权限 5、创建.my.cnf文件 6、将规则添加到SELinux策略中 注意: 若模板无法读取.my.cnf 信息,从而导致监控报错,可以尝试修改模…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join
纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...
RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill
视觉语言模型(Vision-Language Models, VLMs),为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展,机器人仍难以胜任复杂的长时程任务(如家具装配),主要受限于人…...
GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障
关键领域软件测试的"安全密码":Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力,从金融交易到交通管控,这些关乎国计民生的关键领域…...
Kafka主题运维全指南:从基础配置到故障处理
#作者:张桐瑞 文章目录 主题日常管理1. 修改主题分区。2. 修改主题级别参数。3. 变更副本数。4. 修改主题限速。5.主题分区迁移。6. 常见主题错误处理常见错误1:主题删除失败。常见错误2:__consumer_offsets占用太多的磁盘。 主题日常管理 …...
MyBatis中关于缓存的理解
MyBatis缓存 MyBatis系统当中默认定义两级缓存:一级缓存、二级缓存 默认情况下,只有一级缓存开启(sqlSession级别的缓存)二级缓存需要手动开启配置,需要局域namespace级别的缓存 一级缓存(本地缓存&#…...
【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道
文/法律实务观察组 在债务重组领域,专业机构的核心价值不仅在于减轻债务数字,更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明,合法债务优化需同步实现三重平衡: 法律刚性(债…...
Python训练营-Day26-函数专题1:函数定义与参数
题目1:计算圆的面积 任务: 编写一个名为 calculate_circle_area 的函数,该函数接收圆的半径 radius 作为参数,并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求:函数接收一个位置参数 radi…...
