MQTT的连接配置以及重连机制和遇到的问题--------求如何修改更加好
今天遇到了一个mqtt的问题,虽然解决了,但是感觉不是很好,希望大家多指点
这是配置文件
customer:mqtt:broker: tcp://ip:1883clientList:- clientId: nays_servicesubscribeTopic: xxxxxx- clientId: receive_servicesubscribeTopic: xxxxxx
MqttConfig 读取配置文件的
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {/*** mqtt broker地址*/String broker;/*** 需要创建的MQTT客户端*/List<MqttClient> clientList;
}
一个MqttClient类用来构造配置文件中的数据对象
@Data
public class MqttClient {/*** 客户端ID*/private String clientId;/*** 监听主题*/private String subscribeTopic;/*** 用户名*/private String userName;/*** 密码*/private String password;
}
服务运行的时候进行mqtt客户端创建,创建的数据从配置文件中读取
/*** MQTT客户端创建*/
@Component
@Slf4j
public class MqttClientCreate {@Resourceprivate MqttClientManager mqttClientManager;@Resourceprivate MqttConfig mqttConfig;/*** 创建MQTT客户端*/@PostConstructpublic void createMqttClient() {// 会读取配置文件中的clientListList<MqttClient> mqttClientList = mqttConfig.getClientList();// 遍历去创建for (MqttClient mqttClient : mqttClientList) {log.info("{}", mqttClient);mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic());}}
}
这是创建的代码,问题很多(请看代码的注释部分)
```java
@Slf4j
@Component
public class MqttClientManager {@Value("${customer.mqtt.broker}")private String mqttBroker;@Resourceprivate MqttCallBackContext mqttCallBackContext;/*** 存储MQTT客户端*/public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();public MqttClient getMqttClientById(String clientId) {return MQTT_CLIENT_MAP.get(clientId);}/*** 创建mqtt客户端* @param clientId 客户端ID* @param subscribeTopic 订阅主题,可为空*/public void createMqttClient(String clientId, String subscribeTopic) {// 它将消息存储在内存中,而不是持久存储到文件或其他存储介质中MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();// 客户端每次连接到 MQTT 服务器时都会被视为一个全新的会话。connOpts.setCleanSession(true);if (null != subscribeTopic && !subscribeTopic.isEmpty()) {AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);// 这里的default就是DefaultMqttCallBack, 一开始创建的时候走的就是这个// 问题最大的地方在这,通过这样方式拿回来的是同一个对象,hashCode也相同// 现在想到的做法是深拷贝,有没有什么好的做法,比如通过构造方法if (null == callBack) {// 一开始这里的操作直接是, 当创建多个客户端的时候拿到的对象都是同一个// callback = mqttCallBackContext.getCallBack("default");AbsMqttCallBack original = mqttCallBackContext.getCallBack("default");callBack = original.deepCopy();}callBack.setClientId(clientId);callBack.setConnectOptions(connOpts);client.setCallback(callBack);}//连接mqtt服务端brokerclient.connect(connOpts);log.info("客户端 {} 连接成功状态 {}", clientId, client.isConnected());// 订阅主题if (null != subscribeTopic && !subscribeTopic.isEmpty()) {if (subscribeTopic.contains("-")) {client.subscribe(subscribeTopic.split("-"));}else {client.subscribe(subscribeTopic);}}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);} catch (MqttException e) {log.error("创建mqttClient失败!", e);}}
}
这是用于存储每个mqtt客户端的回调方法类
/*** MQTT订阅回调环境类*/
@Component
@Slf4j
public class MqttCallBackContext {// 在 Spring 中,当你注入一个 Map<String, AbsMqttCallBack> 类型的字段时,// Spring 会自动将所有实现了 AbsMqttCallBack 接口的 Bean 收集起来,// 并将它们的名称作为键值。因此,DefaultMqttCallBack 会被注入到 callBackMap 中,键值为 "default"。private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();/*** 默认构造函数** @param callBackMap 回调集合*/public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {this.callBackMap.putAll(callBackMap);}/*** 获取MQTT回调类** @param clientId 客户端ID* @return MQTT回调类*/public AbsMqttCallBack getCallBack(String clientId) {return this.callBackMap.get(clientId);}
}
这里遇到的问题就是mqtt断了之后进行重新连接的机制,在MqttClientManager这个代码中之前的回调类是callback = mqttCallBackContext.getCallBack(“default”);这样拿的,通过hashCode来看,都一样,说明每次创建都会对这个对象进行修改,那么这里赋值的clientId就会变成最后一个创建的mqtt对象id,所以在重连代码中,每次进来的对象虽然是另外一个mqtt客户端,但是拿到的clientid都是同一个,没有办法进行获取和其它的操作
/*** MQTT回调抽象类*/
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {private String clientId;private MqttConnectOptions connectOptions;@Resourceprivate MqttConfig mqttConfig;@Resourceprivate MqttClientManager mqttClientManager;/*** 失去连接操作,进行重连** @param throwable 异常*/@Overridepublic void connectionLost(Throwable throwable) {log.info("{}失去连接,进行尝试重连", this.clientId);MqttClient mqttClient = MqttClientManager.MQTT_CLIENT_MAP.get(clientId);String subscribeTopic = mqttConfig.getClientList().stream().filter(item -> item.getClientId().equals(clientId)).map(com.ruoyi.web.core.mottconfig.MqttClient::getSubscribeTopic).findFirst().orElse(null);if (mqttClient != null) {MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true); // 可以根据实际需求配置// 重连的尝试int retryCount = 0;int maxRetries = 10; // 最大重连次数while (retryCount < maxRetries) {try {if (mqttClient.isConnected()) {log.info("{} 重连成功", clientId);return;}// 重新连接mqttClient.connect(connOpts);log.info("{} 重连成功", clientId);if (null != subscribeTopic && !subscribeTopic.isEmpty()) {if (subscribeTopic.contains("-")) {mqttClient.subscribe(subscribeTopic.split("-"));}else {mqttClient.subscribe(subscribeTopic);}}break;} catch (MqttException e) {retryCount++;log.error("{} 重连失败,尝试第 {} 次重连", clientId, retryCount, e);// 可设置重连间隔,比如等待2秒钟后再尝试重连try {Thread.sleep(2000);} catch (InterruptedException ie) {Thread.currentThread().interrupt();}}}if (retryCount == maxRetries) {log.error("{} 超过最大重连次数,重连失败", clientId);}}}/*** 接收订阅消息* @param topic 主题* @param mqttMessage 接收消息* @throws Exception 异常*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {String content = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);handleReceiveMessage(topic, content);}/*** 消息发送成功** @param iMqttDeliveryToken toke*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("消息发送成功");}/*** 处理接收的消息* @param topic 主题* @param message 消息内容*/protected abstract void handleReceiveMessage(String topic, String message);// 深拷贝方法public abstract AbsMqttCallBack deepCopy();}
这里就是 最后的callback实现类进行业务处理
/*** 默认回调*/
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {@Autowiredprivate AlarmListService alarmListService;@Autowiredprivate OperateService operateService;@Autowiredprivate INrDeviceService iNrDeviceService;//private static final String TOPIC1 = 1;/*** @param topic 主题* @param message 消息内容*/@Overrideprotected void handleReceiveMessage(String topic, String message) {log.info("订阅的主题---{}", topic);log.info("接收到消息---{}", message);// 业务操作}@Overridepublic AbsMqttCallBack deepCopy() {DefaultMqttCallBack copy = new DefaultMqttCallBack();copy.setClientId(this.getClientId());copy.setConnectOptions(this.getConnectOptions()); copy.setMqttConfig(this.getMqttConfig());copy.setMqttClientManager(this.getMqttClientManager());return copy;}
}
我是通过深拷贝来做的,应该是可以通过构造方法来,但是对这个整体的代码还是不够熟悉,想看看应该如何优化,还请指点,最好笑的是:这段代码是公司一直使用的,用在了好几个项目上,我真是服了!!!!
相关文章:
MQTT的连接配置以及重连机制和遇到的问题--------求如何修改更加好
今天遇到了一个mqtt的问题,虽然解决了,但是感觉不是很好,希望大家多指点 这是配置文件 customer:mqtt:broker: tcp://ip:1883clientList:- clientId: nays_servicesubscribeTopic: xxxxxx- clientId: receive_servicesubscribeTopic: xxxxx…...
大数据学习之任务流调度系统Azkaban、Superset可视化系统
一.任务流调度系统Azkaban 1.课程介绍 2.为什么需要工作流调度系统 3.AZKABAN是什么 4.AZKABAN下载 5.制作安装包 6.tar包准备 7.MYSQL配置AZKABAN 8.配置EXECUTOR SERVER 9.配置WEBSERVER 10.单作业实战_yaml语言(今天稍晚更新) 11.单作业实战 12.多作业依赖实战 13.失败自动重…...
在VS-qt的程序中,后期增加PCH预编译功能,提高编译速度
由于前期创建qt程序的时候未勾选pch功能,导致没有启动预编译的功能. 这种情况下需要增加pch功能应该怎么做? 在项目中增加2个文件 stdafx.h和stdafx.cpp文件 stdafx.h增加qt常用头文件 #pragma once //windows #include <windows.h>//qt常用 #include <QObject&g…...
蓝桥云客 路径之谜
11.路径之谜 - 蓝桥云课 路径之谜 题目描述 小明冒充X星球的骑士,进入了一个奇怪的城堡。 城堡里边什么都没有,只有方形石头铺成的地面。 假设城堡地面是nn个方格。如下图所示。 按习俗,骑士要从西北角走到东南角。可以横向或纵向移动&…...
ES6相关操作
一.JavaScript的基础语法 1.Demo1.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>JavaScrip…...
在Linux上创建一个Docker容器并在其中执行Python脚本
在Linux上创建一个Docker容器并在其中执行Python脚本的过程,涉及多个方面的内容,包括安装Docker、编写Dockerfile、构建镜像、运行容器等。 1. 安装Docker 在Linux上使用Docker之前,你需要确保系统已安装Docker。Docker支持的Linux发行版有…...
Ubuntu 下 nginx-1.24.0 源码分析 - ngx_os_init 函数
ngx_os_init 声明在 src/os/unix/ngx_os.h ngx_int_t ngx_os_init(ngx_log_t *log); 定义在 src\os\unix\ngx_posix_init.c ngx_int_t ngx_os_init(ngx_log_t *log) {ngx_time_t *tp;ngx_uint_t n; #if (NGX_HAVE_LEVEL1_DCACHE_LINESIZE)long size; #endif#if (NGX…...
【Python项目】基于Python的语音数据及标注核对审核系统
【Python项目】基于Python的语音数据及标注核对审核系统 技术简介: 采用Python技术、MySQL数据库、Django框架等实现。 系统简介: 语音数据及标注核对审核系统是一个基于B/S架构的语音数据处理平台,旨在通过自动化的方式对语音数据进行标…...
深入解析BFS算法:C++实现无权图最短路径的高效解决方案
在无权图中,广度优先搜索(BFS)是解决最短路径问题的高效算法。接下来博主从专业角度深入探讨其实现细节,并给出C代码示例: 目录 一、核心原理 二、算法步骤 三、C实现关键点 1. 数据结构 2. 边界检查 3. 路径回溯…...
LeetCode刷题---二分查找---441
排列硬币 441. 排列硬币 - 力扣(LeetCode) 题目 你总共有 n 枚硬币,并计划将它们按阶梯状排列。对于一个由 k 行组成的阶梯,其第 i 行必须正好有 i 枚硬币。阶梯的最后一行 可能 是不完整的。 给你一个数字 n ,计算…...
Unity结合Vuforia虚拟按键实现AR机械仿真动画效果
零、最终效果 待上传 一、资源准备 1、Vuforia Vuforia版本不能高于10.17.4(往上的版本虚拟按键功能被删除) 2、Unity Unity版本必须要高于2022.3.x,不然使用Vuforia插件时会出现bug 二、主要内容 1、添加虚拟按钮 2、为虚拟按钮设置…...
网络安全 linux学习计划 linux网络安全精要
🍅 点击文末小卡片 ,免费获取网络安全全套资料,资料在手,涨薪更快 2.使用命令行 文件系统层次标准(FHS)是一个文件和目录在Unix和Linux操作系统上面应该如何存储的定义。 /bin 重要的二进制可执行程序/bo…...
深度解析2025最新微服务版本特性
当程序猿张三在凌晨三点对着满屏报错日志抓狂时,他绝对想不到2025年的微服务架构已经进化成了会哄睡的技术保姆。这年头要是谁家系统还像俄罗斯套娃般环环相扣,出门都不好意思跟同行打招呼。且看这群代码世界的乐高大师们,今年又给我们整了哪…...
世界棒球经典赛(World Baseball Classic)·棒球1号位
世界棒球经典赛(World Baseball Classic)是一项由美国职棒大联盟(MLB)和国际棒球总会(IBAF,现更名为世界棒垒球联盟WBSC)共同主办的国际棒球赛事。该赛事吸引了来自世界各地的顶尖棒球队伍参与&…...
为AI聊天工具添加一个知识系统 之115 详细设计之56 知识表征 之2
本文要点 要点 知识表征的顶级范畴中最好是先将九个原语primitive T, ⊥, Independent, Relative, Mediating, Physical, Abstract, Continuant,和 Occurrent 进行分组(分成2大组 和 4个小组)并写出它们的满足公司,然后将它们和三种设计&am…...
rust 实例化动态对象
在功能开发中,动态创建或获取某个对象的情况很多。在前端JS开发中,可以使用工厂函数,通过给定的类型标识创建不同的对象实例;还可以通过对象映射来实现动态创建对象。 在Rust中,我们也可以使用这两种方式去创建对象实…...
支持向量机 (Support Vector Machine, SVM)
支持向量机 (Support Vector Machine, SVM) 支持向量机(SVM)是一种广泛应用于分类、回归分析以及异常检测的监督学习算法。它基于结构风险最小化(Structural Risk Minimization,SRM)原则,通过寻找一个最优…...
C#初级教程(1)——C# 与.NET 框架:探索微软平台编程的强大组合
图片来源: https://www.lvhang.site/docs/dotnettimeline 即梦AI - 一站式AI创作平台 一、历史发展脉络 在早期的微软平台编程中,常用的编程语言有 Visual Basic、C、C。到了 20 世纪 90 年代末,Win32 API、MFC(Microsoft Found…...
Mac m1 连接公司内网
1、创建VPN 1、在系统偏好设置 2、选择网络 3、进行添加 2、添加设置 1、选择VPN 2、类型选择L2TP/IPSec 3、填写服务器IP和账号 4、点击认证设置-填写密码 。然后应用 3、进行特殊配置 网上说苹果系统的问题。 1、创建命令 sudo vim /etc/ppp/options 2、添加内容-主要别…...
C++:类与对象,定义类和构造函数
#define _CRT_SECURE_NO_WARNINGS 1 #include <iostream> using namespace std; //如何让定义一个类 // 封装 // 1、将数据和方法定义到一起。 // 2、把想给你看的数据给你看,不想给你看的封装起来。 通过访问限定符来实现 class Stack { public: //1.成…...
杨校老师课堂之信息学奥赛结构体操作使用经典题集锦汇总
C基础:结构体数组综合训练 员工信息处理系统题目描述输入描述输出描述解题思路参考代码 员工信息处理系统 题目描述 在一家企业中,员工信息的准确性和时效性是日常人事管理工作的关键。由于企业员工数量众多,手动统计与更新员工信息不仅耗费大量时间&a…...
8. Flink-CDC
1. Flink-CDC的介绍 Flink-cdc主要是用来同步数据库中的数据,它的主要优势在于基于Flink框架直接用Flink Stream Api 或Flink SQL 直接编程,不需要引入第三方组件 2.Flink-CDC的使用 Flink-cdc在使用上需要注意的点 注意Flink-cdc在2.1版本之前需要导…...
Windows 权限结构和原理:深入浅出
一、什么是权限? 权限,是指在操作系统或应用程序中,某个对象(如用户、程序、设备等)对特定资源的可操作范围。具体来说,权限控制了一个主体(通常是用户或应用程序)对某个资源&#…...
Nginx环境安装
一、官网地址 Nginx官网:http://nginx.org/ Nginx中文网:https://nginx.p2hp.com/ 二、Nginx版本 mainline version 开发版本stableversion 稳定版本legacy version 历史版本 三、Windows系统安装Nginx 第一步:选择Windows版本,…...
Spring AI + Ollama 实现调用DeepSeek-R1模型API
一、前言 随着人工智能技术的飞速发展,大语言模型(LLM)在各个领域的应用越来越广泛。DeepSeek 作为一款备受瞩目的国产大语言模型,凭借其强大的自然语言处理能力和丰富的知识储备,迅速成为业界关注的焦点。无论是文本生…...
android系统SystemServer进程启动流程分析
目录 一,SystemServer整体框架 二,SystemServer启动源码分析 2.1,重要的概念 2.2,启动入口 2.3,创建对应进程的binder 三,binder驱动和binder线程池 四,SystemServer真正启动方法 4.1 SystemServer main方法里面主要做了几件事情 1)创建SystemServiceManager管理所有的…...
【雅思博客06】Daily Life
对话 A: Honey, the house is such a mess! I need you to help me tidy up a bit. My boss and her husband are coming over for dinner, and the house needs to be spotless! B: I’m in the middle of something right now. I’ll be there in a second. A: This can’t …...
Oracle 深入理解Lock和Latch ,解析访问数据块全流程
Oracle 锁机制介绍 根据保护对象的不同,单实例Oracle数据库锁可以分为以下几大类: DML lock(data locks,数据锁):用于保护数据的完整性; DDL lock(dictionary locks,字典…...
19、《Springboot+MongoDB整合:玩转文档型数据库》
SpringbootMongoDB整合:玩转文档型数据库 摘要:本文全面讲解Spring Boot与MongoDB的整合实践,涵盖环境搭建、CRUD操作、聚合查询、事务管理、性能优化等核心内容。通过15个典型代码示例,演示如何高效操作文档数据库,深…...
如何基于transformers库通过训练Qwen/DeepSeek模型的传统分类能力实现文本分类任务
文章目录 模型与环境准备文档分析源码解读模型训练及推理方式进阶:CPU与显存的切换进阶:多卡数据并行训练🔑 DDP 训练过程核心步骤🚫 DDP 不适用于模型并行⚖️ DDP vs. Model Parallelism⚙️ 解决大模型训练的推荐方法🎉进入大模型应用与实战专栏 | 🚀查看更多专栏…...
