Kafka 入门与应用实战:吞吐量优化与与 RabbitMQ、RocketMQ 的对比
前言
在现代微服务架构和分布式系统中,消息队列作为解耦组件,承担着重要的职责。它不仅提供了异步处理的能力,还能确保系统的高可用性、容错性和扩展性。常见的消息队列包括 Kafka、RabbitMQ 和 RocketMQ,其中 Kafka 因其高吞吐量、分布式特性和可靠性成为大规模数据流处理的首选。
本篇文章将深入介绍 Kafka 的基本概念、执行流程、吞吐量优化策略、生命周期,重点对比 Kafka 与 RabbitMQ 和 RocketMQ 的异同,最后演示如何在 Spring Boot 中使用 Kafka,并提供相应的代码示例与配置。
1. 什么是 Kafka?
Kafka 是一个高吞吐量、分布式的消息流平台,最初由 LinkedIn 开发,后来捐赠给 Apache 基金会。它的主要优势在于能处理大量的实时数据流,常用于日志聚合、流式处理和数据传输等场景。
1.1 Kafka 的核心组成
Kafka 的核心组件包括:
- Producer(生产者):负责向 Kafka 中发送消息。例如,在一个电商系统中,订单创建后,订单数据会由生产者发送到 Kafka 中。
- Consumer(消费者):从 Kafka 中拉取消息进行处理。比如,电商系统中的库存管理模块,会作为消费者从 Kafka 中获取订单消息,进而更新库存。
- Broker(代理):Kafka 服务的节点,负责存储消息和分发消息。可以把 Broker 理解为一个仓库,消息在这里暂存和被分发。
- Zookeeper:Kafka 集群的元数据和协调管理服务,保证 Kafka 集群的高可用性和一致性。Zookeeper 就像是一个指挥中心,协调着各个 Broker 的工作。
Kafka 集群的高可用性和横向扩展能力,允许 Kafka 能在大规模生产环境中运行,并提供强大的消息持久化和可靠性。
1.2 Kafka 核心概念
- Topic(主题):消息的分类,生产者向主题发送消息,消费者从主题中接收消息。例如,在一个电商系统中,“订单消息” 可以作为一个 Topic,所有与订单相关的消息都发送到这个主题中。
- Partition(分区):每个 Topic 可以划分成多个分区。分区使得 Kafka 可以水平扩展,并且增加并发处理能力。比如,按照不同地区(如华北、华南等)划分 Partition,这样可以并行处理不同地区的订单消息。
- Offset(偏移量):每个消息在分区中的唯一标识,消费者根据 Offset 读取消息。Offset 就像是订单流水号,记录着消息在分区中的位置。
- Replication(副本):Kafka 支持为每个分区设置副本数量,以保证高可用性。例如,订单消息在不同数据中心的备份就是副本,即使某个数据中心出现故障,其他副本也能保证数据不丢失。
2. Kafka 的执行流程与吞吐量优化
Kafka 的消息处理流程可以分为以下几个步骤:
2.1 生产者发送消息
- 连接 Kafka 集群:Kafka 生产者与 Kafka Broker 通过 TCP 连接。
- 选择分区:根据分区策略(例如轮询、哈希)选择目标分区。
- 消息传输与存储:生产者将消息发送到指定的 Broker,Broker 将消息存储到日志中。
- 消息确认:根据生产者的配置,Kafka 可以在消息成功写入磁盘后确认消息,或仅在消息被接收后确认。
2.2 消费者消费消息
- 订阅 Topic:消费者通过订阅 Topic,开始接收该主题中的消息。
- 拉取消息:消费者定期向 Kafka 请求消息,Kafka 返回符合消费者偏移量的消息。
- 确认偏移量:消费者可以显式或隐式地提交消息的偏移量,确保消息的准确消费。
2.3 吞吐量优化策略
Kafka 的高吞吐量来源于其设计架构和优化策略,以下是一些关键的优化方向:
2.3.1 分区与副本数
Kafka 通过将 Topic 划分为多个分区(Partition),实现数据的水平分布和并发处理。每个分区的消息是有序的,但跨分区的消息没有顺序保障。分区数越多,能够支持的消费者并发度也越高。副本数则保证了 Kafka 在单个节点故障时,依然能够保持数据的可用性和可靠性。
优化建议:
- 增加分区数:增加分区数可以提升 Kafka 的并发能力,尤其是在消费端和生产端之间的数据流动非常活跃时。例如,当电商促销活动期间,订单量剧增,增加分区数可以更好地处理大量订单消息。
- 合理配置副本数:副本数的增加虽然提高了可靠性,但会带来更多的网络和存储压力。通常,副本数为 3 是一个常见的配置。
2.3.2 批量发送消息
Kafka 支持批量发送消息,生产者将多个消息一起发送到服务器,而不是一个消息一个消息地发送。批量发送减少了网络延迟和磁盘 I/O,从而提高了吞吐量。
优化建议:
设置适当的 batch.size
和 linger.ms
参数。batch.size
控制批次的最大大小,linger.ms
控制生产者等待时间。适当增加这些参数能够减少网络请求次数,提升吞吐量。例如:
Properties props = new Properties();
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
// 其他配置...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
2.3.3 消息压缩
Kafka 支持消息压缩,生产者可以使用 GZIP、Snappy 或 LZ4 等压缩算法来减少消息的大小,进而提高网络带宽的利用率。
优化建议:
开启压缩,尤其是在消息体较大的情况下,可以显著减少传输的流量。通过设置 compression.type
参数,可以选择适合的压缩算法。
2.3.4 消费者并发处理
Kafka 的消费者群组机制允许多个消费者并行消费消息。通过增加消费者的数量,可以提高消费速度。
优化建议:
消费者的数量应当根据分区数来合理配置,消费者数量过多会导致某些消费者处于空闲状态,而过少则会影响消费效率。
2.3.5 Kafka 配置优化
Kafka 的一些配置项可以进一步提升系统的吞吐量:
- acks 配置:生产者的
acks
配置决定了消息确认的策略。acks = 1
表示生产者等待 Leader 写入日志并返回确认即可,acks = all
则要求所有副本都写入日志。acks = 1
通常可以获得更高的吞吐量。 - compression.type:启用消息压缩,如 snappy、gzip 等,减少网络传输开销。
- buffer.memory:设置生产者端缓冲区的大小,影响消息的积压情况。
3. Kafka 与 RabbitMQ、RocketMQ 的对比
3.1 Kafka vs RabbitMQ
对比项 | Kafka | RabbitMQ |
---|---|---|
架构设计 | 采用分布式日志架构,每个主题(Topic)由多个分区组成,保证高吞吐量和数据可持久化 | 采用 AMQP 协议,基于队列和交换机的模式,提供更多的消息传递功能(例如消息确认、路由) |
吞吐量 | 提供了更高的吞吐量,特别适合大数据、日志流等场景 | 适用于低延迟和高可靠性的应用,但在高吞吐量场景下表现较差 |
使用场景 | 适用于实时数据流处理、大数据流式计算等高吞吐量场景 | 更适用于任务队列、消息分发、延时消息等应用 |
3.2 Kafka vs RocketMQ
对比项 | Kafka | RocketMQ |
---|---|---|
架构设计 | 基于分区和日志的存储,适用于海量数据的存储和流式传输 | 基于主题和队列,支持事务消息和顺序消息,适用于金融等高可靠性要求的场景 |
吞吐量 | 吞吐量通常比 RocketMQ 高,适合处理大量的实时数据流 | 支持顺序消费和事务消息,对于对数据一致性要求较高的应用场景更为合适 |
使用场景 | 更适合用于数据流处理、大规模日志聚合等 | 适合于分布式事务、高可靠消息传递等场景 |
4. 在 Spring Boot 中使用 Kafka
4.1 Maven 依赖配置
首先,在 Spring Boot 项目中添加 Kafka 的依赖。在 pom.xml
中加入:
<dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.0</version> <!-- 根据实际版本号调整 --></dependency>
</dependencies>
4.2 配置 Kafka
在 application.yml
或 application.properties
文件中配置 Kafka:
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: test-groupauto-offset-reset: earliestproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
4.3 Kafka 生产者示例代码
@Service
public class KafkaProducer {private final KafkaTemplate<String, String> kafkaTemplate;// 这里的KafkaTemplate<String, String>是用于发送消息的模板类,它定义了发送消息的方式和相关配置public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}// sendMessage方法用于将消息发送到指定的主题public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);}
}
4.4 Kafka 消费者示例代码
@Service
public class KafkaConsumer {// @KafkaListener注解表示该方法是一个Kafka消息监听器,监听指定的主题和组@KafkaListener(topics = "test-topic", groupId = "test-group")public void listen(String message) {System.out.println("Received message: " + message);}
}
4.5 启动 Spring Boot 应用
启动 Spring Boot 应用后,Kafka 生产者和消费者将自动处理消息的发送与接收。如果是本地环境,启动 Kafka 服务可以使用相应的命令行操作,例如在 Kafka 的安装目录下执行 bin/kafka-server-start.sh config/server.properties
(Linux 或 macOS 系统)。如果连接失败,可能会出现如 “Connection refused” 等错误提示,此时需要检查 Kafka 服务是否正常运行、配置的连接地址和端口是否正确等。
5. 总结
-
Kafka 作为一款卓越的分布式流平台,在大数据处理和分布式系统领域有着举足轻重的地位 。它具备高吞吐量、分布式特性以及出色的可靠性,使其成为处理大规模实时数据流的不二之选。
-
在基础概念方面,Kafka 拥有 Producer、Consumer、Broker 和 Zookeeper 等核心组件,这些组件相互协作,构建起了一个稳定且高效的消息处理体系。同时,Topic、Partition、Offset 和 Replication 等核心概念,是深入理解 Kafka 工作机制的关键。例如,Partition 实现了数据的水平扩展和并发处理,而 Replication 则确保了数据的高可用性。
-
在执行流程上,生产者发送消息时,会经历连接 Kafka 集群、选择分区、传输存储以及消息确认等步骤;消费者消费消息则通过订阅 Topic、拉取消息以及确认偏移量来完成。这一过程看似简单,却蕴含着诸多内部机制,如生产者如何保证消息在分区内的顺序性,消费者显式和隐式提交偏移量的区别等,这些细节对于优化 Kafka 性能和保障消息准确处理至关重要。
-
吞吐量优化是 Kafka 的一大亮点。通过合理调整分区与副本数、采用批量发送消息、开启消息压缩、优化消费者并发处理以及精细配置 Kafka 参数等策略,可以显著提升 Kafka 的性能。例如,增加分区数能提升并发能力,但要注意避免分区过多导致管理成本增加;批量发送消息和消息压缩可以有效减少网络延迟和带宽占用。
-
与 RabbitMQ 和 RocketMQ 相比,Kafka 在架构设计、吞吐量和使用场景上各有特点。RabbitMQ 基于 AMQP 协议,侧重于消息传递功能和低延迟高可靠性;RocketMQ 支持事务消息和顺序消息,适用于对数据一致性要求极高的场景。而 Kafka 凭借其分布式日志架构和高吞吐量,在实时数据流处理和大规模日志聚合等方面表现出色。
-
在实际应用中,通过在 Spring Boot 中集成 Kafka,可以轻松搭建高效的消息处理系统。从添加 Maven 依赖到配置 Kafka,再到编写生产者和消费者示例代码,每一步都为实现可靠的消息通信奠定了基础。
-
总之,Kafka 的强大功能和广泛适用性,使其成为现代分布式系统中不可或缺的一部分。无论是处理海量数据的实时分析,还是构建高可靠的异步消息处理机制,Kafka 都能提供卓越的解决方案。随着技术的不断发展,Kafka 也将持续演进,为开发者带来更多的便利和创新。
相关文章:

Kafka 入门与应用实战:吞吐量优化与与 RabbitMQ、RocketMQ 的对比
前言 在现代微服务架构和分布式系统中,消息队列作为解耦组件,承担着重要的职责。它不仅提供了异步处理的能力,还能确保系统的高可用性、容错性和扩展性。常见的消息队列包括 Kafka、RabbitMQ 和 RocketMQ,其中 Kafka 因其高吞吐量…...

“推理”(Inference)在深度学习和机器学习的语境
“推理”(Inference)在深度学习和机器学习的语境中,是指使用经过训练的模型对新数据进行预测的过程。将其简单地理解为“模型的应用阶段”。在这一阶段,我们不再进行模型训练,而是利用已训练好且保存下来的模型来获取对…...

字节腾讯阿里大厂面经汇总:Java集合(容器)大厂面试题及参考答案
ArrayList 的扩容机制以及删除操作的时间复杂度 ArrayList 是 Java 中非常常用的一个集合类,它是基于数组实现的动态数组。当我们创建一个 ArrayList 时,如果不指定初始容量,它会有一个默认的初始容量(通常是 10)。当我们向 ArrayList 中添加元素时,如果元素的数量达到了…...

数据结构(初阶)(一)----算法复杂度
算法复杂度 算法复杂度数据结构算法算法效率复杂度的概念 数据结构 数据结构(Data Structure)是计算机存储、组织数据的⽅式,指相互之间存在⼀种或多种特定关系的数据元素的集合。没有⼀种单⼀的数据结构对所有⽤途都有⽤,所以我们要学各式各样的数据结…...

构建高效稳定的网络环境
概述 网络技术是当今IT行业的重要组成部分,构建高效稳定的网络环境对于企业、个人和互联网发展至关重要。本文将探讨网络技术中的关键要素,包括网络协议、网络架构、网络安全和网络优化,并提供实用的技巧和最佳实践,以帮助您构建…...

使用Edge打开visio文件
使用Edge打开visio文件 打开Edge浏览器搜索‘vsdx edge’ 打开第一个搜索结果 Microsoft Support 根据上述打开的页面进行操作 第一步:安装Visio Viewer 第二步:添加注册表 桌面新增文本文件,将下面的内容放入新建文本中,修…...

ChatGPT Prompt 编写指南
一、第一原则:明确的意图 你需要明确地表达你的意图和要求,尽可能具体、描述性、详细地描述所需的上下文、你期望的结果等。你的要求越明确,越有希望获得你想要的答案。 糟糕的案例 ❌ 写一首关于 OpenAI 的诗。 更好的案…...

蚁群算法 (Ant Colony Optimization) 算法详解及案例分析
蚁群算法 (Ant Colony Optimization) 算法详解及案例分析 目录 蚁群算法 (Ant Colony Optimization) 算法详解及案例分析1. 引言2. 蚁群算法 (ACO) 算法原理2.1 蚂蚁觅食行为2.2 算法步骤2.3 数学公式3. 蚁群算法的优势与局限性3.1 优势3.2 局限性4. 案例分析4.1 案例1: 旅行商…...

安卓动态设置Unity图形API
命令行方式 Unity图像api设置为自动,安卓动态设置Vulkan、OpenGLES Unity设置 安卓设置 创建自定义活动并将其设置为应用程序入口点。 在自定义活动中,覆盖字符串UnityPlayerActivity。updateunitycommandlineararguments (String cmdLine)方法。 在该方法中,将cmdLine…...

通信协议—WebSocket
一、WebSocket编程概念 1.1 什么是WebSocket WebSocket 是一种全双工通信协议,允许在客户端(通常是浏览器)和服务器之间建立持久连接,以实现实时的双向通信。它是 HTML5 标准的一部分,相比传统的 HTTP 请求ÿ…...

helm推送到harbor私有库--http: server gave HTTP response to HTTPS client
harbor私有库访问的是http模式 harbor 2.8版本以上可以存储helm镜像 docker镜像推送的时候需要docker端配置insecure-registries 发现helm推送只能在harbor部署的本机使用localhost才能推送成功,即 helm push xxx.tgz oci://localhost:80/library 使用helm pus…...

数据结构——实验一·线性表
海~~欢迎来到Tubishu的博客🌸如果你也是一名在校大学生,正在寻找各种变成资源,那么你就来对地方啦🌟 Tubishu是一名计算机本科生,会不定期整理和分享学习中的优质资源,希望能为你的编程之路添砖加瓦⭐&…...

快速搭建深度学习环境(Linux:miniconda+pytorch+jupyter notebook)
本文基于服务器端环境展开,使用的虚拟终端为Xshell。 miniconda miniconda是Anaconda的轻量版,仅包含Conda和Python,如果只做深度学习,可使用miniconda。 [注]:Anaconda、Conda与Miniconda Conda:创建和管…...

OpenCV相机标定与3D重建(66)对立体匹配生成的视差图(disparity map)进行验证的函数validateDisparity()的使用
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 使用左右检查来验证视差。矩阵 “cost” 应该由立体对应算法计算。 cv::validateDisparity 函数是 OpenCV 库中用于对立体匹配生成的视差图&…...

2025年新开局!谁在引领汽车AI风潮?
汽车AI革命已来。 在2025年伊始开幕的CES展上,AI汽车、AI座舱无疑成为了今年汽车行业的最大热点。其中不少车企在2025年CES上展示了其新一代AI座舱,为下一代智能汽车的人机交互、场景创新率先打样。 其中,东软集团也携带AI驱动、大数据支撑…...

Spring自定义BeanPostProcessor实现bean的代理Java动态代理知识
上文:https://blog.csdn.net/qq_26437925/article/details/145241149 中大致了解了spring aop的代理的实现,其实就是有个BeanPostProcessor代理了bean对象。顺便复习下java代理相关知识 目录 自定义BeanPostProcessor实现aopJava动态代理知识动态代理的几…...

三篇物联网漏洞挖掘综述
由于物联网设备存在硬件资源受限、硬件复杂异构, 代码、文档未公开的问题, 物联网设备的漏洞挖掘存在较大的挑战: 硬件资源受限性: 通用动态二进分析技术需要在运行程序外围实施监控分析。由于物联网设备存储资源(存储)的受限性,…...

Pytorch深度学习指南 卷I --编程基础(A Beginner‘s Guide) 第1章 一个简单的回归
本章正式开始使用pytorch的接口来实现对应的numpy的学习的过程,来学习模型的实现,我们会介绍numpy是如何学习的,以及我们如何一步步的通过torch的接口来实现简单化的过程,优雅的展示我们的代码,已经我们的代码完成的事…...

【EXCEL_VBA_实战】多工作薄合并深入理解
工作背景:多个工作薄存在冲突的名称,需快速合并 困难点:工作表移动复制时,若有冲突的名称,会不断弹出对话框待人工确认 思路:利用代码确认弹出的对话框 关键代码:Application.DisplayAlerts …...

mysql之表的外键约束
MySQL表的外键约束详细介绍及代码示例 外键约束是数据库中用于维护数据完整性和一致性的重要机制。它确保一个表中的数据与另一个表中的数据相关联,防止无效的数据引用。本文将详细介绍了外键约束的各个方面,并通过具体的代码示例进行演示。 1. 外键约束…...

Tuning the Go HTTP Client Settings
记录一次Go HTTP Client TIME_WAIT的优化 业务流程 分析 通过容器监控发现服务到事件总线的负载均衡之间有大量的短链接,回看一下代码 发送请求的代码 func SendToKEvent(ev *KEvent) error {data, err : json.Marshal(ev.Data)if err ! nil {return err}log.Pri…...

第二十四课 Vue中子组件调用父组件数据
Vue中子组件调用父组件数据 Vue是不建议在不同的组件直接传递值的,我们需要使用props方法来进行组件间的值传递 子组件调用父组件数据 父模板的数据,子组件是无法直接调用的 无法直接调用 1)组件调用顶级对象中的data <div class&quo…...

Jenkins-pipeline语法说明
一. 简述: Jenkins Pipeline 是一种持续集成和持续交付(CI/CD)工具,它允许用户通过代码定义构建、测试和部署流程。 二. 关于jenkinsfile: 1. Sections部分: Pipeline里的Sections通常包含一个或多个Direc…...

小米Vela操作系统开源:AIoT时代的全新引擎
小米近日正式开源了其物联网嵌入式软件平台——Vela操作系统,并将其命名为OpenVela。这一举动在AIoT(人工智能物联网)领域掀起了不小的波澜,也为开发者们提供了一个强大的AI代码生成器和开发平台。OpenVela项目源代码已托管至GitH…...

NodeJs如何做API接口单元测试? --【elpis全栈项目】
NodeJs API接口单元测试 api单元测试需要用到的 assert:断言库 (还要一些断言库比如:Chai)supertest: 模拟http请求 简单的例子: const express require(express); const supertest require(supertest); const assert require(assert);…...

bundletool来特定设备规范的json安装aab包
1、获取自己设备的设备规范json java -jar ./bundletool.jar get-device-spec --outputj:/device-spec.json 2、根据设备规范生成apks包 java -jar ./bundletool.jar build-apks --device-specj:/device-spec.json --bundleapp-dev-release.aab --output随便的文件名.apks -…...

2024年第十五届蓝桥杯青少组国赛(c++)真题—快速分解质因数
快速分解质因数 完整题目和在线测评可点击下方链接前往: 快速分解质因数_C_少儿编程题库学习中心-嗨信奥https://www.hixinao.com/tiku/cpp/show-3781.htmlhttps://www.hixinao.com/tiku/cpp/show-3781.html 若如其他赛事真题可自行前往题库中心查找,题…...

.Net Core微服务入门全纪录(四)——Ocelot-API网关(上)
系列文章目录 1、.Net Core微服务入门系列(一)——项目搭建 2、.Net Core微服务入门全纪录(二)——Consul-服务注册与发现(上) 3、.Net Core微服务入门全纪录(三)——Consul-服务注…...

chrome游览器JSON Formatter插件无效问题排查,FastJsonHttpMessageConverter导致Content-Type返回不正确
问题描述 chrome游览器又一款JSON插件叫JSON Formatter,游览器GET请求调用接口时,如果返回的数据是json格式,则会自动格式化展示,类似这样: 但是今天突然发现怎么也格式化不了,打开一个json文件倒是可以格…...

[Qt]系统相关-网络编程-TCP、UDP、HTTP协议
目录 前言 一、UDP网络编程 1.Qt项目文件 2.UDP类 QUdpSocket QNetworkDatagram 3.UDP回显服务器案例 细节 服务器设计 客户端设计 二、TCP网络编程 1.TCP类 QTcpServer QTcpSocket 2.TCP回显服务器案例 细节 服务器设计 客户端设计 三、HTTP客户端 1.HTTP…...