Kafka核心参数与使用02
一、从基础的客户端说起
Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。
1. 消息发送者主流程
一个最基础的 Producer
发送消息的步骤如下:
-
设置 Producer 核心属性
- 例如:
bootstrap.servers
(集群地址)、key.serializer
、value.serializer
等。 - 大多数核心配置在
ProducerConfig
中都有对应的注释说明。
- 例如:
-
构建消息
- Kafka 消息是一个
Key-Value
结构,Key
常用于分区路由,Value
则是业务真正要传递的内容。
- Kafka 消息是一个
-
发送消息
- 单向发送:
producer.send(record);
仅发出消息,不关心服务端响应。 - 同步发送:
producer.send(record).get();
获取服务端响应前会阻塞。 - 异步发送:
producer.send(record, callback);
服务端响应时会回调。
- 单向发送:
示例核心代码示意:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });
2. 消息消费者主流程
在 Consumer
侧,同样有三步:
-
设置 Consumer 核心属性
- 例如:
bootstrap.servers
、group.id
、key.deserializer
、value.deserializer
等。
- 例如:
-
拉取消息
- Kafka 采用 Pull 模式:消费者主动调用
poll()
拉取消息。
- Kafka 采用 Pull 模式:消费者主动调用
-
处理消息,提交位点
- 手动提交:
consumer.commitSync()
或consumer.commitAsync()
- 自动提交:设置
enable.auto.commit = true
及相应提交周期参数。
- 手动提交:
示例核心代码示意:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务处理}// 手动提交 offsetconsumer.commitSync();
}
二、从客户端属性来梳理客户端工作机制
Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。
1. 消费者分组消费机制
- Group 机制
每个Consumer
都会指定一个group.id
。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。 - offset 提交
- offset 保存在 Broker 端,但由 Consumer “主导”提交。
- 提交方式有:
- 同步:
commitSync()
,安全但速度慢; - 异步:
commitAsync()
,快但可能丢失或重复消费。
- 同步:
- auto.offset.reset
- 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(
earliest
,latest
,none
)决定从何处开始消费。
- 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(
提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。
2. 生产者拦截器机制
- 通过配置
interceptor.classes
可以指定一个或多个实现了ProducerInterceptor
接口的拦截器。 - 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。
3. 消息序列化机制
- Producer 端:
key.serializer
/value.serializer
:将对象序列化为byte[]
。- 内置如
StringSerializer
、IntegerSerializer
等;可自定义自定义序列化类。
- Consumer 端:
key.deserializer
/value.deserializer
:将byte[]
反序列化为业务对象。
- 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
- 核心思想:定长字段与不定长字段的序列化与反序列化。
4. 消息分区路由机制
- Producer 侧:
- 通过
partitioner.class
指定自定义的分区器(Partitioner
接口)。Kafka 内置默认逻辑:- 若无 key,则采用黏性分区策略(Sticky Partition);
- 若指定 key,则对 key 进行哈希得到分区;
- 也可改为轮询策略(RoundRobinPartitioner)。
- 通过
- Consumer 侧:
- 通过
partition.assignment.strategy
指定分区分配策略,内置RangeAssignor
,RoundRobinAssignor
,StickyAssignor
,CooperativeStickyAssignor
等。- Range:按顺序将分区切块分配。
- RoundRobin:轮询分配。
- Sticky:尽可能保持现有分配不变,同时保证分配均匀。
- 通过
5. 生产者消息缓存机制
生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator
),然后 sender
线程批量发送到 Broker:
buffer.memory
:缓存总大小,默认 32MB。batch.size
:每个分区发送批次大小,默认 16KB。linger.ms
:即便batch.size
未填满,等待linger.ms
毫秒后也会将消息批量发送。max.in.flight.requests.per.connection
:同一连接上未收到响应的请求数上限。
6. 发送应答机制
acks
用于控制生产者发送完消息后何时认为消息“成功”:acks=0
:不等待 Broker 确认,吞吐量高,安全性低。acks=1
:只等待 Leader 分区写入,常见设置。acks=all
或-1
:等待所有副本写入,安全性最高,吞吐量相对低。
- 还需配合 Broker 端
min.insync.replicas
参数,控制副本个数不足时直接返回错误。
7. 生产者消息幂等性
- 为保证 Exactly-once 语义,需要开启
enable.idempotence
(幂等性)。 - 幂等性主要依赖
PID
+SequenceNumber
机制:- Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
- Broker 针对
<PID, Partition>
维护序列号,只接收递增消息,防止消息重复写入。
- 幂等性要求:
acks=all
retries>0
max.in.flight.requests.per.connection<=5
8. 生产者消息事务
- 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
- 主要 API:
initTransactions()
beginTransaction()
commitTransaction()
abortTransaction()
- 事务依赖于
transaction.id
来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。
三、客户端流程总结
-
Producer:
- 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到
RecordAccumulator
→Sender
线程批量发送到 Broker → 按acks
等待 Broker 响应 → 提交或重试。
- 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到
-
Consumer:
- 属性配置(反序列化、消费组、分区分配策略等) →
poll()
拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
- 属性配置(反序列化、消费组、分区分配策略等) →
-
重点:
- 消息在 Producer 端的缓存发送机制 与 消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
- 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。
四、Spring Boot 集成 Kafka
Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。
-
引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>
-
在
application.properties
中配置 Kafka 相关参数- 和原生 Kafka 参数名称基本一致,如
spring.kafka.producer.*
、spring.kafka.consumer.*
等。 - 典型参数:
bootstrap-servers
,acks
,batch-size
,enable-auto-commit
,auto-offset-reset
等。
- 和原生 Kafka 参数名称基本一致,如
-
使用
KafkaTemplate
发送消息@RestController public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/send/{message}")public void sendMessage(@PathVariable("message") String msg) {kafkaTemplate.send("topic1", msg);} }
-
使用
@KafkaListener
声明消息消费者@Component public class KafkaConsumerListener {@KafkaListener(topics = {"topic1"})public void onMessage(ConsumerRecord<?, ?> record) {System.out.println("消费内容:" + record.value());} }
结语
- 想要真正掌握 Kafka,重点在于建立整体的数据流转模型:
- Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
- Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
- 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
- Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。
相关文章:
Kafka核心参数与使用02
一、从基础的客户端说起 Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。 1. 消息发送者主流程 一个最基础的 Producer 发送消息的步骤…...
Three.js 渲染技术:打造逼真3D体验的幕后功臣
文章目录 前言一、着色器(Shaders)二、后处理(Post-processing)三、抗锯齿(Anti-aliasing)四、实时渲染与离线渲染五、光照模型与材质优化六、环境映射(Environment Mapping)七、纹理…...
QTcpSocket 如何统计在线时长
基本原理 QTcpSocket是 Qt 库中用于 TCP 通信的类。要统计在线时长,关键思路是记录连接建立的时间和当前时间,通过计算两者的差值来得到在线时长。实现步骤 记录连接建立时间: 在连接成功的信号槽函数中记录开始时间。例如,当QTcpSocket成功连接到服务器时,会发出connecte…...

【Altium】AD使用智能粘贴功能把多个网络标签改成端口
1、 文档目标 使用智能粘贴功能把多个网络标签(net lable)改成端口(port) 2、 问题场景 客户有一份原理图,网络用的是net label,没用Port,然后创建一个sheet symbol,但是sheet sy…...

.NET 终止或结束进程
如何使用 C# 终止进程。 使用简单的方法终止.NET中的现有进程Process.Kill()。有一个可选参数 true 或 false,用于结束与要结束的进程相关的所有子进程。 了解如何创建流程。 结束当前进程: System.Diagnostics.Process.GetCurrentProcess().Kill(tru…...
R.swift库的详细用法
R.swift 是一个 Swift 工具库,它提供了一个自动生成的类 R,使得你可以通过类型安全的方式访问项目中的资源,例如图片、字体、颜色、XIB 文件等。通过 R.swift,你可以避免字符串类型的错误,提升代码的可维护性。 以下是 R.swift 库的详细用法: 1. 安装 R.swift 使用 Sw…...

Js的回调函数
一、什么是回调函数(Callback)? 回调函数(Callback Function)是指一个函数被作为参数传递给另一个函数,并在特定事件发生或操作完成时执行。 可以通俗地理解为一种“委托”机制。 在JavaScript中࿰…...
flutter 独立开发之笔记
1、# use: - [flutter_launcher_icons:] 每次修改完icon后,都需要执行一遍 dart run flutter_launcher_icons 2、开启混淆并打包apk flutter build apk --obfuscate --split-debug-info./out/android/app.android-arm64.symbols 3、开启windows支持 flutter con…...
PHP的扩展Imagick的安装
windows下的安装 下载:Imagick扩展 PECL :: Package :: imagick 3.7.0 for Windows 下载:ghostscript(PDF提取图片时用到,不处理PDF可以不安装) Ghostscript : Downloads 安装扩展 Imagick解压后&…...
【git】在服务器使用docker设置了一个gogs服务器,访问和现实都不理想
以下问题应该都可以通过设置custom/conf/app.ini来解决 配置文档参考地址:https://www.bookstack.cn/read/gogs_zh/advanced-configuration_cheat_sheet.md domain显示的事localhost,实际上应该是一个IP地址。 关键字: DOMAIN ROOT_URL 因为是docker…...

多台PC共用同一套鼠标键盘
当环境中有多个桌面 pc 需要操作的时候,在 多台 pc 之间切换会造成很多的不方便 可以通过远程进行连接,但是有一个更好的方案是让多台机器之间共用同一套键盘鼠标 常用的解决方案 synergy 和 sharemouse,通过移动光标在不同的 pc 间切换 s…...
大语言模型是如何训练出来的?
近期听了不少与AI相关的播客,有理想转型AI的分享,有Character.ai出来同事的分享等,结合对Transformer架构的理解尝试大致还原大语言模型的训练过程。不过,当我这样的“中国大妈”也能够大致琢磨明白大语言模型是如何训练出来的时候…...
Vue2与Vue3在项目开发中的选择:深入探讨
文章目录 前言一、Vue2的优势与挑战二、Vue3的进步与特性三、如何做出选择?结语 前言 Vue.js 是一个用于构建用户界面的渐进式JavaScript框架。Vue2和Vue3是其两个主要版本,它们各自拥有一系列特点和优势。随着Vue3的发布,开发者们面临着在新…...
Web枚举:深入了解目标应用系统
Web枚举是渗透测试中重要的第一步,旨在全面收集目标系统的信息,以便后续攻击载荷的构建更具针对性和效率。本文将详细讨论如何通过各种方法识别目标Web应用的技术栈,并提取关键信息。 1. 识别目标系统的技术栈 技术栈指Web应用所依赖的技术组…...

RabbitMQ介绍与使用
RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(高级消息队列协议)标准,使用 Erlang 编程语言构建。它是消息队列(MQ)的一种,广泛应用于分布式系统中&#x…...

从0到机器视觉工程师(六):配置OpenCV和Qt环境
CMake配置OpenCV CMakeLists.txt文件的编写 cmake_minimum_required(VERSION 3.20) project(test_opencv LANGUAGES CXX) #寻找Opencv库 FIND_PACKAGE(OpenCV REQUIRED) include_directories(test_opencv ${OpenCV_INCLUDE_DIRS}) add_executable(test_opencv main.cpp) TARGE…...

计算机毕业设计Python机器学习农作物健康识别系统 人工智能 图像识别 机器学习 大数据毕业设计 算法
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...

(Arxiv-2023)LORA-FA:针对大型语言模型微调的内存高效低秩自适应
LORA-FA:针对大型语言模型微调的内存高效低秩自适应 paper是香港浸会大学发表在Arxiv 2023的工作 paper title:LORA-FA: MEMORY-EFFICIENT LOW-RANK ADAPTATION FOR LARGE LANGUAGE MODELS FINE-TUNING ABSTRACT 低秩自适应 (LoRA) 方法可以大大减少微调…...
huggingface/bert/transformer的模型默认下载路径以及自定义路径
当使用 BertTokenizer.from_pretrained(bert-base-uncased) 加载预训练的 BERT 模型时,Hugging Face 的 transformers 库会从 Hugging Face Model Hub 下载所需的模型文件和分词器文件(如果它们不在本地缓存中)。 默认情况下,这些…...
从 0 开始上手 Solana 智能合约
Solana CLI 基础知识 Solana CLI 是一个命令行界面工具,提供了一系列用于与 Solana Cluster 交互的命令。 我们将介绍一些最常见的命令,但你始终可以通过运行 solana --help 查看所有可能的 Solana CLI 命令列表。 Solana CLI 配置 Solana CLI 存储了…...

ElasticSearch搜索引擎之倒排索引及其底层算法
文章目录 一、搜索引擎1、什么是搜索引擎?2、搜索引擎的分类3、常用的搜索引擎4、搜索引擎的特点二、倒排索引1、简介2、为什么倒排索引不用B+树1.创建时间长,文件大。2.其次,树深,IO次数可怕。3.索引可能会失效。4.精准度差。三. 倒排索引四、算法1、Term Index的算法2、 …...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲
文章目录 前言第一部分:体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分:体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

排序算法总结(C++)
目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指:同样大小的样本 **(同样大小的数据)**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...
Java求职者面试指南:计算机基础与源码原理深度解析
Java求职者面试指南:计算机基础与源码原理深度解析 第一轮提问:基础概念问题 1. 请解释什么是进程和线程的区别? 面试官:进程是程序的一次执行过程,是系统进行资源分配和调度的基本单位;而线程是进程中的…...

【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...
【把数组变成一棵树】有序数组秒变平衡BST,原来可以这么优雅!
【把数组变成一棵树】有序数组秒变平衡BST,原来可以这么优雅! 🌱 前言:一棵树的浪漫,从数组开始说起 程序员的世界里,数组是最常见的基本结构之一,几乎每种语言、每种算法都少不了它。可你有没有想过,一组看似“线性排列”的有序数组,竟然可以**“长”成一棵平衡的二…...