java和python实现mqtt
说明:
MQTT 异步通信系统功能文档
- 系统概述
本系统基于 MQTT 协议实现异步通信,包含三个核心组件:
Broker(消息代理):负责消息的路由和转发。
Client(主客户端):定时发送时间戳消息并等待响应。
Echo Client(回显客户端):接收消息并原样返回。
所有组件均运行在本地(localhost),使用端口 10008 进行通信。
-
功能描述
2.1 Broker(消息代理)
持续运行,负责接收和转发消息。
监听 localhost:10008,确保客户端之间的通信畅通。
2.2 Client(主客户端)
每秒发布当前时间戳到 /ping 主题。
订阅 /pong 主题,等待回显消息。
检测连接状态,在断开时终止运行。
2.3 Echo Client(回显客户端)
订阅 /ping 主题,接收主客户端发送的消息。
将接收到的消息原样发布到 /pong 主题。
检测连接状态,在断开时终止运行。 -
通信流程
Client 发送消息
每秒生成时间戳,发布到 /ping 主题。
Echo Client 接收并回显
从 /ping 获取消息,原样转发到 /pong 主题。
Client 接收回显
从 /pong 获取消息,打印并进入下一轮循环。
如果任一客户端断开连接,系统会检测并终止运行。 -
运行机制
异步架构:使用 asyncio 实现非阻塞并发,提高效率。
自动重连:客户端连接失败时,以 100ms 间隔尝试重连。
日志输出:各组件打印关键操作(如发送/接收消息),便于调试。 -
适用场景
MQTT 协议学习:演示基本的发布/订阅机制。
设备间通信模拟:测试消息收发逻辑。
异步编程实践:展示 asyncio 与 MQTT 的结合使用。
系统设计简洁,便于扩展或集成到更复杂的项目中。
包含功能:
MQTT服务器
运行在本地端口10008,支持匿名访问
数据存储于内存中,无持久化
程序终止时自动关闭服务
Ping客户端
每秒发送一条时间戳消息到主题/ping
订阅主题/pong,接收并显示该主题的消息
Echo客户端
监听主题/ping,收到消息后立即将内容转发至主题/pong
实现自动应答机制
/我是分割线
python部分
step101:C:\Users\wangrusheng\PycharmProjects\FastAPIProject1\hello.py
import asyncio
import timeimport mqttoolsBROKER_PORT = 10008async def start_client():client = mqttools.Client('localhost', BROKER_PORT, connect_delays=[0.1])await client.start()return clientasync def client_main():"""Publish the current time to /ping and wait for the echo client topublish it back on /pong, with a one second interval."""client = await start_client()await client.subscribe('/pong')while True:print()message = str(int(time.time())).encode('ascii')print(f'client: Publishing {message} on /ping.')client.publish(mqttools.Message('/ping', message))message = await client.messages.get()print(f'client: Got {message.message} on {message.topic}.')if message is None:print('Client connection lost.')breakawait asyncio.sleep(1)async def echo_client_main():"""Wait for the client to publish to /ping, and publish /pong inresponse."""client = await start_client()await client.subscribe('/ping')while True:message = await client.messages.get()print(f'echo_client: Got {message.message} on {message.topic}.')if message is None:print('Echo client connection lost.')breakprint(f'echo_client: Publishing {message.message} on /pong.')client.publish(mqttools.Message('/pong', message.message))async def broker_main():"""The broker, serving both clients, forever."""broker = mqttools.Broker(('localhost', BROKER_PORT))await broker.serve_forever()async def main():await asyncio.gather(broker_main(),echo_client_main(),client_main())asyncio.run(main())
step102:运行
(.venv) PS C:\Users\wangrusheng\PycharmProjects\FastAPIProject1> python hello.py client: Publishing b'1744796556' on /ping.
echo_client: Got b'1744796556' on /ping.
echo_client: Publishing b'1744796556' on /pong.
client: Got b'1744796556' on /pong.client: Publishing b'1744796557' on /ping.
echo_client: Got b'1744796557' on /ping.
echo_client: Publishing b'1744796557' on /pong.
client: Got b'1744796557' on /pong.client: Publishing b'1744796558' on /ping.
echo_client: Got b'1744796558' on /ping.
echo_client: Publishing b'1744796558' on /pong.
client: Got b'1744796558' on /pong.
end
/我是分割线
step201:C:\Users\wangrusheng\IdeaProjects\untitled2\build.gradle
plugins {id 'java'
}group = 'org.example'
version = '1.0-SNAPSHOT'repositories {mavenCentral()
}dependencies {// MQTT 服务端依赖implementation 'io.moquette:moquette-broker:0.15'// MQTT 客户端依赖implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'testImplementation platform('org.junit:junit-bom:5.10.0')testImplementation 'org.junit.jupiter:junit-jupiter'
}test {useJUnitPlatform()
}
step202:C:\Users\wangrusheng\IdeaProjects\untitled2\src\main\java\org\example\Main.java
package org.example;import org.eclipse.paho.client.mqttv3.MqttException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {// 启动MQTT服务器startMqttServer();// 等待服务器初始化try {Thread.sleep(1500);} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 启动客户端startPingClient();startEchoClient();}private static void startMqttServer() {new Thread(() -> {try {System.out.println("Starting MQTT server...");MqttServer.startServer();} catch (Exception e) {System.err.println("Failed to start server: ");e.printStackTrace();System.exit(1);}}).start();}private static void startPingClient() {new Thread(() -> {try {MqttClientHandler client = new MqttClientHandler("ping-client");client.connect();client.subscribe("/pong", 0);// 定时发送ping消息ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();scheduler.scheduleAtFixedRate(() -> {try {String timestamp = String.valueOf(System.currentTimeMillis() / 1000);System.out.printf("client: Publishing %s on /ping.\n", timestamp);client.publish("/ping", timestamp, 0);} catch (MqttException e) {e.printStackTrace();}}, 0, 1, TimeUnit.SECONDS);// 保持线程运行Thread.sleep(Long.MAX_VALUE);} catch (Exception e) {e.printStackTrace();}}).start();}private static void startEchoClient() {new Thread(() -> {try {MqttClientHandler echoClient = new EchoClientHandler("echo-client");echoClient.connect();echoClient.subscribe("/ping", 0);// 保持线程运行Thread.sleep(Long.MAX_VALUE);} catch (Exception e) {e.printStackTrace();}}).start();}
}
step203:C:\Users\wangrusheng\IdeaProjects\untitled2\src\main\java\org\example\EchoClientHandler.java
package org.example;import org.eclipse.paho.client.mqttv3.MqttMessage;public class EchoClientHandler extends MqttClientHandler {public EchoClientHandler(String clientId) {super(clientId);}@Overridepublic void messageArrived(String topic, MqttMessage message) {System.out.printf("echo_client: Got %s on %s.\n", new String(message.getPayload()), topic);try {if ("/ping".equals(topic)) {String payload = new String(message.getPayload());System.out.printf("echo_client: Publishing %s on /pong.\n", payload);publish("/pong", payload, 0);}}catch (Exception e){e.printStackTrace();}}
}
step204:C:\Users\wangrusheng\IdeaProjects\untitled2\src\main\java\org\example\MqttServer.java
package org.example;import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import java.io.IOException;
import java.util.Properties;public class MqttServer {private static Server mqttBroker;public static void startServer() throws IOException {if (mqttBroker == null) {mqttBroker = new Server();IConfig config = new MemoryConfig(serverConfig());mqttBroker.startServer(config);System.out.println("MQTT Broker started on port 10008");addShutdownHook();}}private static Properties serverConfig() {Properties props = new Properties();props.put("port", "10008"); // 修改端口props.put("host", "0.0.0.0");props.put("allow_anonymous", "true");props.put("persistence_store", "memory");return props;}private static void addShutdownHook() {Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("Stopping MQTT broker...");mqttBroker.stopServer();System.out.println("MQTT broker stopped");}));}
}
step205:C:\Users\wangrusheng\IdeaProjects\untitled2\src\main\java\org\example\MqttClientHandler.java
package org.example;import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class MqttClientHandler implements MqttCallbackExtended {private static final String BROKER_URL = "tcp://localhost:10008";private final String clientId;private IMqttClient client;private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();public MqttClientHandler(String clientId) {this.clientId = clientId;}public void connect() throws MqttException {client = new MqttClient(BROKER_URL, clientId, new MemoryPersistence());client.setCallback(this);MqttConnectOptions options = new MqttConnectOptions();options.setAutomaticReconnect(true);options.setCleanSession(true);options.setConnectionTimeout(30);client.connect(options);}// 其他方法保持相同,只需修改messageArrived的日志输出@Overridepublic void messageArrived(String topic, MqttMessage message) {System.out.printf("client: Got %s on %s.\n", new String(message.getPayload()), topic);}// 其他原有方法保持不变...public void publish(String topic, String content, int qos) throws MqttException {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(topic, message);}public void subscribe(String topic, int qos) throws MqttException {client.subscribe(topic, qos);}public void disconnect() throws MqttException {client.disconnect();client.close();scheduler.shutdown();}@Overridepublic void connectionLost(Throwable cause) {System.out.println("Connection lost, attempting reconnect...");scheduler.scheduleAtFixedRate(() -> {try {if (!client.isConnected()) {client.reconnect();System.out.println("Reconnected successfully");}} catch (MqttException e) {System.err.println("Reconnect failed: " + e.getMessage());}}, 0, 5, TimeUnit.SECONDS);}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {System.out.println("Connection established: " + (reconnect ? "Reconnected" : "New connection"));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("Message delivery confirmed");}}
step206:运行
ing ':org.example.Main.main()'…> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE> Task :org.example.Main.main()
Starting MQTT server...
MQTT Broker started on port 10008
log4j:WARN No appenders could be found for logger (io.moquette.broker.Server).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connection established: New connection
Connection established: New connection
client: Publishing 1744797021 on /ping.
Message delivery confirmed
echo_client: Got 1744797021 on /ping.
echo_client: Publishing 1744797021 on /pong.
client: Publishing 1744797022 on /ping.
Message delivery confirmed
client: Got 1744797021 on /pong.
client: Publishing 1744797023 on /ping.
Message delivery confirmed
client: Publishing 1744797024 on /ping.
Message delivery confirmed
client: Publishing 1744797025 on /ping.
Message delivery confirmed
client: Publishing 1744797026 o
end
相关文章:
java和python实现mqtt
说明: MQTT 异步通信系统功能文档 系统概述 本系统基于 MQTT 协议实现异步通信,包含三个核心组件: Broker(消息代理):负责消息的路由和转发。 Client(主客户端):定时发…...
5.9 《GPT-4调试+测试金字塔:构建高可靠系统的5大实战策略》
5.4 测试与调试:构建企业级质量的保障体系 关键词:测试金字塔模型、GPT-4调试助手、LangChain调试模式、异步任务验证 测试策略设计(测试金字塔实践) #mermaid-svg-RblGbJVMnCIShiCW {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill…...
Linux——进程通信
我们知道,进程具有独立性,各进程之间互不干扰,但我们为什么还要让其联系,建立通信呢?比如:数据传输,资源共享,通知某个事件,或控制某个进程。因此,让进程间建…...
学习笔记十三—— 理解 Rust 闭包:从语法到 impl Fn vs Box<dyn Fn>
🧠 理解 Rust 闭包:从语法到 impl Fn vs Box 📚 目录 闭包是什么?和普通函数有什么不同?闭包的语法长什么样?闭包“捕获变量”是什么意思?闭包和所有权的关系Fn、FnMut、FnOnce 三种闭包类型的…...
【免费参会合集】2025年生物制药行业展会会议表格整理
全文精心整理, 建议今年参会前都好好收藏着,记得点赞! 医药人非常吃资源,资源从何而来?作为一名从事医药行业的工作者,可以很负责任的告诉诸位,其中非常重要的一个渠道就是会议会展! 建议所有医…...
腾讯云开发+MCP:旅游规划攻略
1.登录注册好之后进入腾讯云开发 2.创建环境 4.创建好环境之后点击去开发 5.进入控制台后,选择AI,找到MCP 6.点击创建MCP Server 使用腾讯云开发创建MCP目前需要云开发入门版99/月,我没开通,所以没办法往下进行。...
银河麒麟系统 达梦8 安装 dlask 框架后端环境
适配的一套环境为 dmPython2.5.8 dmSQLAlchemy1.4.39 Flask2.0.3 Flask-Cors3.0.10 Flask-SQLAlchemy2.5.1 SQLAlchemy1.4.54 Werkzeug2.2.2其中 # sqlalchemy-dm1.4.39 通过dmdbms目录内文件进行源码安装 (MindSpore) [ma-user python]$pwd /home/syl/dmdbms/drivers/python…...
Cribl (实验) vpc-flow 数据抽样
先看文档: Firewall Logs: VPC Flow Logs, Cisco ASA, Etc. | Cribl Docs Firewall Logs: VPC Flow Logs, Cisco ASA, Etc. Recipe for Sampling Firewall Logs Firewall logs are another source of important operational (and security) data. Typical examples include Ama…...
Sklearn入门之数据预处理preprocessing
、 Sklearn全称:Scipy-toolkit Learn是 一个基于scipy实现的的开源机器学习库。它提供了大量的算法和工具,用于数据挖掘和数据分析,包括分类、回归、聚类等多种任务。本文我将带你了解并入门Sklearn下的preprocessing在机器学习中的基本用法。 获取方式…...
我想自己组装一台服务器,微调大模型通义千问2.5 Omni 72B,但是我是个人购买,资金非常有限,最省的方案
目录 🧠 首先我们要搞清楚几个核心点: 🎯 目标:微调 Qwen2.5-Omni-72B 🚨 现实问题:作为个人用户,72B 模型几乎无法负担全量微调 💸 全量微调硬件需求: ✅ 最省的个人方案:不组 72B,只训练 Qwen2.5-Omni-7B 或 14B 💡 推荐方案 A:个人桌面级多卡训练服…...
家用打印机性价比排名及推荐
文章目录 品牌性价比一、核心参数对比与场景适配二、技术类型深度解析三、不同场景选择 相关文章 品牌 性价比 一、核心参数对比与场景适配 兄弟T436W 优势: 微压电技术,打印头寿命长,堵头率低。 支持A4无边距和5G WiFi,适合照片…...
KWDB(Knowledge Worker Database)基础概念与原理完整指南
KWDB(Knowledge Worker Database)基础概念与原理完整指南—目录 前言一、背景1.1 知识工作者的痛点1.2 技术演进推动 二、定义与定位2.1 什么是KWDB?2.2 KWDB与传统数据库的对比与传统关系型数据库(如MySQL)的对比与分…...
数字电子技术基础(四十七)——使用Mutlisim软件来模拟74LS85芯片
目录 1 使用74LS85N芯片完成四位二进制数的比较 1.1原理介绍 1.2 器件选择 1.3 运行电路 2 使用74LS85N完成更多位的二进制比较 1 使用74LS85N芯片完成四位二进制数的比较 1.1原理介绍 对于74LS85 是一款 4 位数值比较器集成电路,用于比较两个 4 位二进制数&…...
关于STM32创建工程文件启动文件选择
注意启动文件只要选择这几个 而不是要把所有都选上...
LLC电路工作在容性区的风险
在t0时刻之前,Q6Q7导通,回路如下所示,此时A点电压是低压,B点电压是高压 在t0时刻时,谐振电流相位发生变换,在t1时刻,Q5,Q8导通,对于Q8MOS管来说,B点电压在Q6Q…...
Linux Kernel 6
clone 系统调用(The clone system call) 在 Linux 中,使用 clone() 系统调用来创建新的线程或进程。fork() 系统调用和 pthread_create() 函数都基于 clone() 的实现。 clone() 系统调用允许调用者决定哪些资源应该与父进程共享,…...
【开源项目】Excel手撕AI算法深入理解(四):AlphaFold、Autoencoder
项目源码地址:https://github.com/ImagineAILab/ai-by-hand-excel.git 一、AlphaFold AlphaFold 是 DeepMind 开发的突破性 AI 算法,用于预测蛋白质的三维结构。它的出现解决了生物学领域长达 50 年的“蛋白质折叠问题”,被《科学》杂志评为…...
第IV部分有效应用程序的设计模式
第IV部分有效应用程序的设计模式 第IV部分有效应用程序的设计模式第23章:应用程序用户界面的架构设计23.1设计考量23.2示例1:用于非分布式有界上下文的一个基于HTMLAF的、服务器端的UI23.3示例2:用于分布式有界上下文的一个基于数据API的客户端UI23.4要点第24章:CQRS:一种…...
如何编制实施项目管理章程
本文档概述了一个项目管理系统的实施计划,旨在通过统一的业务规范和技术架构,加强集团公司的业务管控,并规范业务管理。系统建设将遵循集团统一模板,确保各单位项目系统建设的标准化和一致性。 实施范围涵盖投资管理、立项管理、设计管理、进度管理等多个方面,支持项目全生…...
排序(java)
一.概念 排序:对一组数据进行从小到大/从大到小的排序 稳定性:即使进行排序相对位置也不受影响如: 如果再排序后 L 在 i 的前面则稳定性差,像图中这样就是稳定性好。 二.常见的排序 三.常见算法的实现 1.插入排序 1.1 直…...
嵌入式C语言进阶(二+)内存管理补充版
C语言内存管理:从小白到大神的完全指南 前言:为什么需要理解内存管理 C语言以其高效性和灵活性著称,但这也意味着程序员需要手动管理内存。与Java、Python等高级语言不同,C语言没有自动垃圾回收机制,内存管理的重担完全落在开发者肩上。理解C语言的内存管理机制不仅能帮…...
【HDFS入门】HDFS副本策略:深入浅出副本机制
目录 1 HDFS副本机制概述 2 HDFS副本放置策略 3 副本策略的优势 4 副本因子配置 5 副本管理流程 6 最佳实践与调优 7 总结 1 HDFS副本机制概述 Hadoop分布式文件系统(HDFS)的核心设计原则之一就是通过数据冗余来保证可靠性,而这一功能正是通过副本策略实现的…...
Excel自定义函数取拼音首字母
1.启动Excel 2003(其它版本请仿照操作),打开相应的工作表; 2.执行“工具 > 宏 > Visual Basic编辑器”命令(或者直接按“AltF11”组合键),进入Visual Basic编辑状态; 3.执行“…...
智能 GitHub Copilot 副驾驶® 更新升级!
智能 GitHub Copilot 副驾驶 迎来重大升级!现在,所有 VS Code 用户都能体验支持 Multi-Context Protocol(MCP)的全新 Agent Mode。此外,微软还推出了智能 GitHub Copilot 副驾驶 Pro 订阅计划,提供更强大的…...
Android ViewPager使用预加载机制导致出现页面穿透问题
缘由 在应用中使用ViewPager,并且设置预加载页面。结果出现了一些异常的现象。 我们有4个页面,分别是4个Fragment,暂且称为FragmentA、FragmentB、FragmentC、FragmentD,ViewPager在MainActivity中,切换时&#x…...
【今日三题】添加字符(暴力枚举) / 数组变换(位运算) / 装箱问题(01背包)
⭐️个人主页:小羊 ⭐️所属专栏:每日两三题 很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~ 目录 添加字符(暴力枚举)数组变换(位运算)装箱问题(01背包) 添加字符(暴力枚举) 添加字符 当在A的开头或结尾添加字符直到和B长度…...
【AIoT】智能硬件GPIO通信详解(二)
前言 上一篇我们深入解析了智能硬件GPIO通信原理(传送门:【AIoT】智能硬件GPIO通信详解(一))。接下来,我们将结合无人售货机控制场景,通过具体案例进一步剖析物联网底层通信机制的实际应用。 在智能零售领域,无人售货机通过AI技术升级为智能柜,其设备控制的底层通信…...
Python(18)Python中JSON的妙用:详解序列化与反序列化原理及实战案例
目录 一、背景:为什么Python需要JSON?二、核心技术解析:序列化与反序列化2.1 核心概念2.2 类型映射对照表 三、Python操作JSON的四大核心方法3.1 基础方法库3.2 方法详解1. json.dumps()2. json.loads()3. json.dump()4. json.load() 四、实战…...
【Python进阶】字典:高效键值存储的十大核心应用
目录 前言:技术背景与价值当前技术痛点解决方案概述目标读者说明 一、技术原理剖析核心概念图解核心作用讲解关键技术模块技术选型对比 二、实战演示环境配置要求核心代码实现(10个案例)案例1:基础操作案例2:字典推导式…...
MATLAB脚本实现了一个三自由度的通用航空运载器(CAV-H)的轨迹仿真,主要用于模拟升力体在不同飞行阶段(初始滑翔段、滑翔段、下压段)的运动轨迹
%升力体:通用航空运载器CAV-H %读取数据1 升力系数 alpha = [10 15 20]; Ma = [3.5 5 8 10 15 20 23]; alpha1 = 10:0.1:20; Ma1 = 3.5:0.1:23; [Ma1, alpha1] = meshgrid(Ma1, alpha1); CL = readmatrix(simulation.xlsx, Sheet, Sheet1, Range, B2:H4); CL1 = interp2(…...
