MQTT+Springboot整合
1.mqttconfig配置(配置参数是从数据库查出来的)
package com.terminal.dc3.api.center.manager.config;import com.collection.common.utils.StringUtils;
import com.collection.system.mapper.MqttConfigMapper;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.concurrent.ConcurrentHashMap;@Component
@Data
public class MqttConfig {@Autowiredprivate MqttPushClient mqttPushClient;@Autowiredprivate MqttConfigMapper mqttConfigMapper;public static ConcurrentHashMap<String, List<MqttPushClient>> mapHashMap = new ConcurrentHashMap<>();/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 端口*/private String port;/*** 客户Id*/private String clientId;/*** 默认连接话题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 保持连接数*/private int keepalive;/*** mqtt功能使能*/private boolean enabled;private boolean retained;/*** qos*/private int qos;public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getHostUrl() {return hostUrl;}public void setHostUrl(String hostUrl) {this.hostUrl = hostUrl;}public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public String getDefaultTopic() {return defaultTopic;}public void setDefaultTopic(String defaultTopic) {this.defaultTopic = defaultTopic;}public int getTimeout() {return timeout;}public void setTimeout(int timeout) {this.timeout = timeout;}public int getKeepalive() {return keepalive;}public void setKeepalive(int keepalive) {this.keepalive = keepalive;}public boolean isEnabled() {return enabled;}public void setEnabled(boolean enabled) {this.enabled = enabled;}public int getQos() {return qos;}public void setQos(int qos) {this.qos = qos;}// @Beanpublic MqttPushClient getMqttPushClient() {List<com.collection.system.domain.MqttConfig> mqttConfigs = mqttConfigMapper.selectMqttConfigList(new com.collection.system.domain.MqttConfig());for (com.collection.system.domain.MqttConfig mq : mqttConfigs) {mq.setHost("tcp://" + mq.getHost() + ":" + mq.getPort());if (mq.getEnabled()) {String mqtt_topic[] = StringUtils.split(mq.getTopic(), ",");mqttPushClient.connect(mq.getHost(), mq.getClientId(), mq.getUsername(),mq.getPassword(), mq.getTimeout(), mq.getKeepalive());//连接if (mqtt_topic != null) {for (int i = 0; i < mqtt_topic.length; i++) {mqttPushClient.subscribe(mqtt_topic[i], mq.getQos().intValue());//订阅主题}}}}return mqttPushClient;}
}
2.MqttPushClient客户端配置
package com.terminal.dc3.api.center.manager.config;import com.collection.common.core.redis.RedisCache;
import com.collection.common.utils.QueueUtils;
import com.collection.common.utils.SecurityUtils;
import com.collection.common.utils.StringUtils;
import com.collection.common.utils.ip.IpUtils;
import com.collection.common.utils.uuid.IdUtils;
import com.collection.ems.domain.MqttDatasLog;
import com.collection.ems.mapper.MqttDatasLogMapper;
import com.terminal.dc3.driver.service.mqtt.PushCallback;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class MqttPushClient {private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);@Autowiredprivate PushCallback pushCallback;private static MqttConfig mqttConfig;@Autowiredpublic void setMqttConfig(MqttConfig mqttConfig) {this.mqttConfig = mqttConfig;}private static MqttClient client;private static MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}private static MqttDatasLogMapper mqttDatasLogMapper;@Autowiredpublic void setMqttDatasLogMapper(MqttDatasLogMapper mqttDatasLogMapper) {this.mqttDatasLogMapper = mqttDatasLogMapper;}/*** 客户端连接** @param host ip+端口* @param clientID 客户端Id* @param username 用户名* @param password 密码* @param timeout 超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {if (host != null && clientID != null && username != null && password != null) {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);options.setAutomaticReconnect(true);MqttPushClient.setClient(client);client.setCallback(pushCallback);client.connect(options);}} catch (Exception e) {logger.error("connect error", e);}}/*** 发布消息** @param pubTopic 主题* @param message 内容* @param qos 连接方式*/public void publishMessage(String pubTopic, String message, int qos) {//重新进行连接if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();logger.info("重新获取连接 {}" + client);}MqttMessage mqttMessage = new MqttMessage();mqttMessage.setQos(qos);mqttMessage.setPayload(message.getBytes());if (client == null) {return;}MqttTopic topic = client.getTopic(pubTopic);//记录发送日志MqttDatasLog mqttDatasLog = new MqttDatasLog();mqttDatasLog.setDataLog(IdUtils.randomUUID());mqttDatasLog.setLogType(pubTopic);mqttDatasLog.setContent(message);mqttDatasLog.setOperIp(IpUtils.getHostIp());mqttDatasLog.setOperTime(new Date());mqttDatasLog.setBusinessType(0L);if (null != topic) {try {MqttDeliveryToken publish = topic.publish(mqttMessage);if (!publish.isComplete()) {logger.info("发布消息成功");mqttDatasLog.setStatus(0l);//在队列中删除推送成功的数据QueueUtils.removeQueueObject(pubTopic, new String(mqttMessage.getPayload()));}} catch (Exception e) {logger.error("发布消息错误 error");mqttDatasLog.setStatus(1l);}}mqttDatasLogMapper.insertMqttDatasLog(mqttDatasLog);}/*** 订阅某个主题** @param topic 主题* @param qos 连接方式*/public static void subscribe(String topic, int qos) {logger.info("开始订阅主题" + topic + "连接方式" + qos);try {MqttPushClient.getClient().subscribe(topic, qos);} catch (Exception e) {logger.error("开始订阅主题错误 error");}}}
3.MQTTMessage数据类
package com.terminal.dc3.api.center.manager.config;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;/*** 消息实体对象**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MQTTMessage implements Serializable {/*** MQTT主题*/private String topic;/*** qos*/private Integer qos = 1;/*** MQTT内容*/private String content;}
4.MQTTLicenseTask(这是我的发送数据定时器,仅供参考)
package com.collection.web.controller.ems.task;import com.collection.common.queue.QueueCodeConstant;
import com.collection.common.utils.QueueUtils;
import com.collection.common.utils.StringUtils;
import com.collection.system.domain.MqttConfig;
import com.collection.system.mapper.MqttConfigMapper;
import com.terminal.dc3.api.center.manager.config.MqttPushClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.List;@Component
@Slf4j
public class MQTTLicenseTask {@Resourceprivate MqttConfigMapper mqttConfigMapper;@Resourceprivate MqttPushClient mqttPushClient;// @Scheduled(fixedRate = 5000) // 每1000毫秒(1秒)执行一次@RabbitListener(queues = QueueCodeConstant.mqtt_task_config, containerFactory = "rabbitListenerContainerFactory")public void executeTask() {List<MqttConfig> mqttConfigs = mqttConfigMapper.selectMqttConfigList(new com.collection.system.domain.MqttConfig());for (com.collection.system.domain.MqttConfig mq : mqttConfigs) {String mqtt_topic[] = StringUtils.split(mq.getTopic(), ",");if (mqtt_topic != null) {for (int i = 0; i < mqtt_topic.length; i++) {Object queueObject = QueueUtils.getQueueObject(mqtt_topic[i]);if (queueObject != null) {log.info("定时推送");mqttPushClient.publishMessage(mqtt_topic[i], String.valueOf(queueObject), mq.getQos().intValue());}}}}}}
相关文章:
MQTT+Springboot整合
1.mqttconfig配置(配置参数是从数据库查出来的) package com.terminal.dc3.api.center.manager.config;import com.collection.common.utils.StringUtils; import com.collection.system.mapper.MqttConfigMapper; import lombok.Data; import org.springframework.beans.fact…...

ERROR TypeError: AutoImport is not a function
TypeError: AutoImport is not a function 原因:unplugin-auto-import 插件版本问题 Vue3基于Webpack,在vue.config.js中配置 当unplugin-vue-components版本小于0.26.0时,使用以下写法 const { defineConfig } require("vue/cli-se…...

软考教材重点内容 信息安全工程师 第 3 章 密码学基本理论
(本章相对老版本极大的简化,所有与算法相关的计算全部删除,因此考试需要了解各个常 用算法的基本参数以及考试中可能存在的古典密码算法的计算,典型的例子是 2021 和 2022 年分别考了 DES 算法中的 S 盒计算,RSA 中的已…...

微信小程序 https://thirdwx.qlogo.cn 不在以下 downloadFile 合法域名列表中
授权登录后,拿到用户头像进行加载,但报错提示: https://thirdwx.qlogo.cn 不在以下 downloadFile 合法域名列表中 解决方法一(未完全解决,临时处理):在微信开发者工具将不校验...勾上就可以访问…...

Linux性能优化之火焰图的起源
Linux火焰图的起源与性能优化专家 Brendan Gregg 密切相关,他在 2011 年首次提出这一工具,用于解决性能分析过程中可视化和数据解读的难题。 1. 背景:性能优化的需求 在现代计算中,性能优化往往需要对程序执行中的热点和瓶颈进行…...
《Markdown语法入门》
文章目录 《Markdown语法入门》1.标题2.段落2.1 换行2.2分割线 3.文字显示3.1 字体3.2 上下标 4. 列表4.1无序列表4.2 有序列表4.3 任务列表 5. 区块显示6. 代码显示6.1 行内代码6.2 代码块 7.插入超链接8.插入图片9. 插入表格 《Markdown语法入门》 【Typora 教程】手把手教你…...
Controller Baseband commands速览
目录 一、设备连接与通信控制类(34条) 1.1. 连接参数相关 1.1.1. 连接建立超时设置 1.1.2. 链路监督超时设置 1.1.3. Page操作超时设置 1.1.4. 扩展Page操作超时设置 1.1.5. 安全连接主机支持 1.2. 扫描操作相关 1.2.1. 扫描启用与禁用 1.2.2.…...
Redisson 3.39.0 发布
Redisson 3.39.0 发布,官方推荐的 Redis 客户端 Redisson 3.38.0 ,一个 Java 编写的 Redis 客户端。 此版本更新内容如下: RTopic 对象的 partitioning 实现 RShardedTopic对象的 partitioning 实现 RReliableTopic 对象的 partitioning 实…...
高阶C语言补充:柔性数组
C99中,结构体中最后一个元素允许时未知大小的数组,这就叫做柔性数组成员。 vs编译器也支持柔性数组。 之所以把柔性数组单独列出,是因为: 1、柔性数组是建立在结构体的基础上的。 2、柔性数组的使用用到了动态内存分配。 这使得柔…...

S32K324信息安全-使用IC5000/IC5700进行debug口解锁
文章目录 前言winIDEA配置参考 前言 由于信息安全要求,需要对debug口(JTAG)进行加密,本文介绍基于固定密码的方式,使用IC5000/IC5700进行debug口解锁的方法 winIDEA配置 点击 Hardware | CPU Options | Reset | Ini…...

简单实现QT对象的[json]序列化与反序列化
简单实现QT对象的[json]序列化与反序列化 简介应用场景qt元对象系统思路实现使用方式题外话 简介 众所周知json作为一种轻量级的数据交换格式,在开发中被广泛应用。因此如何方便的将对象数据转为json格式和从json格式中加载数据到对象中就变得尤为重要。 在python类…...

Unity肢体控制(关节控制)
前面的基础搭建网上自己搜,我这个任务模型网上也有,可以去官网看看更多模型,这里只讲述有模型如何驱动肢体的操作方式 第一步:创建脚本 第二步:创建Rig Builder 建空容器 加部件(Rig),加了之后…...

Node.js | Yarn下载安装与环境配置
一、安装Node.js Yarn 是 Node.js 下的包管理工具,因此想要使用 Yarn 就必须先下载 Node.js。 推荐参考:Node.js | npm下载安装及环境配置教程 二、Yarn安装 打开cmd,输入以下命令: npm install -g yarn检查是否安装成功&…...
WPF如何全局应用黑白主题效果
灰白色很多时候用于纪念,哀悼等。那么使用 WPF如何来做到这种效果呢?要实现的这种效果,我们会发现,它其实不仅仅是要针对图片,而是要针对整个窗口来实现灰白色。 如果只是针对图片的话,我可以可以对图片进…...
[Qt] Qt删除文本文件中的某一行
需求 我们经常读一个文件或者直接往一个空白文件中写文本,那么该如何使用Qt在一个文本文件中删除某一行 代码 #include <QCoreApplication> #include <QIODevice> #include <QFile> #include <QTextStream> #include <QString> #i…...

【HarmonyOS学习日志(9)】一次开发,多端部署之界面级一多开发
关于一次开发,多端部署 一次开发多端部署就是指一套代码工程,一次开发上架,多端按需部署(一多),用于支撑开发者快速高效地开发多终端设备上的应用,以节省开发成本。 HarmonyOS系统面向多终端&…...

基于Java+SSM+JSP+MYSQL实现的宠物领养收养管理系统功能设计与实现六
一、前言介绍: 免费学习:猿来入此 1.1 项目摘要 随着人们生活水平的提高,宠物已经成为越来越多家庭的重要成员。然而,宠物的数量增长也带来了一系列问题,如流浪宠物数量的增加、宠物健康管理的缺失以及宠物领养收养…...

Java项目实战II基于微信小程序的课堂助手(开发文档+数据库+源码)
目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 在数字化教…...
解析 Android WebChromeClient:提升 WebView 用户体验的关键组件
文章目录 一、总览二、详细说明三、一些实际和有趣的应用四、最佳实践五、与其他组件的比较六、安全性考虑:防止 XSS 攻击与数据泄露6.1 介绍6.2 代码案例6.2.1 输入过滤6.2.2 Content Security Policy (CSP) 案例 六、总结 在 Android 开发中,WebChrome…...

【LeetCode热题100】字符串
本篇博客记录了关于字符串相关的几道题目,包括最长公共前缀、最长回文子串、二进制求和、字符串相乘。 //解法1 class Solution { public:string longestCommonPrefix(vector<string>& strs) {string ret strs[0];for(int i 1 ; i < strs.size() ; i…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...

Keil 中设置 STM32 Flash 和 RAM 地址详解
文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...
python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)
更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

NFT模式:数字资产确权与链游经济系统构建
NFT模式:数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新:构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议:基于LayerZero协议实现以太坊、Solana等公链资产互通,通过零知…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...

全志A40i android7.1 调试信息打印串口由uart0改为uart3
一,概述 1. 目的 将调试信息打印串口由uart0改为uart3。 2. 版本信息 Uboot版本:2014.07; Kernel版本:Linux-3.10; 二,Uboot 1. sys_config.fex改动 使能uart3(TX:PH00 RX:PH01),并让boo…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...

群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...