当前位置: 首页 > news >正文

springboot集成mqtt

引入jar包

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

配置

mqtt:username: admin  #用户名password: admin  #密码address: tcp://192.168.236.136:1883   #地址通过TCP访问,realtime:topicprefix: /realtime/    #实时数据消息topic前缀

mqtt生产者配置类

@Configuration
@IntegrationComponentScan
public class MqttConfig {@Value("${mqtt.username}")private String userName;@Value("${mqtt.password}")private String passWord;@Value("${mqtt.address}")private String address;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setConnectionTimeout(10);options.setKeepAliveInterval(90);options.setAutomaticReconnect(true);options.setKeepAliveInterval(2);options.setServerURIs(new String[]{address});options.setUserName(userName);options.setPassword(passWord.toCharArray());factory.setConnectionOptions(options);return factory;}/*** @return 创建推送通道。*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** @return 默认输出。*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler outbound() {// 在这里进行mqttOutboundChannel的相关设置MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("publishClient", mqttClientFactory());messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。messageHandler.setDefaultTopic("mqttTopic");return messageHandler;}/*** 推送MQTT的网关。*/@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttSender {/*** 发送信息到MQTT服务器** @param data 发送的文本*/void sendToMqtt(String data);/*** 发送信息到MQTT服务器** @param topic   主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,String payload);/*** 发送信息到MQTT服务器** @param topic   主题* @param qos     对消息处理的几种机制。<br> 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>*                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>*                2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);}
}

mqtt生产者测试

@RestController
@RequestMapping("/v1/mqtt")
public class MqttController {@Autowiredprivate MqttConfig.MqttSender mqttSender;@PostMapping("/v1/create")public HttpResult<Boolean> create(@RequestBody final RemoteControlRequestBean requestBean) {mqttSender.sendToMqtt(requestBean.getTopic(), 0, requestBean.getMsg());return DefaultHttpResultFactory.success("发送成功",Boolean.TRUE);}
}

mqtt消费者配置

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {@Value("${mqtt.username}")private String userName;@Value("${mqtt.password}")private String passWord;@Value("${mqtt.address}")private String address;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setConnectionTimeout(10);options.setKeepAliveInterval(90);options.setAutomaticReconnect(true);options.setKeepAliveInterval(2);options.setServerURIs(new String[]{address});options.setUserName(userName);options.setPassword(passWord.toCharArray());factory.setConnectionOptions(options);return factory;}@Beanpublic MessageProducer inbound() {String[] topics = new String[]{"test","topicTest","123"};MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter( "123_consumer", mqttClientFactory(), topics);adapter.setCompletionTimeout(30000);DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();converter.setPayloadAsBytes(true);adapter.setConverter(converter);adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** MQTT信息通道(消费者)**/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT消息处理器(消费者)**/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message<?> message) throws MessagingException {log.info("收到订阅消息: {}", message);String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();log.info("消息主题:{}", topic);Object payLoad = message.getPayload();log.info("发送的Packet数据{}", JSON.toJSONString(payLoad));}};}
}

topic定义的核心是区分业务场景。

mqtt中topic定义和规范

所有的主题名和主题过滤器必须至少包含一个字符。
主题名和主题过滤器是大小写敏感的。如:ACCOUNTS 和 Accounts 是不同的主题名。
主题名和主题过滤器可以包含空格字符。如:Accounts payable 是合法的主题名
主题名或主题过滤器以前置或后置斜杠 / 区分。如:/finance 和 finance 是不同的。
只包含斜杠 / 的主题名或主题过滤器是合法的。
主题名和主题过滤器不能包含 null 字符(Unicode U+0000)。
主题名和主题过滤器是 UTF-8 编码字符串,除了不能超过 UTF-8 编码字符串的长度限制之外,主题名或主题过滤器的层级数量没有其它限制。

mqtt中topic层级

MQTT 协议主题可以通过斜杠(“/” U+002F)将主题分割成多个层级;作为消息通道,客户端可以通过定义主题层级来实现对消息类型的细分;
例如:一个主机厂有多个车型,每个车型下面有多个车联网业务,我们在定义车机向对某个车型业务系统发消息时可以向<车型A>/ <车辆唯一标识>/<业务X>主题发消息;
当然在MQTT世界中主题可以有很多层(MQTT 协议中没有限制层级数量),比如:<车型A>/<车辆唯一标识(车架号)>/<业务X>/<子业务1>
这样,我们在定义车联网分层级的业务通道的时候可以按主题层级来设计。

topic中通配符

多层通配符#

#字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。如:订阅者可以通过订阅<车型A>/# 接收到:
<车型A>
<车型A>/<车架号1>
<车型A>/<车架号1>/<业务X>
这几类主题的消息。

单层通配符+

加号 (“+” U+002B) 用于单个主题层级匹配的通配符。如:订阅者可以通过订阅<车型A>/+ 来接收
<车型A>/<车架号1>
<车型A>/<车架号2>
不同于多层通配符,使用单层通配符的时候无法匹配子层级的主题,比如:<车型A>/<车架号1>/<业务X>的主题消息就无法接收到。

相关文章:

springboot集成mqtt

引入jar包 <dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency><groupId>com.alibaba</groupId><artifactId>fastjs…...

Lecture3 梯度下降(Gradient Descent)

目录 1 问题背景 2 批量梯度下降 (Batch Gradient Descent) 3 鞍点(Saddle Point) 3 随机梯度下降 (Stochastic Gradient Descent) 4 小批量梯度下降 (Mini-batch Gradient Descent) 1 问题背景 图1 上节课讲述的穷举法求最优权重值在Lecture2中&#xff0c;介绍了使用穷举…...

深入了解DSP

一、时钟和电源 问&#xff1a;DSP的电源设计和时钟设计应该特别注意哪些方面&#xff1f;外接晶振选用有源的好还是无源的好&#xff1f; 答&#xff1a;时钟一般使用晶体&#xff0c;电源可用TI的配套电源。外接晶振用无源的好。 问&#xff1a;TMS320LF2407的A/D转换精度保证…...

Flink反压如何排查

Flink反压利用了网络传输和动态限流。Flink的任务的组成由流和算子组成&#xff0c;那么流中的数据在算子之间转换的时候&#xff0c;会放入分布式的阻塞队列中。当消费者的阻塞队列满的时候&#xff0c;则会降低生产者的处理速度。 如上图所示&#xff0c;当Task C 的数据处…...

windows无法访问指定设备路径或文件怎么办?2个解决方案

有时候Win10电脑打不开程序或文件&#xff0c;windows无法访问指定设备路径或文件该怎么办&#xff1f;原因是什么呢&#xff1f;一般导致这种情况的出现&#xff0c;大多是因为我们的电脑缺乏相应的查看权限&#xff0c;我们只需要通过赋予权限就可以解决这个难题了。 操作环境…...

冷知识|鹤顶红还能用来修长城?

大家好&#xff0c;我是建模助手。 在上篇浅浅地蹭了波热点之后&#xff0c;我灵机一动&#xff0c;倒不如也搞一搞建筑方面的冷知识&#xff1f;冷热搭配&#xff0c;事半功倍... 问问大家&#xff0c;如果谈起古建筑&#xff0c;关键词都有什么&#xff1f;是庄严、震撼、壮…...

【GD32F427开发板试用】在IAR环境中移植RTX5

本篇文章来自极术社区与兆易创新组织的GD32F427开发板评测活动&#xff0c;更多开发板试用活动请关注极术社区网站。作者&#xff1a;吴金刚 0.前言 首先感谢极术社区和兆易创新给了这次试用GD32F427开发板的机会。 板子做的虽然简约&#xff0c;但是自带了GD-link所以一根USB…...

MySQl学习(从入门到精通15)

MySQl学习&#xff08;从入门到精通15&#xff09;第 18 章_MySQL 8 其它新特性1. MySQL 8 新特性概述1. 1 MySQL 8. 0 新增特性1. 2 MySQL 8. 0 移除的旧特性2. 新特性 1 &#xff1a;窗口函数2. 1 使用窗口函数前后对比2. 2 窗口函数分类2. 3 语法结构2. 4 分类讲解1. 序号函…...

前端构建工具 Vite

文章目录参考环境构建工具构建工具的主要功能目前主流的前端构建工具Vite为什么使用 Vite冷启动WebpackVite热更新优化热更新优化预构建依赖Webpack VS ViteVite 的缺点首屏性能懒加载与 Vite 相关的基本操作获取create-vite创建项目Project nameSelect a frameworkSelect a va…...

若依框架---PageHelper分页(十)

在前几天的文章中&#xff0c;我们介绍了PageHelper的分页方法&#xff0c;研读代码定位到了ExecutorUtil.pageQuery(...)方法&#xff0c;并阅读到了其中的部分代码。 今天我们将看到重要的SQL修改代码。 getPageSql 我们接着看代码&#xff1a; if (!dialect.beforePage(…...

苹果手机专用蓝牙耳机有哪些?与iphone兼容性好的蓝牙耳机

蓝牙耳机摆脱了线缆的束缚&#xff0c;在地以各种方式轻松通话。自从蓝牙耳机问世以来&#xff0c;一直是行动商务族提升效率的好工具&#xff0c;苹果产品一直都是受欢迎的数码产品&#xff0c;下面推荐几款与iphone兼容性好的蓝牙耳机。 第一款&#xff1a;南卡小音舱蓝牙耳…...

CS-TPGS;壳聚糖修饰维生素E;Chitosan-g-TPGS

Chitosan-g-TPGS,CS-TPGS壳聚糖修饰维生素E聚乙二醇1000琥珀酸酯外观呈现白色固体或者粘稠液体。长期保存需要在-20℃,避光,干燥条件下存放&#xff0c;注意取用一定要干燥,避免频繁溶冻。 维生素E聚乙二醇琥珀酸酯(简称TPGS)是维生素E的水溶性衍生物,由维生素E琥珀酸酯的羧基与…...

easyx的基本使用(万字解析)

easyx的基本使用一.基本框架1.创建文件2.创建窗体-initgraph,closegraph,getchar二.简单的绘制1.圆形-circle2.坐标系统-setorigin,setaspectratio三.简单图形1.绘制点-putpixel2.简单的直线-line3.矩形-rectangle4.椭圆-ellipse5.圆角矩形-roundrect6.扇形-pie7.圆弧-arc四.多…...

基于OpenCV 的车牌识别

基于OpenCV 的车牌识别 车牌识别是一种图像处理技术&#xff0c;用于识别不同车辆。这项技术被广泛用于各种安全检测中。现在让我一起基于 OpenCV 编写 Python 代码来完成这一任务。 车牌识别的相关步骤 1. 车牌检测&#xff1a;第一步是从汽车上检测车牌所在位置。我们将使用…...

C#【必备技能篇】Winform跨线程更新进度条的实例

文章目录实例一&#xff1a;【方便理解&#xff0c;常用&#xff01;】源码&#xff1a;运行效果&#xff1a;实例二&#xff1a;【重在理解代码本身】源码&#xff1a;运行效果&#xff1a;参考&#xff1a;实例一&#xff1a;【方便理解&#xff0c;常用&#xff01;】 跨线…...

(1分钟速通面试) 矩阵分解相关内容

矩阵分解算法--总结QR分解 LU分解本篇博客总结一下QR分解和LU分解&#xff0c;这些都是矩阵加速的操作&#xff0c;在slam里面还算是比较常用的内容&#xff0c;这个地方在isam的部分出现过。(当然isam也是一个坑&#xff0c;想要出点创新成果的话 可能是不太现实的 短期来讲 哈…...

this指向

&#xff08;1&#xff09;在全局环境中的this——window 无论是否在严格模式下&#xff0c;在全局执行环境中&#xff08;在任何函数体外部&#xff09;this 都指向全局对象。 "use strict"console.log(this); //windowconsole.log(thiswindow);//true &#xff08…...

安卓小游戏:小板弹球

安卓小游戏&#xff1a;小板弹球 前言 这个是通过自定义View实现小游戏的第三篇&#xff0c;是小时候玩的那种五块钱的游戏机上的&#xff0c;和俄罗斯方块很像&#xff0c;小时候觉得很有意思&#xff0c;就模仿了一下。 需求 这里的逻辑就是板能把球弹起来&#xff0c;球…...

7、单行函数

文章目录1 函数的理解1.1 什么是函数1.2 不同DBMS函数的差异1.3 MySQL的内置函数及分类2 数值函数2.1 基本函数2.2 角度与弧度互换函数2.3 三角函数2.4 指数与对数2.5 进制间的转换3 字符串函数4 日期和时间函数4.1 获取日期、时间4.2 日期与时间戳的转换4.3 获取月份、星期、星…...

华为机试题:HJ56 完全数计算(python)

文章目录博主精品专栏导航知识点详解1、input()&#xff1a;获取控制台&#xff08;任意形式&#xff09;的输入。输出均为字符串类型。1.1、input() 与 list(input()) 的区别、及其相互转换方法2、print() &#xff1a;打印输出。3、整型int() &#xff1a;将指定进制&#xf…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

电脑插入多块移动硬盘后经常出现卡顿和蓝屏

当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时&#xff0c;可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案&#xff1a; 1. 检查电源供电问题 问题原因&#xff1a;多块移动硬盘同时运行可能导致USB接口供电不足&#x…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

unix/linux,sudo,其发展历程详细时间线、由来、历史背景

sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

QT: `long long` 类型转换为 `QString` 2025.6.5

在 Qt 中&#xff0c;将 long long 类型转换为 QString 可以通过以下两种常用方法实现&#xff1a; 方法 1&#xff1a;使用 QString::number() 直接调用 QString 的静态方法 number()&#xff0c;将数值转换为字符串&#xff1a; long long value 1234567890123456789LL; …...

GC1808高性能24位立体声音频ADC芯片解析

1. 芯片概述 GC1808是一款24位立体声音频模数转换器&#xff08;ADC&#xff09;&#xff0c;支持8kHz~96kHz采样率&#xff0c;集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器&#xff0c;适用于高保真音频采集场景。 2. 核心特性 高精度&#xff1a;24位分辨率&#xff0c…...

在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案

这个问题我看其他博主也写了&#xff0c;要么要会员、要么写的乱七八糟。这里我整理一下&#xff0c;把问题说清楚并且给出代码&#xff0c;拿去用就行&#xff0c;照着葫芦画瓢。 问题 在继承QWebEngineView后&#xff0c;重写mousePressEvent或event函数无法捕获鼠标按下事…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...

在 Spring Boot 中使用 JSP

jsp&#xff1f; 好多年没用了。重新整一下 还费了点时间&#xff0c;记录一下。 项目结构&#xff1a; pom: <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://ww…...