当前位置: 首页 > news >正文

Java使用MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种轻量级的、基于发布/订阅模式的物联网通信协议。它构建于TCP/IP协议之上,由IBM在1999年发布。MQTT的主要特点包括:

  • 轻量级与高效:MQTT设计用于在带宽有限、网络不稳定的环境中工作,具有较小的数据包开销和较低的带宽占用。
  • 高可靠性:使用TCP协议传输,确保消息传递的可靠性。
  • 发布/订阅模式:支持一对多的消息发布,降低应用程序之间的耦合度。
  • 广泛适用性:广泛应用于物联网、智能家居、小型设备等领域,特别适用于机器与机器(M2M)通信。

MQTT协议通过简单的发布和订阅机制,实现了消息的可靠传输和分发,是物联网领域中的重要通信协议之一。

1、需要了解的理论知识

1.1、MQTT 的工作原理

要了解 MQTT 的工作原理,首先需要掌握以下几个概念:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。

MQTT 客户端

任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。

MQTT Broker

MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。

发布-订阅模式

发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

主题

MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:test/topic

1.2、MQTT 的工作流程

在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:

  1. 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
  2. **客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。**当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
  3. MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

2、代码实现

2.1、Maven依赖

MQTT协议有两个版本,一个是3.x,另一个是5.x。本文使用的是3.x

MQTT v3.1

<!--  MQTT v3.1 -->
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency><!--  MQTT 5.0 -->
<!--		<dependency>-->
<!--			<groupId>org.eclipse.paho</groupId>-->
<!--			<artifactId>org.eclipse.paho.mqttv5.client</artifactId>-->
<!--			<version>1.2.5</version>-->
<!--		</dependency>-->

2.2、配置文件

broker这里使用免费的公共的服务,也可以自己使用开源项目emqx搭建

# httpserver.port=8091#server.servlet.context-path=/hub# mqtt
mqtt.url=tcp://broker.emqx.io:1883
mqtt.username=
mqtt.password=
mqtt.clientId=java-mqtt-client
mqtt.defaultTopic=test/topic
mqtt.cleanSession=true

2.3、基于MQTT写个service

依赖了lombok简化代码,按需导入

<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class MqttService implements MqttCallback {@Value("${mqtt.url}")private String url;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.clientId}")private String clientId;@Value("${mqtt.cleanSession}")private boolean cleanSession;private MqttClient client;private int reconnectDelay = 2000; // 初始重连延迟2秒private int maxReconnectAttempts = 3; // 最大重连尝试次数private int reconnectAttempts = 0;@PostConstructpublic void connect() {try {client = new MqttClient(url, clientId);MqttConnectOptions options = new MqttConnectOptions();// cleanSession为 false 时表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。// cleanSession为 true 时表示创建一个新的临时会话,在客户端断开时,会话自动销毁。// 注意:持久会话恢复的前提是客户端使用固定的 Client ID 再次连接,如果 Client ID 是动态的,那么连接成功后将会创建一个新的持久会话。options.setCleanSession(cleanSession);// 禁用Paho的自动重连,自己控制options.setAutomaticReconnect(false);if (username != null && !username.isEmpty()) {options.setUserName(username);options.setPassword(password.toCharArray());}client.setCallback(this);client.connect(options);// 订阅一个或多个主题client.subscribe("test/topic");} catch (MqttException e){log.error("---mqtt connect fail", e);}}// 实现MqttCallback的方法:connectionLost, messageArrived, deliveryComplete@Overridepublic void connectionLost(Throwable cause) {log.info("---Connection lost! " + cause.getMessage());// 这里可以重新连接MQTT服务器reconnect();}private void reconnect() {if (reconnectAttempts < maxReconnectAttempts) {reconnectAttempts++;log.info("---Attempting to reconnect in " + reconnectDelay + "ms");// 使用ScheduledExecutorService方式实现延迟重连// 为简单起见,使用Thread.sleeptry {Thread.sleep(reconnectDelay);} catch (InterruptedException e) {Thread.currentThread().interrupt();}reconnectDelay *= 2; // 增大重连间隔connect(); // 尝试重新连接} else {log.warn("---Max reconnect attempts reached");// 可以考虑执行一些清理操作或通知操作}}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// 当消息到达时调用log.info("---Message arrived. Topic: " + topic + " Message: " + new String(message.getPayload()));// 处理消息的逻辑}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 当消息被完全传送出去后调用log.info("---Delivery complete!");// 可以在这里处理一些发送完成后的清理工作}// 发送消息的方法public void publish(String topic, String payload) throws MqttException {MqttMessage message = new MqttMessage(payload.getBytes());// QoS 0,最多交付一次。可能丢失消息// QoS 1,至少交付一次。可以保证收到消息,但消息可能重复// QoS 2,只交付一次。可以保证消息既不丢失也不重复message.setQos(2);client.publish(topic, message);log.info("---Message published: {}", payload);}// 断开连接的方法@PreDestroypublic void disconnect() throws MqttException {if (client != null && client.isConnected()) {client.disconnect();log.info("---Disconnected");}}
}

2.4、基于MQTT写个service

写个控制器测试

import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@RequestMapping(value = "/send", method = {RequestMethod.GET, RequestMethod.POST})public ResponseEntity<String> sendMessage(@RequestParam String topic, @RequestParam String message) {topic = "test/topic";try {mqttService.publish(topic, message);return ResponseEntity.ok("Message sent successfully");} catch (MqttException e) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send message: " + e.getMessage());}}
}

浏览器输入:http://localhost:8091/mqtt/send?topic&message=我来了
控制台日志如下:符合预期

2024-08-10 23:13:17 [MQTT Call: java-mqtt-client] INFO  cn.talktrip.mqtt.MqttService - ---Message arrived. Topic: test/topic Message: 我来了
2024-08-10 23:13:17 [http-nio-8091-exec-8] INFO  cn.talktrip.mqtt.MqttService - ---Message published: 我来了
2024-08-10 23:13:17 [MQTT Call: java-mqtt-client] INFO  cn.talktrip.mqtt.MqttService - ---Delivery complete!

参考文档:https://www.emqx.com/zh/blog/the-easiest-guide-to-getting-started-with-mqtt

相关文章:

Java使用MQTT协议

MQTT&#xff08;Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议&#xff09;是一种轻量级的、基于发布/订阅模式的物联网通信协议。它构建于TCP/IP协议之上&#xff0c;由IBM在1999年发布。MQTT的主要特点包括&#xff1a; 轻量级与高效&#xff1a;M…...

等级+时间的优先级算法

简介 本算法为等级与时间结合计算对应优先级逻辑 等级越高者优先级越高 同等级下&#xff0c;时间越小者优先级越高 实现 主方法 calculatePriority import com.zk.blog.enums.TypeEnum; import org.apache.commons.lang3.StringUtils;/*** program: * description:* autho…...

物流仓库安全视频智能管理方案:构建全方位、高效能的防护体系

一、背景分析 随着物流行业的快速发展和仓储需求的日益增长&#xff0c;仓库安全成为企业运营中不可忽视的重要环节。传统的人工监控方式不仅效率低下&#xff0c;且难以做到全天候、无死角覆盖&#xff0c;给仓库资产和人员安全带来潜在风险。因此&#xff0c;引入仓库安全视…...

jackson反序列化漏洞

jackson反序列化漏洞 反序列化漏洞触发根因jackson介绍jackson反序列化漏洞关键点enableDefaultTypingactivateDefaultTypingJsonTypeInfo 漏洞触发场景漏洞复现环境引入依赖pocactivateDefaultTypingenableDefaultTypingJsonTypeInfo 参考 很久没写blog&#xff0c;最近慢慢开…...

Java | Leetcode Java题解之第328题奇偶链表

题目&#xff1a; 题解&#xff1a; class Solution {public ListNode oddEvenList(ListNode head) {if (head null) {return head;}ListNode evenHead head.next;ListNode odd head, even evenHead;while (even ! null && even.next ! null) {odd.next even.nex…...

100 Exercises To Learn Rust 挑战!准备篇

公司内部的学习会非常活跃&#xff01;我也参与了Rust学习会&#xff0c;并且一直在研究rustlings。最近&#xff0c;我发现了一个类似于rustlings的新教程网站&#xff1a;Welcome - 100 Exercises To Learn Rust。 rustlings是基于Rust的权威官方文档《The Rust Programming…...

瑞_RabbitMQ_初识MQ

文章目录 1 初识MQ1.1 同步调用1.1.1 同步调用的优势1.1.2 同步调用的缺点 1.2 异步调用1.2.1 异步调用的角色1.2.2 异步调用的优势1.2.3 异步调用的缺点1.2.4 异步调用的场景 1.3 MQ技术选型 2 RabbitMQ2.1 安装2.1.1 资源准备2.1.2 安装步骤 2.2 RabbitMQ架构2.3 RabbitMQ管理…...

系统内存管理:虚拟内存、内存分段与分页、页表缓存TLB以及Linux内存管理

虚拟内存 虚拟内存是一种操作系统提供的机制&#xff0c;用于将每个进程分配的独立的虚拟地址空间映射到实际的物理内存地址空间上。通过使用虚拟内存&#xff0c;操作系统可以有效地解决多个应用程序直接操作物理内存可能引发的冲突问题。 在使用虚拟内存的情况下&#xff0…...

Java每日一练_模拟面试题5(堆和栈的区别)

在Java中&#xff0c;堆&#xff08;Heap&#xff09;和栈&#xff08;Stack&#xff09;是两个不同的内存区域&#xff0c;它们在存储内容、管理方式、空间大小、分配方式等多个方面存在显著的区别。以下是Java中堆和栈的主要区别&#xff1a; 1. 存储内容不同 堆&#xff1…...

传感器校正和测试

是 一。舵机在使用过程中为了防止手动扭动损坏其中的齿轮&#xff0c;一般会使用代码测试并校正到0位。 #include <Servo.h> Servo myservo; // 创建一个Servo对象 // 连接到舵机信号线的Arduino引脚 int servoPin 9; void setup() { myservo.attach(servoPin…...

Eclipse 悬浮提示:提高编程效率的利器

Eclipse 悬浮提示&#xff1a;提高编程效率的利器 引言 在当今的软件开发领域&#xff0c;Eclipse 是一款广受欢迎的集成开发环境&#xff08;IDE&#xff09;。它以其强大的功能和灵活性而著称&#xff0c;被全球的开发者用于各种编程语言和项目。Eclipse 的一个显著特点是其…...

Vault系列之:创建令牌

Vault系列之&#xff1a;创建令牌 一、Vault令牌二、令牌认证三、创建一个新的令牌四、使用令牌登陆五、 撤销令牌 一、Vault令牌 Vault令牌是Vault服务器提供的一种身份验证方式&#xff0c;用于授权和访问Vault中存储的资源。Vault令牌可以是客户端令牌或服务令牌。客户端令…...

如何在 Windows 10 环境下安装和配置 MySQL:初学者指南

如何在 Windows 10 环境下安装和配置 MySQL&#xff1a;初学者指南 MySQL 是一个流行的开源数据库管理系统&#xff0c;广泛应用于各种应用程序中。对于初学者来说&#xff0c;了解如何在 Windows 10 环境下安装和配置 MySQL 是一个重要的第一步。本篇博客将详细介绍如何完成这…...

Ubuntu 24.04上报:Error: could not connect to ollama app, is it running?的解决方法

说起来这个问题真实让人无语。按照我之前说过的方法&#xff1a;设置Ollama在局域网中访问的方法&#xff08;Ubuntu&#xff09;_ollama 局域网访问-CSDN博客 把Ollama的默认端口修改后&#xff0c;如果再运行&#xff1a; ollama ps 则会报下面的错&#xff1a; Error: c…...

字典树查重(到底要开多大的空间啊)

前言&#xff1a;烦死了&#xff0c;这个题目一看就是用字典树来做&#xff0c;但是空间不知道开多大&#xff0c;烦死了 后来发现其实tree的第一维空间直接开极端的情况就行&#xff0c;就好像这一题&#xff0c;最多有 1e4 个字符串&#xff0c;每个字符串最长为 50&#xff…...

财务会计与管理会计(二)

文章目录 多工作表销售数据汇总1、INDIRECT函数2、HLOOKUP函数 多表筛选分类求和1、SUMIF函数2、INDIRECT函数 两组数据比对详解VLOOKUP函数的应用 多工作表销售数据汇总 1、INDIRECT函数 INDIRECT(""&D$4&"!D4:M24") 1月!D4:M24 HLOOKUP($A$1,I…...

技术周总结 08.05-08.11周日

文章目录 一、08.06 周二1.1) 问题01 mac安装 scala:1. 使用 Homebrew2. 使用 SDKMAN!其他注意事项1. 确认 Scala 安装位置2. 设置 PATH 环境变量对于 zsh (macOS Catalina 及更高版本默认使用 zsh):对于 bash (如果您使用的是 bash shell): 3. 验证安装 二、08.09 周五2.1&…...

B树和B+树的插入、删除

1. B树 1.1 B树的定义 树也称树&#xff0c;它是一颗多路平衡查找树。我们描述一颗树时需要指定它的阶数&#xff0c;阶数表示了一个结点最多有多少个孩子结点&#xff0c;用字母表示阶数。当取时&#xff0c;就是我们常见的二叉搜索树。 一颗阶的树定义如下&#xff1a; 每…...

Axios网络请求总结

在实际项目开发中&#xff0c;前端页面所需要的数据往往需要从服务器端获取&#xff0c;这必然涉及与服务器的通信。Axios 是一个基于 promise 网络请求库&#xff0c;作用于node.js 和浏览器中。Axios 在浏览器端使用XMLHttpRequests发送网络请求&#xff0c;并能自动完成JSON…...

立仪科技光谱共焦应用之金属隔膜静态重复性测量

01&#xff5c;检测需求&#xff1a;金属隔膜重复性测量 02&#xff5c;检测方式 为了保证精度&#xff0c;首先先用千分尺进行测量&#xff0c;得出相应的厚度数据&#xff0c;在选择合适的侧头&#xff0c;根据结果&#xff0c;我们现在立仪科技H4UO控制器搭配D27A20侧头 03&…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用&#xff1a;实现组件通用属性的渐变过渡效果&#xff0c;提升用户体验。支持属性&#xff1a;width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项&#xff1a; 布局类属性&#xff08;如宽高&#xff09;变化时&#…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

STM32标准库-DMA直接存储器存取

文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA&#xff08;Direct Memory Access&#xff09;直接存储器存取 DMA可以提供外设…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

Linux nano命令的基本使用

参考资料 GNU nanoを使いこなすnano基础 目录 一. 简介二. 文件打开2.1 普通方式打开文件2.2 只读方式打开文件 三. 文件查看3.1 打开文件时&#xff0c;显示行号3.2 翻页查看 四. 文件编辑4.1 Ctrl K 复制 和 Ctrl U 粘贴4.2 Alt/Esc U 撤回 五. 文件保存与退出5.1 Ctrl …...

DBLP数据库是什么?

DBLP&#xff08;Digital Bibliography & Library Project&#xff09;Computer Science Bibliography是全球著名的计算机科学出版物的开放书目数据库。DBLP所收录的期刊和会议论文质量较高&#xff0c;数据库文献更新速度很快&#xff0c;很好地反映了国际计算机科学学术研…...

02.运算符

目录 什么是运算符 算术运算符 1.基本四则运算符 2.增量运算符 3.自增/自减运算符 关系运算符 逻辑运算符 &&&#xff1a;逻辑与 ||&#xff1a;逻辑或 &#xff01;&#xff1a;逻辑非 短路求值 位运算符 按位与&&#xff1a; 按位或 | 按位取反~ …...

2025年- H71-Lc179--39.组合总和(回溯,组合)--Java版

1.题目描述 2.思路 当前的元素可以重复使用。 &#xff08;1&#xff09;确定回溯算法函数的参数和返回值&#xff08;一般是void类型&#xff09; &#xff08;2&#xff09;因为是用递归实现的&#xff0c;所以我们要确定终止条件 &#xff08;3&#xff09;单层搜索逻辑 二…...

边缘计算网关提升水产养殖尾水处理的远程运维效率

一、项目背景 随着水产养殖行业的快速发展&#xff0c;养殖尾水的处理成为了一个亟待解决的环保问题。传统的尾水处理方式不仅效率低下&#xff0c;而且难以实现精准监控和管理。为了提升尾水处理的效果和效率&#xff0c;同时降低人力成本&#xff0c;某大型水产养殖企业决定…...