当前位置: 首页 > news >正文

Kafka核心参数与使用02

一、从基础的客户端说起

Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。

1. 消息发送者主流程

一个最基础的 Producer 发送消息的步骤如下:

  1. 设置 Producer 核心属性

    • 例如:bootstrap.servers(集群地址)、key.serializervalue.serializer 等。
    • 大多数核心配置在 ProducerConfig 中都有对应的注释说明。
  2. 构建消息

    • Kafka 消息是一个 Key-Value 结构,Key 常用于分区路由,Value 则是业务真正要传递的内容。
  3. 发送消息

    • 单向发送producer.send(record); 仅发出消息,不关心服务端响应。
    • 同步发送producer.send(record).get(); 获取服务端响应前会阻塞。
    • 异步发送producer.send(record, callback); 服务端响应时会回调。

示例核心代码示意:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });

2. 消息消费者主流程

Consumer 侧,同样有三步:

  1. 设置 Consumer 核心属性

    • 例如:bootstrap.serversgroup.idkey.deserializervalue.deserializer 等。
  2. 拉取消息

    • Kafka 采用 Pull 模式:消费者主动调用 poll() 拉取消息。
  3. 处理消息,提交位点

    • 手动提交:consumer.commitSync()consumer.commitAsync()
    • 自动提交:设置 enable.auto.commit = true 及相应提交周期参数。

示例核心代码示意:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务处理}// 手动提交 offsetconsumer.commitSync();
}

二、从客户端属性来梳理客户端工作机制

Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。

1. 消费者分组消费机制

  • Group 机制
    每个 Consumer 都会指定一个 group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。
  • offset 提交
    • offset 保存在 Broker 端,但由 Consumer “主导”提交。
    • 提交方式有:
      • 同步:commitSync(),安全但速度慢;
      • 异步:commitAsync(),快但可能丢失或重复消费。
    • auto.offset.reset
      • 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(earliest, latest, none)决定从何处开始消费。

提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。

2. 生产者拦截器机制

  • 通过配置 interceptor.classes 可以指定一个或多个实现了 ProducerInterceptor 接口的拦截器。
  • 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。

3. 消息序列化机制

  • Producer 端:
    • key.serializer / value.serializer:将对象序列化为 byte[]
    • 内置如 StringSerializerIntegerSerializer 等;可自定义自定义序列化类。
  • Consumer 端:
    • key.deserializer / value.deserializer:将 byte[] 反序列化为业务对象。
  • 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
    • 核心思想:定长字段不定长字段的序列化与反序列化。

4. 消息分区路由机制

  • Producer 侧:
    • 通过 partitioner.class 指定自定义的分区器(Partitioner 接口)。Kafka 内置默认逻辑:
      • 若无 key,则采用黏性分区策略(Sticky Partition);
      • 若指定 key,则对 key 进行哈希得到分区;
      • 也可改为轮询策略(RoundRobinPartitioner)。
  • Consumer 侧:
    • 通过 partition.assignment.strategy 指定分区分配策略,内置 RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor 等。
      • Range:按顺序将分区切块分配。
      • RoundRobin:轮询分配。
      • Sticky:尽可能保持现有分配不变,同时保证分配均匀。

5. 生产者消息缓存机制

生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:

  • buffer.memory:缓存总大小,默认 32MB。
  • batch.size:每个分区发送批次大小,默认 16KB。
  • linger.ms:即便 batch.size 未填满,等待 linger.ms 毫秒后也会将消息批量发送。
  • max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。

6. 发送应答机制

  • acks 用于控制生产者发送完消息后何时认为消息“成功”:
    • acks=0:不等待 Broker 确认,吞吐量高,安全性低。
    • acks=1:只等待 Leader 分区写入,常见设置。
    • acks=all-1:等待所有副本写入,安全性最高,吞吐量相对低。
  • 还需配合 Broker 端 min.insync.replicas 参数,控制副本个数不足时直接返回错误。

7. 生产者消息幂等性

  • 为保证 Exactly-once 语义,需要开启 enable.idempotence(幂等性)。
  • 幂等性主要依赖 PID + SequenceNumber 机制:
    • Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
    • Broker 针对 <PID, Partition> 维护序列号,只接收递增消息,防止消息重复写入。
  • 幂等性要求:
    • acks=all
    • retries>0
    • max.in.flight.requests.per.connection<=5

8. 生产者消息事务

  • 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
  • 主要 API:
    • initTransactions()
    • beginTransaction()
    • commitTransaction()
    • abortTransaction()
  • 事务依赖于 transaction.id 来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。

三、客户端流程总结

  1. Producer:

    • 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到 RecordAccumulatorSender 线程批量发送到 Broker → 按 acks 等待 Broker 响应 → 提交或重试。
  2. Consumer:

    • 属性配置(反序列化、消费组、分区分配策略等) → poll() 拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
  3. 重点:

    • 消息在 Producer 端的缓存发送机制消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
    • 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。

四、Spring Boot 集成 Kafka

Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。

  1. 引入依赖

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. application.properties 中配置 Kafka 相关参数

    • 和原生 Kafka 参数名称基本一致,如 spring.kafka.producer.*spring.kafka.consumer.* 等。
    • 典型参数:bootstrap-servers, acks, batch-size, enable-auto-commit, auto-offset-reset 等。
  3. 使用 KafkaTemplate 发送消息

    @RestController
    public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/send/{message}")public void sendMessage(@PathVariable("message") String msg) {kafkaTemplate.send("topic1", msg);}
    }
    
  4. 使用 @KafkaListener 声明消息消费者

    @Component
    public class KafkaConsumerListener {@KafkaListener(topics = {"topic1"})public void onMessage(ConsumerRecord<?, ?> record) {System.out.println("消费内容:" + record.value());}
    }
    

 


结语

  • 想要真正掌握 Kafka,重点在于建立整体的数据流转模型
    • Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
    • Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
  • 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
  • Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。

相关文章:

Kafka核心参数与使用02

一、从基础的客户端说起 Kafka 提供了非常简单的生产者&#xff08;Producer&#xff09;和消费者&#xff08;Consumer&#xff09;API。通过引入相应依赖后&#xff0c;可以快速上手编写生产者和消费者的示例。 1. 消息发送者主流程 一个最基础的 Producer 发送消息的步骤…...

Three.js 渲染技术:打造逼真3D体验的幕后功臣

文章目录 前言一、着色器&#xff08;Shaders&#xff09;二、后处理&#xff08;Post-processing&#xff09;三、抗锯齿&#xff08;Anti-aliasing&#xff09;四、实时渲染与离线渲染五、光照模型与材质优化六、环境映射&#xff08;Environment Mapping&#xff09;七、纹理…...

QTcpSocket 如何统计在线时长

基本原理 QTcpSocket是 Qt 库中用于 TCP 通信的类。要统计在线时长,关键思路是记录连接建立的时间和当前时间,通过计算两者的差值来得到在线时长。实现步骤 记录连接建立时间: 在连接成功的信号槽函数中记录开始时间。例如,当QTcpSocket成功连接到服务器时,会发出connecte…...

【Altium】AD使用智能粘贴功能把多个网络标签改成端口

1、 文档目标 使用智能粘贴功能把多个网络标签&#xff08;net lable&#xff09;改成端口&#xff08;port&#xff09; 2、 问题场景 客户有一份原理图&#xff0c;网络用的是net label&#xff0c;没用Port&#xff0c;然后创建一个sheet symbol&#xff0c;但是sheet sy…...

.NET 终止或结束进程

如何使用 C# 终止进程。 使用简单的方法终止.NET中的现有进程Process.Kill()。有一个可选参数 true 或 false&#xff0c;用于结束与要结束的进程相关的所有子进程。 了解如何创建流程。 结束当前进程&#xff1a; System.Diagnostics.Process.GetCurrentProcess().Kill(tru…...

R.swift库的详细用法

R.swift 是一个 Swift 工具库,它提供了一个自动生成的类 R,使得你可以通过类型安全的方式访问项目中的资源,例如图片、字体、颜色、XIB 文件等。通过 R.swift,你可以避免字符串类型的错误,提升代码的可维护性。 以下是 R.swift 库的详细用法: 1. 安装 R.swift 使用 Sw…...

Js的回调函数

一、什么是回调函数&#xff08;Callback&#xff09;&#xff1f; 回调函数&#xff08;Callback Function&#xff09;是指一个函数被作为参数传递给另一个函数&#xff0c;并在特定事件发生或操作完成时执行。 可以通俗地理解为一种“委托”机制。 在JavaScript中&#xff0…...

flutter 独立开发之笔记

1、# use: - [flutter_launcher_icons:] 每次修改完icon后&#xff0c;都需要执行一遍 dart run flutter_launcher_icons 2、开启混淆并打包apk flutter build apk --obfuscate --split-debug-info./out/android/app.android-arm64.symbols 3、开启windows支持 flutter con…...

PHP的扩展Imagick的安装

windows下的安装 下载&#xff1a;Imagick扩展 PECL :: Package :: imagick 3.7.0 for Windows​​​​​​​ 下载&#xff1a;ghostscript&#xff08;PDF提取图片时用到&#xff0c;不处理PDF可以不安装&#xff09; Ghostscript : Downloads 安装扩展 Imagick解压后&…...

【git】在服务器使用docker设置了一个gogs服务器,访问和现实都不理想

以下问题应该都可以通过设置custom/conf/app.ini来解决 配置文档参考地址:https://www.bookstack.cn/read/gogs_zh/advanced-configuration_cheat_sheet.md domain显示的事localhost&#xff0c;实际上应该是一个IP地址。 关键字&#xff1a; DOMAIN ROOT_URL 因为是docker…...

多台PC共用同一套鼠标键盘

当环境中有多个桌面 pc 需要操作的时候&#xff0c;在 多台 pc 之间切换会造成很多的不方便 可以通过远程进行连接&#xff0c;但是有一个更好的方案是让多台机器之间共用同一套键盘鼠标 常用的解决方案 synergy 和 sharemouse&#xff0c;通过移动光标在不同的 pc 间切换 s…...

大语言模型是如何训练出来的?

近期听了不少与AI相关的播客&#xff0c;有理想转型AI的分享&#xff0c;有Character.ai出来同事的分享等&#xff0c;结合对Transformer架构的理解尝试大致还原大语言模型的训练过程。不过&#xff0c;当我这样的“中国大妈”也能够大致琢磨明白大语言模型是如何训练出来的时候…...

Vue2与Vue3在项目开发中的选择:深入探讨

文章目录 前言一、Vue2的优势与挑战二、Vue3的进步与特性三、如何做出选择&#xff1f;结语 前言 Vue.js 是一个用于构建用户界面的渐进式JavaScript框架。Vue2和Vue3是其两个主要版本&#xff0c;它们各自拥有一系列特点和优势。随着Vue3的发布&#xff0c;开发者们面临着在新…...

Web枚举:深入了解目标应用系统

Web枚举是渗透测试中重要的第一步&#xff0c;旨在全面收集目标系统的信息&#xff0c;以便后续攻击载荷的构建更具针对性和效率。本文将详细讨论如何通过各种方法识别目标Web应用的技术栈&#xff0c;并提取关键信息。 1. 识别目标系统的技术栈 技术栈指Web应用所依赖的技术组…...

RabbitMQ介绍与使用

RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;基于 AMQP&#xff08;高级消息队列协议&#xff09;标准&#xff0c;使用 Erlang 编程语言构建。它是消息队列&#xff08;MQ&#xff09;的一种&#xff0c;广泛应用于分布式系统中&#x…...

从0到机器视觉工程师(六):配置OpenCV和Qt环境

CMake配置OpenCV CMakeLists.txt文件的编写 cmake_minimum_required(VERSION 3.20) project(test_opencv LANGUAGES CXX) #寻找Opencv库 FIND_PACKAGE(OpenCV REQUIRED) include_directories(test_opencv ${OpenCV_INCLUDE_DIRS}) add_executable(test_opencv main.cpp) TARGE…...

计算机毕业设计Python机器学习农作物健康识别系统 人工智能 图像识别 机器学习 大数据毕业设计 算法

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

(Arxiv-2023)LORA-FA:针对大型语言模型微调的内存高效低秩自适应

LORA-FA&#xff1a;针对大型语言模型微调的内存高效低秩自适应 paper是香港浸会大学发表在Arxiv 2023的工作 paper title&#xff1a;LORA-FA: MEMORY-EFFICIENT LOW-RANK ADAPTATION FOR LARGE LANGUAGE MODELS FINE-TUNING ABSTRACT 低秩自适应 (LoRA) 方法可以大大减少微调…...

huggingface/bert/transformer的模型默认下载路径以及自定义路径

当使用 BertTokenizer.from_pretrained(bert-base-uncased) 加载预训练的 BERT 模型时&#xff0c;Hugging Face 的 transformers 库会从 Hugging Face Model Hub 下载所需的模型文件和分词器文件&#xff08;如果它们不在本地缓存中&#xff09;。 默认情况下&#xff0c;这些…...

从 0 开始上手 Solana 智能合约

Solana CLI 基础知识 Solana CLI 是一个命令行界面工具&#xff0c;提供了一系列用于与 Solana Cluster 交互的命令。 我们将介绍一些最常见的命令&#xff0c;但你始终可以通过运行 solana --help 查看所有可能的 Solana CLI 命令列表。 Solana CLI 配置 Solana CLI 存储了…...

网络编程(Modbus进阶)

思维导图 Modbus RTU&#xff08;先学一点理论&#xff09; 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议&#xff0c;由 Modicon 公司&#xff08;现施耐德电气&#xff09;于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用&#xff1a;作为微服务架构的网关&#xff0c;统一入口&#xff0c;处理所有外部请求。 核心能力&#xff1a; 路由转发&#xff08;基于路径、服务名等&#xff09;过滤器&#xff08;鉴权、限流、日志、Header 处理&#xff09;支持负…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

vue3 字体颜色设置的多种方式

在Vue 3中设置字体颜色可以通过多种方式实现&#xff0c;这取决于你是想在组件内部直接设置&#xff0c;还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法&#xff1a; 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

微信小程序 - 手机震动

一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注&#xff1a;文档 https://developers.weixin.qq…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

ardupilot 开发环境eclipse 中import 缺少C++

目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...

学习STC51单片机32(芯片为STC89C52RCRC)OLED显示屏2

每日一言 今天的每一份坚持&#xff0c;都是在为未来积攒底气。 案例&#xff1a;OLED显示一个A 这边观察到一个点&#xff0c;怎么雪花了就是都是乱七八糟的占满了屏幕。。 解释 &#xff1a; 如果代码里信号切换太快&#xff08;比如 SDA 刚变&#xff0c;SCL 立刻变&#…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...