【kafka系列】Kafka如何保证消息不丢失?
目录
1. 生产者端:确保消息成功发送到Broker
核心机制:
关键步骤:
2. Broker端:持久化与副本同步
核心机制:
关键源码逻辑:
3. 消费者端:可靠消费与Offset提交
核心机制:
关键步骤:
4. 全链路保障流程
消息丢失的典型场景与规避
总结
- 生产者端:
- 设置
acks=all确保所有ISR副本写入成功。- 启用重试(
retries)和幂等性(enable.idempotence=true,依赖ProducerId和SequenceNumber)。
- Broker端:
- 副本数
replication.factor≥3,ISR最小副本数min.insync.replicas≥2。- 使用
flush机制定期刷盘(通过log.flush.interval.messages配置)。
- 消费者端:
- 手动提交Offset(
enable.auto.commit=false),处理完消息后调用commitSync()
Kafka通过生产者端确认机制、Broker端持久化与副本同步、消费者端可靠消费三个核心环节保障消息不丢失。以下是具体实现机制与步骤:
1. 生产者端:确保消息成功发送到Broker
核心机制:
acks确认机制:
-
acks=0:生产者不等待Broker确认,可能丢失消息(不推荐)。acks=1:Leader副本写入即确认,若Leader宕机且未同步到其他副本,可能丢失。acks=all(或acks=-1):必须等待所有ISR副本写入成功,才返回确认(最高可靠性)。
- 重试机制:
-
- 配置
retries=N(如3次),在Broker临时故障时自动重试。 - 幂等性(
enable.idempotence=true):通过Producer ID和Sequence Number去重,避免网络重试导致消息重复。
- 配置
关键步骤:
// 生产者配置示例
Properties props = new Properties();
props.put("acks", "all"); // 必须所有ISR副本确认
props.put("retries", 3); // 重试次数
props.put("enable.idempotence", "true"); // 开启幂等性
2. Broker端:持久化与副本同步
核心机制:
- 副本机制(Replication):
-
- 每个Partition有多个副本(
replication.factor≥3),Leader处理读写,Follower同步数据。 - ISR(In-Sync Replicas):只有与Leader保持同步的副本才属于ISR集合。
min.insync.replicas=2:至少需要2个ISR副本写入成功,否则生产者抛出NotEnoughReplicasException。
- 每个Partition有多个副本(
- 持久化策略:
-
- 页缓存(Page Cache):依赖操作系统缓存加速写入,数据异步刷盘。
- 强制刷盘:通过
log.flush.interval.messages和log.flush.interval.ms控制刷盘频率(高可靠性场景建议启用)。
- Leader选举与数据恢复:
-
- 若Leader宕机,Controller从ISR中选举新Leader,确保数据不丢失。
- 若所有ISR副本宕机,需配置
unclean.leader.election.enable=false(禁止非ISR副本成为Leader)。
关键源码逻辑:
- 副本同步:Leader通过
ReplicaFetcherThread向Follower同步数据(源码见kafka.server.ReplicaFetcherThread)。 - ISR管理:Broker定期检查Follower的同步状态,延迟超过
replica.lag.time.max.ms的副本会被移出ISR。
3. 消费者端:可靠消费与Offset提交
核心机制:
- 手动提交Offset:
-
- 关闭自动提交(
enable.auto.commit=false),在消息处理完成后手动调用commitSync()或commitAsync()。 - 若消费者崩溃,下次启动时从最后提交的Offset恢复,避免消息丢失。
- 关闭自动提交(
- 事务性消费:
-
- 结合Kafka事务(
isolation.level=read_committed),仅消费已提交的事务消息。
- 结合Kafka事务(
关键步骤:
// 消费者配置示例
props.put("enable.auto.commit", "false"); // 关闭自动提交
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {processRecord(record); // 处理消息consumer.commitSync(); // 处理完成后提交Offset}
}
4. 全链路保障流程
- 生产者发送:
-
- 消息发送后等待
acks=all确认。 - 若Broker未确认,按
retries重试。
- 消息发送后等待
- Broker持久化:
-
- Leader和ISR副本将消息写入日志文件。
- 根据配置决定是否强制刷盘。
- 消费者消费:
-
- 处理消息后手动提交Offset。
- 若消费者崩溃,从已提交Offset恢复。
消息丢失的典型场景与规避
| 场景 | 规避措施 |
| 生产者 ,Leader宕机 | 使用 + 。 |
| ISR副本不足导致写入失败 | 增加 ,确保 ≤ 当前ISR副本数。 |
| 消费者自动提交Offset,消息未处理 | 关闭自动提交,处理完成后手动提交。 |
| 磁盘故障导致数据丢失 | 使用RAID或分布式存储,确保多副本分布在不同物理节点。 |
总结
Kafka通过以下组合策略保障消息不丢失:
- 生产者端:
acks=all+ 幂等性 + 重试。 - Broker端:多副本同步 + ISR管理 + 强制刷盘。
- 消费者端:手动提交Offset + 事务性消费。
正确配置后,Kafka可提供至少一次(At-Least-Once)或精确一次(Exactly-Once) 的语义保障。
相关文章:
【kafka系列】Kafka如何保证消息不丢失?
目录 1. 生产者端:确保消息成功发送到Broker 核心机制: 关键步骤: 2. Broker端:持久化与副本同步 核心机制: 关键源码逻辑: 3. 消费者端:可靠消费与Offset提交 核心机制: 关…...
新建github操作
1.在github.com的主页根据提示新建一个depository。 2.配置用户名和邮箱 git config --global user.name "name" git config --global user.email "email" 3.生成ssh秘钥 ssh-keygen -t rsa 找到public key 对应的文件路径 cat /root/.ssh/id_rsa 复制显…...
第 15 天:数据存储,打造存档 读取系统!
🎯 目标: ✅ 掌握 UE5 SaveGame 存档系统 ✅ 在 C 创建存档类,存储游戏数据 ✅ 实现存档 & 读取功能,让游戏状态可持久化 ✅ 在 BP_PlayerCharacter 里实现: * 游戏开始时自动加载存档 * 玩家受到伤害时自动存档 …...
Flutter 异步编程利器:Future 与 Stream 深度解析
目录 一、Future:处理单次异步操作 1. 概念解读 2. 使用场景 3. 基本用法 3.1 创建 Future 3.2 使用 then 消费 Future 3.3 特性 二、Stream:处理连续异步事件流 1. 概念解读 2. 使用场景 3. 基本用法 3.1 创建 Stream 3.2 监听 Stream 3.…...
Java短信验证功能简单使用
注册登录阿里云官网:https://www.aliyun.com/ 搜索短信服务 自己一步步申请就可以了 开发文档: https://next.api.aliyun.com/api-tools/sdk/Dysmsapi?version2017-05-25&languagejava-tea&tabprimer-doc 1.引入依赖 <dependency>…...
React进阶之React核心源码解析(一)
React核心源码解析 react 特点CPU卡顿IO 卡顿 新老 react 架构对比v15v16.8Scheduler 调度器Reconciler 协调器 React fiber原理更新dommount 构建过程 render阶段 — scheduler reconcilerreact源码解析react-domreact-dom/src/client/ReactDOMRoot.js react-reconcilerreact-…...
【Vue】打包vue3+vite项目发布到github page的完整过程
文章目录 第一步:打包第二步:github仓库设置第三步:安装插件gh-pages第四步:两个配置第五步:上传github其他问题1. 路由2.待补充 参考文章: 环境: vue3vite windows11(使用终端即可&…...
类加载机制及双亲委派模型
一、引言 二、类加载流程 1. 加载 2. 连接 2.1 验证 2.2 准备 2.3 解析 3. 初始化 三、类加载器 类加载器的类型 双亲委派模型 打破双亲委派模型 双亲委派模型优点 一、引言 在 Java 的运行机制中,类加载是一个至关重要的环节。它不仅决定了 Java 程序的动态…...
tcp/ip协议设置参数,tcp/ip协议6设置
TCP/IP协议设置参数主要涉及到IP地址、子网掩码、网关地址以及DNS服务器地址等关键参数。这些参数的配置确保了网络设备能够正确地接入互联网并与其他设备进行通信。以下是对这些参数设置的详细说明: 1. IP地址 定义:IP地址是互联网中用于唯一标识每一…...
如何在Java EE中使用标签库?
在Java EE(现在称为Jakarta EE)中使用标签库(Tag Library),主要是通过JSP标准标签库(JSTL)或自定义标签库来实现的。标签库允许在JSP页面中使用自定义的标签,从而简化页面逻辑、增强…...
【java】方法的基本内存原理(栈和堆)
java内存主要分为栈和堆,方法相关的部分主要在栈内存里,每个方法调用时会在栈里创建一个栈帧,存放局部变量和方法执行的信息。执行完后栈帧被销毁,局部变量消失。而对象实例存在堆里,由垃圾回收器管理。 **Java方法内…...
今日AI和商界事件(2025-02-15)
根据2025年2月15日的科技动态,以下是今日AI领域的重要事件及相关进展总结: 1. DeepSeek日活突破3000万,开源生态加速AI普惠 里程碑意义:开源大模型DeepSeek宣布日活跃用户数突破3000万,其R1模型凭借开源策略和低成本优…...
尚硅谷课程【笔记】——大数据之Hadoop【一】
课程视频链接:尚硅谷Hadoop3.x教程 一、大数据概论 1)大数据概念 大数据(Big Data):指无法再一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发…...
SQL 建表语句详解
SQL 建表语句详解 在 SQL 中,创建表(Table)是数据库设计的基础。表是存储数据的基本单位,每个表由行和列组成。创建表的过程涉及到定义表的结构,包括列名、数据类型、约束等。本文将详细介绍 SQL 中的建表语句&#x…...
wordpress主题插件开发中高频使用的38个函数
核心模板函数 get_header()/get_footer()/get_sidebar() – 加载模板部件 the_title()/the_content()/the_excerpt() – 显示文章标题、内容、摘要 the_post() – 循环中获取文章数据 bloginfo(‘url’) – 获取站点URL wp_head()/wp_footer() – 输出头部/尾部代码 wp_n…...
DockerFile优化镜像体积
title: DockerFile优化镜像体积 date: 2025-02-15 15:22:40 tags: DockerFile优化镜像体积DockerFile优化镜像体积 DockerFile优化镜像体积前文回顾:一、细数优化镜像体积的思路与方式二、优化Dockfile文件编辑 Dockerfile2文件三、构建镜像四、运行镜像五、查看运行效果原文 …...
使用 playwright 自定义 js 下载的路径和文件名
遇到一个问题,点击按钮自动下载文件,路径和文件名都不能自定义,可以用 playwright 来解决这个问题 from playwright.sync_api import sync_playwright import os import time class ExcelDownloader: def __init__(self, download_pat…...
Open FPV VTX开源之OSD使用分类
Open FPV VTX开源之OSD使用分类 1. 源由2. 硬件2.1 【天空端】SigmaStar2.2 【天空端】Raspberry Pi2.3 【地面端】 3. 软件3.1 天空端软件3.2 地面端软件 4. 分类4.1 嵌入式OSD分类A1-嵌入式OSD:SigmaStar Android分类A2-嵌入式OSD:SigmaStar Hi3536分…...
题解:洛谷 P4113 [HEOI2012] 采花
题目https://www.luogu.com.cn/problem/P4113 运用类似于P1972 [SDOI2009] HH的项链的操作,将数据离线下来处理。 按照区间右端点从小到大排序。 问题是数量大于等于 的时候才能算进去。 于是乎我们用两个数组维护倒数第二次出现和最后一次出现的地方。 每次在…...
linux概念详解
用户守护进程 用户空间守护进程是一些在后台运行的长期服务程序,提供系统级服务。 下面举一些例子。 网络服务: 如sshd(SSH服务)、httpd(HTTP服务)。 sshd:sshd 守护进程会在后台运行&#x…...
easyexcel快速使用
1.easyexcel EasyExcel是一个基于ava的简单、省内存的读写Excel的开源项目。在尽可能节约内存的情况下支持读写百M的Excel 即通过java完成对excel的读写操作, 上传下载 2.easyexcel写操作 把java类中的对象写入到excel表格中 步骤 1.引入依赖 <depen…...
fetch() 与 XMLHttpRequest 的差异
fetch() 与 XMLHttpRequest 的差异 fetch() 的功能与 XMLHttpRequest 基本相同,都是向服务器发出 HTTP 请求,但有三个主要的差异。 (1)fetch()使用 Promise,不使用回调函数,因此大大简化了写法࿰…...
【java面向对象的三大特性】封装、继承和多态
目录标题 一、封装(Encapsulation):二、继承(Inheritance):三、多态(Polymorphism):1. 多态的三个必要条件:2.多态的具体实现:3.多态的使用场景&a…...
c# textbox 设置不获取光标
[DllImport("user32",EntryPoint "HideCaret")] private static extern bool HideCaret(IntPtr hWnd); //需引入命名空间using System.Runtime.InteropServices; private void Txt_RecInfo_MouseDown(object sender, MouseEventArgs e) { …...
算法13-BFPRT算法
一、BFPRT 算法概念 BFPRT 算法(Blum-Floyd-Pratt-Rivest-Tarjan 算法)是一种用于在无序数组中快速找到第 k 小(或第 k 大)元素的高效算法。它的时间复杂度为 O(n),在最坏情况下也能保证线性时间复杂度。BFPRT 算法的…...
android studio下载安装汉化-Flutter安装
1、下载android studio官方地址:(这个网址可能直接打不开,需要VPN) https://developer.android.com/studio?hlzh-cn mac版本分为X86和arm版本,电脑显示芯片是Inter的就是x86的,显示m1和m2的就是arm的 …...
Seaweedfs(master volume filer) docker run参数帮助文档
文章目录 进入容器后执行获取weed -h英文中文 weed server -h英文中文 weed volume -h英文中文 关键点测试了一下,这个-volume.minFreeSpace string有点狠,比如设置值为10(10%),它直接给系统只留下10%的空间࿰…...
嵌套调用实现数组元素逆序存放
主函数调用reverse_array(int ptr[],int cnt)函数,该函数在调用inplace_swap(int *x,int *y)函数时,把两个不同的地址送给inplace_swap(int *x,int *y)函数,实现这两个位置处元素的交换。 令*xa,*yb 则*y *x^*y执行后,*xa,*ya^b…...
【工业安全】-CVE-2022-35555- Tenda W6路由器 命令注入漏洞
文章目录 1.漏洞描述 2.环境搭建 3.漏洞复现 4.漏洞分析 4.1:代码分析 4.2:流量分析 5.poc代码: 1.漏洞描述 漏洞编号:CVE-2022-35555 漏洞名称:Tenda W6 命令注入 威胁等级:高危 漏洞详情࿱…...
Spark 和 Flink
Spark 和 Flink 都是目前流行的大数据处理引擎,但它们在架构设计、应用场景、性能和生态方面有较大区别。以下是详细对比: 1. 架构与核心概念 方面Apache SparkApache Flink计算模型微批(Micro-Batch)为主,但支持结构…...
