Java集成MQTT和Kafka实现稳定、可靠、高性能的物联网消息处理系统
Java集成MQTT和Kafka实现高可用方案
1. 概述
在物联网(IoT)和分布式系统中,消息传递的可靠性和高可用性至关重要。本文将详细介绍如何使用Java集成MQTT和Kafka来构建一个高可用的消息处理系统。
MQTT(消息队列遥测传输)是一种轻量级的发布/订阅协议,适用于资源受限的设备和低带宽、高延迟网络。而Kafka是一个分布式流处理平台,提供高吞吐量、可扩展性和持久性。将两者结合,可以创建一个既能处理大量IoT设备连接,又能保证消息可靠传递和处理的系统。
2. 架构设计
我们的高可用架构设计如下:

主要组件:
- MQTT集群:使用EMQ X等MQTT代理实现集群
- Kafka集群:作为中央消息总线和持久化层
- 桥接组件:将MQTT消息转发到Kafka
- Java应用服务:处理和分析消息
- 监控系统:确保整个系统的健康运行
3. Java集成MQTT实现
3.1 Maven依赖
<dependencies><!-- MQTT客户端 --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency><!-- Spring Integration MQTT --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.5.15</version></dependency><!-- Spring Boot --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.7.8</version></dependency>
</dependencies>
3.2 MQTT配置类
@Configuration
public class MqttConfig {@Value("${mqtt.broker.urls}")private String[] brokerUrls; // 多个MQTT代理地址,用于故障转移@Value("${mqtt.client.id}")private String clientId;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.topics}")private String[] topics;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();// 设置多个服务器地址,实现故障转移options.setServerURIs(brokerUrls);// 设置自动重连options.setAutomaticReconnect(true);options.setKeepAliveInterval(30);options.setConnectionTimeout(30);// 设置遗嘱消息,当客户端异常断开时发送options.setWill("clients/status", (clientId + ": disconnected").getBytes(), 1, true);if (username != null && !username.isEmpty()) {options.setUserName(username);options.setPassword(password.toCharArray());}// 设置清除会话,false表示客户端断开连接后,服务器保留其订阅信息options.setCleanSession(false);factory.setConnectionOptions(options);return factory;}// 出站通道(用于发送消息)@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}// 出站消息处理器@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-pub", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultQos(1);return messageHandler;}// 入站通道(用于接收消息)@Beanpublic MessageChannel mqttInboundChannel() {return new DirectChannel();}// 入站消息适配器@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "-sub", mqttClientFactory(), topics);adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInboundChannel());return adapter;}
}
3.3 MQTT服务类
@Service
@Slf4j
public class MqttService {private final MessageChannel mqttOutboundChannel;@Autowiredpublic MqttService(MessageChannel mqttOutboundChannel) {this.mqttOutboundChannel = mqttOutboundChannel;}// 发布消息到MQTT主题public void publish(String topic, String payload) {log.info("Publishing message to topic {}: {}", topic, payload);Message<String> message = MessageBuilder.withPayload(payload).setHeader(MqttHeaders.TOPIC, topic).setHeader(MqttHeaders.QOS, 1).setHeader(MqttHeaders.RETAINED, false).build();mqttOutboundChannel.send(message);}// 处理接收到的MQTT消息@ServiceActivator(inputChannel = "mqttInboundChannel")public void handleMessage(Message<?> message) {String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);String payload = message.getPayload().toString();log.info("Received message from topic {}: {}", topic, payload);// 这里可以添加消息处理逻辑,或者转发到Kafka}
}
4. Java集成Kafka实现
4.1 Maven依赖
<dependencies><!-- Kafka客户端 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.2</version></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.5</version></dependency>
</dependencies>
4.2 Kafka配置类
@Configuration
public class KafkaConfig {@Value("${kafka.bootstrap.servers}")private String bootstrapServers;@Value("${kafka.consumer.group.id}")private String consumerGroupId;// Kafka生产者配置@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();// 设置Kafka集群地址configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 高可用配置// acks=all表示所有副本都确认后才认为消息发送成功configProps.put(ProducerConfig.ACKS_CONFIG, "all");// 重试次数configProps.put(ProducerConfig.RETRIES_CONFIG, 10);// 启用幂等性,确保消息不会重复发送configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// 批处理大小configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32768);// 批处理延迟configProps.put(ProducerConfig.LINGER_MS_CONFIG, 20);// 缓冲区大小configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}// Kafka消费者配置@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 高可用配置// 自动提交偏移量configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);// 从最早的消息开始消费configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 最大拉取记录数configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);// 心跳间隔configProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);// 会话超时configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);// 最大拉取间隔configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);return new DefaultKafkaConsumerFactory<>(configProps);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 设置并发消费者数量factory.setConcurrency(3);// 批量消费factory.setBatchListener(true);// 手动提交偏移量factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}
4.3 Kafka服务类
@Service
相关文章:
Java集成MQTT和Kafka实现稳定、可靠、高性能的物联网消息处理系统
Java集成MQTT和Kafka实现高可用方案 1. 概述 在物联网(IoT)和分布式系统中,消息传递的可靠性和高可用性至关重要。本文将详细介绍如何使用Java集成MQTT和Kafka来构建一个高可用的消息处理系统。 MQTT(消息队列遥测传输)是一种轻量级的发布/订阅协议,适用于资源受限的设备和…...
【总结篇】java多线程,新建线程有几种写法,以及每种写法的优劣势
java多线程 新建线程有几种写法,以及每种写法的优劣势 [1/5]java多线程 新建线程有几种写法–继承Thread类以及他的优劣势[2/5]java多线程-新建线程有几种写法–实现Runnable接口以及他的优劣势[3/5]java多线程 新建线程有几种写法–实现Callable接口结合FutureTask使用以及他的…...
剑指 Offer II 107. 矩阵中的距离
comments: true edit_url: https://github.com/doocs/leetcode/edit/main/lcof2/%E5%89%91%E6%8C%87%20Offer%20II%20107.%20%E7%9F%A9%E9%98%B5%E4%B8%AD%E7%9A%84%E8%B7%9D%E7%A6%BB/README.md 剑指 Offer II 107. 矩阵中的距离 题目描述 给定一个由 0 和 1 组成的矩阵 mat …...
雅可比行列式
定义和推导 雅可比行列式,它是以n个n元函数的偏导数为元素的行列式。以下是雅可比式的推导过程: 二阶雅可比式的推导以二重积分中的极坐标变换为例,设 : ,则 x 和 y 的全微分分别为: 可以将 dx 与 dy 视作…...
UMA架构下的GPU 显存
GPU 显存 (Graphics Memory) 在大多数现代设备(包括 Android 手机、嵌入式设备等)上,确实是使用 DDR(Double Data Rate SDRAM) 类型的内存。 不过,具体实现方式根据硬件架构有所不同,主要分为以…...
【大模型基础_毛玉仁】3.2 上下文学习
目录 3.2 上下文学习3.2.1 上下文学习的定义3.2.2 演示示例选择1)直接检索2)聚类检索3)迭代检索 3.2.3 性能影响因素 3.2 上下文学习 随模型训练数据规模和参数量的扩大,大语言模型涌现出了上下文学习(In-Context Lea…...
在 ARM 嵌入式 Linux 下使用 C/C++ 实现 MQTT
在 ARM 嵌入式 Linux 下使用 C/C 实现 MQTT 通信是一个常见的需求,尤其是在资源受限的环境中。以下是一个详细的教程,使用 Eclipse Paho C Client 库来实现 MQTT 客户端。 1. 安装 Eclipse Paho C Client 库 Eclipse Paho C Client 是一个轻量级的 MQTT…...
Oraclelinux问题-/var/log/pcp/pmlogger/目录超大
有套19c rac环境,操作系统是oracle linux 8.10,日常巡检时发现/var/log/pcp/pmlogger/目录超大,如下 [rootdb1 ~]# du -sh /var/log/pcp/pmlogger/* 468G /var/log/pcp/pmlogger/db 1.3G /var/log/pcp/pmlogger/oracle06-106 754M /…...
【大语言模型_8】vllm启动的模型通过fastapi封装增加api-key验证
背景: vllm推理框架启动模型不具备api-key验证。需借助fastapi可以实现该功能 代码实现: rom fastapi import FastAPI, Header, HTTPException, Request,Response import httpx import logging# 创建 FastAPI 应用 app FastAPI() logging.basicConfig(…...
学习笔记 ASP.NET Core Web API 8.0部署到iis
一.修改配置文件 修改Program.cs配置文件将 if (app.Environment.IsDevelopment()) {app.UseSwagger();app.UseSwaggerUI(); }修改为 app.UseSwagger(); app.UseSwaggerUI(); 二.安装ASP.NET Core Runtime 8.0.14 文件位置https://dotnet.microsoft.com/en-us/download/do…...
Python散点图(Scatter Plot):高阶分析、散点图矩阵、三维散点图及综合应用
散点图:数据分析的利器 在数据分析领域,散点图是一种直观且强大的可视化工具,广泛应用于揭示变量间的相关性以及识别数据集中的异常值。本文将深入探讨散点图的这两种关键功能,并结合实际案例与Python代码示例,带您全面了解散点图的应用。 一、散点图如何展示变量间的相…...
第5章:Docker镜像管理实战:构建、推送与版本控制
第5章:Docker镜像管理实战:构建、推送与版本控制 作者:DogDog_Shuai 阅读时间:约25分钟 难度:中级 目录 第5章:Docker镜像管理实战:构建、推送与版本控制 目录1. 引言2. Docker镜像基础 2.1 镜像结构...
Microsoft Edge浏览器的取证分析(基于Chromium)
概述 早在2019年,微软就用Chromium替换了EdgeHTML浏览器引擎,这是微软支持谷歌Chrome浏览器的一个开源项目。通过切换到Chromium,Edge与Chrome浏览器共享一个共同的架构,这意味着用于Chrome浏览器调查的取证技术也适用于Edge。 …...
汽车一键启动系统使用方便,舒适出行,轻松匹配
汽车一键启动系统 系统定义 移动管家汽车一键启动系统是装置在智能汽车上的一部分,是实现简约打火和熄火过程的一个按钮装置。它可以在原车钥匙锁头的位置改装,也能独立面板改装,现在很多高低配置的车辆都可安装。 功能特点 基本功能 启…...
C语言复习笔记--数组
今天继续来浅浅推进一下C语言的复习,这次是数组的复习,话不多说,正文开始. 数组的概念 数组是⼀组相同类型元素的集合,一种自定义类型.数组中元素个数不能为0.数组分为⼀维数组和多维数组,多维数组⼀般⽐较多⻅的是⼆维数组. 下面从一维数组说起. 一维数组的创建和…...
海康SDK协议在智联视频超融合平台中的接入方法
一. 海康SDK协议详解 海康SDK协议原理 海康SDK协议是海康威视为开发者提供的一套软件开发工具包,用于与海康设备(如摄像头、NVR、DVR等)进行通信和控制。其核心原理包括: 网络通信:基于TCP/IP协议,实现设…...
腾讯云大模型知识引擎×DeepSeek:股票分析低代码应用实践
项目背景与发展历程 在金融科技快速发展的今天,股票分析作为投资决策的核心环节,正面临数据量激增和复杂性提升的挑战。传统股票分析依赖人工处理,效率低下且成本高昂,而人工智能(AI)的引入为这一领域带来…...
深入解析 SQL Server 锁机制:如何定位并解决表锁问题
在 SQL Server 中,锁是并发控制的关键机制,确保数据的完整性和一致性。然而,在高并发环境下,锁可能导致阻塞甚至死锁,影响系统性能。因此,理解 SQL Server 的锁机制,并掌握如何定位和解决锁问题…...
Spring Boot 异步返回对象深度解析
前言 在现代高并发、高响应的应用场景中,Spring Boot 的异步处理能力是提升系统吞吐量和用户体验的关键技术之一。无论是实时数据推送、大文件传输,还是复杂异步任务调度,Spring Boot 提供了多种灵活的异步处理机制以满足不同需求。本文将从…...
【工具】C#防沉迷进程监控工具使用手册
一、软件简介 本工具用于监控指定进程的运行时长,当达到预设时间时通过声音、弹窗、窗口抖动等方式进行提醒,帮助用户合理控制程序使用时间。 软件在上篇文章。 二、系统要求 Windows 7/10/11.NET Framework 4.5 或更高版本 三、快速入门 1. 配置文件…...
【docker】--- 详解 WSL2 中的 Ubuntu 和 Docker Desktop 的区别和关系!
在编程的艺术世界里,代码和灵感需要寻找到最佳的交融点,才能打造出令人为之惊叹的作品。而在这座秋知叶i博客的殿堂里,我们将共同追寻这种完美结合,为未来的世界留下属于我们的独特印记。【WSL 】--- Windows11 迁移 WSL 超详细指南 —— 给室友换一个宿舍! 开发环境一、引…...
强大的AI网站推荐(第一集)—— Devv AI
网站:Devv AI 号称:最懂程序员的新一代 AI 搜索引擎 博主评价:我的大学所有的代码都是使用它,极大地提升了我的学习和开发效率。 推荐指数:🌟🌟🌟🌟🌟&#x…...
模块二 单元4 安装AD+DC
模块二 单元4 安装ADDC 两个任务: 1.安装AD活动目录 2.升级当前服务器为DC域控制器 安装前的准备工作: 确定你要操作的服务器系统(Windows server 2022); 之前的服务器系统默认是工作组的模式workgroup模式(…...
蓝桥杯备考:数学问题模运算---》次大值
这道题,由于数据规模是2e5,我们直接暴力的话是一定会超时的 所以我们得想个办法,我们先把所有的数排序去重 我们先想想如果要找最大值,怎么找 这时候我们要分类讨论 ①如果是大数模小数,那结果肯定是小于小数的&am…...
k8s1.30 部署calio网络
一、介绍 网路组件有很多种,只需要部署其中一个,推荐calio。 calio是一个纯三成的数据中心网络方案,calico支持广泛的平台。如k8s,openstack等。 calio在每一个计算节点利用linux内核,实现了一个高效的虚拟路由器来…...
test skills
一、测试技术 1、python GitHub - taizilongxu/interview_python: 关于Python的面试题 GitHub - JushuangQiao/Python-Offer: 《剑指Offer》面试题Python实现 GitHub - vinta/awesome-python: An opinionated list of awesome Python frameworks, libraries, software and …...
Elasticsearch:为推理端点配置分块设置
推理端点对一次可处理的文本量有限,具体取决于模型的输入容量。分块(Chunking) 是指将输入文本拆分成符合这些限制的小块的过程,在将文档摄取到 semantic_text 字段时会进行分块。分块不仅有助于保持输入文本在可处理范围内&#…...
如何使用webpack预加载 CSS 中定义的资源和预加载 CSS 文件
在 Webpack 中预加载 CSS 文件及其内部定义的资源(如图片、字体等),可以通过 资源预加载(Preloading) 技术优化关键资源的加载优先级。以下是具体的实现方法和步骤: 一、预加载 CSS 文件 1. 使用 vue/prel…...
[工控机安全] 使用DriverView快速排查不可信第三方驱动(附详细图文教程)
导语: 在工业控制领域,设备驱动程序的安全性至关重要。第三方驱动可能存在兼容性问题、安全漏洞甚至恶意代码,威胁设备稳定运行。本文将手把手教你使用 DriverView工具,高效完成工控机驱动安全检查,精准识别可疑驱动&a…...
解决 React Native 0.76 中 com.facebook.react.settings 插件缺失问题
在使用 React Native 0.76 创建项目时,遇到以下错误: FAILURE: Build failed with an exception. * Where: Settings file /Users/wangxp/learn/AwesomeProject/android/settings.gradle line: 2 * What went wrong: Plugin [id: com.facebook.react.se…...
