spring boot集成mqtt协议发送和订阅数据
maven的pom.xml引入包
<!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.3.6.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId><version>5.3.4.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.3.4.RELEASE</version></dependency>
mqtt.yml配置文件
spring:mqtt:username: adminpassword: beyond_2021url: tcp://192.168.3.100:1883client-id: data-clientIdserver-id: data-serverIddata-topic: data/#will-topic: data-willwill-content: data server offlinecompletion-timeout: 10000
初始化MQTT配置bean
package com.beyond.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;import java.security.SecureRandom;
import java.util.Date;@Configuration
@IntegrationComponentScan
public class MqttConfig {private static final Logger log = LoggerFactory.getLogger(MqttConfig.class);@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client-id}")private String clientId;@Value("${spring.mqtt.server-id}")private String serverId;@Value("${spring.mqtt.data-topic:data/#}")private String dataTopic;@Value("${spring.mqtt.will-topic}")private String willTopic;@Value("${spring.mqtt.will-content}")private String willContent;/*** @desc 连接超时*/@Value("${spring.mqtt.completion-timeout}")private int completionTimeout ;@Beanpublic MqttConnectOptions getMqttConnectOptions(){// MQTT的连接设置MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// 设置连接的用户名mqttConnectOptions.setUserName(username);// 设置连接的密码mqttConnectOptions.setPassword(password.toCharArray());// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,// 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,// 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息mqttConnectOptions.setCleanSession(true);// 设置发布端地址,多个用逗号分隔, 如:tcp://111:1883,tcp://222:1883// 当第一个111连接上后,222不会在连,如果111挂掉后,重试连111几次失败后,会自动去连接222mqttConnectOptions.setServerURIs(hostUrl.split(","));// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(20);mqttConnectOptions.setAutomaticReconnect(true);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。mqttConnectOptions.setWill(willTopic, willContent.getBytes(), 2, false);mqttConnectOptions.setMaxInflight(1000000);return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}/*** @desc 发送通道配置 默认主题* @date 2021/3/16*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线String clientIdStr = clientId + new SecureRandom().nextInt(10);MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, mqttClientFactory());//async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)messageHandler.setAsync(true);messageHandler.setAsyncEvents(true);messageHandler.setDefaultTopic(dataTopic);messageHandler.setDefaultQos(1);return messageHandler;}/*** @desc 发送通道* @date 2021/3/16*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** @desc 接收通道* @date 2021/3/16*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** @desc 配置监听的 topic 支持通配符* @date 2021/3/16*/@Beanpublic MessageProducer inbound() {//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线String serverIdStr = serverId + new SecureRandom().nextInt(10);MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(serverIdStr, mqttClientFactory(), dataTopic);adapter.setCompletionTimeout(completionTimeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** @desc 通过通道获取数据 订阅的数据* @date 2021/3/16*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String payload = message.getPayload().toString();String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();//处理订阅topic:(data/#)到的所有的数据}};}/*** @desc mqtt连接失败或者订阅失败时,触发MqttConnectionFailedEvent事件* @date 2021/7/22*@param event* @return void*/@EventListener(MqttConnectionFailedEvent.class)public void mqttConnectionFailedEvent(MqttConnectionFailedEvent event) {log.error("mqttConnectionFailedEvent连接mqtt失败: " +"date={}, hostUrl={}, username={}, error={}",new Date(), hostUrl, username, event.getCause().getMessage());}/*** @desc 当async和async事件(async-events)都为true时,将发出MqttMessageSentEvent* 它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次连接客户端时递增)* @date 2021/7/22*@param event* @return void*/@EventListener(MqttMessageSentEvent.class)public void mqttMessageSentEvent(MqttMessageSentEvent event) {log.info("mqttMessageSentEvent发送信息: date={}, info={}", new Date(), event.toString());}/*** @desc 当async和async事件(async-events)都为true时,将发出MqttMessageDeliveredEvent* 当客户端库确认传递时,将发出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使传递与发送相关。* @date 2021/7/22*@param event* @return void*/@EventListener(MqttMessageDeliveredEvent.class)public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) {log.info("mqttMessageDeliveredEvent发送成功信息: date={}, info={}", new Date(), event.toString());}/*** @desc 成功订阅到主题,MqttSubscribedEvent事件就会被触发(多个主题,多次触发)* @date 2021/7/22*@param event* @return void*/@EventListener(MqttSubscribedEvent.class)public void mqttSubscribedEvent(MqttSubscribedEvent event) {log.info("mqttSubscribedEvent订阅成功信息: date={}, info={}", new Date(), event.toString());}}
mqtt发送数据网关配置
package com.beyond.data.component;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @desc MQTT发送网关* @date 2021/3/12*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGatewayComponent {void sendToMqtt(String data);void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
发送数据到mqtt伪代码
@Autowired
private MqttGatewayComponent mqttGatewayComponent;//发送字符串或json字符串,到指定的topic
mqttGatewayComponent.sendToMqtt("json string", "data/abcd");
参考链接:
https://blog.csdn.net/sinat_21184471/article/details/87186186
https://blog.csdn.net/qq_29467891/article/details/107043225?utm_source=app
https://blog.csdn.net/myinsert/article/details/107715538
相关文章:
spring boot集成mqtt协议发送和订阅数据
maven的pom.xml引入包 <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.3.6.RELEASE</version></dependency><dependency…...
【数据库】详解数据库架构优化思路(两主架构、主从复制、冷热分离)
文章目录 1、为什么对数据库做优化2、双主架构双主架构的工作方式如下:双主架构的优势包括:但是一般不用这种架构,原因是: 3、主从复制主从复制的工作方式如下:主从复制的优势包括:主从复制的缺点 4、冷热分…...
el-table 实现动态表头 静态内容 根据数据显示动态输入框
直接放代码了 <el-table:data"form.tableDataA"borderstripestyle"width: 100%; margin-top: 20px"><el-table-columnv-for"(category, categoryIndex) in form.tableDataA":key"categoryIndex":label"category.name&qu…...
Reids 的整合 Spring Data Redis使用
大家好 , 我是苏麟 , 今天带来强大的Redis . REmote DIctionary Server(Redis) 是一个由 Salvatore Sanfilippo 写的 key-value 存储系统,是跨平台的非关系型数据库。 Redis 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选…...
3D数据转换工具HOOPS Exchange概览
HOOPS Exchange SDK是一组C软件库,使开发团队能够快速为其应用程序添加可靠的2D和3D CAD导入和导出功能。这允许访问广泛的数据,包括边界表示(BREP)、产品制造信息(PMI)、模型树、视图、持久ID、样式、构造…...
【从零开始的rust web开发之路 一】axum学习使用
系列文章目录 第一章 axum学习使用 文章目录 系列文章目录前言老规矩先看官方文档介绍高级功能兼容性 二、hello world三、路由四,handler和提取器五,响应 前言 本职java开发,兼架构设计。空闲时间学习了rust,目前还不熟练掌握。…...
oracle警告日志\跟踪日志磁盘空间清理
oracle警告日志\跟踪日志磁盘空间清理 问题现象: 通过查看排查到alert和tarce占用大量磁盘空间 警告日志 /u01/app/oracle/diag/rdbms/orcl/orcl/alert 跟踪日志 /u01/app/oracle/diag/rdbms/orcl/orcl/trace 解决方案: 用adrci清除日志 确定目…...
【vue】el-table 数据更新后,刷新表格数据
表格里面的数据更新后,可以通过以下方法来刷新表格 方法1 用更新后的数据,覆盖之前的数据 var newTableData[];for(var i0;i<that.tableData.length;i){ if(aIdthat.selectStationId&&bIdthat.selectDeviceId){that.tableData[i].physica…...
AVL——平衡搜索树
✅<1>主页:我的代码爱吃辣📃<2>知识讲解:数据结构——AVL树☂️<3>开发环境:Visual Studio 2022💬<4>前言:AVL树是对二叉搜索树的严格高度控制,所以AVL树的搜索效率很高…...
TCP通信流程以及一些TCP的相关概念
1.TCP和UDP区别 都为传输层协议 UDP:用户数据报协议,面向无连接,可以单播,多播,广播,面向数据报,不可靠 TCP:传输控制协议,面向连接的,可靠的,基…...
PyTorch学习笔记(十七)——完整的模型验证(测试,demo)套路
完整代码: import torch import torchvision from PIL import Image from torch import nnimage_path "../imgs/dog.png" image Image.open(image_path) print(image)# 因为png格式是四个通道,除了RGB三通道外,还有一个透明度通…...
WPF开篇
一、为什么要学习WPF 大环境不好,公司要求逐年提高,既要会后端又要会客户端WPF相对于WinForm来说用户界面效果更好,图像更加立体化也是给自己增加一项技能,谨记一句话,技多不压身;多一份技能就多一份竞争力…...
linux 压缩解压缩
压缩解压缩 linux中压缩和解压文件也是很常见的 zip格式 zip格式的压缩包在windows很常见,linux中也有zip格式的压缩包 #压缩#zip [选项] 压缩包名 文件(多个文件空格隔开)zip 1.zip 123.txt 456.txt zip -r 2.zip /home/user1 ---------------------- -r 压缩目录 …...
centos9 mysql8修改数据库的存储路径
一、环境 系统:CentOS Stream release 9 mysql版本:mysql Ver 8.0.34 for Linux on x86_64 (MySQL Community Server - GPL) 二、修改mysql的数据库,存储路径 查看目录数据存储的位置 cat /etc/my.cnf操作 1、新建存放的目录,…...
【C++】<Windows编程中消息即事件的处理>
目录 一、注册窗口类,指定消息处理函数,捕获消息并发给处理函数 二、消息处理函数 三、通用窗口消息 四、其他消息 1.滚动条消息 2.按钮控件消息 3.按钮控件通知消息 4.按键消息 5.系统菜单等消息 6.组合框控件消息 7.组合框控件通知消息 8.列…...
数据库SQL语句使用
-- 查询所有数据库 show databases; -- 创建数据库,数据库名为mydatabase create database mydatabase; -- 如果没有名为 mydatabase的数据库则创建,有就不创建 create database if not exists mydatabase; -- 如果没有名为 mydatabase的数据库则创建…...
从零开始 Spring Cloud 12:Sentinel
从零开始 Spring Cloud 12:Sentinel 1.初识 Sentinel 1.1雪崩问题 1.1.1什么是雪崩问题 微服务中,服务间调用关系错综复杂,一个微服务往往依赖于多个其它微服务。 如图,如果服务提供者I发生了故障,当前的应用的部分…...
@Resurce和@Autowired的区别
Resource 和 Autowired 是 Java 中常用的两个注解,用于自动装配依赖对象。它们的主要区别如下: 来源不同: Resource 是 Java EE 提供的注解,属于 J2EE 的一部分,它由 JSR-250 规范定义。 Autowired 是 Spring 框架提供…...
ResNet简介
ResNet (Residual Network) 此网络于2015年,国人何先生提出,用于解决随着深度学习的层数加深造成的网络退化现象和梯度消失、梯度爆炸。 问题1 退化现象 当深度学习的各项指标能够随着训练轮数收敛的情况下,网络的层数增强未能像理论一样&…...
了解单例模式,工厂模式(简单易懂)
文章目录 单例模式饿汉模式懒汉模式对比 工厂模式简单工厂模式(Simple Factory Pattern)工厂方法模式(Factory Method Pattern)抽象工厂模式(Abstract Factory Pattern)对比 单例模式 什么是单例ÿ…...
Chapter03-Authentication vulnerabilities
文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...
使用分级同态加密防御梯度泄漏
抽象 联邦学习 (FL) 支持跨分布式客户端进行协作模型训练,而无需共享原始数据,这使其成为在互联和自动驾驶汽车 (CAV) 等领域保护隐私的机器学习的一种很有前途的方法。然而,最近的研究表明&…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)
Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习) 一、Aspose.PDF 简介二、说明(⚠️仅供学习与研究使用)三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...
Golang——7、包与接口详解
包与接口详解 1、Golang包详解1.1、Golang中包的定义和介绍1.2、Golang包管理工具go mod1.3、Golang中自定义包1.4、Golang中使用第三包1.5、init函数 2、接口详解2.1、接口的定义2.2、空接口2.3、类型断言2.4、结构体值接收者和指针接收者实现接口的区别2.5、一个结构体实现多…...
论文阅读笔记——Muffin: Testing Deep Learning Libraries via Neural Architecture Fuzzing
Muffin 论文 现有方法 CRADLE 和 LEMON,依赖模型推理阶段输出进行差分测试,但在训练阶段是不可行的,因为训练阶段直到最后才有固定输出,中间过程是不断变化的。API 库覆盖低,因为各个 API 都是在各种具体场景下使用。…...
消防一体化安全管控平台:构建消防“一张图”和APP统一管理
在城市的某个角落,一场突如其来的火灾打破了平静。熊熊烈火迅速蔓延,滚滚浓烟弥漫开来,周围群众的生命财产安全受到严重威胁。就在这千钧一发之际,消防救援队伍迅速行动,而豪越科技消防一体化安全管控平台构建的消防“…...
