Java模拟Mqtt客户端连接Mqtt Broker
Java模拟Mqtt客户端基本流程
引入Paho MQTT客户端库
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version>
</dependency>
设置mqtt配置数据
在application.yml中添加如下配置
mqtt:broker-url: tcp://42.194.132.44:1883client-id: mqtt_receive_serverusername: mqtt_serverpassword: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
MqttClient配置
将MqttClient加入到IoC容器,并连接客户端
package com.angel.ocean.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);return client;}
}
MqttService
mqtt客户端,一些基本操作:连接、订阅、发消息,断开连接
package com.angel.ocean.mqtt;import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Slf4j
@Service
public class MqttService {@Resourceprivate MqttClient client;@Resourceprivate KafkaService kafkaService;@PostConstructpublic void init() throws MqttException {client.setCallback(new MqttCallbackHandler(kafkaService));subscribe(MqttTopicConstant.ACTIVATE);subscribe(MqttTopicConstant.RESET);subscribe(MqttTopicConstant.ONLINE);subscribe(MqttTopicConstant.OFFLINE);subscribe(MqttTopicConstant.REPORT);}/*** 连接*/public void connect(String username, String password) throws MqttException {if(!client.isConnected()) {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);}}/*** 发送消息*/public void publish(String topic, String data) {if(client.isConnected()) {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(0);try {client.publish(topic, message);log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, data);} catch (MqttException e) {log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message publish failed, client:{} not online.", client.getClientId());}/*** 订阅*/public void subscribe(String topic) {if(client.isConnected()) {try {client.subscribe(topic);log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);} catch (MqttException e) {log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message subscribe failed, client:{} not online.", client.getClientId());}/*** 断开连接*/public void disconnect() {try {client.disconnect();client.close();log.info("Disconnected:{}", client.getClientId());} catch (MqttException e) {log.error("Message disconnect failed:{}", client.getClientId(), e);}}
}
自定义MqttCallback
对客户端连接丢失,收到消息做一些模拟处理
package com.angel.ocean.mqtt;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.domain.UpData;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;@Slf4j
public class MqttCallbackHandler implements MqttCallback {private KafkaService kafkaService;public MqttCallbackHandler(KafkaService kafkaService) {this.kafkaService = kafkaService;}@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开...", cause);}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String data = new String(message.getPayload());log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);UpData upData = JSONObject.parseObject(data, UpData.class);UpKafKaData upKafKaData = new UpKafKaData(topic, data);log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------:{}", token.isComplete());}
}
MqttController
用于模拟客户端行为
package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.mqtt.MqttService;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/*** 前端控制器** @author Jaime.yu* @time 2024-12-01*/
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {@Resourceprivate MqttService mqttService;@GetMapping("/subscribe")public ApiResult<?> subscribe(String topic) {mqttService.subscribe(topic);return ApiResult.success();}@GetMapping("/publish")public ApiResult<?> publish(String topic, String message) {mqttService.publish(topic, message);return ApiResult.success();}@GetMapping("/disconnect")public ApiResult<?> disconnect() {mqttService.disconnect();return ApiResult.success();}
}
代码验证
启动mqtt客户端
如下图客户端已上线:

发送消息
如下图mqtt broker该客户端的日志,接收到了我们发送的数据:hello world

接收数据
首先我们先订阅个主题:mqtt/0/0

使用MQTTX客户端向该主题发消息

Java mqtt客户端接收数据
查询本地Java mqtt客户收到的消息,如下图收到该消息
mqtt broker 也可以看到该日志:

断开连接
如下图本地客户端862024121819020已断开连接:

相关文章:
Java模拟Mqtt客户端连接Mqtt Broker
Java模拟Mqtt客户端基本流程 引入Paho MQTT客户端库 <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version> </dependency>设置mqtt配置数据 …...
【电商搜索】文档的信息论生成聚类
【电商搜索】文档的信息论生成聚类 目录 文章目录 【电商搜索】文档的信息论生成聚类目录文章信息概览研究背景技术挑战如何破局技术应用主要相关工作与参考文献后续优化方向 后记 文章信息 https://arxiv.org/pdf/2412.13534 概览 本文提出了一种基于信息论的生成聚类&#…...
在福昕(pdf)阅读器中导航到上次阅读页面的方法
文章目录 在福昕(pdf)阅读器中导航到上次阅读页面的方法概述笔记用书签的方法来导航用导航按钮的方法来导航 备注END 在福昕(pdf)阅读器中导航到上次阅读页面的方法 概述 喜欢用福昕(pdf)阅读器来看pdf文件。 但是有个小问题困扰了我好久。 e.g. 300页的pdf看了一半ÿ…...
基于Springboot的数字科技风险报告管理系统
博主介绍:java高级开发,从事互联网行业六年,熟悉各种主流语言,精通java、python、php、爬虫、web开发,已经做了多年的设计程序开发,开发过上千套设计程序,没有什么华丽的语言,只有实…...
【最后203篇系列】001 - 2024回顾
说明 最早在CSDN上写文章有两个目的: 1 自己梳理知识,以备日后查用2 曾经从别人的文章中得到过帮助,所以也希望能给人帮助 所以在这个过程中,我的文章基本上完全是原创,也非常强调落地与工程化。在不断写作的过程中…...
量子退火与机器学习(1):少量数据求解未知QUBO矩阵,以少见多
文章目录 前言ー、复习QUBO:中药配伍的复杂性1.QUBO 的介入:寻找最佳药材组合 二、难题:QUBO矩阵未知的问题1.为什么这么难? 三、稀疏建模(Sparse Modeling)1. 欠定系统中的稀疏解2. L1和L2的选择: 三、压缩感知算法(C…...
矩阵:Input-Output Interpretation of Matrices (中英双语)
矩阵的输入-输出解释:深入理解与应用 在线性代数中,矩阵与向量的乘积 ( y A x y Ax yAx ) 是一个极为重要的关系。通过这一公式,我们可以将矩阵 ( A A A ) 看作一个将输入向量 ( x x x ) 映射到输出向量 ( y y y ) 的线性变换。在这种…...
excel 使用vlook up找出两列中不同的内容
当使用 VLOOKUP 函数时,您可以将其用于比较两列的内容。假设您要比较 A 列和 B 列的内容,并将结果显示在 C 列,您可以在 C1 单元格中输入以下公式: 这个公式将在 B 列中的每个单元格中查找是否存在于 A 列中。如果在 A 列中找不到…...
YoloV8改进策略:Head改进|DynamicHead,利用注意力机制统一目标检测头部|即插即用
摘要 论文介绍 本文介绍了一种名为DynamicHead的模块,该模块旨在通过注意力机制统一目标检测头部,以提升目标检测的性能。论文详细阐述了DynamicHead的工作原理,并通过实验证明了其在COCO基准测试上的有效性和效率。 创新点 DynamicHead模块的创新之处在于它首次尝试在一…...
两地的日出日落时间差为啥不相等
悟空去延吉玩耍,在下午4点多的时候发来一张照片,说,天已经黑了!我赶紧地图上看了看,延吉居然和北京差了大约15度的经度差,那就是大约一小时的时差哦。次日我随便查了一下两地的日出日落时间,结果…...
Android Https和WebView
系统会提示说不安全,因为网站通过js就能调用你的android代码,如果你确认你的网站没用到JS的话就不要打开这个开关,如果用到了,就添加一个注解忽略它就行了。 后来就使用我们公司的网站了,发现也出不来,后来…...
2.5.1 文件管理基本概念
文章目录 文件文件系统文件分类 文件 文件:具有符号名,逻辑上有完整意义的一组相关信息的集合。 文件包含文件体、文件说明两部分。文件体存储文件的真实内容,文件说明存放操作系统管理文件所用的信息。 文件说明包含文件名、内部标识、类型、…...
在 PowerShell 中优雅地显示 Python 虚拟环境
在使用 Python 进行开发时,虚拟环境管理是一个非常重要的部分。无论是使用 venv 还是 conda,我们都希望能够清晰地看到当前所处的虚拟环境。本文将介绍如何在 PowerShell 中配置提示符,使其能够优雅地显示不同类型的 Python 虚拟环境。 问题…...
K8S Ingress 服务配置步骤说明
部署Pod服务 分别使用kubectl run和kubectl apply 部署nginx和tomcat服务 # 快速启动一个nginx服务 kubectl run my-nginx --imagenginx --port80# 使用yaml创建tomcat服务 kubectl apply -f my-tomcat.yamlmy-tomcat.yaml apiVersion: apps/v1 kind: Deployment metadata:n…...
观察者模式(sigslot in C++)
大家,我是东风,今天抽点时间整理一下我很久前关注的一个不错的库,可以支持我们在使用标准C的时候使用信号槽机制进行观察者模式设计,sigslot 官网: http://sigslot.sourceforge.net/ 本文较为详尽探讨了一种观察者模…...
python使用pip进行库的下载
前言 现如今有太多的python编译软件,其库的下载也是五花八门,但在作者看来,无论是哪种方法都是万变不离其宗,即pip下载。 pip是python的包管理工具,无论你是用的什么python软件,都可以用pip进行库的下载。 …...
C#(委托)
一、基本定义 在C#中,委托(Delegate)是一种引用类型,它用于封装一个方法(具有特定的参数列表和返回类型)。可以把委托想象成一个能存储方法的变量,这个变量能够像调用普通方法一样来调用它所存…...
《点点之歌》“意外”诞生记
世界是“点点”的,“点点”是世界的。 (笔记模板由python脚本于2024年12月23日 19:28:25创建,本篇笔记适合喜欢诗文的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网:https://www.python.org/ Free:大咖免费“圣经”教程《 …...
ue5 pcg(程序内容生成)真的简单方便,就5个节点
总结: 前情提示 鼠标单击右键平移节点 1.编辑-》插件-》procedural->勾选两个插件 2.右键-》pcg图表-》拖拽进入场景 3.先看点point 右键-》调试(快捷键d)->右侧设置粒子数 3.1调整粒子数 可以在右侧输入框,使用加减乘除 4.1 表面采样器 …...
32岁前端干了8年,是继续做前端开发,还是转其它工作
前端发展有瓶颈,变来变去都是那一套,只是换了框架换了环境。换了框架后又得去学习,虽然很快上手,但是那些刚毕业的也很快上手了,入门门槛越来越低,想转行或继续卷,该如何破圈 这是一位网友的自述…...
RestClient
什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端,它允许HTTP与Elasticsearch 集群通信,而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级ÿ…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...
掌握 HTTP 请求:理解 cURL GET 语法
cURL 是一个强大的命令行工具,用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中,cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...
云安全与网络安全:核心区别与协同作用解析
在数字化转型的浪潮中,云安全与网络安全作为信息安全的两大支柱,常被混淆但本质不同。本文将从概念、责任分工、技术手段、威胁类型等维度深入解析两者的差异,并探讨它们的协同作用。 一、核心区别 定义与范围 网络安全:聚焦于保…...
PydanticAI快速入门示例
参考链接:https://ai.pydantic.dev/#why-use-pydanticai 示例代码 from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIModel from pydantic_ai.providers.openai import OpenAIProvider# 配置使用阿里云通义千问模型 model OpenAIMode…...
SpringCloud优势
目录 完善的微服务支持 高可用性和容错性 灵活的配置管理 强大的服务网关 分布式追踪能力 丰富的社区生态 易于与其他技术栈集成 完善的微服务支持 Spring Cloud 提供了一整套工具和组件来支持微服务架构的开发,包括服务注册与发现、负载均衡、断路器、配置管理等功能…...
Docker环境下安装 Elasticsearch + IK 分词器 + Pinyin插件 + Kibana(适配7.10.1)
做RAG自己打算使用esmilvus自己开发一个,安装时好像网上没有比较新的安装方法,然后找了个旧的方法对应试试: 🚀 本文将手把手教你在 Docker 环境中部署 Elasticsearch 7.10.1 IK分词器 拼音插件 Kibana,适配中文搜索…...
【Redis】Redis 的持久化策略
目录 一、RDB 定期备份 1.2 触发方式 1.2.1 手动触发 1.2.2.1 自动触发 RDB 持久化机制的场景 1.2.2.2 检查是否触发 1.2.2.3 线上运维配置 1.3 检索工具 1.4 RDB 备份实现原理 1.5 禁用 RDB 快照 1.6 RDB 优缺点分析 二、AOF 实时备份 2.1 配置文件解析 2.2 开启…...
