【kafka系列】生产者
目录
发送流程
1. 流程逻辑分析
阶段一:主线程处理
阶段二:Sender 线程异步发送
核心设计思想
2. 流程
关键点总结
重要参数
一、核心必填参数
二、可靠性相关参数
三、性能优化参数
四、高级配置
五、安全性配置(可选)
六、错误处理与监控
典型配置示例
关键注意事项
发送流程
- 序列化与分区:消息通过
Partitioner选择目标分区(默认轮询或哈希),序列化后加入RecordAccumulator缓冲区。- 批次合并:
Sender线程将同一分区的消息合并为ProducerBatch,减少网络请求(源码见Sender.run()方法)。- 发送至Broker:通过
NetworkClient异步发送,Broker的LogAppendTime处理写入请求。- ACK机制:根据
acks配置(0/1/all)等待Broker确认,通过Metadata类更新分区元数据
1. 流程逻辑分析
Kafka 生产者发送消息的核心流程分为 主线程处理 和 Sender 线程异步发送 两个阶段,具体步骤如下:
阶段一:主线程处理
- 创建 ProducerRecord
-
- 用户调用
producer.send(ProducerRecord),指定 Topic、Key、Value 和可选的分区或时间戳。
- 用户调用
- 选择分区(Partition)
-
- 若未指定分区,根据以下规则选择:
-
-
- 有 Key:对 Key 哈希取模(
hash(key) % 分区数),确保相同 Key 的消息进入同一分区。 - 无 Key:默认使用粘性分区策略(Sticky Partitioning,Kafka 2.4+),在批次填满或超时前发送到同一分区,提升性能。
- 有 Key:对 Key 哈希取模(
-
- 序列化(Serialize)
-
- 使用配置的
key.serializer和value.serializer对 Key 和 Value 序列化(如StringSerializer、ByteArraySerializer)。
- 使用配置的
- 追加到缓冲区(RecordAccumulator)
-
- 将消息按 Topic-Partition 分组,存入
RecordAccumulator的批次(Batch)中。 - 批次策略:
- 将消息按 Topic-Partition 分组,存入
-
-
batch.size:批次大小阈值(默认 16KB),达到阈值立即发送。linger.ms:批次等待时间(默认 0ms),超时后发送未满批次。
-
阶段二:Sender 线程异步发送
- Sender 线程拉取批次
-
- Sender 线程定期检查缓冲区,将满足条件的批次(已满或超时)封装为
ProducerRequest。
- Sender 线程定期检查缓冲区,将满足条件的批次(已满或超时)封装为
- 构建请求并发送到 Broker
-
- 根据分区的 Leader 副本所在 Broker,将请求发送到对应的节点。
- 关键配置:
-
-
acks:控制消息持久化确认级别:
-
-
-
-
0:不等待确认(可能丢失数据)。1:等待 Leader 确认(默认)。all:等待所有 ISR 副本确认(最高可靠性)。
-
-
-
-
max.in.flight.requests.per.connection:控制单个 Broker 的未确认请求数(默认 5)。
-
- 处理 Broker 响应
-
- 成功:触发用户设置的
Callback回调,并释放批次内存。 - 失败:
- 成功:触发用户设置的
-
-
- 可重试错误(如网络抖动、Leader 切换):根据
retries(默认 0)和retry.backoff.ms(默认 100ms)重试。 - 不可重试错误(如消息过大):直接触发回调并抛出异常。
- 可重试错误(如网络抖动、Leader 切换):根据
-
核心设计思想
- 异步批处理:通过缓冲区合并小消息,减少网络 I/O 次数。
- 零拷贝优化:使用
sendfile系统调用提升网络传输效率。 - 高可靠性:通过重试机制和
acks=all确保消息不丢失。
2. 流程

关键点总结
- 分区选择:优先使用 Key 哈希或粘性分区策略,保证消息顺序性和吞吐量。
- 批次优化:通过
batch.size和linger.ms平衡延迟与吞吐。 - 可靠性保障:通过
acks和retries配置确保消息持久化。 - 异步处理:主线程与 Sender 线程解耦,避免阻塞用户逻辑。
重要参数
以下是 Kafka 生产者(Producer)在日常开发中的 常见配置参数 及其作用,按功能分类整理成表格:
一、核心必填参数
| 参数名 | 默认值 | 说明 |
|
| 无 | Kafka 集群地址列表(逗号分隔,如 )。 |
|
| 无 | Key 的序列化类(如 )。 |
|
| 无 | Value 的序列化类(同上)。 |
二、可靠性相关参数
| 参数名 | 默认值 | 说明 |
|
|
| 消息持久化确认机制:
|
|
|
| 发送失败后的重试次数(建议设为 配合 )。 |
|
|
| 是否启用幂等性( 和 |
|
|
| 单个 Broker 的未确认请求数。若启用幂等性,建议设为 以保证顺序。 |
三、性能优化参数
| 参数名 | 默认值 | 说明 |
|
|
| 消息在缓冲区等待时间(毫秒),增大可提升吞吐量(但增加延迟)。 |
|
|
(16KB) | 单个批次的大小阈值,达到阈值后立即发送。 |
|
|
(32MB) | 生产者缓冲区的总内存大小。 |
|
|
| 消息压缩算法( 、 、 、 ),减少网络带宽占用。 |
四、高级配置
| 参数名 | 默认值 | 说明 |
|
|
(30秒) | 生产者等待 Broker 响应的超时时间。 |
|
|
(60秒) | 生产者缓冲区满或元数据不可用时的阻塞时间(超时抛异常)。 |
|
| 默认轮询/哈希策略 | 自定义分区策略(实现 接口)。 |
五、安全性配置(可选)
| 参数名 | 默认值 | 说明 |
|
|
| 安全协议(如 、 )。 |
|
| 无 | SSL 证书路径(客户端认证时需配置)。 |
|
| 无 | SASL 认证机制(如 、 )。 |
六、错误处理与监控
| 参数名 | 默认值 | 说明 |
|
| 无 | 生产者拦截器(实现 接口),用于监控或修改消息。 |
|
|
(30秒) | 性能指标采样窗口时间。 |
典型配置示例
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 10);
props.put("linger.ms", 20);
props.put("batch.size", 32768);
props.put("compression.type", "snappy");
props.put("enable.idempotence", "true");
关键注意事项
- 可靠性 vs 性能:
-
acks=all和enable.idempotence=true提高可靠性,但可能降低吞吐量。- 增大
batch.size和linger.ms可提升吞吐量,但增加延迟。
- 幂等性限制:
-
- 需 Kafka 0.11+ 版本支持,且
max.in.flight.requests=1(或 Kafka 2.0+ 允许5)。
- 需 Kafka 0.11+ 版本支持,且
- 监控与调优:
-
- 通过
metrics和拦截器监控生产者性能,动态调整参数
- 通过
相关文章:
【kafka系列】生产者
目录 发送流程 1. 流程逻辑分析 阶段一:主线程处理 阶段二:Sender 线程异步发送 核心设计思想 2. 流程 关键点总结 重要参数 一、核心必填参数 二、可靠性相关参数 三、性能优化参数 四、高级配置 五、安全性配置(可选࿰…...
HCIA-路由器相关知识和面试问题
二、 路由器 2.1 关于路由器的知识 2.1.1 什么是路由器 路由器是一种网络层互联设备,主要用于连接多个逻辑上分开的网络,实现不同网络之间的数据路由和通信。它能根据网络层地址(如 IP 地址)来转发数据包,在网络中起…...
Unity 获取独立显卡数量
获取独立显卡数量 导入插件包打开Demo 运行看控制台日志 public class GetGraphicCountDemo : MonoBehaviour{public int count;// Start is called before the first frame updatevoid Start(){count this.GetIndependentGraphicsDeviceCount();}}...
【stm32】定时器输出PWM波形(hal库)
一. PWM基本原理 PWM是一种通过调节信号的占空比(Duty Cycle)来控制输出平均电压的技术。占空比是指高电平时间与整个周期时间的比值。例如: - 占空比为50%时,输出平均电压为电源电压的一半。 - 占空比为100%时,输出始…...
Deepseek R1模型本地化部署+API接口调用详细教程:释放AI生产力
文章目录 前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装ollama2部署DeepSeek R1模型删除已存在模型,以7b模型为例 三、DeepSeek API接口调用Cline配置 前言 随着最近人工智能 DeepSeek 的爆火,越来越多的技术大佬们开始关注如…...
Mac ARM 架构的命令行(终端)中,删除整行的快捷键是:Ctrl + U
在 Mac ARM 架构的命令行(终端)中,删除整行的快捷键是: Ctrl U这个快捷键会删除光标所在位置到行首之间的所有内容。如果你想删除光标后面的所有内容,可以使用: Ctrl K这两个快捷键可以帮助你快速清除当…...
用pytorch实现一个简单的图片预测类别
前言: 在阅读本文之前,你需要了解Python,Pytorch,神经网络的一些基础知识,比如什么是数据集,什么是张量,什么是神经网络,如何简单使用tensorboard,DataLoader。 本次模型训练使用的是…...
深度学习框架探秘|TensorFlow:AI 世界的万能钥匙
在人工智能(AI)蓬勃发展的时代,各种强大的工具和框架如雨后春笋般涌现,而 TensorFlow 无疑是其中最耀眼的明星之一。它不仅被广泛应用于学术界的前沿研究,更是工业界实现 AI 落地的关键技术。今天,就让我们…...
Linux: 调整套接字缓冲区大小相关内核参数
Linux: 调整套接字缓冲区大小相关内核参数 内核参数关于套接字缓冲区大小相关的设置,这些参数控制了TCP和UDP套接字的接收和发送缓冲区的最大值、默认值以及动态调整范围。 当前配置 net.core.rmem_max 212992 # 最大接收缓冲区大小(字节&#…...
Linux 服务器部署deepseek
把手教你在linux服务器部署deepseek,打造专属自己的数据库知识库 正文开始 第一步:安装Ollama 打开官方网址:https://ollama.com/download/linux 下载Ollama linux版本 复制命令到linux操作系统执行 [rootpostgresql ~]# curl -fsSL http…...
性能测试工具
Postman Postman 是一款功能强大的API开发协作平台,支持构建、测试和记录 API。这款工具提供了直观的图形界面来发送请求并查看响应数据。它还允许创建复杂的 HTTP 请求序列,并能通过内置脚本引擎实现自动化测试场景。 对于团队合作而言,Po…...
DeepSeek、Kimi、文心一言、通义千问:AI 大语言模型的对比分析
在人工智能领域,DeepSeek、Kimi、文心一言和通义千问作为国内领先的 AI 大语言模型,各自展现出了独特的特点和优势。本文将从技术基础、应用场景、用户体验和价格与性价比等方面对这四个模型进行对比分析,帮助您更好地了解它们的特点和优势。…...
反转链表2(92)
92. 反转链表 II - 力扣(LeetCode) 相关题目:206. 反转链表 - 力扣(LeetCode) 解法: /*** Definition for singly-linked list.* struct ListNode {* int val;* ListNode *next;* ListNode(…...
CSDN、markdown环境下如何插入各种图(流程图,时序图,甘特图)
流程图 横向流程图 mermaid graph LRA[方形] --> B{条件a}B -->|满足| C(圆角)B -->|不满足| D(圆角)C --> E[输出结果1]D --> E效果图: 竖向流程图 mermaid graph TDC{条件a} --> |a1| A[方形]C --> |a2| F[竖向流程图]A --> B(圆角)B …...
《探秘AI绿色计算:降低人工智能硬件能耗的热点技术》
在人工智能飞速发展的当下,其硬件能耗问题愈发凸显。据国际能源署预测,人工智能的能源消耗可能大幅增长。因此,降低人工智能硬件能耗,实现绿色计算,已成为行业关键课题。以下是一些正在崭露头角的热点技术。 新型硬件…...
《Foundation 起步》
《Foundation 起步》 引言 在当今快速发展的科技时代,了解并掌握最新的技术是至关重要的。本文旨在为初学者提供一个全面的《Foundation》起步指南,帮助大家快速入门并掌握这一强大的技术。 一、什么是Foundation? Foundation 是一个流行的前端框架,由 ZURB 公司开发。…...
【Elasticsearch】runtime_mappings搜索请求中定义运行时字段
在 Elasticsearch 中,在搜索请求中定义运行时字段(Runtime Fields)是一种强大的功能,允许用户在查询时动态添加和计算字段,而无需预先在索引映射中定义这些字段。这种方式提供了极大的灵活性,尤其是在处理动…...
unity学习40:导入模型的 Animations文件夹内容,动画属性和修改动画文件
目录 1 Animations文件夹内容 2 每个模型文件的4个标签 3 model 4 rig 动画类型 5 Animation 5.1 新增动画和修改动画 5.2 限制动画某个轴的变化,烘焙 6 material 材料 1 Animations文件夹内容 下面有很多文件夹每个文件夹都是不同的动作模型每个文件夹下…...
【ISO 14229-1:2023 UDS诊断全量测试用例清单系列:第十三节】
ISO 14229-1:2023 UDS诊断服务测试用例全解析(ControlDTCSetting_0x85服务) 作者:车端域控测试工程师 更新日期:2025年02月14日 关键词:UDS协议、0x85服务、DTC设置控制、NRC覆盖、ISO 14229-1:2023 一、服务功能概述…...
web第三次作业
弹窗案例 1.首页代码 <!DOCTYPE html><html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>综合案例</title><st…...
GMSL 实例1:当 MAX96717 遇上 MAX96724,打通 Camera 视频数据传输
新年伊始,继 Deepseek 在 AI 圈掀起风波之后。比亚迪在2月10日发布会上重磅官宣:全系车型将搭载自研的高阶智驾系统“天神之眼”,覆盖从10万元级入门车型到高端豪华车型的所有范围。此举如一颗重磅炸弹投向当前一卷再卷的新能源汽车赛道&…...
Python实现AWS Fargate自动化部署系统
一、背景介绍 在现代云原生应用开发中,自动化部署是提高开发效率和保证部署质量的关键。AWS Fargate作为一项无服务器计算引擎,可以让我们专注于应用程序开发而无需管理底层基础设施。本文将详细介绍如何使用Python实现AWS Fargate的完整自动化部署流程。 © ivwdcwso (ID…...
DeepSeek 助力 Vue 开发:打造丝滑的侧边栏(Sidebar)
前言:哈喽,大家好,今天给大家分享一篇文章!并提供具体代码帮助大家深入理解,彻底掌握!创作不易,如果能帮助到大家或者给大家一些灵感和启发,欢迎收藏关注哦 💕 目录 Deep…...
Android remount failed: Permission denied 失败解决方法
当我们adb remount 执行时有时候会报错: remount failed: Permission denied可以通过解决 adb disable-verity成功后会提示 Successfully disabled verity Now reboot your device for settings to take effect但如果设备此时OEM锁是关闭的则会,提示&…...
基于opencv的 24色卡IQA评测算法源码-可完全替代Imatest
1.概要 利用24色卡可以很快的分析到曝光误差,白平衡误差,噪声,色差,饱和度,gamma值。IQA或tuning工程一般用Imatest来手动计算,不便于产测部署,现利用opencv实现了imatest的全部功能,…...
数据结构与算法之排序算法-(计数,桶,基数排序)
排序算法是数据结构与算法中最基本的算法之一,其作用就是将一些可以比较大小的数据进行有规律的排序,而想要实现这种排序就拥有很多种方法~ 📚 非线性时间比较类: 那么我将通过几篇文章,将排序算法中各种算法细化的&a…...
【ISO 14229-1:2023 UDS诊断(会话控制0x10服务)测试用例CAPL代码全解析②】
ISO 14229-1:2023 UDS诊断【会话控制0x10服务】_TestCase02 作者:车端域控测试工程师 更新日期:2025年02月15日 关键词:UDS诊断、0x10服务、诊断会话控制、ECU测试、ISO 14229-1:2023 TC10-002测试用例 用例ID测试场景验证要点参考条款预期…...
MATLAB图像处理:图像特征概念及提取方法HOG、SIFT
图像特征是计算机视觉中用于描述图像内容的关键信息,其提取质量直接影响后续的目标检测、分类和匹配等任务性能。本文将系统解析 全局与局部特征的核心概念,深入讲解 HOG(方向梯度直方图)与SIFT(尺度不变特征变换&…...
kibana es 语法记录 elaticsearch
目录 一、认识elaticsearch 1、什么是正向索引 2、什么是倒排索引 二、概念 1、说明 2、mysql和es的对比 三、mapping属性 1、定义 四、CRUD 1、查看es中有哪些索引库 2、创建索引库 3、修改索引库 4、删除索引库 5、新增文档 6、删除文档 5、条件查询 一、认识…...
手写一个Java Android Binder服务及源码分析
手写一个Java Android Binder服务及源码分析 前言一、Java语言编写自己的Binder服务Demo1. binder服务demo功能介绍2. binder服务demo代码结构图3. binder服务demo代码实现3.1 IHelloService.aidl3.2 IHelloService.java(自动生成)3.3 HelloService.java…...
