当前位置: 首页 > news >正文

Java模拟Mqtt客户端连接Mqtt Broker

Java模拟Mqtt客户端基本流程

引入Paho MQTT客户端库

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version>
</dependency>

设置mqtt配置数据

在application.yml中添加如下配置

mqtt:broker-url: tcp://42.194.132.44:1883client-id: mqtt_receive_serverusername: mqtt_serverpassword: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0

MqttClient配置

将MqttClient加入到IoC容器,并连接客户端

package com.angel.ocean.config;import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.client-id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId);MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);return client;}
}

MqttService

mqtt客户端,一些基本操作:连接、订阅、发消息,断开连接

package com.angel.ocean.mqtt;import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;@Slf4j
@Service
public class MqttService {@Resourceprivate MqttClient client;@Resourceprivate KafkaService kafkaService;@PostConstructpublic void init() throws MqttException {client.setCallback(new MqttCallbackHandler(kafkaService));subscribe(MqttTopicConstant.ACTIVATE);subscribe(MqttTopicConstant.RESET);subscribe(MqttTopicConstant.ONLINE);subscribe(MqttTopicConstant.OFFLINE);subscribe(MqttTopicConstant.REPORT);}/*** 连接*/public void connect(String username, String password) throws MqttException {if(!client.isConnected()) {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());options.setCleanSession(true);client.connect(options);}}/*** 发送消息*/public void publish(String topic, String data) {if(client.isConnected()) {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(0);try {client.publish(topic, message);log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, data);} catch (MqttException e) {log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message publish failed, client:{} not online.", client.getClientId());}/*** 订阅*/public void subscribe(String topic) {if(client.isConnected()) {try {client.subscribe(topic);log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);} catch (MqttException e) {log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);}return;}log.info("Message subscribe failed, client:{} not online.", client.getClientId());}/*** 断开连接*/public void disconnect() {try {client.disconnect();client.close();log.info("Disconnected:{}", client.getClientId());} catch (MqttException e) {log.error("Message disconnect failed:{}", client.getClientId(), e);}}
}

自定义MqttCallback

对客户端连接丢失,收到消息做一些模拟处理

package com.angel.ocean.mqtt;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.domain.UpData;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;@Slf4j
public class MqttCallbackHandler implements MqttCallback {private KafkaService kafkaService;public MqttCallbackHandler(KafkaService kafkaService) {this.kafkaService = kafkaService;}@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连log.info("连接断开...", cause);}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {String data = new String(message.getPayload());log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);UpData upData = JSONObject.parseObject(data, UpData.class);UpKafKaData upKafKaData = new UpKafKaData(topic, data);log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {log.info("deliveryComplete---------:{}", token.isComplete());}
}

MqttController

用于模拟客户端行为

package com.angel.ocean.controller;import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.mqtt.MqttService;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;/***  前端控制器** @author Jaime.yu* @time 2024-12-01*/
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {@Resourceprivate MqttService mqttService;@GetMapping("/subscribe")public ApiResult<?> subscribe(String topic) {mqttService.subscribe(topic);return ApiResult.success();}@GetMapping("/publish")public ApiResult<?> publish(String topic, String message) {mqttService.publish(topic, message);return ApiResult.success();}@GetMapping("/disconnect")public ApiResult<?> disconnect() {mqttService.disconnect();return ApiResult.success();}
}

代码验证

启动mqtt客户端

如下图客户端已上线:
在这里插入图片描述

发送消息

在这里插入图片描述如下图mqtt broker该客户端的日志,接收到了我们发送的数据:hello world
在这里插入图片描述

接收数据

首先我们先订阅个主题:mqtt/0/0

在这里插入图片描述

使用MQTTX客户端向该主题发消息

在这里插入图片描述

Java mqtt客户端接收数据

查询本地Java mqtt客户收到的消息,如下图收到该消息
在这里插入图片描述mqtt broker 也可以看到该日志:
在这里插入图片描述

断开连接

在这里插入图片描述如下图本地客户端862024121819020已断开连接:
在这里插入图片描述

相关文章:

Java模拟Mqtt客户端连接Mqtt Broker

Java模拟Mqtt客户端基本流程 引入Paho MQTT客户端库 <dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.mqttv5.client</artifactId><version>1.2.5</version> </dependency>设置mqtt配置数据 …...

【电商搜索】文档的信息论生成聚类

【电商搜索】文档的信息论生成聚类 目录 文章目录 【电商搜索】文档的信息论生成聚类目录文章信息概览研究背景技术挑战如何破局技术应用主要相关工作与参考文献后续优化方向 后记 文章信息 https://arxiv.org/pdf/2412.13534 概览 本文提出了一种基于信息论的生成聚类&#…...

在福昕(pdf)阅读器中导航到上次阅读页面的方法

文章目录 在福昕(pdf)阅读器中导航到上次阅读页面的方法概述笔记用书签的方法来导航用导航按钮的方法来导航 备注END 在福昕(pdf)阅读器中导航到上次阅读页面的方法 概述 喜欢用福昕(pdf)阅读器来看pdf文件。 但是有个小问题困扰了我好久。 e.g. 300页的pdf看了一半&#xff…...

基于Springboot的数字科技风险报告管理系统

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了多年的设计程序开发&#xff0c;开发过上千套设计程序&#xff0c;没有什么华丽的语言&#xff0c;只有实…...

【最后203篇系列】001 - 2024回顾

说明 最早在CSDN上写文章有两个目的&#xff1a; 1 自己梳理知识&#xff0c;以备日后查用2 曾经从别人的文章中得到过帮助&#xff0c;所以也希望能给人帮助 所以在这个过程中&#xff0c;我的文章基本上完全是原创&#xff0c;也非常强调落地与工程化。在不断写作的过程中…...

量子退火与机器学习(1):少量数据求解未知QUBO矩阵,以少见多

文章目录 前言ー、复习QUBO&#xff1a;中药配伍的复杂性1.QUBO 的介入&#xff1a;寻找最佳药材组合 二、难题&#xff1a;QUBO矩阵未知的问题1.为什么这么难&#xff1f; 三、稀疏建模(Sparse Modeling)1. 欠定系统中的稀疏解2. L1和L2的选择&#xff1a; 三、压缩感知算法(C…...

矩阵:Input-Output Interpretation of Matrices (中英双语)

矩阵的输入-输出解释&#xff1a;深入理解与应用 在线性代数中&#xff0c;矩阵与向量的乘积 ( y A x y Ax yAx ) 是一个极为重要的关系。通过这一公式&#xff0c;我们可以将矩阵 ( A A A ) 看作一个将输入向量 ( x x x ) 映射到输出向量 ( y y y ) 的线性变换。在这种…...

excel 使用vlook up找出两列中不同的内容

当使用 VLOOKUP 函数时&#xff0c;您可以将其用于比较两列的内容。假设您要比较 A 列和 B 列的内容&#xff0c;并将结果显示在 C 列&#xff0c;您可以在 C1 单元格中输入以下公式&#xff1a; 这个公式将在 B 列中的每个单元格中查找是否存在于 A 列中。如果在 A 列中找不到…...

YoloV8改进策略:Head改进|DynamicHead,利用注意力机制统一目标检测头部|即插即用

摘要 论文介绍 本文介绍了一种名为DynamicHead的模块,该模块旨在通过注意力机制统一目标检测头部,以提升目标检测的性能。论文详细阐述了DynamicHead的工作原理,并通过实验证明了其在COCO基准测试上的有效性和效率。 创新点 DynamicHead模块的创新之处在于它首次尝试在一…...

两地的日出日落时间差为啥不相等

悟空去延吉玩耍&#xff0c;在下午4点多的时候发来一张照片&#xff0c;说&#xff0c;天已经黑了&#xff01;我赶紧地图上看了看&#xff0c;延吉居然和北京差了大约15度的经度差&#xff0c;那就是大约一小时的时差哦。次日我随便查了一下两地的日出日落时间&#xff0c;结果…...

Android Https和WebView

系统会提示说不安全&#xff0c;因为网站通过js就能调用你的android代码&#xff0c;如果你确认你的网站没用到JS的话就不要打开这个开关&#xff0c;如果用到了&#xff0c;就添加一个注解忽略它就行了。 后来就使用我们公司的网站了&#xff0c;发现也出不来&#xff0c;后来…...

2.5.1 文件管理基本概念

文章目录 文件文件系统文件分类 文件 文件&#xff1a;具有符号名&#xff0c;逻辑上有完整意义的一组相关信息的集合。 文件包含文件体、文件说明两部分。文件体存储文件的真实内容&#xff0c;文件说明存放操作系统管理文件所用的信息。 文件说明包含文件名、内部标识、类型、…...

在 PowerShell 中优雅地显示 Python 虚拟环境

在使用 Python 进行开发时&#xff0c;虚拟环境管理是一个非常重要的部分。无论是使用 venv 还是 conda&#xff0c;我们都希望能够清晰地看到当前所处的虚拟环境。本文将介绍如何在 PowerShell 中配置提示符&#xff0c;使其能够优雅地显示不同类型的 Python 虚拟环境。 问题…...

K8S Ingress 服务配置步骤说明

部署Pod服务 分别使用kubectl run和kubectl apply 部署nginx和tomcat服务 # 快速启动一个nginx服务 kubectl run my-nginx --imagenginx --port80# 使用yaml创建tomcat服务 kubectl apply -f my-tomcat.yamlmy-tomcat.yaml apiVersion: apps/v1 kind: Deployment metadata:n…...

观察者模式(sigslot in C++)

大家&#xff0c;我是东风&#xff0c;今天抽点时间整理一下我很久前关注的一个不错的库&#xff0c;可以支持我们在使用标准C的时候使用信号槽机制进行观察者模式设计&#xff0c;sigslot 官网&#xff1a; http://sigslot.sourceforge.net/ 本文较为详尽探讨了一种观察者模…...

python使用pip进行库的下载

前言 现如今有太多的python编译软件&#xff0c;其库的下载也是五花八门&#xff0c;但在作者看来&#xff0c;无论是哪种方法都是万变不离其宗&#xff0c;即pip下载。 pip是python的包管理工具&#xff0c;无论你是用的什么python软件&#xff0c;都可以用pip进行库的下载。 …...

C#(委托)

一、基本定义 在C#中&#xff0c;委托&#xff08;Delegate&#xff09;是一种引用类型&#xff0c;它用于封装一个方法&#xff08;具有特定的参数列表和返回类型&#xff09;。可以把委托想象成一个能存储方法的变量&#xff0c;这个变量能够像调用普通方法一样来调用它所存…...

《点点之歌》“意外”诞生记

世界是“点点”的&#xff0c;“点点”是世界的。 (笔记模板由python脚本于2024年12月23日 19:28:25创建&#xff0c;本篇笔记适合喜欢诗文的coder翻阅) 【学习的细节是欢悦的历程】 Python 官网&#xff1a;https://www.python.org/ Free&#xff1a;大咖免费“圣经”教程《 …...

ue5 pcg(程序内容生成)真的简单方便,就5个节点

总结&#xff1a; 前情提示 鼠标单击右键平移节点 1.编辑-》插件-》procedural->勾选两个插件 2.右键-》pcg图表-》拖拽进入场景 3.先看点point 右键-》调试(快捷键d)->右侧设置粒子数 3.1调整粒子数 可以在右侧输入框&#xff0c;使用加减乘除 4.1 表面采样器 …...

32岁前端干了8年,是继续做前端开发,还是转其它工作

前端发展有瓶颈&#xff0c;变来变去都是那一套&#xff0c;只是换了框架换了环境。换了框架后又得去学习&#xff0c;虽然很快上手&#xff0c;但是那些刚毕业的也很快上手了&#xff0c;入门门槛越来越低&#xff0c;想转行或继续卷&#xff0c;该如何破圈 这是一位网友的自述…...

【演化博弈】期望收益函数公式、复制动态方程——化简功能技巧

期望化简 在演化博弈论的研究中&#xff0c;期望收益函数和复制动态方程是核心工具。化简这些公式的功能技巧具有以下几个重要作用&#xff1a; 提高公式的可读性和理解度 复杂的数学表达式可能让人感到困惑。通过化简&#xff0c;公式变得更加简单和易读&#xff0c;使研究者…...

opencv中的各种滤波器简介

在 OpenCV 中&#xff0c;滤波器是图像处理中的重要工具&#xff0c;用于对图像进行平滑、去噪、边缘检测等操作。以下是几种常见滤波器的简单介绍。 1. 均值滤波 (Mean Filter) 功能&#xff1a; 对图像进行平滑处理&#xff0c;减少噪声。 应用场景&#xff1a; 去除图像…...

[Effective C++]条款36-37 两个绝不

本文初发于 “天目中云的小站”&#xff0c;同步转载于此。 条款36 : 绝不重新定义继承而来的non-virtual函数 本条款很容易理解, 援引以前的条款就可以说明为什么 : 条款34中就提到过 : non-virtual函数意味着接口 强制性实现继承, 它不应当被改变. 重新定义继承而来的non-…...

各种网站(学习资源及其他)

欢迎围观笔者的个人博客~ 也欢迎通过RSS网址https://kangaroogao.github.io/atom.xml进行订阅~ 大学指南 上海交通大学生存手册中国科学技术大学人工智能与数据科学学院本科进阶指南USTC不完全入学指南大学生活质量指北科研论 信息搜集 AI信息搜集USTC飞跃网站计算机保研 技…...

docker怎么部署高斯数据库

部署高斯数据库&#xff08;openGauss&#xff09;到Docker的步骤如下&#xff1a; 安装Docker&#xff1a; 如果您的系统尚未安装Docker&#xff0c;需要先进行安装。以CentOS为例&#xff0c;可以使用以下命令安装Docker&#xff1a; yum install -y docker拉取镜像&#xff…...

VScode中配置ESlint+Prettier详细步骤(图文详情)

VScode中配置ESlintPrettier详细步骤&#xff08;图文详情&#xff09; 前置环境&#xff1a; node 18.19.0 vite 3.2.11 vue 3.2.47 本文将不在演示vue3基础工程创建&#xff0c;如果还没有vue3项目工程请参考文章&#xff1a; Vite创建Vue3工程并引入ElementPlus&#x…...

Leetcode打卡:考场就坐

执行结果&#xff1a;通过 题目&#xff1a; 855 考场就坐 在考场里&#xff0c;有 n 个座位排成一行&#xff0c;编号为 0 到 n - 1。 当学生进入考场后&#xff0c;他必须坐在离最近的人最远的座位上。如果有多个这样的座位&#xff0c;他会坐在编号最小的座位上。(另外&am…...

数据库压力测试详解

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 很多人提到 jmeter 时&#xff0c;只会说到 jmeter进行接口自动化或接口性能测试&#xff0c;其实jmeter还能对数据库进行自动化操作。个人常用的场景有以下&#…...

项目测试方案流程详解

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 作为一名软件测试工程师&#xff0c;为项目制作完成的测试方案并执行&#xff0c;是我们日常工作的重要部分&#xff0c;同时&#xff0c;也是一名合格的软件测试工…...

以二进制形式创建gitea仓库

1、官方文档&#xff1a; 数据库准备 | Gitea Documentation 使用二进制文件安装 | Gitea Documentation 2、具体操作 1&#xff09;创建gitea数据库 2&#xff09;检查是否安装 Git。要求 Git 版本 > 2.0。 如需升级git请参考以下链接&#xff1a;linux升级git版本-C…...