Kafka 消息 0 丢失的最佳实践
文章目录
- Kafka 消息 0 丢失的最佳实践
- 生产者端的最佳实践
- 使用带有回调的 producer.send(msg, callback) 方法
- 设置 acks = all
- 设置 retries 为一个较大的值
- 启用幂等性与事务(Kafka 0.11+)
- 正确关闭生产者与 flush() 方法
- Broker 端的最佳实践
- 设置 unclean.leader.election.enable = false
- 设置 replication.factor >= 3
- 设置 min.insync.replicas > 1
- 确保 replication.factor > min.insync.replicas
- 优化 Broker 存储与磁盘配置
- 消费者端的最佳实践
- 确保消息消费完成再提交
- 处理 Rebalance 事件
- 异常重试与死信队列(DLQ)
- 业务维度的 0 丢失架构
- 本地消息表 + 定时扫描
- 监控与告警
- 结论
Kafka 消息 0 丢失的最佳实践
在分布式系统中,消息队列(如 Kafka)是核心组件之一,用于解耦系统、异步通信和流量削峰。
然而,消息丢失是生产环境中必须解决的关键问题。尽管 Kafka 本身设计为高可靠、高吞吐的系统,但在实际使用中,仍需通过合理的配置和最佳实践来确保消息的 0 丢失。
本文将详细介绍 Kafka 消息 0 丢失的最佳实践,涵盖生产者、Broker 和消费者三方面的配置与优化。
生产者端的最佳实践
使用带有回调的 producer.send(msg, callback) 方法
Kafka 的 producer.send(msg) 方法虽然可以发送消息,但它无法提供消息发送成功与否的反馈。为了确保消息发送的可靠性,必须使用带有回调的 producer.send(msg, callback) 方法。回调函数可以在消息发送成功或失败时通知开发者,从而在应用层执行适当的补救措施。
示例代码:
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')def callback(record_metadata, exception):if exception:print(f"Message failed to send: {exception}")else:print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} at offset {record_metadata.offset}")producer.send('my-topic', b'Hello, Kafka!', callback=callback)
设置 acks = all
acks 参数用于控制 Kafka 消息发送的确认机制。当 acks=all 时,Kafka 会要求所有副本的 Broker 都成功接收到消息后才认为消息“已提交”。这是 Kafka 提供的最严格的确认机制,能够有效防止消息丢失。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all' # 设置为 all 以确保所有副本都成功接收消息
)
acks = 0 (No acknowledgment)
在这种模式下,生产者在发送消息后不会等待任何确认。即,消息发送后立即返回,生产者不会知道消息是否成功到达 Kafka 集群。这种模式的性能最好,因为它不需要等待 Kafka 进行任何确认,但它的可靠性较差。
优点:
- 性能非常高,因为生产者发送完消息后就立即继续执行,不会等待任何确认。
- 延迟最小,适用于对消息丢失容忍度较高的场景。
缺点:
- 消息丢失的风险较高。如果消息在网络传输过程中丢失,生产者无法知道,因此无法做出补救。
- 对于大多数生产环境不建议使用,因为会丢失数据。
适用场景:
- 对消息丢失不敏感的场景,比如一些日志系统、缓存系统等。
acks = 1 (Leader acknowledgment)
在这种模式下,生产者会等待 Kafka 集群的 Leader 节点确认收到消息。Leader 节点收到消息后会立即向生产者发送确认,不需要等待副本节点的响应。如果 Leader 成功接收到消息,那么生产者会认为该消息已经成功发送。
优点:
- 相对于
acks=0,可靠性更高,因为至少 Leader 节点会确认收到消息。 - 仍然保持较好的性能,延迟比
acks=all要低。
缺点:
- 如果 Leader 收到消息后崩溃,但副本节点还未同步数据,消息可能会丢失。
- 不能保证消息最终会被所有副本保存。
适用场景:
- 对消息丢失容忍度较高,但仍希望比
acks=0更加可靠的场景。
acks = all (All acknowledgments)
在这种模式下,生产者会等待 Kafka 集群中所有副本的确认。即,生产者只有在所有副本都确认收到消息后才会认为消息发送成功。这是 Kafka 中最严格的消息确认机制,确保消息不会丢失。
优点:
- 提供最强的消息可靠性,因为只有当所有副本都接收到消息后,生产者才会收到成功确认。
- 即使 Kafka 集群的某些节点发生故障,消息依然可以保证不会丢失。
缺点:
- 性能较低,因为生产者需要等待所有副本的确认,增加了延迟。
- 可能导致较高的网络负载和集群负担,尤其在集群副本数较多时。
适用场景:
- 对消息可靠性要求极高的场景,比如金融交易系统、在线支付、订单处理等。
总结
acks=0:适合对数据丢失不敏感且要求极高性能的场景。acks=1:适合对性能要求高,但也需要一定可靠性的场景。acks=all:适合对可靠性要求极高,愿意牺牲一定性能来保证数据不丢失的场景。
设置 retries 为一个较大的值
在网络波动或 Broker 暂时不可用的情况下,消息发送可能会失败。通过设置 retries 参数,可以让 Kafka 在消息发送失败时自动重试,确保消息最终能够成功传输。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',retries=10 # 设置重试次数,确保网络波动时消息不会丢失
)
启用幂等性与事务(Kafka 0.11+)
在 Kafka 0.11+ 版本中,可以启用幂等性(enable.idempotence=True)防止生产者重复发送消息(如因网络重试导致的重复),同时结合事务(Transactional API)确保端到端的 Exactly-Once 语义。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all',enable_idempotence=True,transactional_id='my-transaction-id'
)
producer.init_transactions()
try:producer.begin_transaction()producer.send('my-topic', b'Transactional message')producer.commit_transaction()
except Exception as e:producer.abort_transaction()
正确关闭生产者与 flush() 方法
在生产者发送消息后,尤其是在批量发送或高吞吐场景下,务必在关闭生产者前调用 flush() 方法,确保所有缓冲区的消息都被发送。否则,未发送的消息可能在程序异常终止时丢失。
示例代码:
producer.send('my-topic', b'Final message')
producer.flush() # 确保所有消息发送完成
producer.close()
Broker 端的最佳实践
设置 unclean.leader.election.enable = false
unclean.leader.election.enable 参数控制哪些 Broker 有资格竞选分区的 Leader。如果设置为 true,即使某个 Broker 落后原先的 Leader 很多,它仍然可以成为新的 Leader,这可能导致消息丢失。因此,建议将该参数设置为 false。
配置方法:
unclean.leader.election.enable=false
设置 replication.factor >= 3
通过增加分区副本数量,可以有效避免单点故障导致的数据丢失。通常建议设置 replication.factor >= 3,即每个分区有至少三个副本。
配置方法:
replication.factor=3
设置 min.insync.replicas > 1
min.insync.replicas 参数控制消息至少需要写入到多少个副本才算“已提交”。将其设置为大于 1,能够确保消息在多个副本上持久化,提升系统的容错能力。
配置方法:
min.insync.replicas=2
确保 replication.factor > min.insync.replicas
为了确保 Kafka 集群在面对副本丢失时仍能提供高可用性,replication.factor 应该大于 min.insync.replicas。否则,在某些副本故障时,分区将无法正常工作,导致消息丢失。
推荐配置:
replication.factor=3
min.insync.replicas=2
优化 Broker 存储与磁盘配置
- 文件系统选择:使用 XFS 或 ext4 等具备高效持久化能力的文件系统。
- 磁盘配置:避免使用 NAS/SAN 等网络存储,优先本地磁盘,并确保写缓存策略正确(如内核参数
fsync配置)。 - 日志刷写策略:调整
log.flush.interval.messages和log.flush.interval.ms(默认不推荐修改,但在极端情况下可适当调整)。
消费者端的最佳实践
确保消息消费完成再提交
Kafka 的 Consumer 端提供了 enable.auto.commit 配置项来控制位移提交。将其设置为 false,并结合 commitSync() 或 commitAsync() 方法进行手动提交,可以确保每个消息都被成功处理后才提交位移,防止消费失败时丢失消息。
配置方法:
consumer = KafkaConsumer('my-topic', enable_auto_commit=False)# 手动提交位移
consumer.commitSync()
处理 Rebalance 事件
消费者需正确处理 Rebalance 事件,避免在分区重新分配时消息处理未完成导致偏移量提交错误。实现 ConsumerRebalanceListener 并在失去分区所有权前提交偏移量。
示例代码:
from kafka import ConsumerRebalanceListenerclass RebalanceListener(ConsumerRebalanceListener):def on_partitions_revoked(self, revoked):consumer.commitSync()def on_partitions_assigned(self, assigned):passconsumer = KafkaConsumer('my-topic', enable_auto_commit=False)
consumer.subscribe(topics=['my-topic'], listener=RebalanceListener())
异常重试与死信队列(DLQ)
在消费逻辑中捕获异常并实现重试机制,若多次重试失败则将消息转入死信队列,避免阻塞消费且保留异常数据。
示例代码:
for message in consumer:try:process_message(message)consumer.commitSync()except Exception as e:send_to_dlq(message)consumer.commitSync() # 避免重复消费
业务维度的 0 丢失架构
本地消息表 + 定时扫描
在高可靠性要求的业务场景中,可以通过结合业务系统本地的消息表和定时扫描机制,进一步增强消息丢失的防范能力。
例如,业务系统可以在本地保存未成功消费的消息,在系统启动时或者定时进行消息的重新扫描和处理,从而避免消息丢失。
监控与告警
- 生产者监控:跟踪
record-error-rate、request-latency等指标。 - Broker 监控:关注
UnderReplicatedPartitions、IsrShrinksPerSec、OfflinePartitionsCount。 - 消费者监控:监控
Consumer Lag(滞后量),确保消费进度正常。 - 告警规则:当 ISR 数量小于
min.insync.replicas或副本不足时触发告警。
结论
通过结合 Kafka 的配置和应用层的最佳实践,我们可以最大程度上防止消息丢失。尤其是在高可靠性要求的场景中,务必遵循上述实践,保证 Kafka 消息系统的稳定性和可靠性。你可以根据实际业务的需求,对 Kafka 配置做进一步的优化。通过这些措施,Kafka 能够提供近乎零丢失的消息传输服务。
相关文章:
Kafka 消息 0 丢失的最佳实践
文章目录 Kafka 消息 0 丢失的最佳实践生产者端的最佳实践使用带有回调的 producer.send(msg, callback) 方法设置 acks all设置 retries 为一个较大的值启用幂等性与事务(Kafka 0.11)正确关闭生产者与 flush() 方法 Broker 端的最佳实践设置 unclean.l…...
机器学习算法——回归任务
回归分析是估计因变量和自变量之间关系的过程。 目录 1、多元线性回归 2、岭回归 3、Lasso回归 4、弹性网络回归 5、多项式回归 6、指数回归 7、自然对数回归 8、广义线性模型 GLM 9、Cox比例风险模型 10、决策树回归 11、随机森林回归 12、梯度提升回归 13、XGBoost回归 14、…...
【数据库】数据库基础
第一章 数据库基础 一、数据库基础1.1 数据库系统的体系结构 (三层模式两级映像)1.1.1 逻辑模式1.1.2 外模式1.1.3 内模式1.1.4 外模式/模式映象1.1.5 逻辑模式/内模式映象1.1.6 逻辑独立性1.1.7 物理独立性 1.2 数据模型 一、数据库基础 1.1 数据库系统…...
端到端自动驾驶——cnn网络搭建
论文参考:https://arxiv.org/abs/1604.07316 demo 今天主要来看一个如何通过图像直接到控制的自动驾驶端到端的项目,首先需要配置好我的仿真环境,下载软件udacity: https://d17h27t6h515a5.cloudfront.net/topher/2016/November…...
深度学习R8周:RNN实现阿尔兹海默症(pytorch)
🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊 数据集包含2149名患者的广泛健康信息,每名患者的ID范围从4751到6900不等。该数据集包括人口统计详细信息、生活方式因素、病史、临床测量、认知和功…...
vuex中的state是响应式的吗?
在 Vue.js 中,Vuex 的 state 是响应式的。这意味着当你更改 state 中的数据时,依赖于这些数据的 Vue 组件会自动更新。这是通过 Vue 的响应式系统实现的,该系统使用了 ES6 的 Proxy 对象来监听数据的变化。 当你在 Vuex 中定义了一个 state …...
JavaScript系列05-现代JavaScript新特性
JavaScript作为网络的核心语言之一,近年来发展迅速。从ES6(ECMAScript 2015)开始,JavaScript几乎每年都有新的语言特性加入,极大地改善了开发体验和代码质量。本文主要内容包括: ES6关键特性:解构赋值与扩展运算符&am…...
【量化金融自学笔记】--开篇.基本术语及学习路径建议
在当今这个信息爆炸的时代,金融领域正经历着一场前所未有的变革。传统的金融分析方法逐渐被更加科学、精准的量化技术所取代。量化金融,这个曾经高不可攀的领域,如今正逐渐走进大众的视野。它将数学、统计学、计算机科学与金融学深度融合&…...
3d投影到2d python opencv
目录 cv2.projectPoints 投影 矩阵计算投影 cv2.projectPoints 投影 cv2.projectPoints() 是 OpenCV 中的一个函数,用于将三维空间中的点(3D points)投影到二维图像平面上。这在计算机视觉中经常用于相机标定、物体姿态估计、3D物体与2D图…...
26-小迪安全-模块引用,mvc框架,渲染,数据联动0-rce安全
先创建一个新闻需要的库 这样id值可以逐级递增 然后随便写个值,让他输出一下看看 模板引入 但是这样不够美观,这就涉及到了引入html模板 模板引入是html有一个的地方值可以通过php代码去传入过去,其他的html界面直接调用,这样页…...
【第14节】C++设计模式(行为模式)-Strategy (策略)模式
一、问题的提出 Strategy 模式:算法实现与抽象接口的解耦 Strategy 模式和 Template 模式要解决的问题是相似的,都是为了将业务逻辑(算法)的具体实现与抽象接口解耦。Strategy 模式通过将算法封装到一个类(Context&am…...
播放器系列4——PCM重采样
FFmpeg重采样过程 #mermaid-svg-QydNPsDAlg9lTn6z {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-QydNPsDAlg9lTn6z .error-icon{fill:#552222;}#mermaid-svg-QydNPsDAlg9lTn6z .error-text{fill:#552222;stroke:#5…...
网络安全需要学多久才能入门?
网络安全是一个复杂且不断发展的领域,想要入行该领域,我们需要付出足够多的时间和精力好好学习相关知识,才可以获得一份不错的工作,那么网络安全需要学多久才能入门?我们通过这篇文章来了解一下。 学习网络安全的入门时间因个人的…...
通俗版解释:分布式和微服务就像开餐厅
一、分布式系统:把大厨房拆成多个小厨房 想象你开了一家超火爆的餐厅,但原来的厨房太小了: 问题:一个厨师要同时切菜、炒菜、烤面包,手忙脚乱还容易出错。 解决方案: 拆分成多个小厨房(分布式…...
JAVA安全—手搓内存马
前言 最近在学这个内存马,就做一个记录,说实话这个内存马还是有点难度的。 什么是内存马 首先什么是内存马呢,顾名思义就是把木马打进内存中。传统的webshell一旦把文件删除就断开连接了,而Java内存马则不同,它将恶…...
【神经网络】python实现神经网络(一)——数据集获取
一.概述 在文章【机器学习】一个例子带你了解神经网络是什么中,我们大致了解神经网络的正向信息传导、反向传导以及学习过程的大致流程,现在我们正式开始进行代码的实现,首先我们来实现第一步的运算过程模拟讲解:正向传导。本次代…...
历年湖南大学计算机复试上机真题
历年湖南大学计算机复试机试真题 在线评测:https://app2098.acapp.acwing.com.cn/ 杨辉三角形 题目描述 提到杨辉三角形。 大家应该都很熟悉。 这是我国宋朝数学家杨辉在公元 1261 年著书《详解九章算法》提出的。 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 …...
[LeetCode]day33 150.逆波兰式求表达值 + 239.滑动窗口最大值
逆波兰式求表达值 题目链接 题目描述 给你一个字符串数组 tokens ,表示一个根据 逆波兰表示法 表示的算术表达式。 请你计算该表达式。返回一个表示表达式值的整数。 注意: 有效的算符为 ‘’、‘-’、‘*’ 和 ‘/’ 。 每个操作数(运…...
【银河麒麟高级服务器操作系统实际案例分享】数据库资源重启现象分析及处理全过程
更多银河麒麟操作系统产品及技术讨论,欢迎加入银河麒麟操作系统官方论坛 https://forum.kylinos.cn 了解更多银河麒麟操作系统全新产品,请点击访问 麒麟软件产品专区:https://product.kylinos.cn 开发者专区:https://developer…...
C#中泛型的协变和逆变
协变: 在泛型接口中,使用out关键字可以声明协变。这意味着接口的泛型参数只能作为返回类型出现,而不能作为方法的参数类型。 示例:泛型接口中的协变 假设我们有一个基类Animal和一个派生类Dog: csharp复制 public…...
微信QQ防撤回终极指南:RevokeMsgPatcher技术深度解析
微信QQ防撤回终极指南:RevokeMsgPatcher技术深度解析 【免费下载链接】RevokeMsgPatcher :trollface: A hex editor for WeChat/QQ/TIM - PC版微信/QQ/TIM防撤回补丁(我已经看到了,撤回也没用了) 项目地址: https://gitcode.com…...
离线地图项目救星:手把手教你用微图批量下载并管理多源瓦片(附避坑点)
离线地图实战指南:微图工具链与多源瓦片管理全解析 在智慧园区建设、车载导航系统开发或野外作业场景中,稳定可靠的地图服务往往是刚需。但现实情况是,这些场景常面临网络覆盖不稳定甚至完全离线的挑战。传统解决方案要么依赖预装商业地图数…...
Nodejs后端服务集成Taotoken实现AI对话功能的具体配置指南
🚀 告别海外账号与网络限制!稳定直连全球优质大模型,限时半价接入中。 👉 点击领取海量免费额度 Nodejs后端服务集成Taotoken实现AI对话功能的具体配置指南 1. 准备工作:获取API密钥与模型ID 在开始编写代码之前&…...
自然语言处理进阶:用BERT实现文本相似度计算
在软件测试领域,文本相似度计算是一项极具实用价值的技术。它能助力测试人员高效完成重复用例排查、智能测试用例生成、用户反馈聚类等任务,大幅提升测试工作的效率与精准度。传统的文本相似度计算方法,如基于词频的TF-IDF、基于词向量的Word…...
力扣算法面试150题——个人笔记——复习用
双指针 第一题: 125. 验证回文串https://leetcode.cn/problems/valid-palindrome/ 题目内容 如果在将所有大写字符转换为小写字符、并移除所有非字母数字字符之后,短语正着读和反着读都一样。则可以认为该短语是一个 回文串 。 字母和数字都属于字母…...
正交张量、正定张量与材料稳定性:在有限元分析ABAQUS中的实际应用与参数设置
正交张量、正定张量与材料稳定性:在有限元分析ABAQUS中的实际应用与参数设置 当工程师在ABAQUS中遇到材料刚度矩阵非正定警告时,往往意味着仿真结果可能失去物理意义。这种警告背后隐藏着深刻的张量数学原理——正定张量的性质直接决定了材料本构模型的稳…...
车间管理越管越乱?找准根源+避坑,跳出管理内耗
很多车间管理者都深陷这样的困境:每天忙得脚不沾地,盯进度、查卫生、处理各类现场异常,耗尽心力却收效甚微,车间反而越管越乱——物料堆放杂乱无章、工序衔接频频脱节、员工操作随心所欲、设备故障时有发生,产能上不去…...
从Caffeine源码到实战:手把手教你用Checker Framework给Java代码做‘体检’
从Caffeine源码到实战:手把手教你用Checker Framework给Java代码做‘体检’ 在阅读Caffeine这样的高质量开源项目时,细心的开发者常会注意到一些独特的编译注解——比如Nullable、GuardedBy这类标记。这些看似简单的注解背后,其实隐藏着一个强…...
中兴B862AV3.2M盒子救砖记:免拆机、免ADB,一根双公头USB线搞定刷机
中兴B862AV3.2M盒子救砖实战:零门槛线刷方案详解 当你的中兴B862AV3.2M电视盒子突然黑屏、卡在开机LOGO或完全无法响应时,那种焦虑感与技术无助感往往让人手足无措。不同于常规的系统升级,设备"变砖"状态下的恢复操作需要更谨慎的步…...
SNMP Exporter实战指南:构建企业级网络监控架构的深度解析
SNMP Exporter实战指南:构建企业级网络监控架构的深度解析 【免费下载链接】snmp_exporter SNMP Exporter for Prometheus 项目地址: https://gitcode.com/gh_mirrors/sn/snmp_exporter SNMP Exporter作为Prometheus生态中的关键组件,专为网络设备…...
