Kafka 消息 0 丢失的最佳实践
文章目录
- Kafka 消息 0 丢失的最佳实践
- 生产者端的最佳实践
- 使用带有回调的 producer.send(msg, callback) 方法
- 设置 acks = all
- 设置 retries 为一个较大的值
- 启用幂等性与事务(Kafka 0.11+)
- 正确关闭生产者与 flush() 方法
- Broker 端的最佳实践
- 设置 unclean.leader.election.enable = false
- 设置 replication.factor >= 3
- 设置 min.insync.replicas > 1
- 确保 replication.factor > min.insync.replicas
- 优化 Broker 存储与磁盘配置
- 消费者端的最佳实践
- 确保消息消费完成再提交
- 处理 Rebalance 事件
- 异常重试与死信队列(DLQ)
- 业务维度的 0 丢失架构
- 本地消息表 + 定时扫描
- 监控与告警
- 结论
Kafka 消息 0 丢失的最佳实践
在分布式系统中,消息队列(如 Kafka)是核心组件之一,用于解耦系统、异步通信和流量削峰。
然而,消息丢失是生产环境中必须解决的关键问题。尽管 Kafka 本身设计为高可靠、高吞吐的系统,但在实际使用中,仍需通过合理的配置和最佳实践来确保消息的 0 丢失。
本文将详细介绍 Kafka 消息 0 丢失的最佳实践,涵盖生产者
、Broker
和消费者
三方面的配置与优化。
生产者端的最佳实践
使用带有回调的 producer.send(msg, callback) 方法
Kafka 的 producer.send(msg)
方法虽然可以发送消息,但它无法提供消息发送成功与否的反馈。为了确保消息发送的可靠性,必须使用带有回调的 producer.send(msg, callback)
方法。回调函数可以在消息发送成功或失败时通知开发者,从而在应用层执行适当的补救措施。
示例代码:
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')def callback(record_metadata, exception):if exception:print(f"Message failed to send: {exception}")else:print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} at offset {record_metadata.offset}")producer.send('my-topic', b'Hello, Kafka!', callback=callback)
设置 acks = all
acks
参数用于控制 Kafka 消息发送的确认机制。当 acks=all
时,Kafka 会要求所有副本的 Broker 都成功接收到消息后才认为消息“已提交”。这是 Kafka 提供的最严格的确认机制,能够有效防止消息丢失。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all' # 设置为 all 以确保所有副本都成功接收消息
)
acks = 0
(No acknowledgment)
在这种模式下,生产者在发送消息后不会等待任何确认。即,消息发送后立即返回,生产者不会知道消息是否成功到达 Kafka 集群。这种模式的性能最好,因为它不需要等待 Kafka 进行任何确认,但它的可靠性较差。
优点:
- 性能非常高,因为生产者发送完消息后就立即继续执行,不会等待任何确认。
- 延迟最小,适用于对消息丢失容忍度较高的场景。
缺点:
- 消息丢失的风险较高。如果消息在网络传输过程中丢失,生产者无法知道,因此无法做出补救。
- 对于大多数生产环境不建议使用,因为会丢失数据。
适用场景:
- 对消息丢失不敏感的场景,比如一些日志系统、缓存系统等。
acks = 1
(Leader acknowledgment)
在这种模式下,生产者会等待 Kafka 集群的 Leader 节点确认收到消息。Leader 节点收到消息后会立即向生产者发送确认,不需要等待副本节点的响应。如果 Leader 成功接收到消息,那么生产者会认为该消息已经成功发送。
优点:
- 相对于
acks=0
,可靠性更高,因为至少 Leader 节点会确认收到消息。 - 仍然保持较好的性能,延迟比
acks=all
要低。
缺点:
- 如果 Leader 收到消息后崩溃,但副本节点还未同步数据,消息可能会丢失。
- 不能保证消息最终会被所有副本保存。
适用场景:
- 对消息丢失容忍度较高,但仍希望比
acks=0
更加可靠的场景。
acks = all
(All acknowledgments)
在这种模式下,生产者会等待 Kafka 集群中所有副本的确认。即,生产者只有在所有副本都确认收到消息后才会认为消息发送成功。这是 Kafka 中最严格的消息确认机制,确保消息不会丢失。
优点:
- 提供最强的消息可靠性,因为只有当所有副本都接收到消息后,生产者才会收到成功确认。
- 即使 Kafka 集群的某些节点发生故障,消息依然可以保证不会丢失。
缺点:
- 性能较低,因为生产者需要等待所有副本的确认,增加了延迟。
- 可能导致较高的网络负载和集群负担,尤其在集群副本数较多时。
适用场景:
- 对消息可靠性要求极高的场景,比如金融交易系统、在线支付、订单处理等。
总结
acks=0
:适合对数据丢失不敏感且要求极高性能的场景。acks=1
:适合对性能要求高,但也需要一定可靠性的场景。acks=all
:适合对可靠性要求极高,愿意牺牲一定性能来保证数据不丢失的场景。
设置 retries 为一个较大的值
在网络波动或 Broker 暂时不可用的情况下,消息发送可能会失败。通过设置 retries
参数,可以让 Kafka 在消息发送失败时自动重试,确保消息最终能够成功传输。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',retries=10 # 设置重试次数,确保网络波动时消息不会丢失
)
启用幂等性与事务(Kafka 0.11+)
在 Kafka 0.11+ 版本中,可以启用幂等性(enable.idempotence=True
)防止生产者重复发送消息(如因网络重试导致的重复),同时结合事务(Transactional API)确保端到端的 Exactly-Once 语义。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all',enable_idempotence=True,transactional_id='my-transaction-id'
)
producer.init_transactions()
try:producer.begin_transaction()producer.send('my-topic', b'Transactional message')producer.commit_transaction()
except Exception as e:producer.abort_transaction()
正确关闭生产者与 flush() 方法
在生产者发送消息后,尤其是在批量发送或高吞吐场景下,务必在关闭生产者前调用 flush()
方法,确保所有缓冲区的消息都被发送。否则,未发送的消息可能在程序异常终止时丢失。
示例代码:
producer.send('my-topic', b'Final message')
producer.flush() # 确保所有消息发送完成
producer.close()
Broker 端的最佳实践
设置 unclean.leader.election.enable = false
unclean.leader.election.enable
参数控制哪些 Broker 有资格竞选分区的 Leader。如果设置为 true
,即使某个 Broker 落后原先的 Leader 很多,它仍然可以成为新的 Leader,这可能导致消息丢失。因此,建议将该参数设置为 false
。
配置方法:
unclean.leader.election.enable=false
设置 replication.factor >= 3
通过增加分区副本数量,可以有效避免单点故障导致的数据丢失。通常建议设置 replication.factor >= 3
,即每个分区有至少三个副本。
配置方法:
replication.factor=3
设置 min.insync.replicas > 1
min.insync.replicas
参数控制消息至少需要写入到多少个副本才算“已提交”。将其设置为大于 1,能够确保消息在多个副本上持久化,提升系统的容错能力。
配置方法:
min.insync.replicas=2
确保 replication.factor > min.insync.replicas
为了确保 Kafka 集群在面对副本丢失时仍能提供高可用性,replication.factor
应该大于 min.insync.replicas
。否则,在某些副本故障时,分区将无法正常工作,导致消息丢失。
推荐配置:
replication.factor=3
min.insync.replicas=2
优化 Broker 存储与磁盘配置
- 文件系统选择:使用 XFS 或 ext4 等具备高效持久化能力的文件系统。
- 磁盘配置:避免使用 NAS/SAN 等网络存储,优先本地磁盘,并确保写缓存策略正确(如内核参数
fsync
配置)。 - 日志刷写策略:调整
log.flush.interval.messages
和log.flush.interval.ms
(默认不推荐修改,但在极端情况下可适当调整)。
消费者端的最佳实践
确保消息消费完成再提交
Kafka 的 Consumer 端提供了 enable.auto.commit
配置项来控制位移提交。将其设置为 false
,并结合 commitSync()
或 commitAsync()
方法进行手动提交,可以确保每个消息都被成功处理后才提交位移,防止消费失败时丢失消息。
配置方法:
consumer = KafkaConsumer('my-topic', enable_auto_commit=False)# 手动提交位移
consumer.commitSync()
处理 Rebalance 事件
消费者需正确处理 Rebalance 事件,避免在分区重新分配时消息处理未完成导致偏移量提交错误。实现 ConsumerRebalanceListener
并在失去分区所有权前提交偏移量。
示例代码:
from kafka import ConsumerRebalanceListenerclass RebalanceListener(ConsumerRebalanceListener):def on_partitions_revoked(self, revoked):consumer.commitSync()def on_partitions_assigned(self, assigned):passconsumer = KafkaConsumer('my-topic', enable_auto_commit=False)
consumer.subscribe(topics=['my-topic'], listener=RebalanceListener())
异常重试与死信队列(DLQ)
在消费逻辑中捕获异常并实现重试机制,若多次重试失败则将消息转入死信队列,避免阻塞消费且保留异常数据。
示例代码:
for message in consumer:try:process_message(message)consumer.commitSync()except Exception as e:send_to_dlq(message)consumer.commitSync() # 避免重复消费
业务维度的 0 丢失架构
本地消息表 + 定时扫描
在高可靠性要求的业务场景中,可以通过结合业务系统本地的消息表和定时扫描机制,进一步增强消息丢失的防范能力。
例如,业务系统可以在本地保存未成功消费的消息,在系统启动时或者定时进行消息的重新扫描和处理,从而避免消息丢失。
监控与告警
- 生产者监控:跟踪
record-error-rate
、request-latency
等指标。 - Broker 监控:关注
UnderReplicatedPartitions
、IsrShrinksPerSec
、OfflinePartitionsCount
。 - 消费者监控:监控
Consumer Lag
(滞后量),确保消费进度正常。 - 告警规则:当 ISR 数量小于
min.insync.replicas
或副本不足时触发告警。
结论
通过结合 Kafka 的配置和应用层的最佳实践,我们可以最大程度上防止消息丢失。尤其是在高可靠性要求的场景中,务必遵循上述实践,保证 Kafka 消息系统的稳定性和可靠性。你可以根据实际业务的需求,对 Kafka 配置做进一步的优化。通过这些措施,Kafka 能够提供近乎零丢失的消息传输服务。
相关文章:
Kafka 消息 0 丢失的最佳实践
文章目录 Kafka 消息 0 丢失的最佳实践生产者端的最佳实践使用带有回调的 producer.send(msg, callback) 方法设置 acks all设置 retries 为一个较大的值启用幂等性与事务(Kafka 0.11)正确关闭生产者与 flush() 方法 Broker 端的最佳实践设置 unclean.l…...

机器学习算法——回归任务
回归分析是估计因变量和自变量之间关系的过程。 目录 1、多元线性回归 2、岭回归 3、Lasso回归 4、弹性网络回归 5、多项式回归 6、指数回归 7、自然对数回归 8、广义线性模型 GLM 9、Cox比例风险模型 10、决策树回归 11、随机森林回归 12、梯度提升回归 13、XGBoost回归 14、…...

【数据库】数据库基础
第一章 数据库基础 一、数据库基础1.1 数据库系统的体系结构 (三层模式两级映像)1.1.1 逻辑模式1.1.2 外模式1.1.3 内模式1.1.4 外模式/模式映象1.1.5 逻辑模式/内模式映象1.1.6 逻辑独立性1.1.7 物理独立性 1.2 数据模型 一、数据库基础 1.1 数据库系统…...

端到端自动驾驶——cnn网络搭建
论文参考:https://arxiv.org/abs/1604.07316 demo 今天主要来看一个如何通过图像直接到控制的自动驾驶端到端的项目,首先需要配置好我的仿真环境,下载软件udacity: https://d17h27t6h515a5.cloudfront.net/topher/2016/November…...

深度学习R8周:RNN实现阿尔兹海默症(pytorch)
🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊 数据集包含2149名患者的广泛健康信息,每名患者的ID范围从4751到6900不等。该数据集包括人口统计详细信息、生活方式因素、病史、临床测量、认知和功…...
vuex中的state是响应式的吗?
在 Vue.js 中,Vuex 的 state 是响应式的。这意味着当你更改 state 中的数据时,依赖于这些数据的 Vue 组件会自动更新。这是通过 Vue 的响应式系统实现的,该系统使用了 ES6 的 Proxy 对象来监听数据的变化。 当你在 Vuex 中定义了一个 state …...

JavaScript系列05-现代JavaScript新特性
JavaScript作为网络的核心语言之一,近年来发展迅速。从ES6(ECMAScript 2015)开始,JavaScript几乎每年都有新的语言特性加入,极大地改善了开发体验和代码质量。本文主要内容包括: ES6关键特性:解构赋值与扩展运算符&am…...
【量化金融自学笔记】--开篇.基本术语及学习路径建议
在当今这个信息爆炸的时代,金融领域正经历着一场前所未有的变革。传统的金融分析方法逐渐被更加科学、精准的量化技术所取代。量化金融,这个曾经高不可攀的领域,如今正逐渐走进大众的视野。它将数学、统计学、计算机科学与金融学深度融合&…...

3d投影到2d python opencv
目录 cv2.projectPoints 投影 矩阵计算投影 cv2.projectPoints 投影 cv2.projectPoints() 是 OpenCV 中的一个函数,用于将三维空间中的点(3D points)投影到二维图像平面上。这在计算机视觉中经常用于相机标定、物体姿态估计、3D物体与2D图…...

26-小迪安全-模块引用,mvc框架,渲染,数据联动0-rce安全
先创建一个新闻需要的库 这样id值可以逐级递增 然后随便写个值,让他输出一下看看 模板引入 但是这样不够美观,这就涉及到了引入html模板 模板引入是html有一个的地方值可以通过php代码去传入过去,其他的html界面直接调用,这样页…...

【第14节】C++设计模式(行为模式)-Strategy (策略)模式
一、问题的提出 Strategy 模式:算法实现与抽象接口的解耦 Strategy 模式和 Template 模式要解决的问题是相似的,都是为了将业务逻辑(算法)的具体实现与抽象接口解耦。Strategy 模式通过将算法封装到一个类(Context&am…...
播放器系列4——PCM重采样
FFmpeg重采样过程 #mermaid-svg-QydNPsDAlg9lTn6z {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-QydNPsDAlg9lTn6z .error-icon{fill:#552222;}#mermaid-svg-QydNPsDAlg9lTn6z .error-text{fill:#552222;stroke:#5…...

网络安全需要学多久才能入门?
网络安全是一个复杂且不断发展的领域,想要入行该领域,我们需要付出足够多的时间和精力好好学习相关知识,才可以获得一份不错的工作,那么网络安全需要学多久才能入门?我们通过这篇文章来了解一下。 学习网络安全的入门时间因个人的…...
通俗版解释:分布式和微服务就像开餐厅
一、分布式系统:把大厨房拆成多个小厨房 想象你开了一家超火爆的餐厅,但原来的厨房太小了: 问题:一个厨师要同时切菜、炒菜、烤面包,手忙脚乱还容易出错。 解决方案: 拆分成多个小厨房(分布式…...

JAVA安全—手搓内存马
前言 最近在学这个内存马,就做一个记录,说实话这个内存马还是有点难度的。 什么是内存马 首先什么是内存马呢,顾名思义就是把木马打进内存中。传统的webshell一旦把文件删除就断开连接了,而Java内存马则不同,它将恶…...

【神经网络】python实现神经网络(一)——数据集获取
一.概述 在文章【机器学习】一个例子带你了解神经网络是什么中,我们大致了解神经网络的正向信息传导、反向传导以及学习过程的大致流程,现在我们正式开始进行代码的实现,首先我们来实现第一步的运算过程模拟讲解:正向传导。本次代…...
历年湖南大学计算机复试上机真题
历年湖南大学计算机复试机试真题 在线评测:https://app2098.acapp.acwing.com.cn/ 杨辉三角形 题目描述 提到杨辉三角形。 大家应该都很熟悉。 这是我国宋朝数学家杨辉在公元 1261 年著书《详解九章算法》提出的。 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 …...

[LeetCode]day33 150.逆波兰式求表达值 + 239.滑动窗口最大值
逆波兰式求表达值 题目链接 题目描述 给你一个字符串数组 tokens ,表示一个根据 逆波兰表示法 表示的算术表达式。 请你计算该表达式。返回一个表示表达式值的整数。 注意: 有效的算符为 ‘’、‘-’、‘*’ 和 ‘/’ 。 每个操作数(运…...

【银河麒麟高级服务器操作系统实际案例分享】数据库资源重启现象分析及处理全过程
更多银河麒麟操作系统产品及技术讨论,欢迎加入银河麒麟操作系统官方论坛 https://forum.kylinos.cn 了解更多银河麒麟操作系统全新产品,请点击访问 麒麟软件产品专区:https://product.kylinos.cn 开发者专区:https://developer…...
C#中泛型的协变和逆变
协变: 在泛型接口中,使用out关键字可以声明协变。这意味着接口的泛型参数只能作为返回类型出现,而不能作为方法的参数类型。 示例:泛型接口中的协变 假设我们有一个基类Animal和一个派生类Dog: csharp复制 public…...

SpringBoot-17-MyBatis动态SQL标签之常用标签
文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)
2025年能源电力系统与流体力学国际会议(EPSFD 2025)将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会,EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路
进入2025年以来,尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断,但全球市场热度依然高涨,入局者持续增加。 以国内市场为例,天眼查专业版数据显示,截至5月底,我国现存在业、存续状态的机器人相关企…...

2.Vue编写一个app
1.src中重要的组成 1.1main.ts // 引入createApp用于创建应用 import { createApp } from "vue"; // 引用App根组件 import App from ./App.vue;createApp(App).mount(#app)1.2 App.vue 其中要写三种标签 <template> <!--html--> </template>…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题
分区配置 (ptab.json) img 属性介绍: img 属性指定分区存放的 image 名称,指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件,则以 proj_name:binary_name 格式指定文件名, proj_name 为工程 名&…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
uniapp 实现腾讯云IM群文件上传下载功能
UniApp 集成腾讯云IM实现群文件上传下载功能全攻略 一、功能背景与技术选型 在团队协作场景中,群文件共享是核心需求之一。本文将介绍如何基于腾讯云IMCOS,在uniapp中实现: 群内文件上传/下载文件元数据管理下载进度追踪跨平台文件预览 二…...
怎么开发一个网络协议模块(C语言框架)之(六) ——通用对象池总结(核心)
+---------------------------+ | operEntryTbl[] | ← 操作对象池 (对象数组) +---------------------------+ | 0 | 1 | 2 | ... | N-1 | +---------------------------+↓ 初始化时全部加入 +------------------------+ +-------------------------+ | …...