MQTT服务搭建及python使用示例
1、MQTT协议
1.1、MQTT介绍
MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网设备之间的通讯。它具有低带宽、低功耗和开放性等特点,适合在网络带宽有限或者网络连接不稳定的环境下使用。MQTT协议使用TCP/IP协议栈进行通讯,支持多种编程语言和平台,并且能够提供可靠的消息传递机制。
在MQTT中,设备可以发布消息到特定的主题(topic),同时其他设备可以订阅这些主题以接收相应的消息。这种发布/订阅模式使得设备之间的通讯更加灵活和高效。MQTT还支持三种服务质量等级:至多一次(at most once)、至少一次(at least once)和恰好一次(exactly once),以满足不同场景下对消息传递可靠性的需求。
优点:代码量少,开销低,带宽占用小,即时通讯协议。
1.2、MQTT概念
订阅(Subscribtion):订阅包含主题筛选器( Topic Filter )和最大服务质量( QoS )。订阅会与一个会话( Session )关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。
会话(Session):每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。
主题名(Topic Name):连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。需要注意的是, MQTT 中消息主题按照层级命名,使用 ‘/’ 进行分割。此外,主题中可以使用通配符进行多个主题或多层级的订阅,有两种常见的通配符:
单层通配符 + :单层通配符只能匹配一层的主题,例如: China/Beijing/+ ,可以匹配的只有 Beijing 这个主 题下面一层的主题,例如 Xicheng, DongCheng, Xuanwu 等等。
多层通配符 # :顾名思义,多层通配符就是可以匹配多个层级的主题,例如: China/# ,可以匹配到的主题可能有:China/Beijing/Dongcheng, China/Shanghai/PuDong ,等等。
主题筛选器(Topic Filter):一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。
消息订阅:消息订阅者所具体接收的内容。
1.3、MQTT中的角色
Publisher 和 Subscriber 为客户端,Broker 为服务器端,消息主题为消息类型,Broker 根据 Topic 过滤消息,并将消息向客户端推送。

MQTT 中用 QoS 表示服务质量, MQTT 协议中有三种服务质量 (QoS) :
1 ) QoS =0 ,至多一次,可能会出现丢包的情况,使用在对实时性要求不高的情况,例如,将此服务质量与通信环境传感器数据一起使用。 对于是否丢失个别读取或是否稍后立即发布新的读取并不重要。
2 ) QoS =1, 至少一次,保证包会到达目的地,但是可能出现重包。
3 ) QoS =2, 刚好一次,保证包会到达目的地,且不会出现重包的现象。
客户端:
Publisher 和 Subscriber 都属于客户端。
发布应用消息给其它相关的客户端。
订阅以请求接受相关的应用消息。
取消订阅以移除接受应用消息的请求。
从服务端断开连接
服务器:
服务器端即所谓的 MQTT Broker 服务器。
接受来自客户端的网络连接。
接受客户端发布的应用消息。
处理客户端的订阅和取消订阅请求。
转发应用消息给符合条件的已订阅客户端。
MQTT 提供的公共服务器端( Broker )有:
test.mosquitto.orgbroker.hivemq.comiot.eclipse.org
2、基于公共服务示例
在次,选择使用EMQX提供的免费MQTT公共服务器,但同样可以选择其他任何MQTT broker。The Free Global Public MQTT Broker | Try Now | EMQ (emqx.com)
Broker: broker.emqx.io
TCP Port: 1883
Websocket Port: 8083
python库:pip install paho-mqtt=="1.6.1"
消息发布代码,pub.py
import time
import random
from paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'def connect_mqtt():def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef publish(client):msg_count = 0while True:time.sleep(1)msg = f"messages: {msg_count}"result = client.publish(topic, msg)# result: [0, 1]status = result[0]if status == 0:print(f"Send `{msg}` to topic `{topic}`")else:print(f"Failed to send message to topic {topic}")msg_count += 1def run():client = connect_mqtt()client.loop_start()publish(client)if __name__ == '__main__':run()
运行打印结果:

消息订阅代码,sub.py:
import random
from paho.mqtt import client as mqtt_clientbroker = 'broker.emqx.io'
port = 1883
topic = "/flask/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'def connect_mqtt() -> mqtt_client:def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.connect(broker, port)return clientdef subscribe(client: mqtt_client):def on_message(client, userdata, msg):print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")client.subscribe(topic)client.on_message = on_messagedef run():client = connect_mqtt()subscribe(client)client.loop_forever()if __name__ == '__main__':run()
运行打印结果:

可以看到,发布和订阅成功。注意:每个客户端的ID唯一,不能重复!
3、基于Apollo服务示例
3.1、安装Apollo服务
服务器端搭建:
Index of /dist/activemq/activemq-apollo/1.7.1
解压并打开目标如下:
Apollo中间件其实是免安装的,我们只需要下载apache-apollo-1.7.1-windows-distro.zip,然后解压到某个文件夹就可以了。在这里我解压到D:\dist\apache-apollo-1.7.1。在apache-apollo-1.7.1/bin目录下打开终端执行命令:apollo create myapollo
若出现如下错误 Loading configuration file ‘D:\phpStudy\apache-apollo-1.7.1\bin\mybroker\etc\apollo.xml’.Startup failed: java.lang.NoClassDefFoundError: javax/xml/bind/ValidationEventHandler,处理方式:换上jdk1.8版本即可。
然后在生成的myapollo目录的bin目录下执行:pollo-broker.cmd run,结果如下:

服务搭建成功,进入后台管理,打开网页,输入ip + : 61680 进入后台管理 ,默认用户名admin 密码 password,例如:http://127.0.0.1:61680


3.2、示例代码
本地服务基础信息:
host="127.0.0.1"
port = 61613
用户名:admin
密码:password
python库:pip install paho-mqtt=="1.6.1"
发布主题,publish.py
import sys
import time
import paho.mqtt.client as mqttdef on_connect(client, userdata, flags, rc):print("Connected with result code " + str(rc))def on_subscribe(client, userdata, mid, granted_qos):print("消息发送成功")client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_subscribe = on_subscribe
client.connect(host="127.0.0.1", port=61613, keepalive=60) # 订阅频道
time.sleep(1)i = 0
while True:try:# 发布MQTT信息sensor_data = "test" + str(i)client.publish(topic="public", payload=sensor_data, qos=0)time.sleep(5)i += 1except KeyboardInterrupt:print("EXIT")client.disconnect()sys.exit(0)
订阅主题,subscribe.py
import time
import paho.mqtt.client as mqtt# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):if rc == 0:print("连接成功")print("Connected with result code " + str(rc))def on_message(client, userdata, msg):print(msg.topic + " " + str(msg.payload))client = mqtt.Client(protocol=3)
client.username_pw_set("admin", "password")
client.on_connect = on_connect
client.on_message = on_message
client.connect(host="127.0.0.1", port=61613, keepalive=60) # 订阅频道
time.sleep(1)# client.subscribe("public")
client.subscribe([("public", 0), ("test", 2)]) # 订阅
client.loop_forever()
执行脚本后,正确大小消息。浏览器可见连接记录。

4、EMQX客户端工具
下载地址:MQTTX 下载选择适合您的平台,立即开始使用 MQTTX。
https://mqttx.app/zh/downloads。

4.1、公共服务测试消息接收
创建连接:
Host:为代码中定义好的 broker.emqx.io
Port:为代码中定义好的 1883
用户名、密码根据需要添加

添加订阅:
主题为:/flask/mqtt

在MQTTX中发布消息:

测试成功。
4.2、自建服务测试消息接收

MQTT版本选择3.1.1,以下参数,与服务保持一致:
host="127.0.0.1"
port = 61613
用户名:admin
密码:password
其它测试步骤,一样。(创建主题,测试)

5、EMQX代理服务器
windows下搭建MQTT代理服务,与Apollo服务功能一样,更方便好用,下载地址:https://www.emqx.io/zh/downloads
下载后解压,进入目录bin文件下,执行:emqx start

打开浏览器输入 http://localhost:18083 进入EMQ的web控制台,输入用户名:admin 密码:public 进行登录。登录进入后,界面如下:

至此,代理服务器已经创建完成!客户端就可以连接代理服务器进行消息的分发和订阅!


附录:
java1.8.1下载,地址:Java Downloads | Oracle
https://www.oracle.com/java/technologies/downloads/#java8-windows

参考:
1、嵌入式QT- QT使用MQTT
嵌入式QT- QT使用MQTT_qt mqtt-CSDN博客
2、Python实现通信协议(mqtt)5星,mqtt flask
【小沐学Python】Python实现通信协议(mqtt)_python mqtt-CSDN博客
相关文章:
MQTT服务搭建及python使用示例
1、MQTT协议 1.1、MQTT介绍 MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的通信协议,通常用于物联网设备之间的通讯。它具有低带宽、低功耗和开放性等特点,适合在网络带宽有限或者网络连接不稳定…...
Ubuntu如何设置中文输入法
概述 Ubuntu 是一个基于 Debian 构建的开源操作系统,拥有广泛的用户群体和强大的社区支持。是免费、开源的操作系统。被设计为一个适用于个人电脑、服务器和云平台的通用操作系统。Ubuntu的目标是提供一个稳定、易于使用和免费的操作系统,以促进人们在计…...
PostgreSQL的pg_dump和 pg_dumpall 异同点
PostgreSQL的pg_dump和 pg_dumpall 异同点 基础信息 OS版本:Red Hat Enterprise Linux Server release 7.9 (Maipo) DB版本:16.2 pg软件目录:/home/pg16/soft pg数据目录:/home/pg16/data 端口:5777pg_dump 和 pg_dum…...
【Ping】Windows 网络延迟测试 ping 、telnet、tcping 工具
ping 命令 属于网络层的ICMP协议,只能检查 IP 的连通性或网络连接速度, 无法检测IP的端口状态。 telnet telnet命令,属于应用层的协议,用于远程登录,也可用于检测IP的端口状态。但是功能有限,只能检测一时…...
DuDuTalk:4G桌面拾音设备在银行网点服务场景的应用价值
随着科技的飞速发展,银行业也在不断地寻求创新以提高服务质量和效率。在这个过程中,4G桌面拾音设备作为一种新型的智能设备,其在银行网点服务场景中的应用价值逐渐凸显出来。本文将从多个角度探讨4G桌面拾音设备在银行网点服务场景的应用价值…...
QT 设置窗口不透明度
在窗口作为子窗口时,setWindowOpacity设置窗口的不透明度可能会失效。 QGraphicsOpacityEffect *opacityEffect new QGraphicsOpacityEffect(this); opacityEffect->setOpacity(1.0); this->setGraphicsEffect(opacityEffect);// 创建属性动画对象ÿ…...
如何在Python中实现文本相似度比较?
在Python中实现文本相似度比较可以通过多种方法,每种方法都有其适用场景和优缺点。以下是一些常见的文本相似度比较方法: 1. 余弦相似度(Cosine Similarity) 余弦相似度是通过计算两个向量之间夹角的余弦值来确定它们之间的相似…...
韩顺平0基础学Java——第7天
p110-p154 控制结构(第四章) 多分支 if-elseif-else import java.util.Scanner; public class day7{public static void main(String[] args) {Scanner myscanner new Scanner(System.in);System.out.println("input your score?");int s…...
性能远超GPT-4!谷歌发布Med-Gemini医疗模型;李飞飞首次创业瞄准空间智能;疫苗巨头联合OpenAl助力AI医疗...
AI for Science 企业动态速览—— * 谷歌 Med-Gemini 医疗 AI 模型性能远超 GPT-4 * 斯坦福李飞飞首次创业瞄准「空间智能」 * 疫苗巨头 Moderna 与 OpenAl 达成合作 * 美国能源部推动 AI 在清洁能源领域的应用 * 美年健康荣获「2024福布斯中国人工智能创新场景应用企业TOP10」…...
中国科技大航海时代,“掘金”一带一路
文|白 鸽 编|王一粟 “这不就是90年代的内地吗?” 在深度考察完沙特市场后,华盛集团联合创始人兼CEO张霆对镜相工作室感慨道。 在张霆看来,沙特落后的基建(意味着大量创新空间)、刚刚开放…...
ffmpeg7.0 flv支持hdr
ffmpeg7.0 flv支持hdr 自从ffmpeg6.0应用enhance rtmp支持h265/av1的flv格式后,7.0迎来了flv的hdr能力。本文介绍ffmpeg7.0如何支持hdr in flv。 如果对enhance rtmp如何支持h265不了解,推荐详解Enhanced-RTMP支持H.265 1. enhance rtmp关于hdr 文档…...
【教程】极简Python接入免费语音识别API
转载请注明出处:小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你,请不吝给个[点赞、收藏、关注]哦~ 安装库: pip install SpeechRecognition 使用方法: import speech_recognition as srr sr.Recognizer() harvard sr…...
详解typora配置亚马逊云科技Amazon S3图床
欢迎免费试用亚马逊云科技产品:https://mic.anruicloud.com/url/1333 当前有很多不同的博客社区,不同的博客社区使用的编辑器也不尽相同,大概可以分为两种,一种是markdown格式,另外一种是富文本格式。例如华为云开发者…...
Python sqlite3库 实现 数据库基础及应用 输入地点,可输出该地点的爱国主义教育基地名称和批次的查询结果。
目录 【第11次课】实验十数据库基础及应用1-查询 要求: 提示: 运行结果: 【第11次课】实验十数据库基础及应用1-查询 声明:著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 1.简答题 数据库文件Edu_Base.db&#…...
iOS-SSL固定证书
文章目录 1. SSL简介2. 证书锁定原理1.1 证书锁定1.2 公钥锁定1.3 客户端获取公钥1.4 客户端使用SSL锁定选择1.5 项目支持SSL证书锁定1.6 问题记录1. SSL简介 证书锁定(SSL/TLS Pinning)顾名思义,将服务器提供的SSL/TLS证书内置到移动端开发的APP客户端中,当客户端发起请求…...
docker 开启 tcp 端口
前言:查了很多网上资料 都说要修改daemons,json 完全不管用,而且还导致添加 {“host”:["tcp://0.0.0.0:2375","unix:///var/lib/docker.sock"]} 后,docker restart 失败,浪费了不少时间 !&am…...
zookeeper之分布式环境搭建
ZooKeeper的分布式环境搭建是一个涉及多个步骤的过程,主要包括准备工作、安装ZooKeeper、配置集群、启动服务以及验证集群状态。以下是搭建ZooKeeper分布式环境的基本步骤: 1. 准备工作 确保所有节点的系统时间同步。确保所有节点之间网络互通…...
java设计模式三
工厂模式是一种创建型设计模式,它提供了一个创建对象的接口,但允许子类决定实例化哪一个类。工厂模式有几种变体,包括简单工厂模式、工厂方法模式和抽象工厂模式。下面通过一个简化的案例和对Java标准库中使用工厂模式的源码分析来说明这一模…...
##12 深入了解正则化与超参数调优:提升神经网络性能的关键策略
文章目录 前言1. 正则化技术的重要性1.1 L1和L2正则化1.2 Dropout1.3 批量归一化 2. 超参数调优技术2.1 网格搜索2.2 随机搜索2.3 贝叶斯优化 3. 实践案例3.1 设置实验3.2 训练和测试 4. 结论 前言 在深度学习中,构建一个高性能的模型不仅需要一个好的架构…...
TODESK怎么查看有人在远程访问
odesk怎么查看有人在远程访问 Todesk作为一款远程桌面控制软件,为用户提供了便捷的远程访问与控制功能。但在享受这种便利的同时,许多用户也关心如何确保自己设备的安全,特别是如何知道是否有人在未经授权的情况下远程访问自己的电脑。本文将…...
CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
算法:模拟
1.替换所有的问号 1576. 替换所有的问号 - 力扣(LeetCode) 遍历字符串:通过外层循环逐一检查每个字符。遇到 ? 时处理: 内层循环遍历小写字母(a 到 z)。对每个字母检查是否满足: 与…...
Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...
使用Spring AI和MCP协议构建图片搜索服务
目录 使用Spring AI和MCP协议构建图片搜索服务 引言 技术栈概览 项目架构设计 架构图 服务端开发 1. 创建Spring Boot项目 2. 实现图片搜索工具 3. 配置传输模式 Stdio模式(本地调用) SSE模式(远程调用) 4. 注册工具提…...
探索Selenium:自动化测试的神奇钥匙
目录 一、Selenium 是什么1.1 定义与概念1.2 发展历程1.3 功能概述 二、Selenium 工作原理剖析2.1 架构组成2.2 工作流程2.3 通信机制 三、Selenium 的优势3.1 跨浏览器与平台支持3.2 丰富的语言支持3.3 强大的社区支持 四、Selenium 的应用场景4.1 Web 应用自动化测试4.2 数据…...
Spring Security 认证流程——补充
一、认证流程概述 Spring Security 的认证流程基于 过滤器链(Filter Chain),核心组件包括 UsernamePasswordAuthenticationFilter、AuthenticationManager、UserDetailsService 等。整个流程可分为以下步骤: 用户提交登录请求拦…...
人工智能 - 在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型
在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型。这些平台各有侧重,适用场景差异显著。下面我将从核心功能定位、典型应用场景、真实体验痛点、选型决策关键点进行拆解,并提供具体场景下的推荐方案。 一、核心功能定位速览 平台核心定位技术栈亮…...
HTML中各种标签的作用
一、HTML文件主要标签结构及说明 1. <!DOCTYPE html> 作用:声明文档类型,告知浏览器这是 HTML5 文档。 必须:是。 2. <html lang“zh”>. </html> 作用:包裹整个网页内容,lang"z…...
Git 命令全流程总结
以下是从初始化到版本控制、查看记录、撤回操作的 Git 命令全流程总结,按操作场景分类整理: 一、初始化与基础操作 操作命令初始化仓库git init添加所有文件到暂存区git add .提交到本地仓库git commit -m "提交描述"首次提交需配置身份git c…...
