【kafka系列】Kafka如何保证消息不丢失?
目录
1. 生产者端:确保消息成功发送到Broker
核心机制:
关键步骤:
2. Broker端:持久化与副本同步
核心机制:
关键源码逻辑:
3. 消费者端:可靠消费与Offset提交
核心机制:
关键步骤:
4. 全链路保障流程
消息丢失的典型场景与规避
总结
- 生产者端:
- 设置
acks=all确保所有ISR副本写入成功。- 启用重试(
retries)和幂等性(enable.idempotence=true,依赖ProducerId和SequenceNumber)。
- Broker端:
- 副本数
replication.factor≥3,ISR最小副本数min.insync.replicas≥2。- 使用
flush机制定期刷盘(通过log.flush.interval.messages配置)。
- 消费者端:
- 手动提交Offset(
enable.auto.commit=false),处理完消息后调用commitSync()
Kafka通过生产者端确认机制、Broker端持久化与副本同步、消费者端可靠消费三个核心环节保障消息不丢失。以下是具体实现机制与步骤:
1. 生产者端:确保消息成功发送到Broker
核心机制:
acks确认机制:
-
acks=0:生产者不等待Broker确认,可能丢失消息(不推荐)。acks=1:Leader副本写入即确认,若Leader宕机且未同步到其他副本,可能丢失。acks=all(或acks=-1):必须等待所有ISR副本写入成功,才返回确认(最高可靠性)。
- 重试机制:
-
- 配置
retries=N(如3次),在Broker临时故障时自动重试。 - 幂等性(
enable.idempotence=true):通过Producer ID和Sequence Number去重,避免网络重试导致消息重复。
- 配置
关键步骤:
// 生产者配置示例
Properties props = new Properties();
props.put("acks", "all"); // 必须所有ISR副本确认
props.put("retries", 3); // 重试次数
props.put("enable.idempotence", "true"); // 开启幂等性
2. Broker端:持久化与副本同步
核心机制:
- 副本机制(Replication):
-
- 每个Partition有多个副本(
replication.factor≥3),Leader处理读写,Follower同步数据。 - ISR(In-Sync Replicas):只有与Leader保持同步的副本才属于ISR集合。
min.insync.replicas=2:至少需要2个ISR副本写入成功,否则生产者抛出NotEnoughReplicasException。
- 每个Partition有多个副本(
- 持久化策略:
-
- 页缓存(Page Cache):依赖操作系统缓存加速写入,数据异步刷盘。
- 强制刷盘:通过
log.flush.interval.messages和log.flush.interval.ms控制刷盘频率(高可靠性场景建议启用)。
- Leader选举与数据恢复:
-
- 若Leader宕机,Controller从ISR中选举新Leader,确保数据不丢失。
- 若所有ISR副本宕机,需配置
unclean.leader.election.enable=false(禁止非ISR副本成为Leader)。
关键源码逻辑:
- 副本同步:Leader通过
ReplicaFetcherThread向Follower同步数据(源码见kafka.server.ReplicaFetcherThread)。 - ISR管理:Broker定期检查Follower的同步状态,延迟超过
replica.lag.time.max.ms的副本会被移出ISR。
3. 消费者端:可靠消费与Offset提交
核心机制:
- 手动提交Offset:
-
- 关闭自动提交(
enable.auto.commit=false),在消息处理完成后手动调用commitSync()或commitAsync()。 - 若消费者崩溃,下次启动时从最后提交的Offset恢复,避免消息丢失。
- 关闭自动提交(
- 事务性消费:
-
- 结合Kafka事务(
isolation.level=read_committed),仅消费已提交的事务消息。
- 结合Kafka事务(
关键步骤:
// 消费者配置示例
props.put("enable.auto.commit", "false"); // 关闭自动提交
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {processRecord(record); // 处理消息consumer.commitSync(); // 处理完成后提交Offset}
}
4. 全链路保障流程
- 生产者发送:
-
- 消息发送后等待
acks=all确认。 - 若Broker未确认,按
retries重试。
- 消息发送后等待
- Broker持久化:
-
- Leader和ISR副本将消息写入日志文件。
- 根据配置决定是否强制刷盘。
- 消费者消费:
-
- 处理消息后手动提交Offset。
- 若消费者崩溃,从已提交Offset恢复。
消息丢失的典型场景与规避
| 场景 | 规避措施 |
| 生产者 ,Leader宕机 | 使用 + 。 |
| ISR副本不足导致写入失败 | 增加 ,确保 ≤ 当前ISR副本数。 |
| 消费者自动提交Offset,消息未处理 | 关闭自动提交,处理完成后手动提交。 |
| 磁盘故障导致数据丢失 | 使用RAID或分布式存储,确保多副本分布在不同物理节点。 |
总结
Kafka通过以下组合策略保障消息不丢失:
- 生产者端:
acks=all+ 幂等性 + 重试。 - Broker端:多副本同步 + ISR管理 + 强制刷盘。
- 消费者端:手动提交Offset + 事务性消费。
正确配置后,Kafka可提供至少一次(At-Least-Once)或精确一次(Exactly-Once) 的语义保障。
相关文章:
【kafka系列】Kafka如何保证消息不丢失?
目录 1. 生产者端:确保消息成功发送到Broker 核心机制: 关键步骤: 2. Broker端:持久化与副本同步 核心机制: 关键源码逻辑: 3. 消费者端:可靠消费与Offset提交 核心机制: 关…...
AtCoder Beginner Contest 393 —— E - GCD of Subset 补题 + 题解 python
AtCoder Beginner Contest 393 E - GCD of Subset Problem Statement You are given a sequence A ( A 1 , A 2 , … , A N ) A (A_1, A_2, \dots, A_N) A(A1,A2,…,AN) of length N N N and a positive integer K K K (at most N N N). For each i 1 , 2 , … …...
vue3响应式丢失解决办法(三)
vue3的响应式的理解,与普通对象的区别(一) vue3 分析总结响应式丢失问题原因(二) 经过前面2篇文章,知道了响应式为什么丢失了,但是还是碰到了丢失情况,并且通过之前的内容还不能解…...
BY组态:构建灵活、可扩展的自动化系统
引言 在现代工业自动化领域,BY组态(Build Your Own Configuration)作为一种灵活、可扩展的解决方案,正逐渐成为工程师和系统集成商的首选。BY组态允许用户根据具体需求自定义系统配置,从而优化生产效率、降低成本并提…...
2025 (ISC)²CCSP 回忆录
2025.1.20 广州,周一,我一次性通过了CCSP的考试。 为什么要考证? 个人成长所需 职业热情:做一行爱一行,既然我投入了美好的青春年华到网络安全行业当中,那么对于这个行业最有权威的认证,是肯定…...
强化学习笔记7——DDPG到TD3
前提:基于TD 的方法多少都会有高估问题,即Q值偏大。原因两个:一、TD目标是真实动作的高估。 二:自举法高估。 DDPG 属于AC方法:异策略,适合连续动作空间,因为他的策略网络直接输出的动作&#…...
win10 系统 自定义Ollama安装路径 及模型下载位置
win10 系统 自定义Ollama安装路径 及模型下载位置 由于Ollama的exe安装软件双击安装的时候默认是在C盘,以及后续的模型数据下载也在C盘,导致会占用C盘空间,所以这里单独写了一个自定义安装Ollama安装目录的教程。 Ollama官网地址࿱…...
-bash:/usr/bin/rm: Argument list too long 解决办法
问题概述 小文件日志太多导致无法使用rm命令,因为命令行参数列表的长度超过了系统允许的最大值。 需要删除/tmp目录下的所有文件,文件数量比较多。 ls -lt /tmp | wc -l 5682452 解决方法如下: 使用find -exec 遍历,然后执行删…...
内容中台重构企业内容管理流程驱动智能协作升级
内容概要 内容中台作为企业数字化转型的核心基础设施,通过技术架构革新与功能模块整合,重构了传统内容管理流程的底层逻辑。其核心价值在于构建动态化、智能化的内容生产与流转体系,将分散的创作、存储、审核及分发环节纳入统一平台管理。基…...
python实现YouTube关键词爬虫(2025/02/11)
在当今数字化时代,YouTube作为全球最大的视频分享平台之一,拥有海量的视频资源。无论是进行市场调研、内容创作还是学术研究,能够高效地获取YouTube上的相关视频信息都显得尤为重要。今天,我将为大家介绍一个基于Python实现的YouT…...
【效率技巧】怎么做思维导图||数学思维||费曼学习法
目录标题 常见问题:认知误区和建议:思维导图按照功能分类思维导图好处步骤(拆解的步骤) 常见问题: 1、做好的思维导图浪费时间 2、做简单的思维导图没有效果 认知误区和建议: 1、做思维导图工具…...
LabVIEW与USB设备开发
开发一台USB设备并使用LabVIEW进行上位机开发,涉及底层驱动的编写、USB通信协议的实现以及LabVIEW与设备的接口设计。本文将详细介绍如何开发USB设备驱动、实现LabVIEW与USB设备的通信以及优化数据传输,帮助用户顺利完成项目开发。下面是一个详细的说明&…...
动态规划LeetCode-416.分割等和子集
给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集,使得两个子集的元素和相等。 示例 1: 输入:nums [1,5,11,5] 输出:true 解释:数组可以分割成 [1, 5, 5] 和 [11] 。 示例 2&…...
云原生(五十五) | ECS中自建数据库迁移到RDS
文章目录 ECS中自建数据库迁移到RDS 一、场景说明 二、ECS中自建数据库迁移到RDS实现步骤 三、 创建wordpress数据库 四、登录ECS导出wordpress数据库 五、返回RDS数据库管理控制台 六、开启外网地址并设置白名单 七、获取RDS外网访问地址 八、重新设置wordpress的wp-…...
【吾爱出品】 视频批量分段工具
视频批量分段工具 链接:https://pan.xunlei.com/s/VOJDvtHQE7GOiJ84WNea5Ay1A1?pwd5nta# 选择视频文件 启动程序后,点击 "文件" 菜单下的 "选择视频文件" 按钮,或者直接将视频文件拖放到程序窗口中的视频列表区域。支…...
HTML【详解】input 标签
input 标签主要用于接收用户的输入,随 type 属性值的不同,变换其具体功能。 通用属性 属性属性值功能name字符串定义输入字段的名称,在表单提交时,服务器通过该名称来获取对应的值disabled布尔值禁用输入框,使其无法被…...
二叉搜索树的实现(C++)
前言 二叉搜索树(搜索二叉树,Binary search tree)是一种特殊的二叉树。其规则为:左子树的值一定小于等于根,右子树的值一定大于等于根,并且左右子树也为搜索二叉树。 二叉搜索树的插入 1.若树为空…...
vue2老版本 npm install 安装失败_安装卡主
vue2老版本 npm install 安装失败_安装卡主 特别说明:vue2老版本安装慢、运行慢,建议升级vue3element plus vite 解决方案1: 第一步、修改npm 镜像为国内镜像 使用淘宝镜像: npm config set registry https://registry.npmmir…...
【MySQL】索引篇
1.什么时候适用索引? 字段有唯一限制,比如商品编码经常用于where查询条件的字段经常用于group by和order by 的字段 2.什么时候不需要创建索引? 字段中存在大量重复经常更新的字段表数据太少的时候 where条件、group by,order by里…...
Arduino 第十六章:pir红外人体传感器练习
Arduino 第十六章:PIR 传感器练习 一、引言 在 Arduino 的众多有趣项目中,传感器的应用是非常重要的一部分。今天我们要学习的主角是 PIR(被动红外)传感器。PIR 传感器能够检测人体发出的红外线,常用于安防系统、自动…...
docker详细操作--未完待续
docker介绍 docker官网: Docker:加速容器应用程序开发 harbor官网:Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台,用于将应用程序及其依赖项(如库、运行时环…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路
进入2025年以来,尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断,但全球市场热度依然高涨,入局者持续增加。 以国内市场为例,天眼查专业版数据显示,截至5月底,我国现存在业、存续状态的机器人相关企…...
【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
redis和redission的区别
Redis 和 Redisson 是两个密切相关但又本质不同的技术,它们扮演着完全不同的角色: Redis: 内存数据库/数据结构存储 本质: 它是一个开源的、高性能的、基于内存的 键值存储数据库。它也可以将数据持久化到磁盘。 核心功能: 提供丰…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
aardio 自动识别验证码输入
技术尝试 上周在发学习日志时有网友提议“在网页上识别验证码”,于是尝试整合图像识别与网页自动化技术,完成了这套模拟登录流程。核心思路是:截图验证码→OCR识别→自动填充表单→提交并验证结果。 代码在这里 import soImage; import we…...
