使用Springboot实现MQTT通信
目录
一、MQ协议
MQTT 特点
MQTT 工作原理
MQTT 主要应用场景
MQTT 配置与注意事项
二、MQTT服务器搭建
三、参考案例
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模型的轻量级消息传输协议,常用于物联网(IoT)场景中。它设计简洁、带宽占用少,非常适合资源受限的设备和网络环境。
一、MQ协议
MQTT 特点
-
轻量级协议:
- 设计简单,占用带宽少,特别适合嵌入式设备和不稳定的网络环境。
-
发布/订阅模型:
- 客户端通过主题(Topic)发布消息,订阅者通过主题接收消息,彼此不直接通信。
-
可靠性保障:
- 提供三种服务质量(QoS)等级,确保消息可靠传输:
- QoS 0:至多一次(不确认,可能丢失)。
- QoS 1:至少一次(需要确认,但可能重复)。
- QoS 2:仅一次(确保消息不丢失且不重复)。
- 提供三种服务质量(QoS)等级,确保消息可靠传输:
-
持续连接:
- 使用 TCP/IP 连接,通过心跳包(Keep-Alive)保持连接稳定。
-
支持离线消息:
- 使用“保留消息”和“持久会话”功能,实现离线设备接收消息。
-
安全性:
- 支持 SSL/TLS 加密,结合用户名和密码进行身份验证。
MQTT 工作原理
-
连接:
- 客户端通过
CONNECT消息向服务器建立连接,服务器返回CONNACK消息。
- 客户端通过
-
发布:
- 客户端通过
PUBLISH消息向服务器发布消息,指定消息的主题。
- 客户端通过
-
订阅:
- 客户端通过
SUBSCRIBE消息订阅一个或多个主题,服务器将匹配主题的消息推送给客户端。
- 客户端通过
-
心跳:
- 客户端和服务器定期发送心跳包(PINGREQ 和 PINGRESP),确保连接有效。
-
断开:
- 客户端通过
DISCONNECT消息通知服务器主动断开连接。
- 客户端通过
MQTT 主要应用场景
-
物联网(IoT):
- 设备状态监控、数据收集和远程控制。
-
智能家居:
- 控制家电、监控传感器数据。
-
车联网:
- 实时车辆数据传输、位置追踪。
-
移动应用:
- 消息推送、实时聊天。
-
工业领域:
- 设备数据采集和分析。
MQTT 配置与注意事项
-
主题命名:
- 使用层级结构(如
/iot/device/status),便于管理。 - 避免过于复杂的主题结构。
- 使用层级结构(如
-
QoS 选择:
- 根据应用需求选择适合的 QoS 等级,平衡可靠性和性能。
-
安全措施:
- 启用 SSL/TLS 加密。
- 配置用户名和密码,限制匿名连接。
- 控制主题的访问权限。
-
性能优化:
- 控制消息大小,减少带宽占用。
- 调整心跳时间,优化连接稳定性。
二、MQTT服务器搭建
1、在springboot项目工程pom文件下引入相关依赖
<!--mqtt相关依赖--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
2、修改application.yml配置文件
spring:application:name: provider#MQTT配置信息mqtt:#MQTT服务地址,端口号默认1883,如果有多个,用逗号隔开url: tcp://127.0.0.1:1883#用户名username: guest#密码password: guest#客户端id(不能重复)client:id: provider-id#MQTT默认的消息推送主题,实际可在调用接口是指定default: topic: topic
server:port: 8080
3、消息发布者客户端配置
?
package com.three.demo.mqtt.config;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import java.time.LocalDateTime;@Configuration
@Slf4j
public class MqttClientConfig {@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client.id}")private String clientId;@Autowiredprivate MqttClientCallBack mqttClientCallBack;/*** 客户端对象*/private MqttAsyncClient client;/*** 在bean初始化后连接到服务器*/@PostConstructpublic void init() {connect();}/*** 客户端连接服务端*/public void connect() {//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接服务器都是以新的身份options.setCleanSession(false);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(60);//设置心跳时间 单位为秒,表示服务器每隔 1.5*10秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);// 开启自动重连options.setAutomaticReconnect(true);// 设置最大重连时间间隔 (可选),单位是毫秒,设置为 5000 表示最多等待 5 秒再尝试重连options.setMaxReconnectDelay(5000);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);try {//创建MQTT客户端对象client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());//设置回调client.setCallback(mqttClientCallBack);// 使用异步连接client.connect(options, null, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {log.info("MQTT连接成功");}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {log.error("MQTT连接失败:" + exception.getMessage());}});} catch (MqttException e) {log.error("mqtt连接失败。。" + e.getMessage());}}public void publish(int qos, boolean retained) {MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained);mqttMessage.setPayload(pushLog.getData().getBytes());try {// 使用异步客户端发布消息,并处理结果client.publish(pushLog.getTopic(), mqttMessage, null, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {System.out.println("发送成功");}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {log.error("发送失败:" + exception.getMessage());}});} catch (MqttException e) {log.error("发送失败:" + e.getMessage());}}/*** 断开连接*/public void disConnect() {try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}
}?
4、消息发布客户端回调
package com.three.demo.mqtt.config;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {@Value("${spring.mqtt.client.id}")private String clientId;/*** 与服务器断开的回调*/@Overridepublic void connectionLost(Throwable cause) {log.error(clientId + "与服务器断开连接!!" + cause.getMessage());}/*** 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {IMqttAsyncClient client = token.getClient();System.out.println(client.getClientId()+"发布消息成功!");}}
5、创建控制器测试发布信息
package com.three.demo.mqtt.controller;import com.three.demo.mqtt.config.MqttClientConfig;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;@Controller
public class SendController {@Autowiredprivate MqttClientConfig client;@RequestMapping("/sendMessage")@ResponseBodypublic String sendMessage(int qos,boolean retained,String topic,String message){try {client.publish(qos, retained, topic, message);return "发送成功";} catch (Exception e) {e.printStackTrace();return "发送失败";}}
}
6、消息接收者配置
这里我对之前的代码进行改造
/*** 客户端连接服务端*/public void connect() {//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接服务器都是以新的身份options.setCleanSession(false);//设置连接用户名options.setUserName(username);//设置连接密码options.setPassword(password.toCharArray());//设置超时时间,单位为秒options.setConnectionTimeout(60);//设置心跳时间 单位为秒,表示服务器每隔 1.5*10秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);// 开启自动重连options.setAutomaticReconnect(true);// 设置最大重连时间间隔 (可选),单位是毫秒,设置为 5000 表示最多等待 5 秒再尝试重连options.setMaxReconnectDelay(5000);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);try {//创建MQTT客户端对象client = new MqttAsyncClient(hostUrl, clientId, new MemoryPersistence());//设置回调client.setCallback(mqttClientCallBack);// 使用异步连接client.connect(options, null, new IMqttActionListener() {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {log.info("MQTT连接成功");// 连接成功后订阅主题try {//订阅主题//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息int[] qos = {2, 2};String[] topics = {"/iot/msg/topic1","/iot/msg/topic2"};client.subscribe(topics, qos);log.info("订阅主题成功");} catch (MqttException e) {log.error("订阅主题失败:" + e.getMessage());}}@Overridepublic void onFailure(IMqttToken asyncActionToken, Throwable exception) {log.error("MQTT连接失败:" + exception.getMessage());}});} catch (MqttException e) {e.printStackTrace();log.error("mqtt连接失败。。" + e.getMessage());}}
然后在消息客户端回调类这里
package com.ruoyi.yyt.mqtt.config;import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;@Slf4j
@Component
public class MqttClientCallBack implements MqttCallback {@Value("${spring.mqtt.client.id}")private String clientId;/*** 客户端断开连接的回调*/@Overridepublic void connectionLost(Throwable throwable) {log.error(clientId + "与服务器断开连接!!" + cause.getMessage());}/*** 消息到达的回调*/@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(String.format("接收消息主题 : %s",topic));System.out.println(String.format("接收消息Qos : %d",message.getQos()));System.out.println(String.format("接收消息内容 : %s",new String(message.getPayload())));System.out.println(String.format("接收消息retained : %b",message.isRetained()));}/*** 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {IMqttAsyncClient client = token.getClient();System.out.println(client.getClientId() + "发布消息成功!");}}
这个时候我们启动服务,调用测试接口

就可以看到接口返回发布成功,并且能看到后台服务的打印日志了

至此大功告成了!
三、参考案例
参考
相关文章:
使用Springboot实现MQTT通信
目录 一、MQ协议 MQTT 特点 MQTT 工作原理 MQTT 主要应用场景 MQTT 配置与注意事项 二、MQTT服务器搭建 三、参考案例 MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅模型的轻量级消息传输协议,常用于物联网ÿ…...
植物大战僵尸融合版(电脑/安卓)
《植物大战僵尸融合版》是一款由B站UP主“蓝飘飘fly”制作的同人策略塔防游戏,基于经典《植物大战僵尸》玩法,加入了独特的植物融合系统。 出于方便,软件是便携版,解压后双击即可畅玩。 游戏主页依旧是植物大战僵尸经典界面。右下…...
02DevOps基础环境准备
准备两台Linux的操作系统,最简单的方式就是在本机上使用虚拟机搭建两个操作系统(实际生产环境是两台服务器,虚拟机的方式用于学习使用) 我搭建的两台服务器的ip分别是192.168.1.10、192.168.1.11 192.168.1.10服务器用于安装doc…...
苍穹外卖-day12(工作台、数据导出)
工作台Apache POI导出运营数据Excel报表 功能实现:工作台、数据导出 工作台效果图: 数据导出效果图: 在数据统计页面点击数据导出:生成Excel报表 1. 工作台 1.1 需求分析和设计 1.1.1 产品原型 工作台是系统运营的数据看板&…...
说一下 Tcp 粘包是怎么产生的?
TCP 粘包是什么? TCP 粘包(TCP Packet Merging) 是指多个小的数据包在 TCP 传输过程中被合并在一起,接收方读取时无法正确分辨数据边界,导致数据解析错误。 TCP 是流式协议,没有数据包的概念,…...
详解享元模式
引言 在计算机中,内存是非常宝贵的资源,而程序中可能会有大量相似或相同的对象,它们的存在浪费了许多空间。而享元模式通过共享这些对象,从而解决这种问题的。 1.概念 享元模式(Flyweight Pattern):运用共享技术有效地…...
第18章 不可变对象设计模式(Java高并发编程详解:多线程与系统设计)
1.线程安全 所谓共享的资源,是指在多个线程同时对其进行访问的情况下,各线程都会使其发生变化,而线程安全性的主要目的就在于在受控的并发访问中防止数据发生变化。除了使用synchronized关键字同步对资源的写操作之外, 还可以在线…...
openEuler22.03LTS系统升级docker至26.1.4以支持启用ip6tables功能
本文记录了openEuler22.03LTS将docker升级由18.09.0升级至26.1.4的过程(当前docker最新版本为27.5.1,生产环境为保障稳定性,选择升级到上一个大版本26的最新小版本)。 一、现有环境 1、系统版本 [rootlocalhost opt]# cat /etc…...
< OS 有关 > Ubuntu 版本升级 实践 24.04 -> 24.10, 安装 .NET
原因: 想安装 .NET 9 去编译 GitHut 项目,这回用不熟悉的 Ubuntu来做,不知道怎么拐去给 Ubuntu 升级,看到现在版本是 24.10 但不是 LTS 版本,记录下升级过程。 一、实践过程: 1. 查看当前版本 命令1: l…...
某咨询大数据解决方案介绍(32页PPT)
本文档介绍了一个大数据平台解决方案,旨在解决企业当前面临的数据问题,包括数据定义缺失、重复采集和存储、数据不完整以及缺乏可靠决策依据等。通过引入大数据技术,该方案强调从被动的IT支撑向主动的数据核心服务转型,以实现科学…...
ZooKeeper作为注册中心有什么问题? ZooKeeper作为注册中心,海量服务同时重启有什么问题?
目录 ZooKeeper作为注册中心存在的问题 性能瓶颈 一致性保证 复杂性 扩展性 单点故障 数据模型限制 社区和生态 安全性 总结 ZooKeeper作为注册中心,海量服务同时重启有的问题 1. ZooKeeper集群压力剧增 2. ZooKeeper Leader节点压力 3. 会话和临时节点管理 4.…...
matlab simulink 汽车四分之一模型主动被动悬架-LQR
1、内容简介 略 matlab simulink 可以交流、咨询、答疑 124- 2、内容说明 略汽车悬架系统由弹性元件、导向元件和减振器组成,是车身与车轴之间连接的所有组合体零件的总称,也是车架(或承载式车身)与车桥(或车轮)之间一切力传递装置的总称,其主要功能是使车轮与地面有很好的…...
从零开始:OpenCV 图像处理快速入门教程
文章大纲 第1章 OpenCV 概述 1.1 OpenCV的模块与功能 1.2 OpenCV的发展 1.3 OpenCV的应用 第2章 基本数据类型 2.1 cv::Vec类 2.2 cv::Point类 2.3 cv::Rng类 2.4 cv::Size类 2.5 cv:&…...
基础相对薄弱怎么考研
复习总体规划 明确目标 选择专业和院校:根据你的兴趣、职业规划和自身实力,选择适合自己的专业和院校。可以参考往年的分数线、报录比、复试难度等。了解考试科目:不同专业考试科目不同,一般包括: 公共课:…...
强化学习笔记6——异同策略、AC、等其他模型总结
异步两种方法:1:经验回放 2:数据动作非同时产生 举例QLearning为什么是异策略? 生成动作时e的概率从Q表选,1-e概况随机。 更新策略时,贪心策略选择Q_max作为动作。 策略优化两种主要方法:基于梯…...
Linux提权--passwd提权
passwd 命令用于更改用户密码。在 Linux 系统中,普通用户可以通过 passwd 更改自己的密码,但如果攻击者能够以某种方式执行 passwd 命令更改 root 用户的密码,他们就能获取 root 权限。 1.常见的 passwd 提权方法 SUID 设置࿱…...
一、本地部署安装 DeepSeek 并训练本地知识库,并调用对话框进行问答
本地部署安装 DeepSeek 1、硬件环境 操作系统:Windows10 内存:16G 显卡:NIVIDIA GeForce RTX 2060 6G 2、安装步骤 (1)安装 Ollama 访问Ollama 官网,点击 “Download for Windows” 下载安装程序。下载…...
海思的一站式集成环境Hispark Studio更新了
HiSpark Studio是海思提供的面向智能设备开发者提供一站式集成开发环境,支持代码编辑、编译、烧录和调试等功能。我以前在评测星闪芯片的时候用过,当时写了篇博客:【星闪开发连载】WS63E开发板Windows环境的构建_hispark studio-CSDN博客。那…...
从零开始构建强大 AI 对话系统:ollama + deepseek + open-webui 完整部署教程(Docker 版)
文章目录 前言一、工具简介二、前期准备三、部署步骤1. 安装并配置 ollama2. 部署 open-webui 四、调试与验证五、Docker Compose 简化部署六、注意事项与常见问题1. ollama run 500 报错2. 硬件配置对性能的影响3. **ollama** 启动与 **open-webui** 调用速度差异4. 内存不足导…...
unity学习29:摄像机camera相关skybox 和 Render Texture测试效果
目录 1 摄像机 1.1 每个Scene里都自带一个摄像机 camera 1.2 可以创建多个camera 1.3 下面先看backgroundtype: 2 backgroundtype: 天空盒 skybox 2.1 清除标志,清除:天空盒 自选天空盒 2.2 window /Asset Store 2.3 导入skybox 3 backgroundtype: 纯色…...
【Elasticsearch】Geo-distance聚合
geo_distance聚合的形状是圆形。它基于一个中心点(origin)和一系列距离范围来计算每个文档与中心点的距离,并将文档分配到相应的距离范围内。这种聚合方式本质上是以中心点为圆心,以指定的距离范围为半径的圆形区域来划分数据。 为…...
音频进阶学习十二——Z变换
文章目录 前言一、Z变换1.Z变换的作用2.Z变换公式3.Z的状态表示1) r 1 r1 r12) 0 < r < 1 0<r<1 0<r<13) r > 1 r>1 r>1 4.关于Z的解释 二、收敛域1.收敛域的定义2.收敛域的表示方式3.ROC的分析1)当 …...
easyxor
easyxor 一、查壳 无壳,64位 二、IDA分析 1.main 2.查看key与r(shifee提取) 三、脚本 r [0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, …...
通过多层混合MTL结构提升股票市场预测的准确性,R²最高为0.98
“Boosting the Accuracy of Stock Market Prediction via Multi-Layer Hybrid MTL Structure” 论文地址:https://arxiv.org/pdf/2501.09760 摘要 本研究引入了一种创新的多层次混合多任务学习架构,致力于提升股市预测的效能。此架构融…...
日本游戏机市场5年来首次陷入萎缩;特斯拉招人推进人形机器人量产;任天堂专利显示Switch2手柄可用作鼠标...| 游戏智眼日报
美团成立“算法顾问委员会” 美团宣布,近日,由外部专家学者组成的算法顾问委员会成立,为美团改进算法提供常态化咨询和指导。每个季度美团将举办算法恳谈会,持续邀请骑手、商家、用户、专家学者和媒体代表等共同参加。美团表示&a…...
114-机器学习分类算法
1、内容简介 略 matlab simulink 114-机器学习分类算法可以交流、咨询、答疑 2、内容说明 略 Elong_6.24。ROCAUC confusion newdata Unbalanced_LR.car 3、仿真分析 略 4、参考论文 略...
【论文阅读】On the Security of “VOSA“
On the Security of Verifiable and Oblivious Secure Aggregation for Privacy-Preserving Federated Learning -- 关于隐私保护联邦中可验证与遗忘的安全聚合的安全性 论文来源摘要Introduction回顾 VOSA 方案对VOSA不可伪造性的攻击对于类型 I 的攻击对于类型 II 的攻击 论文…...
12.6 LangChain检索器(Retrievers)全解析:构建高效RAG应用的核心引擎
LangChain检索器(Retrievers)全解析:构建高效RAG应用的核心引擎 一、检索器的核心价值 检索器是大模型应用的智能导航系统,通过将用户查询与知识库精准匹配,解决了传统搜索的三大痛点: 语义鸿沟:突破关键词匹配,理解用户真实意图多源整合:融合向量搜索、关键词搜索和…...
707设计链表(链表操作)
1、题目描述 你可以选择使用单链表或者双链表,设计并实现自己的链表。 单链表中的节点应该具备两个属性:val 和 next 。val 是当前节点的值,next 是指向下一个节点的指针/引用。 如果是双向链表,则还需要属性 prev 以指示链表中…...
储能系统-系统架构
已更新系列文章包括104、61850、modbus 、单片机等,欢迎关注 IEC61850实现方案和测试-1-CSDN博客 快速了解104协议-CSDN博客 104调试工具2_104协议调试工具-CSDN博客 1 电池储能系统(BESS) 架构 电池储能系统主要包括、电池、pcs、本地控制…...
