当前位置: 首页 > news >正文

Kafka核心参数与使用02

一、从基础的客户端说起

Kafka 提供了非常简单的生产者(Producer)和消费者(Consumer)API。通过引入相应依赖后,可以快速上手编写生产者和消费者的示例。

1. 消息发送者主流程

一个最基础的 Producer 发送消息的步骤如下:

  1. 设置 Producer 核心属性

    • 例如:bootstrap.servers(集群地址)、key.serializervalue.serializer 等。
    • 大多数核心配置在 ProducerConfig 中都有对应的注释说明。
  2. 构建消息

    • Kafka 消息是一个 Key-Value 结构,Key 常用于分区路由,Value 则是业务真正要传递的内容。
  3. 发送消息

    • 单向发送producer.send(record); 仅发出消息,不关心服务端响应。
    • 同步发送producer.send(record).get(); 获取服务端响应前会阻塞。
    • 异步发送producer.send(record, callback); 服务端响应时会回调。

示例核心代码示意:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String,String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record =new ProducerRecord<>(TOPIC, key, value);
// 单向发送
producer.send(record);
// 同步发送
producer.send(record).get();
// 异步发送
producer.send(record, new Callback() { ... });

2. 消息消费者主流程

Consumer 侧,同样有三步:

  1. 设置 Consumer 核心属性

    • 例如:bootstrap.serversgroup.idkey.deserializervalue.deserializer 等。
  2. 拉取消息

    • Kafka 采用 Pull 模式:消费者主动调用 poll() 拉取消息。
  3. 处理消息,提交位点

    • 手动提交:consumer.commitSync()consumer.commitAsync()
    • 自动提交:设置 enable.auto.commit = true 及相应提交周期参数。

示例核心代码示意:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 业务处理}// 手动提交 offsetconsumer.commitSync();
}

二、从客户端属性来梳理客户端工作机制

Kafka 的核心特色在于高并发、高吞吐以及在网络不稳定、服务随时会崩溃等复杂场景下仍能保证消息安全性。以下属性与机制能帮助我们从客户端视角理解 Kafka。

1. 消费者分组消费机制

  • Group 机制
    每个 Consumer 都会指定一个 group.id。同一个 Consumer Group 内,Topic 的每个 Partition 只会被同组里的一个 Consumer 消费。不同组之间则是互不影响、各自消费。
  • offset 提交
    • offset 保存在 Broker 端,但由 Consumer “主导”提交。
    • 提交方式有:
      • 同步:commitSync(),安全但速度慢;
      • 异步:commitAsync(),快但可能丢失或重复消费。
    • auto.offset.reset
      • 当 Broker 端没有找到该 Group 相应的 offset 时,可以根据配置(earliest, latest, none)决定从何处开始消费。

提示:Offset 提交与消息处理之间并非完全同步,一旦无法保证强一致性,可能出现消息重复与消息丢失。可根据业务需求与场景选择手动提交或自动提交,也可将 offset 存入外部存储(如 Redis)自行管理。

2. 生产者拦截器机制

  • 通过配置 interceptor.classes 可以指定一个或多个实现了 ProducerInterceptor 接口的拦截器。
  • 典型功能:在发送前统一添加/修改消息内容,或者在发送后做监控/统计等操作。

3. 消息序列化机制

  • Producer 端:
    • key.serializer / value.serializer:将对象序列化为 byte[]
    • 内置如 StringSerializerIntegerSerializer 等;可自定义自定义序列化类。
  • Consumer 端:
    • key.deserializer / value.deserializer:将 byte[] 反序列化为业务对象。
  • 如果使用自定义类型(POJO)进行传输,则需要编写自定义 Serializer/Deserializer。
    • 核心思想:定长字段不定长字段的序列化与反序列化。

4. 消息分区路由机制

  • Producer 侧:
    • 通过 partitioner.class 指定自定义的分区器(Partitioner 接口)。Kafka 内置默认逻辑:
      • 若无 key,则采用黏性分区策略(Sticky Partition);
      • 若指定 key,则对 key 进行哈希得到分区;
      • 也可改为轮询策略(RoundRobinPartitioner)。
  • Consumer 侧:
    • 通过 partition.assignment.strategy 指定分区分配策略,内置 RangeAssignor, RoundRobinAssignor, StickyAssignor, CooperativeStickyAssignor 等。
      • Range:按顺序将分区切块分配。
      • RoundRobin:轮询分配。
      • Sticky:尽可能保持现有分配不变,同时保证分配均匀。

5. 生产者消息缓存机制

生产者为了提高吞吐量,会将消息先写入一个本地缓存(RecordAccumulator),然后 sender 线程批量发送到 Broker:

  • buffer.memory:缓存总大小,默认 32MB。
  • batch.size:每个分区发送批次大小,默认 16KB。
  • linger.ms:即便 batch.size 未填满,等待 linger.ms 毫秒后也会将消息批量发送。
  • max.in.flight.requests.per.connection:同一连接上未收到响应的请求数上限。

6. 发送应答机制

  • acks 用于控制生产者发送完消息后何时认为消息“成功”:
    • acks=0:不等待 Broker 确认,吞吐量高,安全性低。
    • acks=1:只等待 Leader 分区写入,常见设置。
    • acks=all-1:等待所有副本写入,安全性最高,吞吐量相对低。
  • 还需配合 Broker 端 min.insync.replicas 参数,控制副本个数不足时直接返回错误。

7. 生产者消息幂等性

  • 为保证 Exactly-once 语义,需要开启 enable.idempotence(幂等性)。
  • 幂等性主要依赖 PID + SequenceNumber 机制:
    • Producer 向同一分区发送消息时,每条消息都有一个单调递增的序列号。
    • Broker 针对 <PID, Partition> 维护序列号,只接收递增消息,防止消息重复写入。
  • 幂等性要求:
    • acks=all
    • retries>0
    • max.in.flight.requests.per.connection<=5

8. 生产者消息事务

  • 幂等性只能保证单个分区的 Exactly-once,如果涉及 多个分区/Topic 则需要“事务”来保证一批消息的一致性。
  • 主要 API:
    • initTransactions()
    • beginTransaction()
    • commitTransaction()
    • abortTransaction()
  • 事务依赖于 transaction.id 来区分不同的 Producer 实例,以便在崩溃重启后继续补偿或回滚先前未完成的事务,保证多分区的一致写入。

三、客户端流程总结

  1. Producer:

    • 属性配置(序列化、分区器、拦截器、幂等性/事务等) → 将消息提交到 RecordAccumulatorSender 线程批量发送到 Broker → 按 acks 等待 Broker 响应 → 提交或重试。
  2. Consumer:

    • 属性配置(反序列化、消费组、分区分配策略等) → poll() 拉取消息 → 业务处理 → 提交 offset(手动或自动),与 Broker 同步消费进度。
  3. 重点:

    • 消息在 Producer 端的缓存发送机制消息在 Consumer 端的主动拉取、分组消费、offset 提交 是理解 Kafka 高并发、高吞吐、高可用的关键。
    • 其他如 幂等性(保证单分区 Exactly-once)和 事务(保证多分区一致性)是针对数据安全性和业务需求的更深入扩展。

四、Spring Boot 集成 Kafka

Spring Boot 中集成 Kafka 本质也是对上述 Producer/Consumer 的封装。

  1. 引入依赖

    <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
    </dependency>
    
  2. application.properties 中配置 Kafka 相关参数

    • 和原生 Kafka 参数名称基本一致,如 spring.kafka.producer.*spring.kafka.consumer.* 等。
    • 典型参数:bootstrap-servers, acks, batch-size, enable-auto-commit, auto-offset-reset 等。
  3. 使用 KafkaTemplate 发送消息

    @RestController
    public class KafkaProducerController {@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;@GetMapping("/kafka/send/{message}")public void sendMessage(@PathVariable("message") String msg) {kafkaTemplate.send("topic1", msg);}
    }
    
  4. 使用 @KafkaListener 声明消息消费者

    @Component
    public class KafkaConsumerListener {@KafkaListener(topics = {"topic1"})public void onMessage(ConsumerRecord<?, ?> record) {System.out.println("消费内容:" + record.value());}
    }
    

 


结语

  • 想要真正掌握 Kafka,重点在于建立整体的数据流转模型
    • Producer 端如何将消息分区、缓存、发送、应答、重试、保证幂等与事务;
    • Consumer 端如何分组消费、订阅分区、拉取消息、提交 offset。
  • 熟悉这些机制后,再去看各种客户端配置就会轻松许多,能够结合实际业务场景做灵活配置与调优。
  • Spring Boot 也只是对原生 Kafka 客户端的进一步封装,一旦理解 Kafka 底层机制与各项参数原理,使用 Spring Boot 时只需“对号入座”地进行配置即可。

相关文章:

Kafka核心参数与使用02

一、从基础的客户端说起 Kafka 提供了非常简单的生产者&#xff08;Producer&#xff09;和消费者&#xff08;Consumer&#xff09;API。通过引入相应依赖后&#xff0c;可以快速上手编写生产者和消费者的示例。 1. 消息发送者主流程 一个最基础的 Producer 发送消息的步骤…...

Three.js 渲染技术:打造逼真3D体验的幕后功臣

文章目录 前言一、着色器&#xff08;Shaders&#xff09;二、后处理&#xff08;Post-processing&#xff09;三、抗锯齿&#xff08;Anti-aliasing&#xff09;四、实时渲染与离线渲染五、光照模型与材质优化六、环境映射&#xff08;Environment Mapping&#xff09;七、纹理…...

QTcpSocket 如何统计在线时长

基本原理 QTcpSocket是 Qt 库中用于 TCP 通信的类。要统计在线时长,关键思路是记录连接建立的时间和当前时间,通过计算两者的差值来得到在线时长。实现步骤 记录连接建立时间: 在连接成功的信号槽函数中记录开始时间。例如,当QTcpSocket成功连接到服务器时,会发出connecte…...

【Altium】AD使用智能粘贴功能把多个网络标签改成端口

1、 文档目标 使用智能粘贴功能把多个网络标签&#xff08;net lable&#xff09;改成端口&#xff08;port&#xff09; 2、 问题场景 客户有一份原理图&#xff0c;网络用的是net label&#xff0c;没用Port&#xff0c;然后创建一个sheet symbol&#xff0c;但是sheet sy…...

.NET 终止或结束进程

如何使用 C# 终止进程。 使用简单的方法终止.NET中的现有进程Process.Kill()。有一个可选参数 true 或 false&#xff0c;用于结束与要结束的进程相关的所有子进程。 了解如何创建流程。 结束当前进程&#xff1a; System.Diagnostics.Process.GetCurrentProcess().Kill(tru…...

R.swift库的详细用法

R.swift 是一个 Swift 工具库,它提供了一个自动生成的类 R,使得你可以通过类型安全的方式访问项目中的资源,例如图片、字体、颜色、XIB 文件等。通过 R.swift,你可以避免字符串类型的错误,提升代码的可维护性。 以下是 R.swift 库的详细用法: 1. 安装 R.swift 使用 Sw…...

Js的回调函数

一、什么是回调函数&#xff08;Callback&#xff09;&#xff1f; 回调函数&#xff08;Callback Function&#xff09;是指一个函数被作为参数传递给另一个函数&#xff0c;并在特定事件发生或操作完成时执行。 可以通俗地理解为一种“委托”机制。 在JavaScript中&#xff0…...

flutter 独立开发之笔记

1、# use: - [flutter_launcher_icons:] 每次修改完icon后&#xff0c;都需要执行一遍 dart run flutter_launcher_icons 2、开启混淆并打包apk flutter build apk --obfuscate --split-debug-info./out/android/app.android-arm64.symbols 3、开启windows支持 flutter con…...

PHP的扩展Imagick的安装

windows下的安装 下载&#xff1a;Imagick扩展 PECL :: Package :: imagick 3.7.0 for Windows​​​​​​​ 下载&#xff1a;ghostscript&#xff08;PDF提取图片时用到&#xff0c;不处理PDF可以不安装&#xff09; Ghostscript : Downloads 安装扩展 Imagick解压后&…...

【git】在服务器使用docker设置了一个gogs服务器,访问和现实都不理想

以下问题应该都可以通过设置custom/conf/app.ini来解决 配置文档参考地址:https://www.bookstack.cn/read/gogs_zh/advanced-configuration_cheat_sheet.md domain显示的事localhost&#xff0c;实际上应该是一个IP地址。 关键字&#xff1a; DOMAIN ROOT_URL 因为是docker…...

多台PC共用同一套鼠标键盘

当环境中有多个桌面 pc 需要操作的时候&#xff0c;在 多台 pc 之间切换会造成很多的不方便 可以通过远程进行连接&#xff0c;但是有一个更好的方案是让多台机器之间共用同一套键盘鼠标 常用的解决方案 synergy 和 sharemouse&#xff0c;通过移动光标在不同的 pc 间切换 s…...

大语言模型是如何训练出来的?

近期听了不少与AI相关的播客&#xff0c;有理想转型AI的分享&#xff0c;有Character.ai出来同事的分享等&#xff0c;结合对Transformer架构的理解尝试大致还原大语言模型的训练过程。不过&#xff0c;当我这样的“中国大妈”也能够大致琢磨明白大语言模型是如何训练出来的时候…...

Vue2与Vue3在项目开发中的选择:深入探讨

文章目录 前言一、Vue2的优势与挑战二、Vue3的进步与特性三、如何做出选择&#xff1f;结语 前言 Vue.js 是一个用于构建用户界面的渐进式JavaScript框架。Vue2和Vue3是其两个主要版本&#xff0c;它们各自拥有一系列特点和优势。随着Vue3的发布&#xff0c;开发者们面临着在新…...

Web枚举:深入了解目标应用系统

Web枚举是渗透测试中重要的第一步&#xff0c;旨在全面收集目标系统的信息&#xff0c;以便后续攻击载荷的构建更具针对性和效率。本文将详细讨论如何通过各种方法识别目标Web应用的技术栈&#xff0c;并提取关键信息。 1. 识别目标系统的技术栈 技术栈指Web应用所依赖的技术组…...

RabbitMQ介绍与使用

RabbitMQ官网 RabbitMQ 介绍 RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;基于 AMQP&#xff08;高级消息队列协议&#xff09;标准&#xff0c;使用 Erlang 编程语言构建。它是消息队列&#xff08;MQ&#xff09;的一种&#xff0c;广泛应用于分布式系统中&#x…...

从0到机器视觉工程师(六):配置OpenCV和Qt环境

CMake配置OpenCV CMakeLists.txt文件的编写 cmake_minimum_required(VERSION 3.20) project(test_opencv LANGUAGES CXX) #寻找Opencv库 FIND_PACKAGE(OpenCV REQUIRED) include_directories(test_opencv ${OpenCV_INCLUDE_DIRS}) add_executable(test_opencv main.cpp) TARGE…...

计算机毕业设计Python机器学习农作物健康识别系统 人工智能 图像识别 机器学习 大数据毕业设计 算法

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…...

(Arxiv-2023)LORA-FA:针对大型语言模型微调的内存高效低秩自适应

LORA-FA&#xff1a;针对大型语言模型微调的内存高效低秩自适应 paper是香港浸会大学发表在Arxiv 2023的工作 paper title&#xff1a;LORA-FA: MEMORY-EFFICIENT LOW-RANK ADAPTATION FOR LARGE LANGUAGE MODELS FINE-TUNING ABSTRACT 低秩自适应 (LoRA) 方法可以大大减少微调…...

huggingface/bert/transformer的模型默认下载路径以及自定义路径

当使用 BertTokenizer.from_pretrained(bert-base-uncased) 加载预训练的 BERT 模型时&#xff0c;Hugging Face 的 transformers 库会从 Hugging Face Model Hub 下载所需的模型文件和分词器文件&#xff08;如果它们不在本地缓存中&#xff09;。 默认情况下&#xff0c;这些…...

从 0 开始上手 Solana 智能合约

Solana CLI 基础知识 Solana CLI 是一个命令行界面工具&#xff0c;提供了一系列用于与 Solana Cluster 交互的命令。 我们将介绍一些最常见的命令&#xff0c;但你始终可以通过运行 solana --help 查看所有可能的 Solana CLI 命令列表。 Solana CLI 配置 Solana CLI 存储了…...

(六)CAN总线通讯

文章目录 CAN总线回环测试第一种基于板载CAN测试第一步确认板载是否支持第二步关闭 CAN 接口将 CAN 接口置于非活动状态第三步 配置 CAN 接口第一步 设置 CAN 接口比特率第二步 设置 CAN 启用回环模式第三步 启用 CAN 接口 第四步 测试CAN总线回环捕获 CAN 消息发送 CAN 消息 第…...

新一代智能工控系统网络安全合规解决方案

01.新一代智能工控系统概述 新一代智能工控系统是工业自动化的核心&#xff0c;它通过集成人工智能、工业大模型、物联网、5G等技术&#xff0c;实现生产过程的智能化管理和控制。这些系统具备实时监控、自动化优化、灵活调整等特点&#xff0c;能够提升生产效率、保证产品质量…...

Vivado中Tri_mode_ethernet_mac的时序约束、分析、调整——(一)时序约束的基本概念

1、基本概念 推荐阅读&#xff0c;Ally Zhou编写的《Vivado使用误区与进阶》系列文章&#xff0c;熟悉基本概念、tcl语句的使用。 《Vivado使用误区与进阶》电子书开放下载&#xff01;&#xff01; 2、Vivado中的语法例程 1&#xff09;语法例程 约束的语句可以参考vivado…...

车载网络:现代汽车的数字心跳

在汽车领域&#xff0c;“智能汽车”一词毫不夸张。如今的汽车已不再是原始的机械工程&#xff0c;而是通过先进的车载网络无缝连接的精密数字生态系统。这些滚动计算机由复杂的电子控制单元(ECU)网络提供动力&#xff0c;ECU是负责管理从发动机性能到信息娱乐系统等一切事务的…...

python基础和redis

1. Map函数 2. filter函数 numbers generate_numbers() filtered_numbers filter(lambda x: x % 2 0, numbers) for _ in range(5):print(next(filtered_numbers)) # 输出: 0 2 4 6 83. filter map 和 reduce 4. picking and unpicking 5. python 没有函数的重载&#xff0…...

w~自动驾驶~合集16

我自己的原文哦~ https://blog.51cto.com/whaosoft/12765612 #SIMPL 用于自动驾驶的简单高效的多智能体运动预测基准 原标题&#xff1a;SIMPL: A Simple and Efficient Multi-agent Motion Prediction Baseline for Autonomous Driving 论文链接&#xff1a;https://ar…...

最长的指定瑕疵度的元音子串

一、题目 最长的指定瑕疵度的元音子串 定义&#xff1a;开头和结尾都是元音字母&#xff08;aeiouAEIOU&#xff09;的字符串为 元音字符串 &#xff0c;其中混杂的非元音字母数量为其 瑕疵度 。比如: “a” 、 "aa"是元音字符串&#xff0c;其瑕疵度都为0 "aiu…...

每日算法Day15【组合、组合总和III、电话号码的字母组合】

77. 组合 算法链接: 77. 组合 - 力扣&#xff08;LeetCode&#xff09; 类型: 回溯 难度: 中等 回溯三步法&#xff1a; 1、确定参数返回值 2、确定终止条件 3、单层搜索逻辑 剪枝操作&#xff1a; 当path容量超过k时的数据可以不用遍历&#xff0c;故遍历边界条件判断: …...

C语言教程——指针进阶(2)

目录 一、函数指针数组 1.1函数指针数组写法 1.2函数指针用途 二、指向函数指针数组的指针 2.1概念 三、回调函数 3.1用法 3.2qsort排序 总结 前言 我们接着上一篇的函数指针往下学习。 一、函数指针数组 1.1函数指针数组写法 我们都知道指针数组&#xff0c;里面可以…...

调和级数不为整数的证明

文章目录 1. 问题引入2. 证明2.1 引理12.2 引理22.3 引理3&#xff1a;2.4 核心证明&#xff1a; 3. 参考 1. 问题引入 s ( n ) 1 1 2 1 3 ⋯ 1 n , n ∈ N ∗ , n ≥ 2 s(n) 1\frac{1}{2}\frac{1}{3}\cdots\frac{1}{n}, \quad \\n \in N^*, n \ge2 s(n)121​31​⋯n1​,…...