Spring Boot 集成 Kafka
在现代软件开发中,分布式系统和微服务架构越来越受到关注。为了实现系统之间的异步通信和解耦,消息队列成为了一种重要的技术手段。Kafka 作为一种高性能、分布式的消息队列系统,被广泛应用于各种场景。而 Spring Boot 作为一种流行的 Java 开发框架,提供了便捷的方式来构建应用程序。本文将介绍如何在 Spring Boot 项目中集成 Kafka,包括 Kafka 的基本概念、Spring Boot 集成 Kafka 的步骤、配置项以及实际应用案例。
一、引言
随着软件系统的规模和复杂性不断增加,传统的同步通信方式已经无法满足需求。消息队列作为一种异步通信机制,可以有效地解耦系统之间的依赖关系,提高系统的可扩展性和可靠性。Kafka 以其高吞吐量、可扩展性和分布式特性,成为了许多企业级应用的首选消息队列系统。Spring Boot 则提供了一种快速、便捷的方式来构建应用程序,使得开发者可以更加专注于业务逻辑的实现。将 Spring Boot 与 Kafka 集成,可以充分发挥两者的优势,构建出高效、可靠的消息驱动应用。
二、Kafka 基础概念
(一)Kafka 简介
Kafka 是一个分布式的流处理平台,同时也可以作为一个高性能的消息队列系统使用。它最初由 LinkedIn 开发,后来成为了 Apache 软件基金会的一个开源项目。Kafka 具有以下几个主要特点:
- 高吞吐量:Kafka 能够处理大量的消息,每秒可以处理数十万条消息。
- 分布式架构:Kafka 可以在多个服务器上运行,实现分布式存储和处理消息。
- 可扩展性:可以根据需要动态地增加或减少服务器数量,以满足不同的负载需求。
- 持久化存储:Kafka 可以将消息持久化存储在磁盘上,保证消息不会丢失。
- 多消费者支持:多个消费者可以同时从同一个主题中读取消息,实现消息的广播和订阅。
(二)Kafka 核心概念
- 主题(Topic)
- 主题是 Kafka 中消息的逻辑分类。生产者将消息发送到特定的主题,消费者从相应的主题中读取消息。一个主题可以被分为多个分区(Partition),每个分区可以在不同的服务器上存储,以实现高吞吐量和可扩展性。
- 分区(Partition)
- 分区是主题的物理划分。每个分区都是一个有序的、不可变的消息序列。分区可以在不同的服务器上存储,以实现分布式存储和处理。消费者可以从一个或多个分区中读取消息,以实现并行处理。
- 生产者(Producer)
- 生产者是向 Kafka 主题发送消息的应用程序。生产者可以将消息发送到一个或多个主题,并可以指定消息的分区和键值对。生产者可以使用异步或同步的方式发送消息,以满足不同的应用场景需求。
- 消费者(Consumer)
- 消费者是从 Kafka 主题读取消息的应用程序。消费者可以订阅一个或多个主题,并可以从一个或多个分区中读取消息。消费者可以使用自动提交偏移量(Offset)或手动提交偏移量的方式来处理消息,以满足不同的应用场景需求。
- 偏移量(Offset)
- 偏移量是消费者在分区中读取消息的位置。每个分区都有一个唯一的偏移量,消费者可以通过偏移量来确定下一个要读取的消息。消费者可以自动提交偏移量或手动提交偏移量,以保证消息的处理顺序和可靠性。
(三)Kafka 架构
- Broker
- Broker 是 Kafka 中的服务器节点。每个 Broker 可以存储多个主题的分区,并可以接收生产者发送的消息和向消费者提供消息。Broker 之间通过网络通信,实现分布式存储和处理消息。
- Zookeeper
- Zookeeper 是一个分布式协调服务,用于管理 Kafka 集群的元数据。Zookeeper 存储了 Kafka 集群的配置信息、主题和分区的元数据、消费者的偏移量等信息。Kafka 客户端通过与 Zookeeper 通信,获取集群的元数据信息,并进行生产者和消费者的协调。
三、Spring Boot 集成 Kafka 的步骤
(一)添加依赖
在 Spring Boot 项目的 pom.xml 文件中添加以下依赖:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
这个依赖将引入 Spring Kafka 模块,使我们能够在 Spring Boot 项目中使用 Kafka。
(二)配置 Kafka
在 application.properties 或 application.yml 文件中添加 Kafka 的配置信息:
spring.kafka.bootstrap-servers=localhost:9092
这个配置指定了 Kafka 服务器的地址和端口。可以根据实际情况进行修改。
(三)创建生产者
-
创建一个生产者配置类,用于配置生产者的属性:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Bean public ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps); }@Bean public KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory()); }}
在这个配置类中,我们创建了一个ProducerFactory和一个KafkaTemplate。ProducerFactory用于创建生产者实例,KafkaTemplate是一个方便的工具类,用于发送消息。
2. 创建一个生产者服务类,用于发送消息:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}
这个服务类使用KafkaTemplate来发送消息。可以在其他地方注入这个服务类,并调用sendMessage方法来发送消息。
(四)创建消费者
-
创建一个消费者配置类,用于配置消费者的属性:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConsumerConfig {@Bean public ConsumerFactory<String, String> consumerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");return new DefaultKafkaConsumerFactory<>(configProps); }@Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory; }}
在这个配置类中,我们创建了一个ConsumerFactory和一个ConcurrentKafkaListenerContainerFactory。ConsumerFactory用于创建消费者实例,ConcurrentKafkaListenerContainerFactory是一个用于处理多个消费者的容器工厂。
2. 创建一个消费者服务类,用于处理接收到的消息:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "my-topic", groupId = "my-consumer-group")public void consumeMessage(String message) {System.out.println("Received message: " + message);}
}
这个服务类使用@KafkaListener注解来定义一个消费者方法,该方法将在接收到消息时被调用。可以根据实际需求对消息进行处理。
四、Spring Boot 集成 Kafka 的配置项
(一)生产者配置项
bootstrap.servers:Kafka 服务器的地址和端口,多个服务器之间用逗号分隔。key.serializer:消息键的序列化器类名。value.serializer:消息值的序列化器类名。acks:生产者发送消息后,需要等待多少个副本确认才能认为消息发送成功。可选值有0(不等待确认)、1(等待首领副本确认)和all(等待所有副本确认)。retries:生产者发送消息失败后,重试的次数。
(二)消费者配置项
bootstrap.servers:Kafka 服务器的地址和端口,多个服务器之间用逗号分隔。key.deserializer:消息键的反序列化器类名。value.deserializer:消息值的反序列化器类名。group.id:消费者组的名称,用于区分不同的消费者组。auto.offset.reset:当消费者从没有偏移量的分区开始读取消息时,应该从哪里开始读取。可选值有earliest(从最早的消息开始读取)、latest(从最新的消息开始读取)和none(如果没有偏移量,则抛出异常)。
五、Spring Boot 集成 Kafka 的实际应用案例
(一)日志收集
- 场景描述
- 在一个分布式系统中,各个服务产生的日志需要集中收集和处理。可以使用 Kafka 作为日志收集的中间件,将各个服务的日志发送到 Kafka 主题中,然后由一个专门的日志处理服务从 Kafka 中读取日志并进行处理。
- 实现步骤
- 在各个服务中,使用 Spring Boot 集成 Kafka 的生产者功能,将日志发送到特定的 Kafka 主题中。
- 创建一个日志处理服务,使用 Spring Boot 集成 Kafka 的消费者功能,从 Kafka 主题中读取日志并进行处理,例如存储到数据库、进行分析等。
(二)订单处理系统
- 场景描述
- 在一个电商订单处理系统中,订单的创建、支付、发货等状态变化需要通知各个相关系统。可以使用 Kafka 作为消息中间件,将订单状态变化的消息发送到 Kafka 主题中,各个相关系统从 Kafka 中读取消息并进行相应的处理。
- 实现步骤
- 当订单状态发生变化时,使用 Spring Boot 集成 Kafka 的生产者功能,将订单状态变化的消息发送到特定的 Kafka 主题中。
- 各个相关系统,如库存管理系统、物流管理系统等,使用 Spring Boot 集成 Kafka 的消费者功能,从 Kafka 主题中读取订单状态变化的消息并进行相应的处理。
(三)实时数据处理
- 场景描述
- 在一个实时数据处理系统中,需要对大量的实时数据进行处理和分析。可以使用 Kafka 作为数据传输的中间件,将实时数据发送到 Kafka 主题中,然后由一个实时数据处理服务从 Kafka 中读取数据并进行处理。
- 实现步骤
- 数据源(如传感器、日志文件等)将实时数据发送到 Kafka 主题中。
- 使用 Spring Boot 集成 Kafka 的消费者功能,创建一个实时数据处理服务,从 Kafka 主题中读取实时数据并进行处理,例如进行数据分析、生成报表等。
六、性能优化和故障排除
(一)性能优化
- 调整 Kafka 服务器配置
- 根据实际情况调整 Kafka 服务器的配置参数,如内存分配、磁盘空间、网络参数等,以提高 Kafka 的性能。
- 优化生产者和消费者代码
- 在生产者和消费者代码中,避免不必要的序列化和反序列化操作,减少网络传输开销。
- 合理设置生产者的重试次数和等待确认的参数,以提高消息发送的成功率和性能。
- 对于消费者,可以根据实际情况调整拉取消息的频率和批量处理的大小,以提高消费效率。
- 使用分区和多消费者
- 根据业务需求合理划分 Kafka 主题的分区,并使用多个消费者同时从不同的分区中读取消息,以提高消费的并行度和性能。
(二)故障排除
- 消息丢失或重复
- 检查生产者和消费者的配置参数,确保消息的发送和消费过程正确。
- 检查 Kafka 服务器的配置参数,确保消息的持久化和副本机制正常工作。
- 如果出现消息丢失或重复的情况,可以通过调整生产者和消费者的配置参数,或者使用 Kafka 的事务功能来保证消息的一致性。
- 消费延迟
- 检查消费者的拉取频率和批量处理大小,是否设置合理。
- 检查 Kafka 服务器的负载情况,是否存在性能瓶颈。
- 如果消费延迟较高,可以考虑增加消费者的数量,或者调整 Kafka 服务器的配置参数,以提高消费效率。
- 连接问题
- 检查 Kafka 服务器的地址和端口是否正确配置。
- 检查网络连接是否正常,是否存在防火墙等限制。
- 如果出现连接问题,可以通过检查网络配置、调整防火墙规则等方式来解决。
七、总结
本文介绍了如何在 Spring Boot 项目中集成 Kafka,包括 Kafka 的基本概念、Spring Boot 集成 Kafka 的步骤、配置项以及实际应用案例。通过集成 Kafka,我们可以构建出高效、可靠的消息驱动应用,实现系统之间的异步通信和解耦。在实际应用中,我们还可以根据需要进行性能优化和故障排除,以确保系统的稳定运行。希望本文对大家在 Spring Boot 集成 Kafka 方面有所帮助。
相关文章:
Spring Boot 集成 Kafka
在现代软件开发中,分布式系统和微服务架构越来越受到关注。为了实现系统之间的异步通信和解耦,消息队列成为了一种重要的技术手段。Kafka 作为一种高性能、分布式的消息队列系统,被广泛应用于各种场景。而 Spring Boot 作为一种流行的 Java 开…...
CentOS中shell脚本对多台机器执行下载安装
1.建立免密ssh连接 详情见这篇: CentOS建立ssh免密连接(含流程剖析)-CSDN博客 2.脚本编写 我这里只是简单写了个demo进行演示,如果服务器很多可以先暂存成文件再逐行读取host进行连接并执行命令 用node1去ssh连接node2和node…...
浅析eBPF
目录 一、eBPF 原理 二、eBPF 已可投入使用的场景 三、eBPF 与 Jaeger/Zipkin 的区别及先进性 四、使用 eBPF 的开源软件 五、开源软件的局限性或待实现功能 猫哥说 一、eBPF 原理 eBPF (extended Berkeley Packet Filter) 是一种内核技术,允许用户在内核空间…...
HTML 基础 (快速入门)详细步骤和示例
目录 创建基本的 HTML 文件 添加内容到页面 页面布局与链接 HTML(超文本标记语言)是构建网页的基础技术,以下是 HTML 基础的详细步骤和示例: 创建基本的 HTML 文件 步骤一:新建文件 在本地计算机上选择一个合适的…...
力扣-动态规划-139 单词拆分
思路 dp数组定义:用wordDict数组可以完成不超过j的字符串的可能为dp[j]递推公式: tmp s.substr(j - wordDict[i].size(), wordDict[i].size()); dp[j] (dp[j - wordDict[i].size()] && wordDict[i] tmp) || dp[j]; dp数组初始化:…...
建筑能耗监测系统数据采集装置 物联网网关功能参数介绍
安科瑞刘鸿鹏 摘要 随着物联网(IoT)技术的迅猛发展,现代物联网系统的规模和复杂度不断增加,各种智能设备和传感器的广泛应用为数据采集和分析提供了丰富的信息源。然而,面对不同协议、标准和通信方式的设备ÿ…...
vue深拷贝:1、使用JSON.parse()和JSON.stringify();2、使用Lodash库;3、使用深拷贝函数(采用递归的方式)
文章目录 引言三种方法的优缺点在Vue中,实现数组的深拷贝I JSON.stringify和 JSON.parse的小技巧深拷贝步骤缺点:案例1:向后端请求路由数据案例2: 表单数据处理时复制用户输入的数据II 使用Lodash库步骤适用于复杂数据结构和需要处理循环引用的场景III 自定义的深拷贝函数(…...
ES 删除index 的curl
以下是使用 `curl` 命令删除 Elasticsearch 索引的格式和示例: ### 基本格式 ```bash curl -XDELETE "http://<node-ip|hostname>:9200/<index-name>" ``` - `<node-ip|hostname>`:Elasticsearch 节点的 IP 地址或主机名。 - `<index-name&g…...
游戏引擎学习第124天
仓库:https://gitee.com/mrxiao_com/2d_game_3 回顾/复习 今天是继续完善和调试多线程的任务队列。之前的几天,我们已经介绍了多线程的一些基础知识,包括如何创建工作队列以及如何在线程中处理任务。今天,重点是解决那些我们之前没有注意到…...
第十四届蓝桥杯Scratch11月stema选拔赛真题——小猫照镜子
编程实现: 小猫照镜子。(背景非源素材) 具体要求: 1). 运行程序,角色、背景如图所示; 完整题目可点击下方链接查看,支持在线编程~ 小猫照镜子_scratch_少儿编程题库学习中心-嗨信奥https://www.hixinao.com/tiku/s…...
使用vscode导出Markdown的PDF无法显示数学公式的问题
我的硬件环境是M2的MacBook air,在vscode中使用了Markdown PDF来导出md文件对应的PDF。但不管导出html还是PDF文件,数学公式都是显示的源代码。 我看了许多教程,给的是这个方法:在md文件对应的html文件中加上以下代码:…...
前端系列之:Blob
Blob 与二进制 什么是二进制? 二进制是计算机数据的基本表示形式,只使用 0 和 1 两个数字来表示数值。任何类型的数据(无论是文本、图片、音频文件等)都可以通过二进制表示。 什么是 Blob? 全称 Binary Large Object&a…...
【项目管理】基于 C 语言的 QQ 聊天室实现(TCP + 多线程 + SQLite3)
基于 C 语言的 QQ 聊天室(TCP + 多线程 + SQLite3) 项目功能基础功能: 登录、注册、添加好友、私聊、创建群聊、群聊扩展功能: 删除好友、注销账号、好友在线状态、群管理(拉人/踢人)、VIP 特权、邮件通知等 功能介绍:模拟QQ聊天客户端:登录界面:1、登录2、注册 //将用…...
Apache Flink:实时数据流处理的终极武器
Apache Flink:实时数据流处理的终极武器 在当今这个数据驱动的世界,实时数据流处理已经成为各行各业的核心需求。从金融风控到电商推荐,从物联网监控到网络安全,毫秒级的响应能力决定了一家公司在市场中的竞争力。而在众多流式计…...
点云处理入门--PointNetPointNet++论文与代码详解
基础知识 点云数据: 点云是一种通过三维扫描设备或计算机图形学技术获取的三维空间数据,通常由一系列点组成,每个点包含其在三维空间中的坐标(如 x,y,z),有时还可能包含颜色、强度等附加信息。 介绍几种常…...
通过Nginx负载均衡+Keepalived实现业务高可用
通过Nginx负载均衡和Keepalived可以实现业务的高可用,以下是详细的实现步骤: 环境准备 假设我们有3台服务器,IP地址分别为: 服务器1(Nginx Keepalived 主节点):192.168.1.100服务器2&#x…...
Spark技术系列(三):Spark算子全解析——从基础使用到高阶优化
Spark技术系列(三):Spark算子全解析——从基础使用到高阶优化 1. 算子核心概念与分类体系 1.1 算子本质解析 延迟执行机制:转换算子构建DAG,行动算子触发Job执行任务并行度:由RDD分区数决定(可通过spark.default.parallelism全局配置)执行位置优化:基于数据本地性的…...
ES6模块化详解:导入与导出方式
在现代 JavaScript 开发中,模块化是代码管理和组织的重要工具。ES6(ECMAScript 2015)引入了模块化的概念,通过 import 和 export 来组织代码,使得模块的管理变得更加清晰和简洁。本文将详细介绍 ES6 中的各种模块导入导…...
每日学习Java之一万个为什么?[MySQL面试篇]
分析SQL语句执行流程中遇到的问题 前言1 MySQL是怎么在一台服务器上启动的2 MySQL主库和从库是同时启动保持Alive的吗?3 如果不是主从怎么在启动的时候保证数据一致性4 ACID原则在MySQL上的体现5 数据在MySQL是通过什么DTO实现的6 客户端怎么与MySQL Server建立连接…...
常用空间数据结构对比
空间数据结构是用来组织和查询多维空间数据的算法结构。它们在地理信息系统 (GIS)、计算机图形学、机器人导航、机器学习等领域非常重要。以下是几种常见空间数据结构的对比: 1. 四叉树(Quadtree) 适用场景:二维空间数据&#x…...
RocketMQ延迟消息机制
两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数,对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后…...
利用ngx_stream_return_module构建简易 TCP/UDP 响应网关
一、模块概述 ngx_stream_return_module 提供了一个极简的指令: return <value>;在收到客户端连接后,立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量(如 $time_iso8601、$remote_addr 等)&a…...
Spring Boot 实现流式响应(兼容 2.7.x)
在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...
Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器
第一章 引言:语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域,文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量,支撑着搜索引擎、推荐系统、…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序
一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...
Ascend NPU上适配Step-Audio模型
1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...
rnn判断string中第一次出现a的下标
# coding:utf8 import torch import torch.nn as nn import numpy as np import random import json""" 基于pytorch的网络编写 实现一个RNN网络完成多分类任务 判断字符 a 第一次出现在字符串中的位置 """class TorchModel(nn.Module):def __in…...
