python实现MQTT协议(发布者,订阅者,topic)
python实现MQTT协议
一、简介
1.1 概述
本文章针对物联网MQTT协议完成python实现
1.2 环境
- Apache-apollo创建broker
- Python实现发布者和订阅者
1.3 内容
-
MQTT协议架构说明 :
-
利用仿真服务体会 MQTT协议
-
针对MQTT协议进行测试
任务1:MQTT协议应用场景
说明: MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、 易于实现。这些特点使它适用于受限环境。该协议的特点有:
1 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2 对负载内容屏蔽的消息传输。
3 使用 TCP/IP 提供网络连接。
物联网应用场景:

协议角色分工:
客户端分为2种角色:发布者(Publisher)和订阅者(Subscriber)。
每一个发布者(Publisher)可以发送不同类型的消息,我们把消息的类型叫做主题(topic),
MQTT通信中的消息都属于某一个主题 ,而只有订阅了这个主题的订阅者(Subscriber)才能收到属于这个主题的消息。
发布者和订阅者不需要在意和知道对方的存在(不需要知道对方的IP和端口),也不需要直接与对方建立连接。因为通信中存在着一个叫代理 (MQTT broker)的第三种角色,也可以叫MQTT服务器(MQTT server)。
发布者、订阅者只需要知道MQTT服务器的IP和端口即可,并和它直接建立连接通信。MQTT代理作为消息的 中转,它过滤所有接受到的消息,并按照一定的机制(MQTT标准规定是基于主题的消息过滤派发方式,而具 体的MQTT服务器软件也提供了其他的派发方式)分发它们,使得所有注册到MQTT代理的订阅者只接收到他 们订阅了的消息,而不会收到他不关心的消息。
当发布者发布一条消息的时候,他必须同时指定消息的主题和消息的负载。MQTT代理在收到发布者发过来的 消息时,无需访问消息负载,他只是访问消息的主题信息,然后根据这主题派发给订阅者。需要注意的是,一个客户端可以同时既当发布者又当订阅者。比如一个开发板连接了一盏LED灯,它可以发布灯的暗/亮状态 信息,也可以从其他节点订阅对灯的控制消息。
3.生产者(发布消息)和消费者(消耗消息-订阅者)模式理解
生产者:wifi设备采集各种物联网传感器比如温度重力传感器
消费者:客户端比如手机
原理如下:

任务2:搭建Mqtt协议服务 (broker)
前提:安装JDK和JAVA_HOME环境变量
1 下载Apollo服务器 地址 http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/
2 进入bin目录命令行 输入:
D:\softwares\apache-apollo-1.7.1\bin\apollo.cmd create jwbroker
3:broker\etc\apollo.xml文件下是配置服务器信息的文件
初始默认帐号是admin,密码password;
4:启动命令行: (以一个实例为单位进行创建的)
进入... jwbroker创建实例的\bin\ 目录,
在CMD输入命令apollo-broker.cmd run,可以使用TAB键自动补全,运行后输出信息如下:验证:
MQTT服务器TCP连接端口: tcp://0.0.0.0:61613
后台web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/
出现如下图表示启动成功

安装mqtt需要的包:
pip install paho-mqtt
发布者publish创建:
import time
from paho.mqtt import publish
#源码中只需要知道 ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))client.subscribe("jw-temperature") # 发布主题def on_message(client,userdata,msg):print(msg.topc + "消息发送!" + msg.payload.decode("utf-8"))if __name__ == '__main__':print("消息发布!----我是一个发布者:正在给设备和传感器发布主题-----")client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))for i in range(20):time.sleep(2)publish.single("lightChange","现在天黑了", qos = 1, hostname = HOST, port = PORT, client_id = client_id,auth = {'username': "admin", 'password': "password"})print("已发送"+str(i+1)+"条消息")
订阅者light_subcribe创建:
import paho.mqtt.client as mqtt
import time#源码中只需要知道 ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].
'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic+ msg.payload.decode("utf-8") + ",回调消息:收到收到!我已经接收到发布者的消息,并且打开了光传感器" )def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手电筒打开----我是一个订阅者:需要消费主题-----")client_loop()
订阅者phone_subcribe创建:
import paho.mqtt.client as mqtt
import time#源码中只需要知道 ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613
'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic + msg.payload.decode("utf-8")+ ",回调消息:收到收到!我已经接收到发布者的消息并给用户反馈手电筒已经打开")def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手机启动----我是一个订阅者:需要消费主题-----")client_loop()
演示:分别运行publish和light_subcribe和phone_subcribe
publish:
light_subcribe:
phone_subcribe:

相关文章:
python实现MQTT协议(发布者,订阅者,topic)
python实现MQTT协议 一、简介 1.1 概述 本文章针对物联网MQTT协议完成python实现 1.2 环境 Apache-apollo创建brokerPython实现发布者和订阅者 1.3 内容 MQTT协议架构说明 : 利用仿真服务体会 MQTT协议 针对MQTT协议进行测试 任务1:MQTT协议应…...
2023年09月03日-----16:58
协同过滤推荐和矩阵分解本质上有什么不同?协同过滤推荐和矩阵分解是两种推荐系统方法,它们在某些方面有相似之处,但也有一些本质不同之处。 基本原理: 协同过滤推荐:协同过滤是一种基于用户行为数据的推荐方法,它依赖于用户-物品交互数据,如用户的评分或点击历史。协同过…...
HTTP状态码504(Gateway Timeout)报错原因分析和解决办法
文章目录 504报错原因分析一、用户角度1. 代理服务器问题2. 网络问题 二、网站管理员角度1. 服务器负载过重2. 网关配置问题3. 目标服务器响应慢4. IIS/nginx/apache服务关闭5. 维护或故障6. 数据库的慢处理也会导致504 用户角度可以采取哪些措施解决504错误1. 刷新页面2. 检查…...
《凤凰架构》第三章——事务处理
前言 由于一些地方原文感觉不太清楚,有些地方用小林coding的文章代替。 总结 事务处理主要的目的就是要让数据在各种条件下,最终的运行结果都能符合你的期望。要达成这个目标有三点需要满足:原子性(业务要么同时成功࿰…...
音视频添 加水印
一、文字水印 在视频中增加文字水印需要准备的条件比较多,需要有文字字库处理的相关文件,在编译FFmpeg时需要支持FreeType、FontConfig、iconv,系统中需要有相关的字库,在FFmpeg中增加纯字母水印可以使用drawtext滤镜进行支持&am…...
使用Python的requests库与chatGPT进行通信
前言 在人工智能领域,自然语言处理模型如OpenAI GPT-3.5 Turbo具有广泛的应用。虽然官方提供了Python库来与这些模型进行交互,但也有一些人更喜欢使用requests库来自定义请求和处理响应,比如现在很多第三方LLM都提供了与chatGPT类似的http请…...
SASS常用内置函数
1,math 引入:use "sass:math"; 常用函数: ceil($number) - 向上取整floor($number) - 向下取整round($number) - 四舍五入max($number...) - 比较若干数值并取最大值min($number...) - 比较若干数值并取最小值random() - 取0~1之…...
2023年05月 C/C++(四级)真题解析#中国电子学会#全国青少年软件编程等级考试
C/C++编程(1~8级)全部真题・点这里 第1题:怪盗基德的滑翔翼 怪盗基德是一个充满传奇色彩的怪盗,专门以珠宝为目标的超级盗窃犯。而他最为突出的地方,就是他每次都能逃脱中村警部的重重围堵,而这也很大程度上是多亏了他随身携带的便于操作的滑翔翼。 有一天,怪盗基德像往…...
Emmet 使用笔记小结
Emmet 使用笔记小结 最近在跟视频走 CSS 的教程,然后要写很多的 HTML 结构,就想着总结一下 Emmet 的语法。 Emmet 是一个工具可以用来加速 HTML 和 CSS 的开发过程,不过 emmet 只支持 HTML & XML 文件结构,所以我个人觉得对…...
如何使用Puppeteer进行新闻网站数据抓取和聚合
导语 Puppeteer是一个基于Node.js的库,它提供了一个高级的API来控制Chrome或Chromium浏览器。通过Puppeteer,我们可以实现各种自动化任务,如网页截图、PDF生成、表单填写、网络监控等。本文将介绍如何使用Puppeteer进行新闻网站数据抓取和聚…...
【LeetCode每日一题合集】2023.8.7-2023.8.13(动态规划分治)
文章目录 344. 反转字符串1749. 任意子数组和的绝对值的最大值(最大子数组和)1281. 整数的各位积和之差1289. 下降路径最小和 II解法1——动态规划 O ( n 3 ) O(n^3) O(n3)解法2——转移过程优化 O ( n 2 ) O(n^2) O(n2) ⭐ 1572. 矩阵对角线元素的和解法…...
微信小程序修改vant组件样式
1 背景 在使用vant组件开发微信小程序的时候,想更改vant组件内部样式,达到自己想要的目的(van-grid组件改成宫格背景色为透明,默认为白色),官网没有示例,通过以下几步修改成功。 2 步骤 2.1 …...
yum 、rpm、yumdownloader、repotrack 学习笔记
1 Linux 包管理器概述 rpm的使用: rpm -ivh filename.rpm#这列出该packageName(包名)安装的所有文件列表。 rpm -ql packageName #查询已安装的该packageName的详细信息,包括版本、发布日期等。 rpm -qi packageName #列出该pac…...
python检测CPU占用、内存和磁盘剩余空间 脚本
python检测CPU占用、内存和磁盘剩余空间 脚本。后续将其加入到计划列表中即可 # codingutf-8 import time import psutil import osimport smtplibfrom email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # email 用于构建邮件内容 from email…...
量化策略:CTA,市场中性,指数增强
CTA 策略 commodity Trading Advisor Strategy,即“商品交易顾问策略”,也被称作管理期货策略。 期货T0,股票T1双向交易:就单向交易而言的,不仅能先买入再卖出(做多),而且可以先卖…...
L1-051 打折(Python实现) 测试点全过
前言: {\color{Blue}前言:} 前言: 本系列题使用的是,“PTA中的团体程序设计天梯赛——练习集”的题库,难度有L1、L2、L3三个等级,分别对应团体程序设计天梯赛的三个难度。更新取决于题目的难度,…...
任意文件读取和漏洞复现
任意文件读取 1. 概述 一些网站的需求,可能会提供文件查看与下载的功能。如果对用户查看或下载的文件没有限制或者限制绕过,就可以查看或下载任意文件。这些文件可以是漂代码文件,配置文件,敏感文件等等。 任意文件读取会造成&…...
编译KArchive在windows10下
使用QT6和VS2019编译KArchive的简要步骤: 安装 Qt ,我是用源码自己编译的 "F:\qtbuild"安装CMakefile并配置环境变量安装Git下载ECM源码 https://github.com/KDE/extra-cmake-modules.git-------------------------------------------------…...
【Python】批量下载页面资源
【背景】 有一些非常不错的资源网站,比如一些MP3资源网站。资源很丰富,但是每一个资源都不大,一个一个下载费时费力,想用Python快速实现可复用的批量下载程序。 【思路】 获得包含资源链接的静态页面,用beautifulsoup分析页面,获得所有MP3资源的实际地址,然后下载。…...
Windows NUMA编程实践 – 处理器组、组亲和性、处理器亲和性及版本变化
Windows在设计之初没有考虑过对大数量的多CPU和NUMA架构的设备的支持,大部分关于CPU的设计按照64个为上限来设计。核心数越来越多的多核处理器的进入市场使得微软不得不做较大的改动来进行支持,因此Windows 的进程、线程和NUMA API在各个版本中行为不一样…...
Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
基础测试工具使用经验
背景 vtune,perf, nsight system等基础测试工具,都是用过的,但是没有记录,都逐渐忘了。所以写这篇博客总结记录一下,只要以后发现新的用法,就记得来编辑补充一下 perf 比较基础的用法: 先改这…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
【RockeMQ】第2节|RocketMQ快速实战以及核⼼概念详解(二)
升级Dledger高可用集群 一、主从架构的不足与Dledger的定位 主从架构缺陷 数据备份依赖Slave节点,但无自动故障转移能力,Master宕机后需人工切换,期间消息可能无法读取。Slave仅存储数据,无法主动升级为Master响应请求ÿ…...
CMake 从 GitHub 下载第三方库并使用
有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...
全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
#Uniapp篇:chrome调试unapp适配
chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器:Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...
免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...
uniapp 字符包含的相关方法
在uniapp中,如果你想检查一个字符串是否包含另一个子字符串,你可以使用JavaScript中的includes()方法或者indexOf()方法。这两种方法都可以达到目的,但它们在处理方式和返回值上有所不同。 使用includes()方法 includes()方法用于判断一个字…...
