当前位置: 首页 > news >正文

spring boot集成mqtt协议发送和订阅数据

maven的pom.xml引入包
        <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.3.6.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId><version>5.3.4.RELEASE</version></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.3.4.RELEASE</version></dependency>
mqtt.yml配置文件
spring:mqtt:username: adminpassword: beyond_2021url: tcp://192.168.3.100:1883client-id: data-clientIdserver-id: data-serverIddata-topic: data/#will-topic: data-willwill-content: data server offlinecompletion-timeout: 10000
初始化MQTT配置bean
package com.beyond.config;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;import java.security.SecureRandom;
import java.util.Date;@Configuration
@IntegrationComponentScan
public class MqttConfig {private static final Logger log = LoggerFactory.getLogger(MqttConfig.class);@Value("${spring.mqtt.username}")private String username;@Value("${spring.mqtt.password}")private String password;@Value("${spring.mqtt.url}")private String hostUrl;@Value("${spring.mqtt.client-id}")private String clientId;@Value("${spring.mqtt.server-id}")private String serverId;@Value("${spring.mqtt.data-topic:data/#}")private String dataTopic;@Value("${spring.mqtt.will-topic}")private String willTopic;@Value("${spring.mqtt.will-content}")private String willContent;/*** @desc 连接超时*/@Value("${spring.mqtt.completion-timeout}")private int completionTimeout ;@Beanpublic MqttConnectOptions getMqttConnectOptions(){// MQTT的连接设置MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();// 设置连接的用户名mqttConnectOptions.setUserName(username);// 设置连接的密码mqttConnectOptions.setPassword(password.toCharArray());// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,// 把配置里的 cleanSession 设为false,客户端掉线后 服务器端不会清除session,// 当重连后可以接收之前订阅主题的消息。当客户端上线后会接受到它离线的这段时间的消息mqttConnectOptions.setCleanSession(true);// 设置发布端地址,多个用逗号分隔, 如:tcp://111:1883,tcp://222:1883// 当第一个111连接上后,222不会在连,如果111挂掉后,重试连111几次失败后,会自动去连接222mqttConnectOptions.setServerURIs(hostUrl.split(","));// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(20);mqttConnectOptions.setAutomaticReconnect(true);// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。mqttConnectOptions.setWill(willTopic, willContent.getBytes(), 2, false);mqttConnectOptions.setMaxInflight(1000000);return mqttConnectOptions;}@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(getMqttConnectOptions());return factory;}/*** @desc 发送通道配置 默认主题* @date 2021/3/16*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线String clientIdStr = clientId + new SecureRandom().nextInt(10);MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientIdStr, mqttClientFactory());//async如果为true,则调用方不会阻塞。而是在发送消息时等待传递确认。默认值为false(发送将阻塞,直到确认发送)messageHandler.setAsync(true);messageHandler.setAsyncEvents(true);messageHandler.setDefaultTopic(dataTopic);messageHandler.setDefaultQos(1);return messageHandler;}/*** @desc 发送通道* @date 2021/3/16*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** @desc 接收通道* @date 2021/3/16*/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** @desc 配置监听的 topic 支持通配符* @date 2021/3/16*/@Beanpublic MessageProducer inbound() {//clientId每个连接必须唯一,否则,两个相同的clientId相互挤掉线String serverIdStr = serverId + new SecureRandom().nextInt(10);MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(serverIdStr, mqttClientFactory(), dataTopic);adapter.setCompletionTimeout(completionTimeout);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** @desc 通过通道获取数据 订阅的数据* @date 2021/3/16*/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {String payload = message.getPayload().toString();String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();//处理订阅topic:(data/#)到的所有的数据}};}/*** @desc mqtt连接失败或者订阅失败时,触发MqttConnectionFailedEvent事件* @date 2021/7/22*@param event* @return void*/@EventListener(MqttConnectionFailedEvent.class)public void mqttConnectionFailedEvent(MqttConnectionFailedEvent event) {log.error("mqttConnectionFailedEvent连接mqtt失败: " +"date={}, hostUrl={}, username={}, error={}",new Date(), hostUrl, username, event.getCause().getMessage());}/*** @desc 当async和async事件(async-events)都为true时,将发出MqttMessageSentEvent* 它包含消息、主题、客户端库生成的消息id、clientId和clientInstance(每次连接客户端时递增)* @date 2021/7/22*@param event* @return void*/@EventListener(MqttMessageSentEvent.class)public void mqttMessageSentEvent(MqttMessageSentEvent event) {log.info("mqttMessageSentEvent发送信息: date={}, info={}", new Date(), event.toString());}/*** @desc 当async和async事件(async-events)都为true时,将发出MqttMessageDeliveredEvent* 当客户端库确认传递时,将发出MqttMessageDeliveredEvent。它包含messageId、clientId和clientInstance,使传递与发送相关。* @date 2021/7/22*@param event* @return void*/@EventListener(MqttMessageDeliveredEvent.class)public void mqttMessageDeliveredEvent(MqttMessageDeliveredEvent event) {log.info("mqttMessageDeliveredEvent发送成功信息: date={}, info={}", new Date(), event.toString());}/*** @desc 成功订阅到主题,MqttSubscribedEvent事件就会被触发(多个主题,多次触发)* @date 2021/7/22*@param event* @return void*/@EventListener(MqttSubscribedEvent.class)public void mqttSubscribedEvent(MqttSubscribedEvent event) {log.info("mqttSubscribedEvent订阅成功信息: date={}, info={}", new Date(), event.toString());}}
mqtt发送数据网关配置
package com.beyond.data.component;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;/*** @desc MQTT发送网关* @date 2021/3/12*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGatewayComponent {void sendToMqtt(String data);void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);}
发送数据到mqtt伪代码
@Autowired
private MqttGatewayComponent mqttGatewayComponent;//发送字符串或json字符串,到指定的topic
mqttGatewayComponent.sendToMqtt("json string", "data/abcd");

参考链接:
https://blog.csdn.net/sinat_21184471/article/details/87186186
https://blog.csdn.net/qq_29467891/article/details/107043225?utm_source=app
https://blog.csdn.net/myinsert/article/details/107715538

相关文章:

spring boot集成mqtt协议发送和订阅数据

maven的pom.xml引入包 <!--mqtt--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.3.6.RELEASE</version></dependency><dependency…...

【数据库】详解数据库架构优化思路(两主架构、主从复制、冷热分离)

文章目录 1、为什么对数据库做优化2、双主架构双主架构的工作方式如下&#xff1a;双主架构的优势包括&#xff1a;但是一般不用这种架构&#xff0c;原因是&#xff1a; 3、主从复制主从复制的工作方式如下&#xff1a;主从复制的优势包括&#xff1a;主从复制的缺点 4、冷热分…...

el-table 实现动态表头 静态内容 根据数据显示动态输入框

直接放代码了 <el-table:data"form.tableDataA"borderstripestyle"width: 100%; margin-top: 20px"><el-table-columnv-for"(category, categoryIndex) in form.tableDataA":key"categoryIndex":label"category.name&qu…...

Reids 的整合 Spring Data Redis使用

大家好 , 我是苏麟 , 今天带来强大的Redis . REmote DIctionary Server(Redis) 是一个由 Salvatore Sanfilippo 写的 key-value 存储系统&#xff0c;是跨平台的非关系型数据库。 Redis 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选…...

3D数据转换工具HOOPS Exchange概览

HOOPS Exchange SDK是一组C软件库&#xff0c;使开发团队能够快速为其应用程序添加可靠的2D和3D CAD导入和导出功能。这允许访问广泛的数据&#xff0c;包括边界表示&#xff08;BREP&#xff09;、产品制造信息&#xff08;PMI&#xff09;、模型树、视图、持久ID、样式、构造…...

【从零开始的rust web开发之路 一】axum学习使用

系列文章目录 第一章 axum学习使用 文章目录 系列文章目录前言老规矩先看官方文档介绍高级功能兼容性 二、hello world三、路由四&#xff0c;handler和提取器五&#xff0c;响应 前言 本职java开发&#xff0c;兼架构设计。空闲时间学习了rust&#xff0c;目前还不熟练掌握。…...

oracle警告日志\跟踪日志磁盘空间清理

oracle警告日志\跟踪日志磁盘空间清理 问题现象&#xff1a; 通过查看排查到alert和tarce占用大量磁盘空间 警告日志 /u01/app/oracle/diag/rdbms/orcl/orcl/alert 跟踪日志 /u01/app/oracle/diag/rdbms/orcl/orcl/trace 解决方案&#xff1a; 用adrci清除日志 确定目…...

【vue】el-table 数据更新后,刷新表格数据

表格里面的数据更新后&#xff0c;可以通过以下方法来刷新表格 方法1 用更新后的数据&#xff0c;覆盖之前的数据 var newTableData[];for(var i0;i<that.tableData.length;i){ if(aIdthat.selectStationId&&bIdthat.selectDeviceId){that.tableData[i].physica…...

AVL——平衡搜索树

✅<1>主页&#xff1a;我的代码爱吃辣&#x1f4c3;<2>知识讲解&#xff1a;数据结构——AVL树☂️<3>开发环境&#xff1a;Visual Studio 2022&#x1f4ac;<4>前言&#xff1a;AVL树是对二叉搜索树的严格高度控制&#xff0c;所以AVL树的搜索效率很高…...

TCP通信流程以及一些TCP的相关概念

1.TCP和UDP区别 都为传输层协议 UDP&#xff1a;用户数据报协议&#xff0c;面向无连接&#xff0c;可以单播&#xff0c;多播&#xff0c;广播&#xff0c;面向数据报&#xff0c;不可靠 TCP&#xff1a;传输控制协议&#xff0c;面向连接的&#xff0c;可靠的&#xff0c;基…...

PyTorch学习笔记(十七)——完整的模型验证(测试,demo)套路

完整代码&#xff1a; import torch import torchvision from PIL import Image from torch import nnimage_path "../imgs/dog.png" image Image.open(image_path) print(image)# 因为png格式是四个通道&#xff0c;除了RGB三通道外&#xff0c;还有一个透明度通…...

WPF开篇

一、为什么要学习WPF 大环境不好&#xff0c;公司要求逐年提高&#xff0c;既要会后端又要会客户端WPF相对于WinForm来说用户界面效果更好&#xff0c;图像更加立体化也是给自己增加一项技能&#xff0c;谨记一句话&#xff0c;技多不压身&#xff1b;多一份技能就多一份竞争力…...

linux 压缩解压缩

压缩解压缩 linux中压缩和解压文件也是很常见的 zip格式 zip格式的压缩包在windows很常见&#xff0c;linux中也有zip格式的压缩包 #压缩#zip [选项] 压缩包名 文件(多个文件空格隔开)zip 1.zip 123.txt 456.txt zip -r 2.zip /home/user1 ---------------------- -r 压缩目录 …...

centos9 mysql8修改数据库的存储路径

一、环境 系统&#xff1a;CentOS Stream release 9 mysql版本&#xff1a;mysql Ver 8.0.34 for Linux on x86_64 (MySQL Community Server - GPL) 二、修改mysql的数据库&#xff0c;存储路径 查看目录数据存储的位置 cat /etc/my.cnf操作 1、新建存放的目录&#xff0c;…...

【C++】<Windows编程中消息即事件的处理>

目录 一、注册窗口类&#xff0c;指定消息处理函数&#xff0c;捕获消息并发给处理函数 二、消息处理函数 三、通用窗口消息 四、其他消息 1.滚动条消息 2.按钮控件消息 3.按钮控件通知消息 4.按键消息 5.系统菜单等消息 6.组合框控件消息 7.组合框控件通知消息 8.列…...

数据库SQL语句使用

-- 查询所有数据库 show databases; -- 创建数据库&#xff0c;数据库名为mydatabase create database mydatabase; -- 如果没有名为 mydatabase的数据库则创建&#xff0c;有就不创建 create database if not exists mydatabase; -- 如果没有名为 mydatabase的数据库则创建…...

从零开始 Spring Cloud 12:Sentinel

从零开始 Spring Cloud 12&#xff1a;Sentinel 1.初识 Sentinel 1.1雪崩问题 1.1.1什么是雪崩问题 微服务中&#xff0c;服务间调用关系错综复杂&#xff0c;一个微服务往往依赖于多个其它微服务。 如图&#xff0c;如果服务提供者I发生了故障&#xff0c;当前的应用的部分…...

@Resurce和@Autowired的区别

Resource 和 Autowired 是 Java 中常用的两个注解&#xff0c;用于自动装配依赖对象。它们的主要区别如下&#xff1a; 来源不同&#xff1a; Resource 是 Java EE 提供的注解&#xff0c;属于 J2EE 的一部分&#xff0c;它由 JSR-250 规范定义。 Autowired 是 Spring 框架提供…...

ResNet简介

ResNet (Residual Network) 此网络于2015年&#xff0c;国人何先生提出&#xff0c;用于解决随着深度学习的层数加深造成的网络退化现象和梯度消失、梯度爆炸。 问题1 退化现象 当深度学习的各项指标能够随着训练轮数收敛的情况下&#xff0c;网络的层数增强未能像理论一样&…...

了解单例模式,工厂模式(简单易懂)

文章目录 单例模式饿汉模式懒汉模式对比 工厂模式简单工厂模式&#xff08;Simple Factory Pattern&#xff09;工厂方法模式&#xff08;Factory Method Pattern&#xff09;抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;对比 单例模式 什么是单例&#xff…...

synchronized 学习

学习源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖&#xff0c;也要考虑性能问题&#xff08;场景&#xff09; 2.常见面试问题&#xff1a; sync出…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中&#xff0c;合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号&#xff1f; 最小权限原则&#xf…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

【Post-process】【VBA】ETABS VBA FrameObj.GetNameList and write to EXCEL

ETABS API实战:导出框架元素数据到Excel 在结构工程师的日常工作中,经常需要从ETABS模型中提取框架元素信息进行后续分析。手动复制粘贴不仅耗时,还容易出错。今天我们来用简单的VBA代码实现自动化导出。 🎯 我们要实现什么? 一键点击,就能将ETABS中所有框架元素的基…...

二维FDTD算法仿真

二维FDTD算法仿真&#xff0c;并带完全匹配层&#xff0c;输入波形为高斯波、平面波 FDTD_二维/FDTD.zip , 6075 FDTD_二维/FDTD_31.m , 1029 FDTD_二维/FDTD_32.m , 2806 FDTD_二维/FDTD_33.m , 3782 FDTD_二维/FDTD_34.m , 4182 FDTD_二维/FDTD_35.m , 4793...

rm视觉学习1-自瞄部分

首先先感谢中南大学的开源&#xff0c;提供了很全面的思路&#xff0c;减少了很多基础性的开发研究 我看的阅读的是中南大学FYT战队开源视觉代码 链接&#xff1a;https://github.com/CSU-FYT-Vision/FYT2024_vision.git 1.框架&#xff1a; 代码框架结构&#xff1a;readme有…...