Kafka核心参数与使用02
一、从基础的客户端说起
Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。
1. 消息发送者主流程
一个最基础的 Producer 发送消息的步骤如下:
-
设置 Producer 核心属性
- 例如:
bootstrap.servers(集群地址)、key.serializer、value.serializer等。 - 大多数核心配置在
ProducerConfig中都有对应的注释说明。
- 例如:
-
构建消息
- Kafka 消息是一个
Key-Value结构,Key常用于分区路由,Value则是业务真正要传递的内容。
- Kafka 消息是一个
-
发送消息
- 单向发送:
producer.send(record);仅发出消息,不关心服务端响应。 - 同步发送:
producer.send(record).get();获取服务端响应前会阻塞。 - 异步发送:
producer.send(record, callback);服务端响应时会回调。
- 单向发送:
示例核心代码示意:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });
2. 消息消费者主流程
在 Consumer 侧,同样有三步:
-
设置 Consumer 核心属性
- 例如:
bootstrap.servers、group.id、key.deserializer、value.deserializer等。
- 例如:
-
拉取消息
- Kafka 采用 Pull 模式:消费者主动调用
poll()拉取消息。
- Kafka 采用 Pull 模式:消费者主动调用
-
处理消息,提交位点
- 手动提交:
consumer.commitSync()或consumer.commitAsync() - 自动提交:设置
enable.auto.commit = true及相应提交周期参数。
- 手动提交:
示例核心代码示意:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务处理}// 手动提交 offsetconsumer.commitSync();
}
二、从客户端属性来梳理客户端工作机制
Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。
1. 消费者分组消费机制
- Group 机制
每个Consumer都会指定一个group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。 - offset 提交
- offset 保存在 Broker 端,但由 Consumer “主导”提交。
- 提交方式有:
- 同步:
commitSync(),安全但速度慢; - 异步:
commitAsync(),快但可能丢失或重复消费。
- 同步:
- auto.offset.reset
- 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(
earliest,latest,none)决定从何处开始消费。
- 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(
提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。
2. 生产者拦截器机制
- 通过配置
interceptor.classes可以指定一个或多个实现了ProducerInterceptor接口的拦截器。 - 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。
3. 消息序列化机制
- Producer 端:
key.serializer/value.serializer:将对象序列化为byte[]。- 内置如
StringSerializer、IntegerSerializer等;可自定义自定义序列化类。
- Consumer 端:
key.deserializer/value.deserializer:将byte[]反序列化为业务对象。
- 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
- 核心思想:定长字段与不定长字段的序列化与反序列化。
4. 消息分区路由机制
- Producer 侧:
- 通过
partitioner.class指定自定义的分区器(Partitioner接口)。Kafka 内置默认逻辑:- 若无 key,则采用黏性分区策略(Sticky Partition);
- 若指定 key,则对 key 进行哈希得到分区;
- 也可改为轮询策略(RoundRobinPartitioner)。
- 通过
- Consumer 侧:
- 通过
partition.assignment.strategy指定分区分配策略,内置RangeAssignor,RoundRobinAssignor,StickyAssignor,CooperativeStickyAssignor等。- Range:按顺序将分区切块分配。
- RoundRobin:轮询分配。
- Sticky:尽可能保持现有分配不变,同时保证分配均匀。
- 通过
5. 生产者消息缓存机制
生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:
buffer.memory:缓存总大小,默认 32MB。batch.size:每个分区发送批次大小,默认 16KB。linger.ms:即便batch.size未填满,等待linger.ms毫秒后也会将消息批量发送。max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。
6. 发送应答机制
acks用于控制生产者发送完消息后何时认为消息“成功”:acks=0:不等待 Broker 确认,吞吐量高,安全性低。acks=1:只等待 Leader 分区写入,常见设置。acks=all或-1:等待所有副本写入,安全性最高,吞吐量相对低。
- 还需配合 Broker 端
min.insync.replicas参数,控制副本个数不足时直接返回错误。
7. 生产者消息幂等性
- 为保证 Exactly-once 语义,需要开启
enable.idempotence(幂等性)。 - 幂等性主要依赖
PID+SequenceNumber机制:- Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
- Broker 针对
<PID, Partition>维护序列号,只接收递增消息,防止消息重复写入。
- 幂等性要求:
acks=allretries>0max.in.flight.requests.per.connection<=5
8. 生产者消息事务
- 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
- 主要 API:
initTransactions()beginTransaction()commitTransaction()abortTransaction()
- 事务依赖于
transaction.id来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。
三、客户端流程总结
-
Producer:
- 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到
RecordAccumulator→Sender线程批量发送到 Broker → 按acks等待 Broker 响应 → 提交或重试。
- 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到
-
Consumer:
- 属性配置(反序列化、消费组、分区分配策略等) →
poll()拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
- 属性配置(反序列化、消费组、分区分配策略等) →
-
重点:
- 消息在 Producer 端的缓存发送机制 与 消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
- 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。
四、Spring Boot 集成 Kafka
Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。
-
引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency> -
在
application.properties中配置 Kafka 相关参数- 和原生 Kafka 参数名称基本一致,如
spring.kafka.producer.*、spring.kafka.consumer.*等。 - 典型参数:
bootstrap-servers,acks,batch-size,enable-auto-commit,auto-offset-reset等。
- 和原生 Kafka 参数名称基本一致,如
-
使用
KafkaTemplate发送消息@RestController public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/send/{message}")public void sendMessage(@PathVariable("message") String msg) {kafkaTemplate.send("topic1", msg);} } -
使用
@KafkaListener声明消息消费者@Component public class KafkaConsumerListener {@KafkaListener(topics = {"topic1"})public void onMessage(ConsumerRecord<?, ?> record) {System.out.println("消费内容:" + record.value());} }
结语
- 想要真正掌握 Kafka,重点在于建立整体的数据流转模型:
- Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
- Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
- 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
- Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。
相关文章:
Kafka核心参数与使用02
一、从基础的客户端说起 Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。 1. 消息发送者主流程 一个最基础的 Producer 发送消息的步骤…...
Three.js 渲染技术:打造逼真3D体验的幕后功臣
文章目录 前言一、着色器(Shaders)二、后处理(Post-processing)三、抗锯齿(Anti-aliasing)四、实时渲染与离线渲染五、光照模型与材质优化六、环境映射(Environment Mapping)七、纹理…...
QTcpSocket 如何统计在线时长
基本原理 QTcpSocket是 Qt 库中用于 TCP 通信的类。要统计在线时长,关键思路是记录连接建立的时间和当前时间,通过计算两者的差值来得到在线时长。实现步骤 记录连接建立时间: 在连接成功的信号槽函数中记录开始时间。例如,当QTcpSocket成功连接到服务器时,会发出connecte…...
【Altium】AD使用智能粘贴功能把多个网络标签改成端口
1、 文档目标 使用智能粘贴功能把多个网络标签(net lable)改成端口(port) 2、 问题场景 客户有一份原理图,网络用的是net label,没用Port,然后创建一个sheet symbol,但是sheet sy…...
.NET 终止或结束进程
如何使用 C# 终止进程。 使用简单的方法终止.NET中的现有进程Process.Kill()。有一个可选参数 true 或 false,用于结束与要结束的进程相关的所有子进程。 了解如何创建流程。 结束当前进程: System.Diagnostics.Process.GetCurrentProcess().Kill(tru…...
R.swift库的详细用法
R.swift 是一个 Swift 工具库,它提供了一个自动生成的类 R,使得你可以通过类型安全的方式访问项目中的资源,例如图片、字体、颜色、XIB 文件等。通过 R.swift,你可以避免字符串类型的错误,提升代码的可维护性。 以下是 R.swift 库的详细用法: 1. 安装 R.swift 使用 Sw…...
Js的回调函数
一、什么是回调函数(Callback)? 回调函数(Callback Function)是指一个函数被作为参数传递给另一个函数,并在特定事件发生或操作完成时执行。 可以通俗地理解为一种“委托”机制。 在JavaScript中࿰…...
flutter 独立开发之笔记
1、# use: - [flutter_launcher_icons:] 每次修改完icon后,都需要执行一遍 dart run flutter_launcher_icons 2、开启混淆并打包apk flutter build apk --obfuscate --split-debug-info./out/android/app.android-arm64.symbols 3、开启windows支持 flutter con…...
PHP的扩展Imagick的安装
windows下的安装 下载:Imagick扩展 PECL :: Package :: imagick 3.7.0 for Windows 下载:ghostscript(PDF提取图片时用到,不处理PDF可以不安装) Ghostscript : Downloads 安装扩展 Imagick解压后&…...
【git】在服务器使用docker设置了一个gogs服务器,访问和现实都不理想
以下问题应该都可以通过设置custom/conf/app.ini来解决 配置文档参考地址:https://www.bookstack.cn/read/gogs_zh/advanced-configuration_cheat_sheet.md domain显示的事localhost,实际上应该是一个IP地址。 关键字: DOMAIN ROOT_URL 因为是docker…...
多台PC共用同一套鼠标键盘
当环境中有多个桌面 pc 需要操作的时候,在 多台 pc 之间切换会造成很多的不方便 可以通过远程进行连接,但是有一个更好的方案是让多台机器之间共用同一套键盘鼠标 常用的解决方案 synergy 和 sharemouse,通过移动光标在不同的 pc 间切换 s…...
大语言模型是如何训练出来的?
近期听了不少与AI相关的播客,有理想转型AI的分享,有Character.ai出来同事的分享等,结合对Transformer架构的理解尝试大致还原大语言模型的训练过程。不过,当我这样的“中国大妈”也能够大致琢磨明白大语言模型是如何训练出来的时候…...
Vue2与Vue3在项目开发中的选择:深入探讨
文章目录 前言一、Vue2的优势与挑战二、Vue3的进步与特性三、如何做出选择?结语 前言 Vue.js 是一个用于构建用户界面的渐进式JavaScript框架。Vue2和Vue3是其两个主要版本,它们各自拥有一系列特点和优势。随着Vue3的发布,开发者们面临着在新…...
Web枚举:深入了解目标应用系统
Web枚举是渗透测试中重要的第一步,旨在全面收集目标系统的信息,以便后续攻击载荷的构建更具针对性和效率。本文将详细讨论如何通过各种方法识别目标Web应用的技术栈,并提取关键信息。 1. 识别目标系统的技术栈 技术栈指Web应用所依赖的技术组…...
RabbitMQ介绍与使用
RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(高级消息队列协议)标准,使用 Erlang 编程语言构建。它是消息队列(MQ)的一种,广泛应用于分布式系统中&#x…...
从0到机器视觉工程师(六):配置OpenCV和Qt环境
CMake配置OpenCV CMakeLists.txt文件的编写 cmake_minimum_required(VERSION 3.20) project(test_opencv LANGUAGES CXX) #寻找Opencv库 FIND_PACKAGE(OpenCV REQUIRED) include_directories(test_opencv ${OpenCV_INCLUDE_DIRS}) add_executable(test_opencv main.cpp) TARGE…...
计算机毕业设计Python机器学习农作物健康识别系统 人工智能 图像识别 机器学习 大数据毕业设计 算法
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...
(Arxiv-2023)LORA-FA:针对大型语言模型微调的内存高效低秩自适应
LORA-FA:针对大型语言模型微调的内存高效低秩自适应 paper是香港浸会大学发表在Arxiv 2023的工作 paper title:LORA-FA: MEMORY-EFFICIENT LOW-RANK ADAPTATION FOR LARGE LANGUAGE MODELS FINE-TUNING ABSTRACT 低秩自适应 (LoRA) 方法可以大大减少微调…...
huggingface/bert/transformer的模型默认下载路径以及自定义路径
当使用 BertTokenizer.from_pretrained(bert-base-uncased) 加载预训练的 BERT 模型时,Hugging Face 的 transformers 库会从 Hugging Face Model Hub 下载所需的模型文件和分词器文件(如果它们不在本地缓存中)。 默认情况下,这些…...
从 0 开始上手 Solana 智能合约
Solana CLI 基础知识 Solana CLI 是一个命令行界面工具,提供了一系列用于与 Solana Cluster 交互的命令。 我们将介绍一些最常见的命令,但你始终可以通过运行 solana --help 查看所有可能的 Solana CLI 命令列表。 Solana CLI 配置 Solana CLI 存储了…...
网络编程(Modbus进阶)
思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...
Python|GIF 解析与构建(5):手搓截屏和帧率控制
目录 Python|GIF 解析与构建(5):手搓截屏和帧率控制 一、引言 二、技术实现:手搓截屏模块 2.1 核心原理 2.2 代码解析:ScreenshotData类 2.2.1 截图函数:capture_screen 三、技术实现&…...
java_网络服务相关_gateway_nacos_feign区别联系
1. spring-cloud-starter-gateway 作用:作为微服务架构的网关,统一入口,处理所有外部请求。 核心能力: 路由转发(基于路径、服务名等)过滤器(鉴权、限流、日志、Header 处理)支持负…...
shell脚本--常见案例
1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...
【Java_EE】Spring MVC
目录 Spring Web MVC 编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 编辑参数重命名 RequestParam 编辑编辑传递集合 RequestParam 传递JSON数据 编辑RequestBody …...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2
每日一言 今天的每一份坚持,都是在为未来积攒底气。 案例:OLED显示一个A 这边观察到一个点,怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 : 如果代码里信号切换太快(比如 SDA 刚变,SCL 立刻变&#…...
论文笔记——相干体技术在裂缝预测中的应用研究
目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术:基于互相关的相干体技术(Correlation)第二代相干体技术:基于相似的相干体技术(Semblance)基于多道相似的相干体…...
