当前位置: 首页 > news >正文

MQTT+Disruptor 提高物联网高并发

基于springboot2.5.7

废话不多说,直接上干货:

@Slf4j
@Configuration
@EnableConfigurationProperties(MqttProperties.class)
@IntegrationComponentScan(basePackages = {"扫描包路径","扫描包路径"})
public class MqttAutoConfig {@Autowiredprivate MqttProperties mqttProperties;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate DisruptorProperties disruptorProperties;@RefreshScope@Bean(value = "mqttParallelQueueHandler",initMethod = "start",destroyMethod = "shutDown")@ConditionalOnProperty(prefix = "custom-config.mqtt", name = "disruptor", havingValue = "true")public ParallelQueueHandler mqttParallelQueueHandler(){log.info("初始化Disruptor...");return new ParallelQueueHandler.Builder<DisruptorEventData>().setDisruptorProperties(disruptorProperties).setWaitStrategy(new BlockingWaitStrategy()).setListener(new MQTTMsgListener()).build();}@Bean@ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})public MqttConnectOptions getReceiverMqttConnectOptionsForSub(){MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();mqttConnectOptions.setUserName(mqttProperties.getUsername());mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray());List<String> hostList = Arrays.asList(mqttProperties.getHostUrl().trim().split(","));String[] serverURIs = new String[hostList.size()];hostList.toArray(serverURIs);mqttConnectOptions.setServerURIs(serverURIs);mqttConnectOptions.setKeepAliveInterval(2);mqttConnectOptions.setAutomaticReconnect(true);return mqttConnectOptions;}/***  MQTT 连接工厂* @return MqttPahoClientFactory*/@Bean@ConditionalOnMissingBeanpublic MqttPahoClientFactory receiverMqttClientFactoryForSub(MqttConnectOptions mqttConnectOptions) {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setConnectionOptions(mqttConnectOptions);log.info("【MQTT】-初始化连接工厂...");return factory;}/*** 出站通道* @return MessageChannel*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/***  MQTT 消息发送处理器* @return MessageHandler*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound(MqttPahoClientFactory factory) {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId()+"out", factory);messageHandler.setDefaultQos(1);//开启异步messageHandler.setAsync(true);messageHandler.setDefaultTopic("test");return messageHandler;}/*** 此处可以使用其他消息通道* Spring Integration默认的消息通道,它允许将消息发送给一个订阅者,然后阻碍发送直到消息被接收。** @return MessageChannel*/@Beanpublic MessageChannel mqttInBoundChannel() {return new DirectChannel();}/*** 适配器, 多个topic共用一个adapter* 客户端作为消费者,订阅主题,消费消息*/@Bean@ConditionalOnMissingBeanpublic MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory factory) {List<String> topics = mqttProperties.getSubscribeTopics();String[] topicArray = new String[topics.size()];for (int i = 0; i < topics.size(); i++) {topicArray[i] = "$queue/"+ topics.get(i);}log.info("【MQTT】-订阅TOPIC:{}", Arrays.toString(topicArray));MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId(), factory, topicArray);adapter.setCompletionTimeout(mqttProperties.getCompletionTimeout());adapter.setConverter(new DefaultPahoMessageConverter());adapter.setRecoveryInterval(10000);adapter.setQos(1);adapter.setOutputChannel(mqttInBoundChannel());return adapter;}@Autowired(required = false)@Qualifier("mqttParallelQueueHandler")private ParallelQueueHandler<DisruptorEventData> parallelQueueHandler;/*** mqtt入站消息处理工具,对于指定消息入站通道接收到生产者生产的消息后处理消息的工具。* @return MessageHandler*/@Bean@RefreshScope@ServiceActivator(inputChannel = "mqttInBoundChannel")public MessageHandler mqttMessageHandler() {// 获取配置中的设备品牌MqttProperties.DeviceBrand deviceBrand = mqttProperties.getDeviceBrand();boolean disruptor = mqttProperties.isDisruptor();// 获取所有实现了 CustomMqttMessageHandler 接口的 Beanreturn message -> {log.info("【MQTT】-收到MQTT消息,Topic: {}, Payload: {}",message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),message.getPayload());String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);Map<String, CustomMqttMessageReceiverHandler> handlers = applicationContext.getBeansOfType(CustomMqttMessageReceiverHandler.class);boolean handled = false;if (Objects.nonNull(deviceBrand)){CustomMqttMessageReceiverHandler handler = handlers.get(deviceBrand.getServiceName());if (Objects.nonNull(handler)){if (handler.supportsTopic(topic)) {handled = this.run(handler,message,topic,disruptor);}}else {log.error("【MQTT】-未找到设备品牌消息接收处理器,deviceBrand->{}",deviceBrand);}}else {for (CustomMqttMessageReceiverHandler handler : handlers.values()) {if (handler.supportsTopic(topic)) {handled = this.run(handler,message,topic,disruptor);}}}if (!handled) {log.warn("【MQTT】-未找到匹配的处理器来处理Topic {} 的消息", topic);}};}@Bean@ConditionalOnProperty(prefix = "custom-config.mqtt", value = {"username","password", "host-url"})public MqttMessageSender mqttMessageSender(){return new MqttMessageSender();}private boolean run(CustomMqttMessageReceiverHandler handler,Message<?> message,String topic,boolean disruptor){try {String traceId = MDC.get("traceId");if (!StringUtils.hasText(traceId)){traceId = UUID.randomUUID().toString().replaceAll("-", "");MDC.put("traceId",traceId);}if (disruptor && Objects.nonNull(parallelQueueHandler)){log.info("【MQTT】-使用Disruptor处理...");DisruptorEventData data = new DisruptorEventData();Map<String,Object> map = new HashMap<>();map.put("data",message);map.put("handler",handler);map.put("traceId",traceId);data.setMessage(map);parallelQueueHandler.add(data);}else {handler.handleMessage(message);}return true;} catch (Exception e) {log.error("【MQTT】-Handler {} 处理Topic {} 的消息时出错", handler.getClass().getSimpleName(), topic, e);return false;}finally {MDC.clear();}}

由于涉及隐私,其余代码可以留言

相关文章:

MQTT+Disruptor 提高物联网高并发

基于springboot2.5.7 废话不多说&#xff0c;直接上干货&#xff1a; Slf4j Configuration EnableConfigurationProperties(MqttProperties.class) IntegrationComponentScan(basePackages {"扫描包路径","扫描包路径"}) public class MqttAutoConfig {…...

SpringBoot项目集成ONLYOFFICE

ONLYOFFICE 文档8.2版本已发布&#xff1a;PDF 协作编辑、改进界面、性能优化、表格中的 RTL 支持等更新 文章目录 前言ONLYOFFICE 产品简介功能与特点Spring Boot 项目中集成 OnlyOffice1. 环境准备2. 部署OnlyOffice Document Server3. 配置Spring Boot项目4. 实现文档编辑功…...

用于nodejs的开源违禁词检测工具 JavaScript node-word-detection

地址 : https://www.npmjs.com/package/node-word-detection github地址: https://github.com/xiaobaidadada/node-word-detection 非常节省内存的轻量级快速违禁词、词典库 检测工具 、 50万个词大约需要300MB内存、被检测的文本100字内结果在1毫秒左右。本项目没有提供词库请…...

FFmpeg 4.3 音视频-多路H265监控录放C++开发十二:在屏幕上显示多路视频播放,可以有不同的分辨率,格式和帧率。

上图是在安防领域的要求&#xff0c;一般都是一个屏幕上有显示多个摄像头捕捉到的画面&#xff0c;这一节&#xff0c;我们是从文件中读取多个文件&#xff0c;显示在屏幕上。...

Linux权限问题(账号切换,权限,粘滞位)

1.什么是权限&#xff1f; 在Linux下有两种用户&#xff0c;分别是超级用户&#xff08;root&#xff09;和普通用户。超级用户可以在Linux下做任何事情&#xff0c;几乎不受限制&#xff0c;而普通用户一般只能在自己的工作目录下&#xff08;/home/xxx&#xff09;工作&#…...

el-upload,上传文件,后端提示信息,前端需要再次重新上传(不用重新选择文件)

1.el-upload 上传附件&#xff1a; <el-uploadref"upload":action"upload.url ?updateSupport upload.updateSupport":auto-upload"false":disabled"upload.isUploading":headers"upload.headers":limit"1"…...

数字信号处理Python示例(5)使用实指数函数仿真PN结二极管的正向特性

文章目录 前言一、二极管的电流-电压关系——Shockley方程二、PN结二极管正向特性的Python仿真三、仿真结果分析写在后面的话 前言 使用Python代码仿真了描述二极管的电流-电压关系的Shockley方程&#xff0c;对仿真结果进行了分析&#xff0c;说明在正向偏置区域&#xff0c;…...

ctfshow(89,90,92,93)--PHP特性--intval函数

Web89 源代码&#xff1a; include("flag.php"); highlight_file(__FILE__);if(isset($_GET[num])){$num $_GET[num];if(preg_match("/[0-9]/", $num)){die("no no no!");}if(intval($num)){echo $flag;} }审计 GET传参num。 如果在参数num中…...

构建ubuntu22.04.4私有源服务以及配置ubuntu私有源

构建ubuntu22.04.4私有源服务以及配置ubuntu私有源 一、环境说明1.1 私有源服务器1.2 客户机二 、构建私有源服务2.1 服务构建2.2 发布新的deb包到源服务器1. 准备新的 `.deb` 包2. 将 `.deb` 包添加到仓库目录3. 更新 `Packages` 文件4. 更新仓库的发布文件(可选)5. 通知客户…...

模块功能的描述方法

目录 行为描述方法 语句块 过程赋值语句 高级程序语句 循环语句 数据流描述 结构描述 混合描述方法 module 模块名(端口列表); // 模块声明// 端口定义input [数据类型] [位宽] 输入端口列表; output [数据类型] [位宽] 输出端口列表; inout [数据类…...

【WPF】MatrixTransform类

【WPF】MatrixTransform类 主要特性使用场景示例 在WPF&#xff08;Windows Presentation Foundation&#xff09;中&#xff0c;MatrixTransform 类是用于表示一个仿射变换的类&#xff0c;它允许开发者通过一个矩阵来定义一个二维空间中的线性变换。这种变换可以包括平移&…...

【C++】继承的理解

1.继承的概念和定义 1.1继承的概念 继承 (inheritance) 机制是面向对象程序设计 使代码可以复用 的最重要的手段&#xff0c;它允许程序员在 保 持原有类特性的基础上进行扩展 &#xff0c;增加功能&#xff0c;这样产生新的类&#xff0c;称派生类。继承 呈现了面向对象 程序…...

day50 图论章节刷题Part02(99.岛屿数量 深搜、99.岛屿数量 广搜、100.岛屿的最大面积)

前言&#xff1a;前段时间论文开题落下了很多进度&#xff0c;今天开始会尽快赶上 99.岛屿数量 深搜 思路&#xff1a;对地图进行遍历遇到一个没有遍历过的陆地节点&#xff0c;计数器就1&#xff0c;并把该节点所能遍历到的陆地都标记上&#xff1b;遇到标记过的陆地节点和海…...

超详细从基准将VMware ESXi 升级到 vSphere 6.7U1教程

哈喽大家好&#xff0c;欢迎来到虚拟化时代君&#xff08;XNHCYL&#xff09;&#xff0c;收不到通知请将我点击星标&#xff01; “ 大家好&#xff0c;我是虚拟化时代君&#xff0c;一位潜心于互联网的技术宅男。这里每天为你分享各种你感兴趣的技术、教程、软件、资源、福…...

华为OD机试 - 打印机队列 - 优先队列(Java 2024 E卷 200分)

华为OD机试 2024E卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;E卷D卷A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;私信哪吒&#xff0c;备注华为OD&#xff0c;加…...

MatrixOne 助力西安天能替换MySQL+MongoDB+ES打造一体化物联网平台

物联网&#xff08;IoT&#xff09;时代&#xff0c;企业正以前所未有的速度加快数字化转型。西安天能软件科技有限责任公司&#xff08;Skyable&#xff09;作为工业物联网领域的领先企业&#xff0c;携手MatrixOne&#xff0c;共同构建新一代一体化物联网平台&#xff0c;实现…...

正则表达式---元字符

简介 正则表达式分为两种语法&#xff1a;POSIX标准的语法&#xff0c;Perl语法。 正则表达式的POSIX规范&#xff0c;分为基本型正则表达式&#xff08;Basic Regular Expression, BRE&#xff09;&#xff0c;扩展型正则表达式&#xff08;Extended Regular Expression&…...

数据库Redis篇

系列文章目录 第一章 C/C语言篇第二章 计算机网络篇第三章 操作系统篇第四章 数据库MySQL篇第五章 数据库Redis篇第六章 场景题/算法题第七篇 常见HR问题篇 本系列专栏&#xff1a;点击进入 后端开发面经 关注走一波 秋招阶段&#xff0c;面过很多大中小厂&#xff0c;积攒了…...

在区块链技术中,什么是权益证明(PoS)?

权益证明&#xff08;Proof of Stake, PoS&#xff09;是一种与工作量证明&#xff08;Proof of Work, PoW&#xff09;类似的共识机制&#xff0c;但它通过不同的方式来确保区块链网络的安全性和一致性。PoS的主要目标是解决PoW中存在的高能耗问题&#xff0c;并提高网络的扩展…...

Spring Boot——日志介绍和配置

1. 日志的介绍 在前面的学习中&#xff0c;控制台上打印出来的一大堆内容就是日志&#xff0c;可以帮助我们发现问题&#xff0c;分析问题&#xff0c;定位问题&#xff0c;除此之外&#xff0c;日志还可以进行系统的监控&#xff0c;数据采集等 2. 日志的使用 在程序中获取日…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

可靠性+灵活性:电力载波技术在楼宇自控中的核心价值

可靠性灵活性&#xff1a;电力载波技术在楼宇自控中的核心价值 在智能楼宇的自动化控制中&#xff0c;电力载波技术&#xff08;PLC&#xff09;凭借其独特的优势&#xff0c;正成为构建高效、稳定、灵活系统的核心解决方案。它利用现有电力线路传输数据&#xff0c;无需额外布…...

对WWDC 2025 Keynote 内容的预测

借助我们以往对苹果公司发展路径的深入研究经验&#xff0c;以及大语言模型的分析能力&#xff0c;我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际&#xff0c;我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测&#xff0c;聊作存档。等到明…...

如何为服务器生成TLS证书

TLS&#xff08;Transport Layer Security&#xff09;证书是确保网络通信安全的重要手段&#xff0c;它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书&#xff0c;可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...

QT: `long long` 类型转换为 `QString` 2025.6.5

在 Qt 中&#xff0c;将 long long 类型转换为 QString 可以通过以下两种常用方法实现&#xff1a; 方法 1&#xff1a;使用 QString::number() 直接调用 QString 的静态方法 number()&#xff0c;将数值转换为字符串&#xff1a; long long value 1234567890123456789LL; …...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

GruntJS-前端自动化任务运行器从入门到实战

Grunt 完全指南&#xff1a;从入门到实战 一、Grunt 是什么&#xff1f; Grunt是一个基于 Node.js 的前端自动化任务运行器&#xff0c;主要用于自动化执行项目开发中重复性高的任务&#xff0c;例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...

mac 安装homebrew (nvm 及git)

mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用&#xff1a; 方法一&#xff1a;使用 Homebrew 安装 Git&#xff08;推荐&#xff09; 步骤如下&#xff1a;打开终端&#xff08;Terminal.app&#xff09; 1.安装 Homebrew…...

【 java 虚拟机知识 第一篇 】

目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...