SpringBoot集成MQTT实现交互服务通信
引言
本文是springboot集成mqtt的一个实战案例。
gitee代码库地址:源码地址
一、什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 于 1999 年发明。MQTT 协议的主要特征是开放、简单、轻量级和易于实现,这些特征使得它适用于受约束的应用环境,如:
网络受限:网络带宽较低且传输不可靠
终端受限:协议运行在嵌入式设备上,嵌入式终端的处理器、内存等是受限的
MQTT 非常适用于物联网领域,如传感器与服务器的通信、传感器信息采集等。
二、发布/订阅模式
发布/订阅模式(Publish/Subscribe Pattern,简称Pub/Sub)是一种消息通信模式,在这种模式下,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者)。而是将代表消息内容的通知(事件)发布到一个特定的主题或频道上,而订阅了这个主题的接收者会收到所有在这个主题上发布的通知。这种模式解耦了消息的发送者和接收者,使得系统更加灵活和可扩展。
主要组成部分
-
发布者(Publisher):负责生成消息并将其发布到特定的主题或频道。
-
订阅者(Subscriber):注册对特定主题的兴趣,并接收该主题上的所有消息。
-
消息代理(Message Broker):作为中间件,它接收来自发布者的消息,并将这些消息传递给所有相关的订阅者。
优点
-
解耦:发布者和订阅者之间不需要直接交互,这降低了系统的耦合度。
-
灵活性:可以动态添加或删除订阅者,不影响其他组件。
-
可扩展性:系统容易扩展,可以轻松增加新的发布者或订阅者。
缺点
-
复杂性:引入了额外的组件(如消息代理),增加了系统的复杂性和管理成本。
-
性能开销:消息的传递需要通过中间件,可能会有延迟和性能损失。
应用场景
-
事件驱动架构:在微服务架构中,不同的服务通过发布/订阅模式进行异步通信。
-
数据流处理:如实时数据分析,多个组件可以订阅数据流并进行处理。
-
分布式系统:用于跨系统或跨服务的消息传递。
发布/订阅模式并不是 MQTT 协议特有的模式,很多消息中间件都有使用发布/订阅模式,有同学可能认为这就是观察者模式,还真不是,这两个模式很容易混淆。观察者模式只有观察者 + 被观察者两个角色,而发布/订阅模式还有一个经纪人 Broker;往更深层次的讲观察者和被观察者,是松耦合的关系,而发布者和订阅者,则完全不存在耦合。
三、Windows下安装MQTT消息服务器
非常遗憾,EMQ X Broker 在 5.4.0 版本的发行版中已不支持 windows 版本的安装包了,笔者从网上找了一个最后支持版本的压缩包,已上传资源。
- 解压后,在bin文件下,使用cmd执行运行命令 .\emqx console
- 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码 admin/public
如果报错缺少Erlang环境,需要自行安装下该环境
浏览器访问:http://localhost:18083/#,输入账号密码进入,会要求你修改密码,可以暂时跳过
四、Windows安装MQTT消息代理客户端MQTTX
下载地址:MQTTX下载地址
点击免费下载
选择64位版本
下好后点击安装,启动运行界面如下:
语言是英文,可以在设置按钮里调成中文。这个客户端代理主要是进行消息发送的测试服务。
五、新建MQTT集成项目
随便新建了一个springboot应用,用的是JDK17,在pom文件中引入如下依赖:
<!-- MQTT --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
5.1 yml配置
server:port: 8081#允许循环依赖
spring:main:allow-circular-references: truecustomer:mqtt:broker: tcp://localhost:1883clientList:#发布客户端ID- clientId: nays_service#监听主题 同时订阅多个主题 使用 - 分割开subscribeTopic: mqtt/publish#用户名userName: admin#密码password: public#接收客户端ID- clientId: receive_service#监听主题 同时订阅多个主题 使用 - 分割开subscribeTopic: mqtt/receive#用户名userName: admin#密码password: public
5.2 Mqtt配置类
package com.hulei.mqttproject.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.List;/*** Mqtt配置类*/
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {/*** mqtt broker地址*/String broker;/*** 需要创建的MQTT客户端*/List<MqttClient> clientList;
}
5.3 MQTT客户端
package com.hulei.mqttproject.config;import lombok.Data;/*** MQTT客户端*/
@Data
public class MqttClient {/*** 客户端ID*/private String clientId;/*** 监听主题*/private String subscribeTopic;/*** 用户名*/private String userName;/*** 密码*/private String password;
}
5.4 MQTT客户端管理类
package com.hulei.mqttproject.config;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** MQTT客户端管理类,如果客户端非常多后续可入redis缓存*/
@Slf4j
@Component
public class MqttClientManager {@Value("${customer.mqtt.broker}")private String mqttBroker;@Resourceprivate MqttCallBackContext mqttCallBackContext;/*** 存储MQTT客户端*/public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();public MqttClient getMqttClientById(String clientId) {return MQTT_CLIENT_MAP.get(clientId);}/*** 创建mqtt客户端** @param clientId 客户端ID* @param subscribeTopic 订阅主题,可为空* @param userName 用户名,可为空* @param password 密码,可为空*/public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();if (null != userName && !userName.isEmpty()) {connOpts.setUserName(userName);}if (null != password && !password.isEmpty()) {connOpts.setPassword(password.toCharArray());}connOpts.setCleanSession(true);if (null != subscribeTopic && !subscribeTopic.isEmpty()) {AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);if (null == callBack) {callBack = mqttCallBackContext.getCallBack("default");}callBack.setClientId(clientId);callBack.setConnectOptions(connOpts);client.setCallback(callBack);}//连接mqtt服务端brokerclient.connect(connOpts);// 订阅主题if (null != subscribeTopic && !subscribeTopic.isEmpty()) {if (subscribeTopic.contains("-"))client.subscribe(subscribeTopic.split("-"));else {client.subscribe(subscribeTopic);}}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);} catch (MqttException e) {log.error("Create mqttClient failed!", e);}}
}
5.5 MQTT客户端创建
package com.hulei.mqttproject.config;import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.List;/*** MQTT客户端创建*/
@Component
@Slf4j
public class MqttClientCreate {@Resourceprivate MqttClientManager mqttClientManager;@Resourceprivate MqttConfig mqttConfig;/*** 创建MQTT客户端*/@PostConstructpublic void createMqttClient() {List<MqttClient> mqttClientList = mqttConfig.getClientList();for (MqttClient mqttClient : mqttClientList) {log.info("{}", mqttClient);//创建客户端,客户端ID:demo,回调类跟客户端ID一致mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());}}
}
5.6 MQTT回调抽象类
package com.hulei.mqttproject.config;import jakarta.annotation.Resource;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;/*** MQTT回调抽象类*/
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {private String clientId;private MqttConnectOptions connectOptions;@ResourceMqttClientManager mqttClientManager;/*** 失去连接操作,进行重连** @param throwable 异常*/@Overridepublic void connectionLost(Throwable throwable) {try {if (null != clientId) {if (null != connectOptions) {mqttClientManager.getMqttClientById(clientId).connect(connectOptions);} else {mqttClientManager.getMqttClientById(clientId).connect();}}} catch (Exception e) {log.error("{} reconnect failed!", e.getMessage(), e);}}/*** 接收订阅消息* @param topic 主题* @param mqttMessage 接收消息* @throws Exception 异常*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {String content = new String(mqttMessage.getPayload());handleReceiveMessage(topic, content);}/*** 消息发送成功** @param iMqttDeliveryToken toke*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("消息发送成功");}/*** 处理接收的消息* @param topic 主题* @param message 消息内容*/protected abstract void handleReceiveMessage(String topic, String message);
}
5.7 MQTT订阅回调环境类
package com.hulei.mqttproject.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** MQTT订阅回调环境类*/
@Component
@Slf4j
public class MqttCallBackContext {private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();/*** 默认构造函数** @param callBackMap 回调集合*/public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {this.callBackMap.putAll(callBackMap);}/*** 获取MQTT回调类** @param clientId 客户端ID* @return MQTT回调类*/public AbsMqttCallBack getCallBack(String clientId) {return this.callBackMap.get(clientId);}
}
5.8 默认回调类
package com.hulei.mqttproject.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** 默认回调*/
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {/*** @param topic 主题* @param message 消息内容*/@Overrideprotected void handleReceiveMessage(String topic, String message) {log.info("接收到主题---{}", topic);log.info("接收到消息---{}", message);// 自定义消息处理业务}
}
六、测试服务类
package com.hulei.mqttproject.controller;import com.hulei.mqttproject.config.MqttClientManager;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@Slf4j
public class SendController {@Resourceprivate MqttClientManager mqttClientManager;@RequestMapping("/sendMessage")public String sendMessage(String topic){try {MqttMessage mqttMessage = new MqttMessage("你好".getBytes());mqttClientManager.getMqttClientById("nays_service").publish(topic,mqttMessage);return "发送成功";} catch (Exception e) {log.error("发送失败",e);return "发送失败";}}
}
七、启动springboot
启动日志可以看到,mqtt消息服务器连接成功
EMQX工具显示发布客户端和接收客户端均已成功注册
使用Apifox测试下SendController中的接口,mqtt/receive是yaml中接收客户端订阅的主题,当然也可以往mqtt/publish主题发,mqtt中消息的发布者也可以订阅主题,监听某些消息。
相关文章:

SpringBoot集成MQTT实现交互服务通信
引言 本文是springboot集成mqtt的一个实战案例。 gitee代码库地址:源码地址 一、什么是MQTT MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe&…...
python实现插入排序、快速排序
python实现插入排序、快速排序 算法步骤: Python实现插入排序快速排序算法步骤: Python实现快速排序算法时间复杂度 插入排序是一种简单直观的排序算法。它的基本思想是通过构建有序序列,对于未排序数据,在已排序序列中从后向前扫…...

Spring Boot集成kudu快速入门Demo
1.什么是kudu 在Kudu出现前,由于传统存储系统的局限性,对于数据的快速输入和分析还没有一个完美的解决方案,要么以缓慢的数据输入为代价实现快速分析,要么以缓慢的分析为代价实现数据快速输入。随着快速输入和分析场景越来越多&a…...
html超文本传输协议
在今天的Web开发学习中,我掌握了一些HTML和CSS的基础知识,下面我将分享我的学习笔记,帮助大家快速构建一个简单的Web界面。 一、HTML基础标签 1. 网站头 使用<title>标签定义网页的标题。 html <title>我的第一个网页</t…...

利用AI辅助制作ppt封面
如何利用AI辅助制作一个炫酷的PPT封面 标题使用镂空字背景替换为动态视频 标题使用镂空字 1.首先,新建一个空白的ppt页面,插入一张你认为符合主题的图片,占满整个可视页面。 2.其次,插入一个矩形,右键选择设置形状格式…...

【spring boot】初学者项目快速练手
一小时带你从0到1实现一个SpringBoot项目开发_哔哩哔哩_bilibili 一、简介 二、项目结构 三、代码结构 1.生成框架 Spring Initializr 快速生成一个初始的项目代码,会生成一个demo文件 打开intellj idea,导入demo文件 2.目录结构 源码都放在src-ma…...
Laravel+swoole 实现websocket长链接
需要使用 swoole 扩展 我使用的是 swoole 5.x start 方法启动服务 和 定时器 调整 listenQueue 定时器可以降低消息通讯延迟 定时器会自动推送队列里面的消息 testMessage 方法测试给指定用户推送消息 使用 laravel console 启动 <?phpnamespace App\Console\Comman…...
【C#】Array和List
C#中的List<T>和数组(T[])在某些方面是相似的,因为它们都是用来存储一系列元素的集合。然而,它们在功能和使用上有一些重要的区别: 数组(Array) 固定大小:数组的大小在声明时…...

SpringCloud网关的实现原理与使用指南
Spring Cloud网关是一个基于Spring Cloud的微服务网关,它是一个独立的项目,可以对外提供API接口服务,负责请求的转发和路由。本文将介绍Spring Cloud网关的实现原理和使用指南。 一、Spring Cloud网关的实现原理 Spring Cloud网关基于Spring…...

LabVIEW 与 PLC 通讯方式
在工业自动化中,LabVIEW 与 PLC(可编程逻辑控制器)的通信至关重要,常见的通信方式包括 OPC、Modbus、EtherNet/IP、Profibus/Profinet 和 Serial(RS232/RS485)。这些通信协议各有特点和应用场景,…...

数据结构初阶·排序算法(内排序)
目录 前言: 1 冒泡排序 2 选择排序 3 插入排序 4 希尔排序 5 快速排序 5.1 Hoare版本 5.2 挖坑法 5.3 前后指针法 5.4 非递归快排 6 归并排序 6.1递归版本归并 6.2 非递归版本归并 7 计数排序 8 排序总结 前言: 目前常见的排序算法有9种…...
PL/SQL oracle上多表关联的一些记录
1.记录自己在PL/SQL上写的几张表的关联条件没有跑出来的一些优化 1. join后面跟上筛选条件 left join on t1.id t2.id and --- 带上分区字段,如 t1.month 202405, 操作跑不出来的一些问题,可能是数据量过大,未做分区过滤 2. 创建…...

Java.Net.UnknownHostException:揭开网络迷雾,解锁异常处理秘籍
在Java编程的浩瀚宇宙中,java.net.UnknownHostException犹如一朵不时飘过的乌云,让开发者在追求网络畅通无阻的道路上遭遇小挫。但别担心,今天我们就来一场说走就走的探险,揭秘这个异常的真面目,并手把手教你几招应对之…...

第十课:telnet(远程登入)
如何远程管理网络设备? 只要保证PC和路由器的ip是互通的,那么PC就可以远程管理路由器(用telnet技术管理)。 我们搭建一个下面这样的简单的拓扑图进行介绍 首先我们点击云,把云打开,点击增加 我们绑定vmn…...

【概率论三】参数估计:点估计(矩估计、极大似然法)、区间估计
文章目录 一. 点估计1. 矩估计法2. 极大似然法2.1. 似然函数2.2. 极大似然估计法 3. 评价估计量的标准3.1. 无偏性3.2. 有效性3.3. 一致性 二. 区间估计1. 区间估计的概念2. 正态总体参数的区间估计 参数估计讲什么 由样本来确定未知参数参数估计分为点估计与区间估计 一. 点估…...

自动化产线 搭配数据采集监控平台 创新与突破
自动化产线在现在的各行各业中应用广泛,已经是现在的生产趋势,不同的自动化生产设备充斥在各行各业中,自动化的设备会产生很多的数据,这些数据如何更科学化的管理,更优质的利用,就需要数据采集监控平台来完…...
【Karapathy大神build-nanogpt】Take Away Notes
B站翻译LINK Personal Note Andrej rebuild gpt2 in pytorch. Take Away Points Before entereing serious training, he use Shakespear’s work as a small debugging datset to see if a model can overfit. Overfitging is a should thing.If we use TF32 or BF32, (by…...

MySQL学习记录 —— 이십이 MySQL服务器日志
文章目录 1、日志介绍2、一般、慢查询日志1、一般查询日志2、慢查询日志FILE格式TABLE格式 3、错误日志4、二进制日志5、日志维护 1、日志介绍 中继服务器的数据来源于集群中的主服务。每次做一些操作时,把操作保存到重做日志,这样崩溃时就可以从重做日志…...

HTTPS请求头缺少HttpOnly和Secure属性解决方案
问题描述: 建立Filter拦截器类 package com.ruoyi.framework.security.filter;import com.ruoyi.common.core.domain.model.LoginUser; import com.ruoyi.common.utils.SecurityUtils; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.framework.…...

react基础样式控制
行内样式 <div style{{width:500px, height:300px,background:#ccc,margin:200px auto}}>文本</div> class类名 注意:在react中使用class类名必须使用className 在外部src下新建index.css文件写入你的样式 .fontcolor{color:red } 在用到的页面引入…...
ES6从入门到精通:前言
ES6简介 ES6(ECMAScript 2015)是JavaScript语言的重大更新,引入了许多新特性,包括语法糖、新数据类型、模块化支持等,显著提升了开发效率和代码可维护性。 核心知识点概览 变量声明 let 和 const 取代 var…...

RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...

3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...

linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
CppCon 2015 学习:Time Programming Fundamentals
Civil Time 公历时间 特点: 共 6 个字段: Year(年)Month(月)Day(日)Hour(小时)Minute(分钟)Second(秒) 表示…...

从零开始了解数据采集(二十八)——制造业数字孪生
近年来,我国的工业领域正经历一场前所未有的数字化变革,从“双碳目标”到工业互联网平台的推广,国家政策和市场需求共同推动了制造业的升级。在这场变革中,数字孪生技术成为备受关注的关键工具,它不仅让企业“看见”设…...

Vue3 PC端 UI组件库我更推荐Naive UI
一、Vue3生态现状与UI库选择的重要性 随着Vue3的稳定发布和Composition API的广泛采用,前端开发者面临着UI组件库的重新选择。一个好的UI库不仅能提升开发效率,还能确保项目的长期可维护性。本文将对比三大主流Vue3 UI库(Naive UI、Element …...