Kafka核心参数与使用02
一、从基础的客户端说起
Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。
1. 消息发送者主流程
一个最基础的 Producer 发送消息的步骤如下:
-
设置 Producer 核心属性
- 例如:
bootstrap.servers(集群地址)、key.serializer、value.serializer等。 - 大多数核心配置在
ProducerConfig中都有对应的注释说明。
- 例如:
-
构建消息
- Kafka 消息是一个
Key-Value结构,Key常用于分区路由,Value则是业务真正要传递的内容。
- Kafka 消息是一个
-
发送消息
- 单向发送:
producer.send(record);仅发出消息,不关心服务端响应。 - 同步发送:
producer.send(record).get();获取服务端响应前会阻塞。 - 异步发送:
producer.send(record, callback);服务端响应时会回调。
- 单向发送:
示例核心代码示意:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });
2. 消息消费者主流程
在 Consumer 侧,同样有三步:
-
设置 Consumer 核心属性
- 例如:
bootstrap.servers、group.id、key.deserializer、value.deserializer等。
- 例如:
-
拉取消息
- Kafka 采用 Pull 模式:消费者主动调用
poll()拉取消息。
- Kafka 采用 Pull 模式:消费者主动调用
-
处理消息,提交位点
- 手动提交:
consumer.commitSync()或consumer.commitAsync() - 自动提交:设置
enable.auto.commit = true及相应提交周期参数。
- 手动提交:
示例核心代码示意:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务处理}// 手动提交 offsetconsumer.commitSync();
}
二、从客户端属性来梳理客户端工作机制
Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。
1. 消费者分组消费机制
- Group 机制
每个Consumer都会指定一个group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。 - offset 提交
- offset 保存在 Broker 端,但由 Consumer “主导”提交。
- 提交方式有:
- 同步:
commitSync(),安全但速度慢; - 异步:
commitAsync(),快但可能丢失或重复消费。
- 同步:
- auto.offset.reset
- 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(
earliest,latest,none)决定从何处开始消费。
- 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(
提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。
2. 生产者拦截器机制
- 通过配置
interceptor.classes可以指定一个或多个实现了ProducerInterceptor接口的拦截器。 - 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。
3. 消息序列化机制
- Producer 端:
key.serializer/value.serializer:将对象序列化为byte[]。- 内置如
StringSerializer、IntegerSerializer等;可自定义自定义序列化类。
- Consumer 端:
key.deserializer/value.deserializer:将byte[]反序列化为业务对象。
- 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
- 核心思想:定长字段与不定长字段的序列化与反序列化。
4. 消息分区路由机制
- Producer 侧:
- 通过
partitioner.class指定自定义的分区器(Partitioner接口)。Kafka 内置默认逻辑:- 若无 key,则采用黏性分区策略(Sticky Partition);
- 若指定 key,则对 key 进行哈希得到分区;
- 也可改为轮询策略(RoundRobinPartitioner)。
- 通过
- Consumer 侧:
- 通过
partition.assignment.strategy指定分区分配策略,内置RangeAssignor,RoundRobinAssignor,StickyAssignor,CooperativeStickyAssignor等。- Range:按顺序将分区切块分配。
- RoundRobin:轮询分配。
- Sticky:尽可能保持现有分配不变,同时保证分配均匀。
- 通过
5. 生产者消息缓存机制
生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:
buffer.memory:缓存总大小,默认 32MB。batch.size:每个分区发送批次大小,默认 16KB。linger.ms:即便batch.size未填满,等待linger.ms毫秒后也会将消息批量发送。max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。
6. 发送应答机制
acks用于控制生产者发送完消息后何时认为消息“成功”:acks=0:不等待 Broker 确认,吞吐量高,安全性低。acks=1:只等待 Leader 分区写入,常见设置。acks=all或-1:等待所有副本写入,安全性最高,吞吐量相对低。
- 还需配合 Broker 端
min.insync.replicas参数,控制副本个数不足时直接返回错误。
7. 生产者消息幂等性
- 为保证 Exactly-once 语义,需要开启
enable.idempotence(幂等性)。 - 幂等性主要依赖
PID+SequenceNumber机制:- Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
- Broker 针对
<PID, Partition>维护序列号,只接收递增消息,防止消息重复写入。
- 幂等性要求:
acks=allretries>0max.in.flight.requests.per.connection<=5
8. 生产者消息事务
- 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
- 主要 API:
initTransactions()beginTransaction()commitTransaction()abortTransaction()
- 事务依赖于
transaction.id来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。
三、客户端流程总结
-
Producer:
- 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到
RecordAccumulator→Sender线程批量发送到 Broker → 按acks等待 Broker 响应 → 提交或重试。
- 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到
-
Consumer:
- 属性配置(反序列化、消费组、分区分配策略等) →
poll()拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
- 属性配置(反序列化、消费组、分区分配策略等) →
-
重点:
- 消息在 Producer 端的缓存发送机制 与 消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
- 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。
四、Spring Boot 集成 Kafka
Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。
-
引入依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency> -
在
application.properties中配置 Kafka 相关参数- 和原生 Kafka 参数名称基本一致,如
spring.kafka.producer.*、spring.kafka.consumer.*等。 - 典型参数:
bootstrap-servers,acks,batch-size,enable-auto-commit,auto-offset-reset等。
- 和原生 Kafka 参数名称基本一致,如
-
使用
KafkaTemplate发送消息@RestController public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/send/{message}")public void sendMessage(@PathVariable("message") String msg) {kafkaTemplate.send("topic1", msg);} } -
使用
@KafkaListener声明消息消费者@Component public class KafkaConsumerListener {@KafkaListener(topics = {"topic1"})public void onMessage(ConsumerRecord<?, ?> record) {System.out.println("消费内容:" + record.value());} }
结语
- 想要真正掌握 Kafka,重点在于建立整体的数据流转模型:
- Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
- Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
- 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
- Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。
相关文章:
Kafka核心参数与使用02
一、从基础的客户端说起 Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。 1. 消息发送者主流程 一个最基础的 Producer 发送消息的步骤…...
Three.js 渲染技术:打造逼真3D体验的幕后功臣
文章目录 前言一、着色器(Shaders)二、后处理(Post-processing)三、抗锯齿(Anti-aliasing)四、实时渲染与离线渲染五、光照模型与材质优化六、环境映射(Environment Mapping)七、纹理…...
QTcpSocket 如何统计在线时长
基本原理 QTcpSocket是 Qt 库中用于 TCP 通信的类。要统计在线时长,关键思路是记录连接建立的时间和当前时间,通过计算两者的差值来得到在线时长。实现步骤 记录连接建立时间: 在连接成功的信号槽函数中记录开始时间。例如,当QTcpSocket成功连接到服务器时,会发出connecte…...
【Altium】AD使用智能粘贴功能把多个网络标签改成端口
1、 文档目标 使用智能粘贴功能把多个网络标签(net lable)改成端口(port) 2、 问题场景 客户有一份原理图,网络用的是net label,没用Port,然后创建一个sheet symbol,但是sheet sy…...
.NET 终止或结束进程
如何使用 C# 终止进程。 使用简单的方法终止.NET中的现有进程Process.Kill()。有一个可选参数 true 或 false,用于结束与要结束的进程相关的所有子进程。 了解如何创建流程。 结束当前进程: System.Diagnostics.Process.GetCurrentProcess().Kill(tru…...
R.swift库的详细用法
R.swift 是一个 Swift 工具库,它提供了一个自动生成的类 R,使得你可以通过类型安全的方式访问项目中的资源,例如图片、字体、颜色、XIB 文件等。通过 R.swift,你可以避免字符串类型的错误,提升代码的可维护性。 以下是 R.swift 库的详细用法: 1. 安装 R.swift 使用 Sw…...
Js的回调函数
一、什么是回调函数(Callback)? 回调函数(Callback Function)是指一个函数被作为参数传递给另一个函数,并在特定事件发生或操作完成时执行。 可以通俗地理解为一种“委托”机制。 在JavaScript中࿰…...
flutter 独立开发之笔记
1、# use: - [flutter_launcher_icons:] 每次修改完icon后,都需要执行一遍 dart run flutter_launcher_icons 2、开启混淆并打包apk flutter build apk --obfuscate --split-debug-info./out/android/app.android-arm64.symbols 3、开启windows支持 flutter con…...
PHP的扩展Imagick的安装
windows下的安装 下载:Imagick扩展 PECL :: Package :: imagick 3.7.0 for Windows 下载:ghostscript(PDF提取图片时用到,不处理PDF可以不安装) Ghostscript : Downloads 安装扩展 Imagick解压后&…...
【git】在服务器使用docker设置了一个gogs服务器,访问和现实都不理想
以下问题应该都可以通过设置custom/conf/app.ini来解决 配置文档参考地址:https://www.bookstack.cn/read/gogs_zh/advanced-configuration_cheat_sheet.md domain显示的事localhost,实际上应该是一个IP地址。 关键字: DOMAIN ROOT_URL 因为是docker…...
多台PC共用同一套鼠标键盘
当环境中有多个桌面 pc 需要操作的时候,在 多台 pc 之间切换会造成很多的不方便 可以通过远程进行连接,但是有一个更好的方案是让多台机器之间共用同一套键盘鼠标 常用的解决方案 synergy 和 sharemouse,通过移动光标在不同的 pc 间切换 s…...
大语言模型是如何训练出来的?
近期听了不少与AI相关的播客,有理想转型AI的分享,有Character.ai出来同事的分享等,结合对Transformer架构的理解尝试大致还原大语言模型的训练过程。不过,当我这样的“中国大妈”也能够大致琢磨明白大语言模型是如何训练出来的时候…...
Vue2与Vue3在项目开发中的选择:深入探讨
文章目录 前言一、Vue2的优势与挑战二、Vue3的进步与特性三、如何做出选择?结语 前言 Vue.js 是一个用于构建用户界面的渐进式JavaScript框架。Vue2和Vue3是其两个主要版本,它们各自拥有一系列特点和优势。随着Vue3的发布,开发者们面临着在新…...
Web枚举:深入了解目标应用系统
Web枚举是渗透测试中重要的第一步,旨在全面收集目标系统的信息,以便后续攻击载荷的构建更具针对性和效率。本文将详细讨论如何通过各种方法识别目标Web应用的技术栈,并提取关键信息。 1. 识别目标系统的技术栈 技术栈指Web应用所依赖的技术组…...
RabbitMQ介绍与使用
RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(高级消息队列协议)标准,使用 Erlang 编程语言构建。它是消息队列(MQ)的一种,广泛应用于分布式系统中&#x…...
从0到机器视觉工程师(六):配置OpenCV和Qt环境
CMake配置OpenCV CMakeLists.txt文件的编写 cmake_minimum_required(VERSION 3.20) project(test_opencv LANGUAGES CXX) #寻找Opencv库 FIND_PACKAGE(OpenCV REQUIRED) include_directories(test_opencv ${OpenCV_INCLUDE_DIRS}) add_executable(test_opencv main.cpp) TARGE…...
计算机毕业设计Python机器学习农作物健康识别系统 人工智能 图像识别 机器学习 大数据毕业设计 算法
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...
(Arxiv-2023)LORA-FA:针对大型语言模型微调的内存高效低秩自适应
LORA-FA:针对大型语言模型微调的内存高效低秩自适应 paper是香港浸会大学发表在Arxiv 2023的工作 paper title:LORA-FA: MEMORY-EFFICIENT LOW-RANK ADAPTATION FOR LARGE LANGUAGE MODELS FINE-TUNING ABSTRACT 低秩自适应 (LoRA) 方法可以大大减少微调…...
huggingface/bert/transformer的模型默认下载路径以及自定义路径
当使用 BertTokenizer.from_pretrained(bert-base-uncased) 加载预训练的 BERT 模型时,Hugging Face 的 transformers 库会从 Hugging Face Model Hub 下载所需的模型文件和分词器文件(如果它们不在本地缓存中)。 默认情况下,这些…...
从 0 开始上手 Solana 智能合约
Solana CLI 基础知识 Solana CLI 是一个命令行界面工具,提供了一系列用于与 Solana Cluster 交互的命令。 我们将介绍一些最常见的命令,但你始终可以通过运行 solana --help 查看所有可能的 Solana CLI 命令列表。 Solana CLI 配置 Solana CLI 存储了…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...
java调用dll出现unsatisfiedLinkError以及JNA和JNI的区别
UnsatisfiedLinkError 在对接硬件设备中,我们会遇到使用 java 调用 dll文件 的情况,此时大概率出现UnsatisfiedLinkError链接错误,原因可能有如下几种 类名错误包名错误方法名参数错误使用 JNI 协议调用,结果 dll 未实现 JNI 协…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...
人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能
1. 开发环境准备 安装DevEco Studio 3.1: 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK 项目配置: // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...
django blank 与 null的区别
1.blank blank控制表单验证时是否允许字段为空 2.null null控制数据库层面是否为空 但是,要注意以下几点: Django的表单验证与null无关:null参数控制的是数据库层面字段是否可以为NULL,而blank参数控制的是Django表单验证时字…...
论文阅读:LLM4Drive: A Survey of Large Language Models for Autonomous Driving
地址:LLM4Drive: A Survey of Large Language Models for Autonomous Driving 摘要翻译 自动驾驶技术作为推动交通和城市出行变革的催化剂,正从基于规则的系统向数据驱动策略转变。传统的模块化系统受限于级联模块间的累积误差和缺乏灵活性的预设规则。…...
spring Security对RBAC及其ABAC的支持使用
RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型,它将权限分配给角色,再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...
