当前位置: 首页 > news >正文

Java使用MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种轻量级的、基于发布/订阅模式的物联网通信协议。它构建于TCP/IP协议之上,由IBM在1999年发布。MQTT的主要特点包括:

  • 轻量级与高效:MQTT设计用于在带宽有限、网络不稳定的环境中工作,具有较小的数据包开销和较低的带宽占用。
  • 高可靠性:使用TCP协议传输,确保消息传递的可靠性。
  • 发布/订阅模式:支持一对多的消息发布,降低应用程序之间的耦合度。
  • 广泛适用性:广泛应用于物联网、智能家居、小型设备等领域,特别适用于机器与机器(M2M)通信。

MQTT协议通过简单的发布和订阅机制,实现了消息的可靠传输和分发,是物联网领域中的重要通信协议之一。

1、需要了解的理论知识

1.1、MQTT 的工作原理

要了解 MQTT 的工作原理,首先需要掌握以下几个概念:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。

MQTT 客户端

任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。

MQTT Broker

MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。

发布-订阅模式

发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

主题

MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:test/topic

1.2、MQTT 的工作流程

在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:

  1. 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
  2. **客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。**当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
  3. MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

2、代码实现

2.1、Maven依赖

MQTT协议有两个版本,一个是3.x,另一个是5.x。本文使用的是3.x

MQTT v3.1

<!--  MQTT v3.1 -->
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency><!--  MQTT 5.0 -->
<!--		<dependency>-->
<!--			<groupId>org.eclipse.paho</groupId>-->
<!--			<artifactId>org.eclipse.paho.mqttv5.client</artifactId>-->
<!--			<version>1.2.5</version>-->
<!--		</dependency>-->

2.2、配置文件

broker这里使用免费的公共的服务,也可以自己使用开源项目emqx搭建

# httpserver.port=8091#server.servlet.context-path=/hub# mqtt
mqtt.url=tcp://broker.emqx.io:1883
mqtt.username=
mqtt.password=
mqtt.clientId=java-mqtt-client
mqtt.defaultTopic=test/topic
mqtt.cleanSession=true

2.3、基于MQTT写个service

依赖了lombok简化代码,按需导入

<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class MqttService implements MqttCallback {@Value("${mqtt.url}")private String url;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.clientId}")private String clientId;@Value("${mqtt.cleanSession}")private boolean cleanSession;private MqttClient client;private int reconnectDelay = 2000; // 初始重连延迟2秒private int maxReconnectAttempts = 3; // 最大重连尝试次数private int reconnectAttempts = 0;@PostConstructpublic void connect() {try {client = new MqttClient(url, clientId);MqttConnectOptions options = new MqttConnectOptions();// cleanSession为 false 时表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。// cleanSession为 true 时表示创建一个新的临时会话,在客户端断开时,会话自动销毁。// 注意:持久会话恢复的前提是客户端使用固定的 Client ID 再次连接,如果 Client ID 是动态的,那么连接成功后将会创建一个新的持久会话。options.setCleanSession(cleanSession);// 禁用Paho的自动重连,自己控制options.setAutomaticReconnect(false);if (username != null && !username.isEmpty()) {options.setUserName(username);options.setPassword(password.toCharArray());}client.setCallback(this);client.connect(options);// 订阅一个或多个主题client.subscribe("test/topic");} catch (MqttException e){log.error("---mqtt connect fail", e);}}// 实现MqttCallback的方法:connectionLost, messageArrived, deliveryComplete@Overridepublic void connectionLost(Throwable cause) {log.info("---Connection lost! " + cause.getMessage());// 这里可以重新连接MQTT服务器reconnect();}private void reconnect() {if (reconnectAttempts < maxReconnectAttempts) {reconnectAttempts++;log.info("---Attempting to reconnect in " + reconnectDelay + "ms");// 使用ScheduledExecutorService方式实现延迟重连// 为简单起见,使用Thread.sleeptry {Thread.sleep(reconnectDelay);} catch (InterruptedException e) {Thread.currentThread().interrupt();}reconnectDelay *= 2; // 增大重连间隔connect(); // 尝试重新连接} else {log.warn("---Max reconnect attempts reached");// 可以考虑执行一些清理操作或通知操作}}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// 当消息到达时调用log.info("---Message arrived. Topic: " + topic + " Message: " + new String(message.getPayload()));// 处理消息的逻辑}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 当消息被完全传送出去后调用log.info("---Delivery complete!");// 可以在这里处理一些发送完成后的清理工作}// 发送消息的方法public void publish(String topic, String payload) throws MqttException {MqttMessage message = new MqttMessage(payload.getBytes());// QoS 0,最多交付一次。可能丢失消息// QoS 1,至少交付一次。可以保证收到消息,但消息可能重复// QoS 2,只交付一次。可以保证消息既不丢失也不重复message.setQos(2);client.publish(topic, message);log.info("---Message published: {}", payload);}// 断开连接的方法@PreDestroypublic void disconnect() throws MqttException {if (client != null && client.isConnected()) {client.disconnect();log.info("---Disconnected");}}
}

2.4、基于MQTT写个service

写个控制器测试

import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@RequestMapping(value = "/send", method = {RequestMethod.GET, RequestMethod.POST})public ResponseEntity<String> sendMessage(@RequestParam String topic, @RequestParam String message) {topic = "test/topic";try {mqttService.publish(topic, message);return ResponseEntity.ok("Message sent successfully");} catch (MqttException e) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send message: " + e.getMessage());}}
}

浏览器输入:http://localhost:8091/mqtt/send?topic&message=我来了
控制台日志如下:符合预期

2024-08-10 23:13:17 [MQTT Call: java-mqtt-client] INFO  cn.talktrip.mqtt.MqttService - ---Message arrived. Topic: test/topic Message: 我来了
2024-08-10 23:13:17 [http-nio-8091-exec-8] INFO  cn.talktrip.mqtt.MqttService - ---Message published: 我来了
2024-08-10 23:13:17 [MQTT Call: java-mqtt-client] INFO  cn.talktrip.mqtt.MqttService - ---Delivery complete!

参考文档:https://www.emqx.com/zh/blog/the-easiest-guide-to-getting-started-with-mqtt

相关文章:

Java使用MQTT协议

MQTT&#xff08;Message Queuing Telemetry Transport&#xff0c;消息队列遥测传输协议&#xff09;是一种轻量级的、基于发布/订阅模式的物联网通信协议。它构建于TCP/IP协议之上&#xff0c;由IBM在1999年发布。MQTT的主要特点包括&#xff1a; 轻量级与高效&#xff1a;M…...

等级+时间的优先级算法

简介 本算法为等级与时间结合计算对应优先级逻辑 等级越高者优先级越高 同等级下&#xff0c;时间越小者优先级越高 实现 主方法 calculatePriority import com.zk.blog.enums.TypeEnum; import org.apache.commons.lang3.StringUtils;/*** program: * description:* autho…...

物流仓库安全视频智能管理方案:构建全方位、高效能的防护体系

一、背景分析 随着物流行业的快速发展和仓储需求的日益增长&#xff0c;仓库安全成为企业运营中不可忽视的重要环节。传统的人工监控方式不仅效率低下&#xff0c;且难以做到全天候、无死角覆盖&#xff0c;给仓库资产和人员安全带来潜在风险。因此&#xff0c;引入仓库安全视…...

jackson反序列化漏洞

jackson反序列化漏洞 反序列化漏洞触发根因jackson介绍jackson反序列化漏洞关键点enableDefaultTypingactivateDefaultTypingJsonTypeInfo 漏洞触发场景漏洞复现环境引入依赖pocactivateDefaultTypingenableDefaultTypingJsonTypeInfo 参考 很久没写blog&#xff0c;最近慢慢开…...

Java | Leetcode Java题解之第328题奇偶链表

题目&#xff1a; 题解&#xff1a; class Solution {public ListNode oddEvenList(ListNode head) {if (head null) {return head;}ListNode evenHead head.next;ListNode odd head, even evenHead;while (even ! null && even.next ! null) {odd.next even.nex…...

100 Exercises To Learn Rust 挑战!准备篇

公司内部的学习会非常活跃&#xff01;我也参与了Rust学习会&#xff0c;并且一直在研究rustlings。最近&#xff0c;我发现了一个类似于rustlings的新教程网站&#xff1a;Welcome - 100 Exercises To Learn Rust。 rustlings是基于Rust的权威官方文档《The Rust Programming…...

瑞_RabbitMQ_初识MQ

文章目录 1 初识MQ1.1 同步调用1.1.1 同步调用的优势1.1.2 同步调用的缺点 1.2 异步调用1.2.1 异步调用的角色1.2.2 异步调用的优势1.2.3 异步调用的缺点1.2.4 异步调用的场景 1.3 MQ技术选型 2 RabbitMQ2.1 安装2.1.1 资源准备2.1.2 安装步骤 2.2 RabbitMQ架构2.3 RabbitMQ管理…...

系统内存管理:虚拟内存、内存分段与分页、页表缓存TLB以及Linux内存管理

虚拟内存 虚拟内存是一种操作系统提供的机制&#xff0c;用于将每个进程分配的独立的虚拟地址空间映射到实际的物理内存地址空间上。通过使用虚拟内存&#xff0c;操作系统可以有效地解决多个应用程序直接操作物理内存可能引发的冲突问题。 在使用虚拟内存的情况下&#xff0…...

Java每日一练_模拟面试题5(堆和栈的区别)

在Java中&#xff0c;堆&#xff08;Heap&#xff09;和栈&#xff08;Stack&#xff09;是两个不同的内存区域&#xff0c;它们在存储内容、管理方式、空间大小、分配方式等多个方面存在显著的区别。以下是Java中堆和栈的主要区别&#xff1a; 1. 存储内容不同 堆&#xff1…...

传感器校正和测试

是 一。舵机在使用过程中为了防止手动扭动损坏其中的齿轮&#xff0c;一般会使用代码测试并校正到0位。 #include <Servo.h> Servo myservo; // 创建一个Servo对象 // 连接到舵机信号线的Arduino引脚 int servoPin 9; void setup() { myservo.attach(servoPin…...

Eclipse 悬浮提示:提高编程效率的利器

Eclipse 悬浮提示&#xff1a;提高编程效率的利器 引言 在当今的软件开发领域&#xff0c;Eclipse 是一款广受欢迎的集成开发环境&#xff08;IDE&#xff09;。它以其强大的功能和灵活性而著称&#xff0c;被全球的开发者用于各种编程语言和项目。Eclipse 的一个显著特点是其…...

Vault系列之:创建令牌

Vault系列之&#xff1a;创建令牌 一、Vault令牌二、令牌认证三、创建一个新的令牌四、使用令牌登陆五、 撤销令牌 一、Vault令牌 Vault令牌是Vault服务器提供的一种身份验证方式&#xff0c;用于授权和访问Vault中存储的资源。Vault令牌可以是客户端令牌或服务令牌。客户端令…...

如何在 Windows 10 环境下安装和配置 MySQL:初学者指南

如何在 Windows 10 环境下安装和配置 MySQL&#xff1a;初学者指南 MySQL 是一个流行的开源数据库管理系统&#xff0c;广泛应用于各种应用程序中。对于初学者来说&#xff0c;了解如何在 Windows 10 环境下安装和配置 MySQL 是一个重要的第一步。本篇博客将详细介绍如何完成这…...

Ubuntu 24.04上报:Error: could not connect to ollama app, is it running?的解决方法

说起来这个问题真实让人无语。按照我之前说过的方法&#xff1a;设置Ollama在局域网中访问的方法&#xff08;Ubuntu&#xff09;_ollama 局域网访问-CSDN博客 把Ollama的默认端口修改后&#xff0c;如果再运行&#xff1a; ollama ps 则会报下面的错&#xff1a; Error: c…...

字典树查重(到底要开多大的空间啊)

前言&#xff1a;烦死了&#xff0c;这个题目一看就是用字典树来做&#xff0c;但是空间不知道开多大&#xff0c;烦死了 后来发现其实tree的第一维空间直接开极端的情况就行&#xff0c;就好像这一题&#xff0c;最多有 1e4 个字符串&#xff0c;每个字符串最长为 50&#xff…...

财务会计与管理会计(二)

文章目录 多工作表销售数据汇总1、INDIRECT函数2、HLOOKUP函数 多表筛选分类求和1、SUMIF函数2、INDIRECT函数 两组数据比对详解VLOOKUP函数的应用 多工作表销售数据汇总 1、INDIRECT函数 INDIRECT(""&D$4&"!D4:M24") 1月!D4:M24 HLOOKUP($A$1,I…...

技术周总结 08.05-08.11周日

文章目录 一、08.06 周二1.1) 问题01 mac安装 scala:1. 使用 Homebrew2. 使用 SDKMAN!其他注意事项1. 确认 Scala 安装位置2. 设置 PATH 环境变量对于 zsh (macOS Catalina 及更高版本默认使用 zsh):对于 bash (如果您使用的是 bash shell): 3. 验证安装 二、08.09 周五2.1&…...

B树和B+树的插入、删除

1. B树 1.1 B树的定义 树也称树&#xff0c;它是一颗多路平衡查找树。我们描述一颗树时需要指定它的阶数&#xff0c;阶数表示了一个结点最多有多少个孩子结点&#xff0c;用字母表示阶数。当取时&#xff0c;就是我们常见的二叉搜索树。 一颗阶的树定义如下&#xff1a; 每…...

Axios网络请求总结

在实际项目开发中&#xff0c;前端页面所需要的数据往往需要从服务器端获取&#xff0c;这必然涉及与服务器的通信。Axios 是一个基于 promise 网络请求库&#xff0c;作用于node.js 和浏览器中。Axios 在浏览器端使用XMLHttpRequests发送网络请求&#xff0c;并能自动完成JSON…...

立仪科技光谱共焦应用之金属隔膜静态重复性测量

01&#xff5c;检测需求&#xff1a;金属隔膜重复性测量 02&#xff5c;检测方式 为了保证精度&#xff0c;首先先用千分尺进行测量&#xff0c;得出相应的厚度数据&#xff0c;在选择合适的侧头&#xff0c;根据结果&#xff0c;我们现在立仪科技H4UO控制器搭配D27A20侧头 03&…...

vue3实现video视频+弹幕评论

vue3实现视频加评论 之前写了一篇博客使用了弹幕插件http://t.csdnimg.cn/616mlvue3 使用弹幕插件&#xff0c;今天对这个页面进行了升级 变成了 vue3使用video 这个没有使用插件&#xff0c;昨天看了好多&#xff0c;没发现有用的插件&#xff0c;下载了几个都没办法使用就用…...

STM32-OTA升级

一、OTA&#xff08;Over-The-Air&#xff09; OTA&#xff08;Over-The-Air&#xff09;是一种通过无线通信方式&#xff0c;为设备分发新软件、配置甚至更新加密密钥的技术。它允许中心位置向所有用户发送更新&#xff0c;确保每个接收者都无法拒绝、破坏或改变这些更新&…...

一种JSON多态表示法

介绍 假设现在需要实现一种功能: 从某个远程的组件(消息队列或远程文件)拉取最后几条记录做一个展示. 需要支持如下的组件: Kafka RocketMQ OSS 假设还有很多, 这里不列了 … 显然, 每种组件需要的参数各不一样, 那么此时如何使用一个统一的结构来表达这些组件的参数呢?…...

C语言实现单链表

一、什么是单链表 1.链表就是一种在物理存储上各个节点非连续的&#xff0c;随机的&#xff0c;元素的逻辑顺序是通过链表中的指针链接的次序而实现的。 图示&#xff1a; 二、单链表中节点的定义 #include<stdio.h> #include<stdlib.h> #include<string.h>…...

循环神经网络三

一.介绍 在普通的神经网络中&#xff0c;信息的传递是单向的&#xff0c;这种限制虽然使得网络变得更容易学习&#xff0c;单在一定程度上也减弱了神经网络模型的能力。特别是在现实生活中&#xff0c;网络的输出不仅和当前时刻的输入相关&#xff0c;也过去一段时间的输出相关…...

优购电商小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;商品分类管理&#xff0c;商品信息管理&#xff0c;留言板管理&#xff0c;订单管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&#xff0c;商品信息&#xf…...

【ARM】v8架构programmer guide(4)_ARMv8的寄存器

目录 4.4Endianness&#xff08;端序或字节序&#xff09; 4.5 改变execution state 4.5.1 Registers at AArch32 4.5.2 PSTATE at AArch32 4.6 NEON 和浮点数寄存器 4.6.1 AArch64中浮点寄存器的组织结构 4.6.2 标量寄存器大小 4.6.3 向量寄存器大小 4.6.4 NEON在AArc…...

Java设计模式详细讲解

目录 设计模式概述 1.1 什么是设计模式1.2 设计模式的类型1.3 设计模式的历史与发展1.4 设计模式在软件开发中的重要性 创建型模式 2.1 单例模式2.2 工厂方法模式2.3 抽象工厂模式2.4 建造者模式2.5 原型模式 结构型模式 3.1 适配器模式3.2 装饰器模式3.3 代理模式3.4 外观模…...

图论------弗洛伊德(Floyd-Warshall)算法

题目描述&#xff1a; 在每年的校赛里&#xff0c;所有进入决赛的同学都会获得一件很漂亮的 T-shirt。但是每当我们的工作人员把上百件的衣服从商店运回到赛场的时候&#xff0c;却是非常累的&#xff01;所以现在他们想要寻找最短的从商店到赛场的路线&#xff0c;你可以帮助…...

C#实现动画效果

在C#中&#xff0c;实现动画效果通常可以使用Windows Forms的Timer类或者使用System.Windows.Media.Animation命名空间下的类&#xff08;如果是WPF应用&#xff09;。以下是一个Windows Forms应用中使用Timer类来创建简单的动画效果的例子。 假设我们有一个窗体&#xff08;F…...