SpringBoot整合MQTT实战:基于EMQX构建高可靠物联网通信,从零到一实现设备云端双向对话
一、引言
随着物联网(IoT)技术的快速发展,MQTT(Message Queuing Telemetry Transport)协议因其轻量级、低功耗和高效的特点,已成为物联网设备通信的事实标准。本文将详细介绍如何使用SpringBoot框架整合MQTT协议,基于开源MQTT代理EMQX实现设备与服务器之间的双向通信。
二、技术选型与环境准备
2.1 技术栈介绍
-
SpringBoot 2.7.x:简化Spring应用初始搭建和开发过程
-
EMQX 5.0:开源的大规模分布式MQTT消息服务器
-
Eclipse Paho:流行的MQTT客户端库
-
Lombok:简化Java Bean编写
2.2 环境准备
-
安装EMQX服务器(可使用Docker快速部署):
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.14
-
确保Java开发环境(JDK 11+)和Maven已安装
三、SpringBoot项目集成MQTT
3.1 创建SpringBoot项目并添加依赖
在pom.xml
中添加必要的依赖:
<dependencies><!-- SpringBoot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- MQTT Paho Client --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- JSON处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
</dependencies>
3.2 配置MQTT连接参数
在application.yml
中添加配置:
mqtt:broker-url: tcp://localhost:1883username: emqxpassword: publicclient-id: springboot-serverdefault-topic: device/statustimeout: 30keepalive: 60qos: 1clean-session: true
创建配置类MqttProperties.java
:
@Data
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {private String brokerUrl;private String username;private String password;private String clientId;private String defaultTopic;private int timeout;private int keepalive;private int qos;private boolean cleanSession;
}
3.3 实现MQTT客户端配置
创建MqttConfiguration.java
:
@Configuration
@RequiredArgsConstructor
public class MqttConfiguration {private final MqttProperties mqttProperties;@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[]{mqttProperties.getBrokerUrl()});options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepalive());options.setCleanSession(mqttProperties.isCleanSession());options.setAutomaticReconnect(true);return options;}@Beanpublic IMqttClient mqttClient() throws MqttException {IMqttClient client = new MqttClient(mqttProperties.getBrokerUrl(), mqttProperties.getClientId(), new MemoryPersistence());client.connect(mqttConnectOptions());return client;}
}
3.4 实现MQTT消息发布服务
创建MqttPublisher.java
:
@Service
@RequiredArgsConstructor
@Slf4j
public class MqttPublisher {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;public void publish(String topic, String payload) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}MqttMessage message = new MqttMessage(payload.getBytes());message.setQos(mqttProperties.getQos());message.setRetained(true);mqttClient.publish(topic, message);log.info("MQTT message published to topic: {}, payload: {}", topic, payload);}public void publish(String payload) throws MqttException {publish(mqttProperties.getDefaultTopic(), payload);}
}
3.5 实现MQTT消息订阅服务
创建MqttSubscriber.java
:
@Service
@RequiredArgsConstructor
@Slf4j
public class MqttSubscriber {private final IMqttClient mqttClient;private final MqttProperties mqttProperties;@PostConstructpublic void init() throws MqttException {subscribe(mqttProperties.getDefaultTopic());}public void subscribe(String topic) throws MqttException {if (!mqttClient.isConnected()) {mqttClient.reconnect();}mqttClient.subscribe(topic, mqttProperties.getQos(), this::handleMessage);log.info("Subscribed to MQTT topic: {}", topic);}private void handleMessage(String topic, MqttMessage message) {String payload = new String(message.getPayload());log.info("Received MQTT message from topic: {}, payload: {}", topic, payload);// 这里可以添加业务逻辑处理接收到的消息processMessage(topic, payload);}private void processMessage(String topic, String payload) {// 示例:解析JSON格式的消息try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);// 根据不同的topic和payload内容进行业务处理if (topic.startsWith("device/status")) {handleDeviceStatus(jsonNode);} else if (topic.startsWith("device/control")) {handleDeviceControl(jsonNode);}} catch (JsonProcessingException e) {log.error("Failed to parse MQTT message payload: {}", payload, e);}}private void handleDeviceStatus(JsonNode jsonNode) {// 处理设备状态上报String deviceId = jsonNode.get("deviceId").asText();String status = jsonNode.get("status").asText();log.info("Device {} status updated to: {}", deviceId, status);}private void handleDeviceControl(JsonNode jsonNode) {// 处理设备控制指令响应String deviceId = jsonNode.get("deviceId").asText();String command = jsonNode.get("command").asText();String result = jsonNode.get("result").asText();log.info("Device {} executed command {} with result: {}", deviceId, command, result);}
}
四、实现双向通信
4.1 服务器向设备发送控制指令
创建REST API接口用于发送控制指令:
@RestController
@RequestMapping("/api/device")
@RequiredArgsConstructor
@Slf4j
public class DeviceController {private final MqttPublisher mqttPublisher;@PostMapping("/control")public ResponseEntity<String> sendControlCommand(@RequestBody DeviceCommand command) {try {ObjectMapper mapper = new ObjectMapper();String payload = mapper.writeValueAsString(command);String topic = "device/control/" + command.getDeviceId();mqttPublisher.publish(topic, payload);return ResponseEntity.ok("Control command sent successfully");} catch (Exception e) {log.error("Failed to send control command", e);return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send control command: " + e.getMessage());}}@Data@NoArgsConstructor@AllArgsConstructorpublic static class DeviceCommand {private String deviceId;private String command;private Map<String, Object> params;}
}
4.2 设备模拟客户端
为了测试双向通信,我们可以创建一个简单的设备模拟客户端:
@Component
@Slf4j
public class DeviceSimulator {private final MqttPublisher mqttPublisher;private final MqttProperties mqttProperties;private IMqttClient deviceClient;public DeviceSimulator(MqttPublisher mqttPublisher, MqttProperties mqttProperties) {this.mqttPublisher = mqttPublisher;this.mqttProperties = mqttProperties;initDeviceClient();}private void initDeviceClient() {try {String deviceId = "device-" + UUID.randomUUID().toString().substring(0, 8);deviceClient = new MqttClient(mqttProperties.getBrokerUrl(), deviceId, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);deviceClient.connect(options);// 订阅控制主题String controlTopic = "device/control/" + deviceId;deviceClient.subscribe(controlTopic, (topic, message) -> {String payload = new String(message.getPayload());log.info("Device received control command: {}", payload);// 模拟设备执行命令并返回响应executeCommand(payload, deviceId);});// 模拟设备定期上报状态simulatePeriodicStatusReport(deviceId);} catch (MqttException e) {log.error("Failed to initialize device simulator", e);}}private void executeCommand(String payload, String deviceId) {try {ObjectMapper mapper = new ObjectMapper();JsonNode jsonNode = mapper.readTree(payload);String command = jsonNode.get("command").asText();// 模拟命令执行Thread.sleep(1000); // 模拟执行耗时// 构造响应ObjectNode response = mapper.createObjectNode();response.put("deviceId", deviceId);response.put("command", command);response.put("result", "success");response.put("timestamp", System.currentTimeMillis());// 发布响应String responseTopic = "device/control/response/" + deviceId;mqttPublisher.publish(responseTopic, response.toString());} catch (Exception e) {log.error("Failed to execute command", e);}}private void simulatePeriodicStatusReport(String deviceId) {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {try {ObjectMapper mapper = new ObjectMapper();ObjectNode status = mapper.createObjectNode();status.put("deviceId", deviceId);status.put("status", "online");status.put("cpuUsage", Math.random() * 100);status.put("memoryUsage", 30 + Math.random() * 50);status.put("timestamp", System.currentTimeMillis());String topic = "device/status/" + deviceId;mqttPublisher.publish(topic, status.toString());} catch (Exception e) {log.error("Failed to send status report", e);}}, 0, 10, TimeUnit.SECONDS);}
}
五、测试与验证
5.1 测试设备状态上报
-
启动SpringBoot应用
-
观察日志输出,应该能看到设备模拟客户端定期上报状态信息
5.2 测试服务器控制指令
使用Postman或curl发送控制指令:
curl -X POST http://localhost:8080/api/device/control \
-H "Content-Type: application/json" \
-d '{"deviceId": "device-123456","command": "restart","params": {"delay": 5}
}'
5.3 验证双向通信
-
服务器发送控制指令到特定设备
-
设备接收指令并执行
-
设备发送执行结果回服务器
-
服务器接收并处理设备响应
六、高级功能扩展
6.1 消息持久化与QoS级别
-
QoS 0:最多一次,消息可能丢失
-
QoS 1:至少一次,消息不会丢失但可能重复
-
QoS 2:恰好一次,消息不丢失且不重复
根据业务需求选择合适的QoS级别:
// 在发布消息时设置QoS
message.setQos(2); // 使用最高级别的QoS
6.2 安全配置
-
启用TLS加密:
mqtt:broker-url: ssl://localhost:8883
-
配置EMQX的ACL规则,限制客户端权限
6.3 集群部署
对于生产环境,可以部署EMQX集群:
# 启动第一个节点
docker run -d --name emqx1 -p 1883:1883 -p 8081:8081 -e EMQX_NODE_NAME=emqx@node1.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14# 启动第二个节点
docker run -d --name emqx2 -p 1884:1883 -p 8082:8081 -e EMQX_NODE_NAME=emqx@node2.emqx.io -e EMQX_CLUSTER__DISCOVERY=static -e EMQX_CLUSTER__STATIC__SEEDS="emqx@node1.emqx.io,emqx@node2.emqx.io" emqx/emqx:5.0.14
6.4 消息桥接与WebHook
通过EMQX的桥接功能,可以将消息转发到其他MQTT服务器或Kafka等消息队列。也可以通过WebHook将消息推送到HTTP服务。
七、总结
本文详细介绍了如何使用SpringBoot整合MQTT协议,基于EMQX实现设备与服务器之间的双向通信。主要内容包括:
-
SpringBoot项目中集成MQTT客户端
-
实现消息发布和订阅功能
-
设计双向通信机制
-
设备模拟与测试验证
-
高级功能扩展建议
这种架构非常适合物联网场景,能够支持海量设备连接和实时消息通信。开发者可以根据实际业务需求,在此基础上进行扩展和优化,构建稳定可靠的物联网平台。
八、参考资料
-
EMQX官方文档:Introduction | EMQX 5.0 Docs
-
Eclipse Paho项目:Eclipse Paho | The Eclipse Foundation
-
MQTT协议规范:MQTT Version 3.1.1
-
Spring Boot官方文档:Spring Boot
相关文章:
SpringBoot整合MQTT实战:基于EMQX构建高可靠物联网通信,从零到一实现设备云端双向对话
一、引言 随着物联网(IoT)技术的快速发展,MQTT(Message Queuing Telemetry Transport)协议因其轻量级、低功耗和高效的特点,已成为物联网设备通信的事实标准。本文将详细介绍如何使用SpringBoot框架整合MQTT协议,基于开源MQTT代理EMQX实现设…...
AI与机器学习深度集成:从设备端能力爆发到开发工具智能化
简介 AI与机器学习技术正以惊人的速度在移动开发领域深入集成,设备端AI能力爆发与AI辅助开发工具的崛起,为开发者带来了前所未有的高效开发体验和应用创新机遇。本文将全面解析Google最新AI技术栈(包括ML Kit 2.0和Gemini Nano模型)的特性与应用场景,探索Android Studio …...

界面控件DevExpress WinForms v24.2 - 数据处理功能增强
DevExpress WinForms拥有180组件和UI库,能为Windows Forms平台创建具有影响力的业务解决方案。DevExpress WinForms能完美构建流畅、美观且易于使用的应用程序,无论是Office风格的界面,还是分析处理大批量的业务数据,它都能轻松胜…...

Linux的MySQL头文件和找不到头文件问题解决
头文件 #include <iostream> #include <mysql_driver.h> #include <mysql_connection.h> #include <cppconn/statement.h> #include <cppconn/resultset.h> #include <cppconn/prepared_statement.h> #include <cppconn/exception.h&g…...

wps excel将表格输出pdf时所有列在一张纸上
记录:wps excel将表格输出pdf时所有列在一张纸上 1,调整缩放比例,或选择将所有列打印在一页 2,将表格的所有铺满到这套虚线...

zabbix7.2最新版本 nginx自定义监控(三) 设置触发器
安装zabbix-get服务 在zabbix-server端口安装zabbix-get服务 [rootlocalhost ~]# dnf install -y zabbix-get Last metadata expiration check: 1:55:49 ago on Wed 14 May 2025 09:24:49 AM CST. Dependencies resolved. Package Architectur…...
CDN加速对云手机延迟的影响
一、CDN加速对云手机延迟的核心作用 缩短物理距离,降低网络延迟 CDN通过全球分布的节点,将云手机的服务内容(如应用数据、画面流)缓存至离用户最近的服务器,减少数据传输的物理距离。例如,用户在中国访问美…...
为什么 Docker 建议关闭 Swap
在使用 Docker 时,关闭系统 Swap(交换分区) 是一个常见的推荐做法,尤其是在生产环境中。虽然 Docker 不强制要求禁用 Swap,但出于性能、稳定性、可控性和资源管理的目的,通常建议这样做。 为什么 Docker 建…...

缓存的相关内容
缓存是一种介于数据永久存储介质与数据应用之间数据临时的存储介质 实用化保存可以有效地减少低俗数据读取的次数 (例如磁盘IO), 提高系统性能 缓存不仅可以用于提高永久性存储介质的数据读取效率,还可以提供临时的数据存储空间 spring boot中提供了缓存技术, 方便…...

[ctfshow web入门] web77
信息收集 上一题的读取flag方式不能用了,使用后的回显是:could not find driver 解题 同样的查目录方法 cvar_export(scandir("glob:///*"));die();cforeach(new DirectoryIterator("glob:///*") as $a){echo($a->__toString…...

C++学习-入门到精通-【7】类的深入剖析
C学习-入门到精通-【7】类的深入剖析 类的深入剖析 C学习-入门到精通-【7】类的深入剖析一、Time类的实例研究二、组成和继承三、类的作用域和类成员的访问类作用域和块作用域圆点成员选择运算符(.)和箭头成员选择运算符(->)访问函数和工具函数 四、具有默认实参的构造函数重…...
API 加速方案:如何使用 Redis 与 Memcached 进行高效缓存优化
API 加速方案:如何使用 Redis 与 Memcached 进行高效缓存优化 1. 引言 在现代 Web 开发中,API 响应速度至关重要。用户期望实时访问数据,而后端服务可能受到数据库查询、计算开销或网络传输的限制。这时候,缓存技术可以有效减少 API 延迟,提升系统性能。 本篇文章将深入…...

主成分分析的应用之sklearn.decomposition模块的PCA函数
主成分分析的应用之sklearn.decomposition模块的PCA函数 一、模型建立整体步骤 二、数据 2297.86 589.62 474.74 164.19 290.91 626.21 295.20 199.03 2262.19 571.69 461.25 185.90 337.83 604.78 354.66 198.96 2303.29 589.99 516.21 236.55 403.92 730.05 438.41 225.80 …...

1. Go 语言环境安装
👑 博主简介:高级开发工程师 👣 出没地点:北京 💊 人生目标:自由 ——————————————————————————————————————————— 版权声明:本文为原创文章…...

IP协议深度解析:互联网世界的核心基石
作为互联网通信的基础协议,IP(Internet Protocol)承载着全球99%的网络数据流量。本文将深入剖析IP协议的核心特性、工作原理及演进历程,通过技术原理、协议对比和实战案例分析,为您揭示这个数字世界"隐形交通规则…...

Oracle DBMS_STATS.GATHER_DATABASE_STATS 默认行为
Oracle DBMS_STATS.GATHER_DATABASE_STATS 默认行为 DBMS_STATS.GATHER_DATABASE_STATS的默认选项究竟是’GATHER’还是’GATHER AUTO’?这个问题非常重要,因为理解默认行为直接影响统计信息收集策略。 一 官方文档确认 根据Oracle 19c官方文档&#…...

C++天空之城的树 全国信息素养大赛复赛决赛 C++小学/初中组 算法创意实践挑战赛 内部集训模拟题详细解析
C++天空之城的树 全国青少年信息素养大赛 C++复赛/决赛模拟练习题 博主推荐 所有考级比赛学习相关资料合集【推荐收藏】1、C++专栏 电子学会C++一级历年真题解析...
HTTP 请求走私(HTTP Request Smuggling)
HTTP 请求走私(HTTP Request Smuggling)是一种通过利用前端代理(如负载均衡器、CDN)和后端服务器在 解析 HTTP 请求时存在不一致性 的漏洞,从而实现 注入恶意请求 的攻击技术。 一、基本原理 HTTP 请求走私主要依赖两…...
基于WebRTC的实时语音对话系统:从语音识别到AI回复
基于WebRTC的实时语音对话系统:从语音识别到AI回复 在当今数字化时代,实时语音交互已成为人机界面的重要组成部分。本文将深入探讨一个基于WebRTC技术的实时语音对话系统,该系统集成了语音识别(ASR)、大语言模型(LLM)和语音合成(TTS)技术&am…...
typeof运算符和深拷贝
typeof运算符 识别所有值类型识别函数判断是否是引用类型(不可再细分) //判断所有值类型 let a; typeof a //undefined const strabc; typeof str //string const n100; typeof n //number const …...
.Net HttpClient 使用 Cookie
在 HttpClient 中使用 Cookie Cookie 是服务器存储在客户端的小型数据片段,可用于身份验证、会话跟踪等。 .Net HttpClient 支持 Cookie 功能,本教程详细介绍了Cookie 的管理与使用。 初始化 #!import "./Ini.ipynb"什么是 Cookie Cookie …...
Python爬虫实战:通过PyExecJS库实现逆向解密
1. 核心定义 1.1 PyExecJS 库 PyExecJS 是 Python 的第三方库,通过调用 JavaScript 运行时环境(如 Node.js、PhantomJS),实现 Python 与 JavaScript 的无缝交互。其核心功能包括: JavaScript 代码编译与执行跨语言函数调用与数据传递多引擎支持与自动环境检测1.2 字段加…...
Java中的伪共享(False Sharing):隐藏的性能杀手与高并发优化实战
引言 在高性能Java应用中,开发者通常会关注锁竞争、GC频率等显性问题,但一个更隐蔽的陷阱——伪共享(False Sharing)——却可能让精心设计的并发代码性能骤降50%以上。伪共享是由CPU缓存架构引发的底层问题,常见于多…...

GO语言语法---switch语句
文章目录 基本语法1. 特点1.1 不需要break1.2 表达式可以是任何类型1.3 省略比较表达式1.4 多值匹配1.5 类型switch1.6 case穿透1.7 switch后直接声明变量1.7.1 基本语法1.7.2 带比较表达式1.7.3 不带比较表达式1.7.4 结合类型判断 1.8 switch后的表达式必须与case语句中的表达…...

开疆智能Profient转ModbusTCP网关连接ABB机器人MODBUS TCP通讯案例
本案例是通过开疆智能Profinet转ModbusTCP网关将西门子PLC与ABB机器人进行通讯 因西门子PLC采用Profinet协议,而ABB机器人采用的是ModbusTCP通讯。故采取此种方案。 配置过程: 1.MODBUS/TCP基于以太网,故ABB机器人在使用时需要有616-1PCIN…...

解决qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed
可以参考:解决qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed-CSDN博客 讲的是程序执行目录下可能缺少了: libssl-1_1-x64.dll 和 libcrypto-1_1-x64.dll 库文件,将其复制到可执行文件exe的同级目录下即可…...
【洛谷P3386】二分图最大匹配之Kuhn算法/匈牙利算法:直观理解
题目:洛谷P3386 【模板】二分图最大匹配 🥕 匈牙利算法本来是针对带权图最大匹配的,这里由于题目只是求最大匹配的边数,所以我们也只考虑无权的情况。 🚀 本文旨在服务于看了别的关于匈牙利算法的文章但不甚理解的童…...

Text2SQL:自助式数据报表开发---0517
Text2SQL技术 早期阶段:依赖于人工编写的规则模板来匹配自然语言和SQL语句之间的对应关系 机器学习阶段:采用序列到序列模型等机器学习方法来学习自然语言与SQL之间的关系 LLM阶段:借助LLM强大的语言理解和代码生成能力,利用提示…...

使用Visual Studio将C#程序发布为.exe文件
说明 .exe 是可执行文件(Executable File)的扩展名。这类文件包含计算机可以直接运行的机器代码指令,通常由编程语言(如 C、C、C#、Python 等)编译或打包生成。可以用于执行自动化操作(执行脚本或批处理操…...
写spark程序数据计算( 数据库的计算,求和,汇总之类的)连接mysql数据库,写入计算结果
1. 添加依赖 在项目的 pom.xml(Maven)中添加以下依赖: xml <!-- Spark SQL --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.3.0…...