当前位置: 首页 > news >正文

python实现MQTT协议(发布者,订阅者,topic)

python实现MQTT协议

一、简介

1.1 概述

本文章针对物联网MQTT协议完成python实现

1.2 环境

  • Apache-apollo创建broker
  • Python实现发布者和订阅者

1.3 内容

  • MQTT协议架构说明 :

  • 利用仿真服务体会 MQTT协议

  • 针对MQTT协议进行测试

任务1:MQTT协议应用场景

说明: MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、 易于实现。这些特点使它适用于受限环境。该协议的特点有:

1 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2 对负载内容屏蔽的消息传输。
3 使用 TCP/IP 提供网络连接。

物联网应用场景:

image-20230901160722558

协议角色分工:

客户端分为2种角色:发布者(Publisher)和订阅者(Subscriber)。
每一个发布者(Publisher)可以发送不同类型的消息,我们把消息的类型叫做主题(topic),
MQTT通信中的消息都属于某一个主题 ,而只有订阅了这个主题的订阅者(Subscriber)才能收到属于这个主题的消息。
发布者和订阅者不需要在意和知道对方的存在(不需要知道对方的IP和端口),也不需要直接与对方建立连接。因为通信中存在着一个叫代理 (MQTT broker)的第三种角色,也可以叫MQTT服务器(MQTT server)。 
发布者、订阅者只需要知道MQTT服务器的IP和端口即可,并和它直接建立连接通信。MQTT代理作为消息的 中转,它过滤所有接受到的消息,并按照一定的机制(MQTT标准规定是基于主题的消息过滤派发方式,而具 体的MQTT服务器软件也提供了其他的派发方式)分发它们,使得所有注册到MQTT代理的订阅者只接收到他 们订阅了的消息,而不会收到他不关心的消息。
当发布者发布一条消息的时候,他必须同时指定消息的主题和消息的负载。MQTT代理在收到发布者发过来的 消息时,无需访问消息负载,他只是访问消息的主题信息,然后根据这主题派发给订阅者。需要注意的是,一个客户端可以同时既当发布者又当订阅者。比如一个开发板连接了一盏LED灯,它可以发布灯的暗/亮状态 信息,也可以从其他节点订阅对灯的控制消息。

3.生产者(发布消息)和消费者(消耗消息-订阅者)模式理解

生产者:wifi设备采集各种物联网传感器比如温度重力传感器
消费者:客户端比如手机
原理如下:

image-20230901095257410

任务2:搭建Mqtt协议服务 (broker)

前提:安装JDK和JAVA_HOME环境变量

1 下载Apollo服务器 地址 http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/

2 进入bin目录命令行 输入:

D:\softwares\apache-apollo-1.7.1\bin\apollo.cmd create jwbroker

3:broker\etc\apollo.xml文件下是配置服务器信息的文件
初始默认帐号是admin,密码password;

4:启动命令行: (以一个实例为单位进行创建的)

进入... jwbroker创建实例的\bin\ 目录,
在CMD输入命令apollo-broker.cmd run,可以使用TAB键自动补全,运行后输出信息如下:验证:
MQTT服务器TCP连接端口: tcp://0.0.0.0:61613
后台web管理页面:https://127.0.0.1:61681/或http://127.0.0.1:61680/

出现如下图表示启动成功

image-20230901154500857

安装mqtt需要的包:

pip install paho-mqtt

发布者publish创建:

import time
from paho.mqtt import publish
#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))client.subscribe("jw-temperature") # 发布主题def on_message(client,userdata,msg):print(msg.topc +  "消息发送!" + msg.payload.decode("utf-8"))if __name__ == '__main__':print("消息发布!----我是一个发布者:正在给设备和传感器发布主题-----")client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))for i in range(20):time.sleep(2)publish.single("lightChange","现在天黑了", qos = 1, hostname = HOST, port = PORT, client_id = client_id,auth = {'username': "admin", 'password': "password"})print("已发送"+str(i+1)+"条消息")

订阅者light_subcribe创建:

import paho.mqtt.client as mqtt
import time#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].
'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and  reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic+ msg.payload.decode("utf-8") +  ",回调消息:收到收到!我已经接收到发布者的消息,并且打开了光传感器" )def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手电筒打开----我是一个订阅者:需要消费主题-----")client_loop()

订阅者phone_subcribe创建:

import paho.mqtt.client as mqtt
import time#源码中只需要知道   ip + 端口 + 订阅的主题
HOST ="127.0.0.1"
PORT =61613
'''
The callback for when the client receives a CONNACK response from the server
客户端接收到服务器端的确认连接请求时,回调on_connect服务端发送CONNACK报文响应从客户端收到的CONNECT报文。
服务端发送给客户端的第一个报文必须是CONNACK [MQTT-3.2.0-1].'''
def on_connect(client,userdata, flags,rc):print("Connected with result code" + str(rc))'''Subscribing in on_connect() means that if we lose the connection and  reconnect then subscriptions wil be renewed(恢复、续订).'''client.subscribe("lightChange") # 订阅主题'''
The callback for when a PUBLISH message is received from the server.
客户端接收到服务器向其传输的消息时,
回调on_messagePUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。
'''
def on_message(client,userdata,msg):print(msg.topic  + msg.payload.decode("utf-8")+  ",回调消息:收到收到!我已经接收到发布者的消息并给用户反馈手电筒已经打开")def client_loop():'''注意,client_id是必须的,并且是唯一的。否则可能会出现如下错误[WinError 10054] 远程主机强迫关闭了一个现有的连接'''client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))client = mqtt.Client(client_id) # Client_id 不能重复,所以使用当前时间client.username_pw_set("admin","password") # 必须设置,否则会返回 /Connected with result code 4/client.on_connect = on_connectclient.on_message = on_message'''拥塞回调:处理网络流量,调度回调和重连接。Blocking call that processes network traffic, dispatches callbacks and handles reconnecting.Other loop*() functions are available that give a threaded interface and amanual- interface...I'''try:client.connect(HOST,PORT,60)client.loop_forever()except KeyboardInterrupt:client.disconnect()if __name__ == '__main__':print("手机启动----我是一个订阅者:需要消费主题-----")client_loop()

演示:分别运行publish和light_subcribe和phone_subcribe

publish:

image-20230901160129432

light_subcribe:

image-20230901160148825

phone_subcribe:

image-20230901160158514

相关文章:

python实现MQTT协议(发布者,订阅者,topic)

python实现MQTT协议 一、简介 1.1 概述 本文章针对物联网MQTT协议完成python实现 1.2 环境 Apache-apollo创建brokerPython实现发布者和订阅者 1.3 内容 MQTT协议架构说明 : 利用仿真服务体会 MQTT协议 针对MQTT协议进行测试 任务1:MQTT协议应…...

2023年09月03日-----16:58

协同过滤推荐和矩阵分解本质上有什么不同?协同过滤推荐和矩阵分解是两种推荐系统方法,它们在某些方面有相似之处,但也有一些本质不同之处。 基本原理: 协同过滤推荐:协同过滤是一种基于用户行为数据的推荐方法,它依赖于用户-物品交互数据,如用户的评分或点击历史。协同过…...

HTTP状态码504(Gateway Timeout)报错原因分析和解决办法

文章目录 504报错原因分析一、用户角度1. 代理服务器问题2. 网络问题 二、网站管理员角度1. 服务器负载过重2. 网关配置问题3. 目标服务器响应慢4. IIS/nginx/apache服务关闭5. 维护或故障6. 数据库的慢处理也会导致504 用户角度可以采取哪些措施解决504错误1. 刷新页面2. 检查…...

《凤凰架构》第三章——事务处理

前言 由于一些地方原文感觉不太清楚,有些地方用小林coding的文章代替。 总结 事务处理主要的目的就是要让数据在各种条件下,最终的运行结果都能符合你的期望。要达成这个目标有三点需要满足:原子性(业务要么同时成功&#xff0…...

音视频添 加水印

一、文字水印 在视频中增加文字水印需要准备的条件比较多,需要有文字字库处理的相关文件,在编译FFmpeg时需要支持FreeType、FontConfig、iconv,系统中需要有相关的字库,在FFmpeg中增加纯字母水印可以使用drawtext滤镜进行支持&am…...

使用Python的requests库与chatGPT进行通信

前言 在人工智能领域,自然语言处理模型如OpenAI GPT-3.5 Turbo具有广泛的应用。虽然官方提供了Python库来与这些模型进行交互,但也有一些人更喜欢使用requests库来自定义请求和处理响应,比如现在很多第三方LLM都提供了与chatGPT类似的http请…...

SASS常用内置函数

1,math 引入:use "sass:math"; 常用函数: ceil($number) - 向上取整floor($number) - 向下取整round($number) - 四舍五入max($number...) - 比较若干数值并取最大值min($number...) - 比较若干数值并取最小值random() - 取0~1之…...

2023年05月 C/C++(四级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C++编程(1~8级)全部真题・点这里 第1题:怪盗基德的滑翔翼 怪盗基德是一个充满传奇色彩的怪盗,专门以珠宝为目标的超级盗窃犯。而他最为突出的地方,就是他每次都能逃脱中村警部的重重围堵,而这也很大程度上是多亏了他随身携带的便于操作的滑翔翼。 有一天,怪盗基德像往…...

Emmet 使用笔记小结

Emmet 使用笔记小结 最近在跟视频走 CSS 的教程,然后要写很多的 HTML 结构,就想着总结一下 Emmet 的语法。 Emmet 是一个工具可以用来加速 HTML 和 CSS 的开发过程,不过 emmet 只支持 HTML & XML 文件结构,所以我个人觉得对…...

如何使用Puppeteer进行新闻网站数据抓取和聚合

导语 Puppeteer是一个基于Node.js的库,它提供了一个高级的API来控制Chrome或Chromium浏览器。通过Puppeteer,我们可以实现各种自动化任务,如网页截图、PDF生成、表单填写、网络监控等。本文将介绍如何使用Puppeteer进行新闻网站数据抓取和聚…...

【LeetCode每日一题合集】2023.8.7-2023.8.13(动态规划分治)

文章目录 344. 反转字符串1749. 任意子数组和的绝对值的最大值(最大子数组和)1281. 整数的各位积和之差1289. 下降路径最小和 II解法1——动态规划 O ( n 3 ) O(n^3) O(n3)解法2——转移过程优化 O ( n 2 ) O(n^2) O(n2) ⭐ 1572. 矩阵对角线元素的和解法…...

微信小程序修改vant组件样式

1 背景 在使用vant组件开发微信小程序的时候,想更改vant组件内部样式,达到自己想要的目的(van-grid组件改成宫格背景色为透明,默认为白色),官网没有示例,通过以下几步修改成功。 2 步骤 2.1 …...

yum 、rpm、yumdownloader、repotrack 学习笔记

1 Linux 包管理器概述 rpm的使用: rpm -ivh filename.rpm#这列出该packageName(包名)安装的所有文件列表。 rpm -ql packageName #查询已安装的该packageName的详细信息,包括版本、发布日期等。 rpm -qi packageName #列出该pac…...

python检测CPU占用、内存和磁盘剩余空间 脚本

python检测CPU占用、内存和磁盘剩余空间 脚本。后续将其加入到计划列表中即可 # codingutf-8 import time import psutil import osimport smtplibfrom email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText # email 用于构建邮件内容 from email…...

量化策略:CTA,市场中性,指数增强

CTA 策略 commodity Trading Advisor Strategy,即“商品交易顾问策略”,也被称作管理期货策略。 期货T0,股票T1双向交易:就单向交易而言的,不仅能先买入再卖出(做多),而且可以先卖…...

L1-051 打折(Python实现) 测试点全过

前言: {\color{Blue}前言:} 前言: 本系列题使用的是,“PTA中的团体程序设计天梯赛——练习集”的题库,难度有L1、L2、L3三个等级,分别对应团体程序设计天梯赛的三个难度。更新取决于题目的难度,…...

任意文件读取和漏洞复现

任意文件读取 1. 概述 一些网站的需求,可能会提供文件查看与下载的功能。如果对用户查看或下载的文件没有限制或者限制绕过,就可以查看或下载任意文件。这些文件可以是漂代码文件,配置文件,敏感文件等等。 任意文件读取会造成&…...

编译KArchive在windows10下

使用QT6和VS2019编译KArchive的简要步骤: 安装 Qt ,我是用源码自己编译的 "F:\qtbuild"安装CMakefile并配置环境变量安装Git下载ECM源码 https://github.com/KDE/extra-cmake-modules.git-------------------------------------------------…...

【Python】批量下载页面资源

【背景】 有一些非常不错的资源网站,比如一些MP3资源网站。资源很丰富,但是每一个资源都不大,一个一个下载费时费力,想用Python快速实现可复用的批量下载程序。 【思路】 获得包含资源链接的静态页面,用beautifulsoup分析页面,获得所有MP3资源的实际地址,然后下载。…...

Windows NUMA编程实践 – 处理器组、组亲和性、处理器亲和性及版本变化

Windows在设计之初没有考虑过对大数量的多CPU和NUMA架构的设备的支持,大部分关于CPU的设计按照64个为上限来设计。核心数越来越多的多核处理器的进入市场使得微软不得不做较大的改动来进行支持,因此Windows 的进程、线程和NUMA API在各个版本中行为不一样…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...

华为云AI开发平台ModelArts

华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

【Web 进阶篇】优雅的接口设计:统一响应、全局异常处理与参数校验

系列回顾: 在上一篇中,我们成功地为应用集成了数据库,并使用 Spring Data JPA 实现了基本的 CRUD API。我们的应用现在能“记忆”数据了!但是,如果你仔细审视那些 API,会发现它们还很“粗糙”:有…...

Robots.txt 文件

什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天,Spring AI 作为 Spring 生态系统的新生力量,正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务(如 OpenAI、Anthropic)的无缝对接&…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

.Net Framework 4/C# 关键字(非常用,持续更新...)

一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

AI,如何重构理解、匹配与决策?

AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...