当前位置: 首页 > news >正文

SpringBoot中使用MQTT实现消息的订阅和发布

SpringBoot中使用MQTT实现消息的订阅和发布

背景 java框架SpringBoot通过mQTT通信 控制物联网设备

还是直接上代码
第一步依赖:

      <!--mqtt相关依赖--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.14</version></dependency>

第二步配置文件

#mqtt
mqtt:mqttUrl: tcp://127.0.0.1mqttPort: 1883mqttUsername: adminmqttPassword: publicmqttClientId: aaa# MQTT回调类型 按一个MQTT服务区分
# 如果MQTT服务端换了 回调处理的是新的业务需求  就把这个换了
#  然后在MQTT配置文件中扩展新的回调类
mqttTypeCallback: breakerCallback  

第三步 config类

package com.xxx.iotjava.mqtt.config;import com.xxx.iotjava.mqtt.callback.BreakerCallback;import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;/*** User:Json* Date: 2024/6/17**/
@Configuration
@Slf4j
public class MqttConfig {@Value("${mqtt.mqttUsername}")private String mqttUsername;@Value("${mqtt.mqttPassword}")private String mqttPassword;@Value("${mqtt.mqttUrl}")private String mqttUrl;@Value("${mqtt.mqttPort}")private Integer mqttPort;@Value("${mqtt.mqttClientId}")private String mqttClientId;@Value("${mqttTypeCallback}")private String mqttTypeCallback;private static String breakerCallback = "breakerCallback";/*** 客户端对象*/private MqttClient client;/*** 客户端连接服务端* 目前只支持一个 MQTT服务端 如果后续一个项目有多个MQTT服务端那就设计成工厂模式*/public boolean connect() {if (isMqtt()){return false;}try {//new MemoryPersistence() 使用内存持久化// 优点:不会在文件系统中创建任何文件(如 .lck 文件),适合对会话持久性没有要求的场景。// 缺点:  客户端断开连接或重启后,会话数据会丢失,无法保留订阅信息和未发送的消息// String persistenceDirectory = "/path/to/your/mqtt/persistence";//new MqttDefaultFilePersistence(persistenceDirectory) 使用文件持久化//如果persistenceDirectory 不写 他默认创建 根目录 linux要给权限// 优点: 客户端断开连接或重启后,能够保留订阅信息和未发送的消息。这对于需要保持会话状态的应用非常重要// 缺点 会在指定的目录中创建文件(如 .lck 文件),需要确保指定的目录是有效的,并且应用有权限访问该目录//创建MQTT客户端对象client = new MqttClient(mqttUrl + ":" + mqttPort, mqttClientId,new MemoryPersistence());//连接设置MqttConnectOptions options = new MqttConnectOptions();//是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息//设置为true表示每次连接服务器都是以新的身份//如果他为true 会出现一个问题 //当网络断开后,客户端会进行重连,但是重连之前订阅的主题就失效了,不再接受之前订阅主题的消息。//因为配置里将cleanSession 设为 true ,当客户端掉线时 ,//服务器端会清除 客户端 session 。 重连后 客户端会有一个新的session。 // 所以如果大家把他为true 重新连接mqtt后,要注意需要手动再订阅一下主题// 推荐文档:https://www.cnblogs.com/A-yes/p/9894144.htmloptions.setCleanSession(true);//设置连接用户名options.setUserName(mqttUsername);//设置连接密码options.setPassword(mqttPassword.toCharArray());options.setAutomaticReconnect(true);  // 启用自动重连//设置超时时间,单位为秒  如果在指定的时间内未能建立连接,客户端会放弃连接尝试并抛出异常。options.setConnectionTimeout(100);//设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线options.setKeepAliveInterval(20);//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息//  options.setWill("willTopic",(mqttClientId + ":与服务器断开连接").getBytes(),0,false);if (StringUtils.isEmpty(mqttTypeCallback)) {log.error("MQTT回调类型为空,请去java_config配置文件配置!");}//设置回调if (breakerCallback.equals(mqttTypeCallback)) {//断路器回调client.setCallback(new BreakerCallback());}client.connect(options);return true;} catch (MqttException e) {log.error("MQTT启动报错:" + e.getMessage());e.printStackTrace();return false;}}/*** qos* 0  最多一次传递【适用于对消息丢失不敏感的场景,如传感器数据频繁发送,可以接受偶尔的数据丢失】* 1 至少一次传递  【消息至少传递一次,但可能会重复(即重复消息)】* 2 仅一次传递 【消息确保仅传递一次,既不会丢失也不会重复。】* retained* 保留消息:如果 retained 参数设置为 true,消息会被代理保留。代理将记住这个消息,并在新客户端订阅该主题时立即发送这个消息。* 非保留消息:如果 retained 参数设置为 false,消息不会被保留,只会发送给当前在线并订阅该主题的客户端。* topic    主题* message  内容*/public void publish(int qos, boolean retained, String topic, String message) {log.info("topic为:【"+topic+"】,qos为:【"+qos+"】 mqtt 发布数据为:"+message);MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setRetained(retained); //代理将记住这个消息,并在新客户端订阅该主题时立即发送这个消息。mqttMessage.setPayload(message.getBytes());//主题的目的地,用于发布信息MqttTopic mqttTopic = client.getTopic(topic);MqttDeliveryToken token;try {//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态token = mqttTopic.publish(mqttMessage);//token.waitForCompletion(); // 等待完成 会堵塞} catch (MqttException e) {log.warn("ClientId【" + mqttClientId + "】发布失败!主题【" + topic + "】,发布数据为:" + message);e.printStackTrace();}}/*** 断开连接*/public void disConnect() {try {client.disconnect();} catch (MqttException e) {e.printStackTrace();}}/****  手动连接*  可用于断线后 手动重连* ***/public boolean againConnect() {try {if (client != null && !client.isConnected()) {client.connect();}return true;} catch (MqttException e) {e.printStackTrace();return false;}}//验证是否启动mqtt连接private boolean isMqtt(){if (StringUtils.isEmpty(mqttUrl) || StringUtils.isEmpty(mqttPort)|| StringUtils.isEmpty(mqttUsername) || StringUtils.isEmpty(mqttPassword)|| StringUtils.isEmpty(mqttClientId)) {log.info("==========mqtt 参数不全,无需启动MQTT连接==================");return true;}return false;}/*** 订阅指定主题* @param topic 订阅的主题* @param qos   订阅的服务质量*/public boolean subscribe(String topic, int qos) {if (isMqtt()){return false;}try {if (client != null && client.isConnected()) {client.subscribe(topic, qos);log.info("订阅主题 {} 成功!", topic);} else {log.error("MQTT客户端尚未连接,无法订阅主题 {}!", topic);}return true;} catch (MqttException e) {log.error("订阅主题 {} 失败:{}", topic, e.getMessage());e.printStackTrace();return false;}}/*** 批量订阅主题*  消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息* @param topic 订阅的主题集合* @param qos   订阅的服务质量集合*/public boolean subscribe(String[] topic, int[] qos) {if (isMqtt()){return false;}try {if (client != null && client.isConnected()) {client.subscribe(topic, qos);log.info("订阅主题 {} 成功!", topic);} else {log.error("MQTT客户端尚未连接,无法订阅主题 {}!", topic);}return true;} catch (MqttException e) {log.error("订阅主题 {} 失败:{}", topic, e.getMessage());e.printStackTrace();return false;}}
}

第四步 回调类

package com.xxx.iotjava.mqtt.callback;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xxx.iotjava.entities.BreakerData;
import com.xxx.iotjava.enums.breaker.BreakerKeywordsEnum;
import com.xxx.iotjava.enums.breaker.BreakerKeywordsValueEnum;
import com.xxx.iotjava.enums.breaker.BreakerOperationEnum;
import com.xxx.iotjava.service.inteface.IBreakerDataService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;import com.alibaba.fastjson.JSONArray;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;/*** User:Json* Date: 2024/6/17**/
@Component
@Slf4j
public class BreakerCallback implements MqttCallback {@AutowiredIBreakerDataService iBreakerDataService;/*** 与服务器断开的回调 *  这里可以做手动连接 但是配置config类 配置了 自动检测异常 true 这里可以也不做*      options.setAutomaticReconnect(true);  // 启用自动重连*/@Overridepublic void connectionLost(Throwable throwable) {log.error("MQTT连接有异常:" + throwable.getMessage());}/*** 订阅的回调* 消息到达的回调* 注意 如果这个回调方法 如果有异常 报错 ,mqtt会重新连接 * 因为配置文件 设置了  options.setAutomaticReconnect(true);  // 启用自动重连* 如果自动重连了 如果是开启新的会话 以前的订阅会消失  具体操作 再上面的配置文件类说明过了*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//        System.out.println("上报时间:"+ LocalDateTime.now());
//        System.out.println(String.format("接收消息主题 : %s",topic));
//        System.out.println(String.format("接收消息Qos : %d",mqttMessage.getQos()));
//        System.out.println(String.format("接收消息内容 : %s",new String(mqttMessage.getPayload())));
//        System.out.println(String.format("接收消息retained : %b",mqttMessage.isRetained()));}/*** 发布的回调* 消息发布成功的回调*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {IMqttAsyncClient client = token.getClient();log.info(client.getClientId() + "发布消息成功!");}}

第五步 mqtt工具类

package com.xxx.iotjava.utils;import com.xxx.init.utils.AppContextUtil;import com.xxx.iotjava.enums.breaker.BreakerOperationTopicEnum;
import com.xxx.iotjava.mqtt.config.MqttConfig;import lombok.extern.slf4j.Slf4j;/*** User:Json* Date: 2024/6/17**/
@Slf4j
public class MqttUtils {private static MqttConfig mqttConfig;public static MqttConfig getMqttConfig() {if (mqttConfig == null)mqttConfig = AppContextUtil.getBean(MqttConfig.class);return mqttConfig;}//初始化 订阅public  static boolean subscribeInit(){return getMqttConfig().subscribe(BreakerOperationTopicEnum.REPORTING_API.getTopic(), 0);}/*** 发送消息* qos 0 最多一次传递  1 至少一次传递  2 仅一次传递* retained  true 保留消息  false 非保留消息* topic    主题* message  内容*/public static boolean sendMqttMsg(int qos, boolean retained, String topic, String message) {try {getMqttConfig().publish(qos, retained, topic, message);return true;} catch (Exception e) {e.printStackTrace();log.error("MQtt发送消息报错:" + e.getMessage());return false;}}/** topic 主题* message 内容* */public static boolean sendMqttMsg(String topic, String message) {return sendMqttMsg(1, false, topic, message);}}

第六步 调用测试
发布
MqttUtils.sendMqttMsg(topic, data)
//订阅 我做的是启动的时候 初始化订阅 所以 直接根据定义的 topic 常量进行初始化订阅
//BreakerOperationTopicEnum.REPORTING_API.getTopic() 我 定义的topic 枚举类 常量
// 这里就不分享了
MqttUtils.subscribeInit();

完成

相关文章:

SpringBoot中使用MQTT实现消息的订阅和发布

SpringBoot中使用MQTT实现消息的订阅和发布 背景 java框架SpringBoot通过mQTT通信 控制物联网设备 还是直接上代码 第一步依赖&#xff1a; <!--mqtt相关依赖--><dependency><groupId>org.springframework.integration</groupId><artifactId>s…...

等保测评练习10

等级保护初级测评师试题10 姓名&#xff1a; 成绩&#xff1a; 判断题&#xff08;10110分&#xff09; 1.等级保护2.0三级系统测评合格最低分为60分&#xff08;&#xff09; 70分且不能有高风险 2.当远程管理云计算平台中设备是…...

VBA学习(16):工作表事件示例:输入数据后锁定单元格

在工作表单元格中输入数据后&#xff0c;该单元格就被锁定&#xff0c;不能再编辑。 打开VBE&#xff0c;在工程资源管理器中双击该工作表名称打开其代码模块&#xff0c;在其中输入下面的代码&#xff1a; 假设整个工作表的LockedFalse Private Sub Worksheet_Change(ByVal …...

mysql学习——SQL中的DDL和DML

SQL中的DDL和DML DDL数据库操作&#xff1a;表操作 DML添加数据修改数据删除数据 学习黑马MySQL课程&#xff0c;记录笔记&#xff0c;用于复习。 DDL DDL&#xff1a;Data Definition Language&#xff0c;数据定义语言&#xff0c;用来定义数据库对象(数据库&#xff0c;表&…...

什么是多态?一文彻底搞懂!

什么是多态 面向对象程序设计有三要素&#xff1a;封装、继承&#xff08;或组合&#xff09;、多态&#xff0c;前两者较好理解&#xff0c;多态总让人困惑&#xff0c;不知道具体有什么作用&#xff0c;更不知道为什么要用多态。今天就来详细分析下什么是多态&#xff0c;以…...

CST电磁仿真软件的参数类型和含义【电磁仿真入门教程】

如果你是一位工程师或设计师&#xff0c;那你对电磁仿真软件CST Studio Suite一定不会感到陌生。CST软件可以帮助你模拟电磁场和电路行为&#xff0c;从而优化产品设计。本文将带你了解CST电磁仿真软件的一些关键参数&#xff0c;并解释其含义。CST电磁仿真软件的参数是指在使用…...

华为HCIA综合实验(结合前几期所有内容)

第一章 实验目的 &#xff08;1&#xff09;配置Telnet&#xff0c;要求所有网络设备支持远程管理&#xff0c;密码为admin&#xff08;2&#xff09;配置Trunk&#xff0c;交换机之间的链路均为Trunk模式&#xff08;3&#xff09;配置VLAN&#xff0c;在SW2和SW3上创建相关…...

git 拉下来的项目,出现“加载失败”的解决方法

现象&#xff1a; 1、对加载失败的项目&#xff0c;尝试重新加载 解决思路&#xff1a;根据上面的提示&#xff0c;打开F盘对应的 .vcxproj文件&#xff0c;查看里面关于opencv454.props的内容 先删了&#xff0c;后面再补 2、当前的工作重点是消除加载失败的情况&#xff0c;…...

sql资料库

1、distinct(关键词distinct用于返回唯一不同的值)&#xff1a;查询结果中去除重复行的关键字 select distinct(university) from user_profile select distinct university from user_profile distinct是紧跟在select后面的&#xff0c;不能在其他位置&#xff0c;不然就…...

【python入门】运算符

文章目录 算术运算符比较运算符赋值运算符逻辑运算符位运算符成员运算符身份运算符优先级 算术运算符 # 加法 print(5 3) # 输出: 8# 减法 print(5 - 3) # 输出: 2# 乘法 print(4 * 3) # 输出: 12# 除法&#xff08;结果为浮点数&#xff09; print(8.0 / 3) # 输出: 2.6…...

【C++高阶】掌握AVL树:构建与维护平衡二叉搜索树的艺术

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;STL-> map与set &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀AVL树 &#x1f4d2;1. AVL树…...

机器学习-课程整理及初步介绍

简介: 机器学习是人工智能的一个分支&#xff0c;它使计算机系统能够从经验中学习并改进其在特定任务上的表现&#xff0c;而无需进行明确的编程。机器学习涉及多种算法和统计模型&#xff0c;它们可以从数据中学习规律&#xff0c;并做出预测或决策。机器学习的应用非常广泛&…...

北斗三号短报文通信终端 | 助力户外无网络场景作业

北斗三号短报文通信终端是一款专为户外无网络场景作业设计的先进通信工具&#xff0c;它依托于中国自主研发的北斗卫星导航系统&#xff0c;为用户在偏远地区或无网络覆盖区域提供了可靠的通信保障。以下是关于北斗三号短报文通信终端的详细介绍&#xff1a; 一、功能特点 北斗…...

RERCS系统开发实战案例-Part05 FPM Application的Feeder Class搜索组件的实施

1、通过事务码 SE24对Feeder Class实施 1&#xff09;接口页签的简单说明&#xff1a; ① IF_FPM_GUIBB&#xff1a;通用UI构建块&#xff0c;整个UIBB模块的基础接口&#xff1b; ② IF_FPM_GUIBB_SEARCH&#xff1a;通用搜索UI构建块&#xff0c;搜索组件UIBB的基础接口&…...

算法常见手写代码

1.NMS def py_cpu_nms(dets, thresh):"""Pure Python NMS baseline."""#x1、y1、x2、y2、以及score赋值x1 dets[:, 0]y1 dets[:, 1]x2 dets[:, 2]y2 dets[:, 3]scores dets[:, 4]#每一个检测框的面积areas (x2 - x1 1) * (y2 - y1 1)#按…...

数据结构9——排序

一、冒泡排序 冒泡排序&#xff08;Bubble Sort&#xff09;&#xff0c;顾名思义&#xff0c;就是指越小的元素会经由交换慢慢“浮”到数列的顶端。 算法原理 从左到右&#xff0c;依次比较相邻的元素大小&#xff0c;更大的元素交换到右边&#xff1b;从第一组相邻元素比较…...

分布式锁实现方案-基于Redis实现的分布式锁

目录 一、基于Lua看门狗实现 1.1 缓存实体 1.2 延迟队列存储实体 1.3 分布式锁RedisDistributedLockWithDog 1.4 看门狗线程续期 1.5 测试类 1.6 测试结果 1.7 总结 二、RedLock分布式锁 2.1 Redlock分布式锁简介 2.2 RedLock测试例子 2.3 RedLock 加锁核心源码分析…...

MTK7628+MT7612 加PA定频数据

1、硬件型号TR726A5G121-DPA PC9.02.0017。如下所示&#xff1a; 2、WIFI5.8 AC模式 42&#xff08;5120MHz&#xff09;信道&#xff0c;80带宽 3、WIFI5.8 AC模式 38&#xff08;5190MHz&#xff09;信道&#xff0c;40带宽 4、WIFI5.8 AC模式 36&#xff08;5180 MHz&…...

[信号与系统]关于双线性变换

前言 本文还是前置知识 双线性变换法 双线性变换法&#xff08;Bilinear Transform&#xff09;是一种用于将模拟滤波器转换为数字滤波器的方法。它通过将模拟域中的s平面上的传递函数映射到数字域中的z平面上的传递函数来实现这一转换。双线性变换法保证了频率响应在转换过…...

763. 划分字母区间

题目&#xff1a;给你一个字符串 s 。我们要把这个字符串划分为尽可能多的片段&#xff0c;同一字母最多出现在一个片段中。注意&#xff0c;划分结果需要满足&#xff1a;将所有划分结果按顺序连接&#xff0c;得到的字符串仍然是 s 。返回一个表示每个字符串片段的长度的列表…...

ElevenLabs泰米尔文语音API调用性能突降?紧急修复方案:更换Region为ap-southeast-1后P95延迟从2.4s降至380ms(附curl压测脚本)

更多请点击&#xff1a; https://intelliparadigm.com 第一章&#xff1a;ElevenLabs泰米尔文语音API性能突降事件全貌 2024年9月中旬起&#xff0c;多位集成ElevenLabs泰米尔文&#xff08;ta-IN&#xff09;语音合成服务的开发者报告异常延迟与高失败率——典型请求响应时间…...

82、【Agent】【OpenCode】bash 工具提示词(amend 风险)

【声明】本博客所有内容均为个人业余时间创作&#xff0c;所述技术案例均来自公开开源项目&#xff08;如Github&#xff0c;Apache基金会&#xff09;&#xff0c;不涉及任何企业机密或未公开技术&#xff0c;如有侵权请联系删除 背景 上篇 blog 【Agent】【OpenCode】bash 工…...

基于eNSP的园区网络高可用与安全隔离综合实验

1. 实验背景与核心价值 园区网络作为企业数字化转型的基础设施&#xff0c;其稳定性和安全性直接关系到日常运营效率。记得去年参与某金融机构网络改造项目时&#xff0c;他们的核心业务系统因为单点故障导致全网瘫痪4小时&#xff0c;直接损失超过百万。这个案例让我深刻认识到…...

终极Citra 3DS模拟器完整指南:在电脑上免费畅玩任天堂3DS游戏

终极Citra 3DS模拟器完整指南&#xff1a;在电脑上免费畅玩任天堂3DS游戏 【免费下载链接】citra A Nintendo 3DS Emulator 项目地址: https://gitcode.com/GitHub_Trending/ci/citra 想要在电脑上重温《精灵宝可梦》系列、《塞尔达传说》等经典3DS游戏吗&#xff1f;Ci…...

别再只会调PWM占空比了!用STM32F103实现直流电机精准调速,从硬件选型到PID参数整定全流程复盘

从PWM到PID&#xff1a;STM32F103直流电机精准调速实战指南 第一次用STM32驱动直流电机时&#xff0c;我天真地以为只要会调PWM占空比就能搞定一切。直到亲眼看到电机在空载时转速飘忽不定&#xff0c;带载后响应迟缓得像老牛拉车&#xff0c;才明白工业级控制远非改变几个寄存…...

Whisky完整指南:在macOS上运行Windows应用的终极解决方案

Whisky完整指南&#xff1a;在macOS上运行Windows应用的终极解决方案 【免费下载链接】Whisky A modern Wine wrapper for macOS built with SwiftUI 项目地址: https://gitcode.com/gh_mirrors/wh/Whisky 想要在Apple Silicon Mac上流畅运行Windows专属软件和游戏&…...

从电赛A题到实战:手把手教你搞定单相交流电子负载的SPWM控制与功率因数调节

从电赛A题到实战&#xff1a;手把手教你搞定单相交流电子负载的SPWM控制与功率因数调节 在电子设计竞赛中&#xff0c;单相交流电子负载的设计一直是极具挑战性的题目。它不仅考验参赛者对电力电子技术的理解&#xff0c;更要求具备将理论转化为实际电路的能力。本文将从硬件选…...

LeaderKey.app开发者指南:深入源码解析架构设计

LeaderKey.app开发者指南&#xff1a;深入源码解析架构设计 【免费下载链接】LeaderKey The *faster than your launcher* launcher 项目地址: https://gitcode.com/gh_mirrors/le/LeaderKey LeaderKey.app是一款轻量级启动器应用&#xff0c;以"比你的启动器更快&…...

如何用MAA自动化助手彻底解放你的《明日方舟》游戏时间:5个实用技巧

如何用MAA自动化助手彻底解放你的《明日方舟》游戏时间&#xff1a;5个实用技巧 【免费下载链接】MaaAssistantArknights 《明日方舟》小助手&#xff0c;全日常一键长草&#xff01;| A one-click tool for the daily tasks of Arknights, supporting all clients. 项目地址…...

除了 Docker 还能用什么?一文看懂容器技术的“四大门派”

除了 Docker 还能用什么&#xff1f;一文看懂容器技术的“四大门派” 在云原生时代&#xff0c;Docker 几乎成了容器的代名词。但实际上&#xff0c;容器技术是一片茂密的森林&#xff0c;除了 Docker&#xff0c;还有许多针对特定痛点&#xff08;如安全、性能、隔离性&#x…...