Kafka 消息 0 丢失的最佳实践
文章目录
- Kafka 消息 0 丢失的最佳实践
- 生产者端的最佳实践
- 使用带有回调的 producer.send(msg, callback) 方法
- 设置 acks = all
- 设置 retries 为一个较大的值
- 启用幂等性与事务(Kafka 0.11+)
- 正确关闭生产者与 flush() 方法
- Broker 端的最佳实践
- 设置 unclean.leader.election.enable = false
- 设置 replication.factor >= 3
- 设置 min.insync.replicas > 1
- 确保 replication.factor > min.insync.replicas
- 优化 Broker 存储与磁盘配置
- 消费者端的最佳实践
- 确保消息消费完成再提交
- 处理 Rebalance 事件
- 异常重试与死信队列(DLQ)
- 业务维度的 0 丢失架构
- 本地消息表 + 定时扫描
- 监控与告警
- 结论
Kafka 消息 0 丢失的最佳实践
在分布式系统中,消息队列(如 Kafka)是核心组件之一,用于解耦系统、异步通信和流量削峰。
然而,消息丢失是生产环境中必须解决的关键问题。尽管 Kafka 本身设计为高可靠、高吞吐的系统,但在实际使用中,仍需通过合理的配置和最佳实践来确保消息的 0 丢失。
本文将详细介绍 Kafka 消息 0 丢失的最佳实践,涵盖生产者
、Broker
和消费者
三方面的配置与优化。
生产者端的最佳实践
使用带有回调的 producer.send(msg, callback) 方法
Kafka 的 producer.send(msg)
方法虽然可以发送消息,但它无法提供消息发送成功与否的反馈。为了确保消息发送的可靠性,必须使用带有回调的 producer.send(msg, callback)
方法。回调函数可以在消息发送成功或失败时通知开发者,从而在应用层执行适当的补救措施。
示例代码:
from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')def callback(record_metadata, exception):if exception:print(f"Message failed to send: {exception}")else:print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} at offset {record_metadata.offset}")producer.send('my-topic', b'Hello, Kafka!', callback=callback)
设置 acks = all
acks
参数用于控制 Kafka 消息发送的确认机制。当 acks=all
时,Kafka 会要求所有副本的 Broker 都成功接收到消息后才认为消息“已提交”。这是 Kafka 提供的最严格的确认机制,能够有效防止消息丢失。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all' # 设置为 all 以确保所有副本都成功接收消息
)
acks = 0
(No acknowledgment)
在这种模式下,生产者在发送消息后不会等待任何确认。即,消息发送后立即返回,生产者不会知道消息是否成功到达 Kafka 集群。这种模式的性能最好,因为它不需要等待 Kafka 进行任何确认,但它的可靠性较差。
优点:
- 性能非常高,因为生产者发送完消息后就立即继续执行,不会等待任何确认。
- 延迟最小,适用于对消息丢失容忍度较高的场景。
缺点:
- 消息丢失的风险较高。如果消息在网络传输过程中丢失,生产者无法知道,因此无法做出补救。
- 对于大多数生产环境不建议使用,因为会丢失数据。
适用场景:
- 对消息丢失不敏感的场景,比如一些日志系统、缓存系统等。
acks = 1
(Leader acknowledgment)
在这种模式下,生产者会等待 Kafka 集群的 Leader 节点确认收到消息。Leader 节点收到消息后会立即向生产者发送确认,不需要等待副本节点的响应。如果 Leader 成功接收到消息,那么生产者会认为该消息已经成功发送。
优点:
- 相对于
acks=0
,可靠性更高,因为至少 Leader 节点会确认收到消息。 - 仍然保持较好的性能,延迟比
acks=all
要低。
缺点:
- 如果 Leader 收到消息后崩溃,但副本节点还未同步数据,消息可能会丢失。
- 不能保证消息最终会被所有副本保存。
适用场景:
- 对消息丢失容忍度较高,但仍希望比
acks=0
更加可靠的场景。
acks = all
(All acknowledgments)
在这种模式下,生产者会等待 Kafka 集群中所有副本的确认。即,生产者只有在所有副本都确认收到消息后才会认为消息发送成功。这是 Kafka 中最严格的消息确认机制,确保消息不会丢失。
优点:
- 提供最强的消息可靠性,因为只有当所有副本都接收到消息后,生产者才会收到成功确认。
- 即使 Kafka 集群的某些节点发生故障,消息依然可以保证不会丢失。
缺点:
- 性能较低,因为生产者需要等待所有副本的确认,增加了延迟。
- 可能导致较高的网络负载和集群负担,尤其在集群副本数较多时。
适用场景:
- 对消息可靠性要求极高的场景,比如金融交易系统、在线支付、订单处理等。
总结
acks=0
:适合对数据丢失不敏感且要求极高性能的场景。acks=1
:适合对性能要求高,但也需要一定可靠性的场景。acks=all
:适合对可靠性要求极高,愿意牺牲一定性能来保证数据不丢失的场景。
设置 retries 为一个较大的值
在网络波动或 Broker 暂时不可用的情况下,消息发送可能会失败。通过设置 retries
参数,可以让 Kafka 在消息发送失败时自动重试,确保消息最终能够成功传输。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',retries=10 # 设置重试次数,确保网络波动时消息不会丢失
)
启用幂等性与事务(Kafka 0.11+)
在 Kafka 0.11+ 版本中,可以启用幂等性(enable.idempotence=True
)防止生产者重复发送消息(如因网络重试导致的重复),同时结合事务(Transactional API)确保端到端的 Exactly-Once 语义。
配置方法:
producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all',enable_idempotence=True,transactional_id='my-transaction-id'
)
producer.init_transactions()
try:producer.begin_transaction()producer.send('my-topic', b'Transactional message')producer.commit_transaction()
except Exception as e:producer.abort_transaction()
正确关闭生产者与 flush() 方法
在生产者发送消息后,尤其是在批量发送或高吞吐场景下,务必在关闭生产者前调用 flush()
方法,确保所有缓冲区的消息都被发送。否则,未发送的消息可能在程序异常终止时丢失。
示例代码:
producer.send('my-topic', b'Final message')
producer.flush() # 确保所有消息发送完成
producer.close()
Broker 端的最佳实践
设置 unclean.leader.election.enable = false
unclean.leader.election.enable
参数控制哪些 Broker 有资格竞选分区的 Leader。如果设置为 true
,即使某个 Broker 落后原先的 Leader 很多,它仍然可以成为新的 Leader,这可能导致消息丢失。因此,建议将该参数设置为 false
。
配置方法:
unclean.leader.election.enable=false
设置 replication.factor >= 3
通过增加分区副本数量,可以有效避免单点故障导致的数据丢失。通常建议设置 replication.factor >= 3
,即每个分区有至少三个副本。
配置方法:
replication.factor=3
设置 min.insync.replicas > 1
min.insync.replicas
参数控制消息至少需要写入到多少个副本才算“已提交”。将其设置为大于 1,能够确保消息在多个副本上持久化,提升系统的容错能力。
配置方法:
min.insync.replicas=2
确保 replication.factor > min.insync.replicas
为了确保 Kafka 集群在面对副本丢失时仍能提供高可用性,replication.factor
应该大于 min.insync.replicas
。否则,在某些副本故障时,分区将无法正常工作,导致消息丢失。
推荐配置:
replication.factor=3
min.insync.replicas=2
优化 Broker 存储与磁盘配置
- 文件系统选择:使用 XFS 或 ext4 等具备高效持久化能力的文件系统。
- 磁盘配置:避免使用 NAS/SAN 等网络存储,优先本地磁盘,并确保写缓存策略正确(如内核参数
fsync
配置)。 - 日志刷写策略:调整
log.flush.interval.messages
和log.flush.interval.ms
(默认不推荐修改,但在极端情况下可适当调整)。
消费者端的最佳实践
确保消息消费完成再提交
Kafka 的 Consumer 端提供了 enable.auto.commit
配置项来控制位移提交。将其设置为 false
,并结合 commitSync()
或 commitAsync()
方法进行手动提交,可以确保每个消息都被成功处理后才提交位移,防止消费失败时丢失消息。
配置方法:
consumer = KafkaConsumer('my-topic', enable_auto_commit=False)# 手动提交位移
consumer.commitSync()
处理 Rebalance 事件
消费者需正确处理 Rebalance 事件,避免在分区重新分配时消息处理未完成导致偏移量提交错误。实现 ConsumerRebalanceListener
并在失去分区所有权前提交偏移量。
示例代码:
from kafka import ConsumerRebalanceListenerclass RebalanceListener(ConsumerRebalanceListener):def on_partitions_revoked(self, revoked):consumer.commitSync()def on_partitions_assigned(self, assigned):passconsumer = KafkaConsumer('my-topic', enable_auto_commit=False)
consumer.subscribe(topics=['my-topic'], listener=RebalanceListener())
异常重试与死信队列(DLQ)
在消费逻辑中捕获异常并实现重试机制,若多次重试失败则将消息转入死信队列,避免阻塞消费且保留异常数据。
示例代码:
for message in consumer:try:process_message(message)consumer.commitSync()except Exception as e:send_to_dlq(message)consumer.commitSync() # 避免重复消费
业务维度的 0 丢失架构
本地消息表 + 定时扫描
在高可靠性要求的业务场景中,可以通过结合业务系统本地的消息表和定时扫描机制,进一步增强消息丢失的防范能力。
例如,业务系统可以在本地保存未成功消费的消息,在系统启动时或者定时进行消息的重新扫描和处理,从而避免消息丢失。
监控与告警
- 生产者监控:跟踪
record-error-rate
、request-latency
等指标。 - Broker 监控:关注
UnderReplicatedPartitions
、IsrShrinksPerSec
、OfflinePartitionsCount
。 - 消费者监控:监控
Consumer Lag
(滞后量),确保消费进度正常。 - 告警规则:当 ISR 数量小于
min.insync.replicas
或副本不足时触发告警。
结论
通过结合 Kafka 的配置和应用层的最佳实践,我们可以最大程度上防止消息丢失。尤其是在高可靠性要求的场景中,务必遵循上述实践,保证 Kafka 消息系统的稳定性和可靠性。你可以根据实际业务的需求,对 Kafka 配置做进一步的优化。通过这些措施,Kafka 能够提供近乎零丢失的消息传输服务。
相关文章:
Kafka 消息 0 丢失的最佳实践
文章目录 Kafka 消息 0 丢失的最佳实践生产者端的最佳实践使用带有回调的 producer.send(msg, callback) 方法设置 acks all设置 retries 为一个较大的值启用幂等性与事务(Kafka 0.11)正确关闭生产者与 flush() 方法 Broker 端的最佳实践设置 unclean.l…...

机器学习算法——回归任务
回归分析是估计因变量和自变量之间关系的过程。 目录 1、多元线性回归 2、岭回归 3、Lasso回归 4、弹性网络回归 5、多项式回归 6、指数回归 7、自然对数回归 8、广义线性模型 GLM 9、Cox比例风险模型 10、决策树回归 11、随机森林回归 12、梯度提升回归 13、XGBoost回归 14、…...

【数据库】数据库基础
第一章 数据库基础 一、数据库基础1.1 数据库系统的体系结构 (三层模式两级映像)1.1.1 逻辑模式1.1.2 外模式1.1.3 内模式1.1.4 外模式/模式映象1.1.5 逻辑模式/内模式映象1.1.6 逻辑独立性1.1.7 物理独立性 1.2 数据模型 一、数据库基础 1.1 数据库系统…...

端到端自动驾驶——cnn网络搭建
论文参考:https://arxiv.org/abs/1604.07316 demo 今天主要来看一个如何通过图像直接到控制的自动驾驶端到端的项目,首先需要配置好我的仿真环境,下载软件udacity: https://d17h27t6h515a5.cloudfront.net/topher/2016/November…...

深度学习R8周:RNN实现阿尔兹海默症(pytorch)
🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊 数据集包含2149名患者的广泛健康信息,每名患者的ID范围从4751到6900不等。该数据集包括人口统计详细信息、生活方式因素、病史、临床测量、认知和功…...
vuex中的state是响应式的吗?
在 Vue.js 中,Vuex 的 state 是响应式的。这意味着当你更改 state 中的数据时,依赖于这些数据的 Vue 组件会自动更新。这是通过 Vue 的响应式系统实现的,该系统使用了 ES6 的 Proxy 对象来监听数据的变化。 当你在 Vuex 中定义了一个 state …...

JavaScript系列05-现代JavaScript新特性
JavaScript作为网络的核心语言之一,近年来发展迅速。从ES6(ECMAScript 2015)开始,JavaScript几乎每年都有新的语言特性加入,极大地改善了开发体验和代码质量。本文主要内容包括: ES6关键特性:解构赋值与扩展运算符&am…...
【量化金融自学笔记】--开篇.基本术语及学习路径建议
在当今这个信息爆炸的时代,金融领域正经历着一场前所未有的变革。传统的金融分析方法逐渐被更加科学、精准的量化技术所取代。量化金融,这个曾经高不可攀的领域,如今正逐渐走进大众的视野。它将数学、统计学、计算机科学与金融学深度融合&…...

3d投影到2d python opencv
目录 cv2.projectPoints 投影 矩阵计算投影 cv2.projectPoints 投影 cv2.projectPoints() 是 OpenCV 中的一个函数,用于将三维空间中的点(3D points)投影到二维图像平面上。这在计算机视觉中经常用于相机标定、物体姿态估计、3D物体与2D图…...

26-小迪安全-模块引用,mvc框架,渲染,数据联动0-rce安全
先创建一个新闻需要的库 这样id值可以逐级递增 然后随便写个值,让他输出一下看看 模板引入 但是这样不够美观,这就涉及到了引入html模板 模板引入是html有一个的地方值可以通过php代码去传入过去,其他的html界面直接调用,这样页…...

【第14节】C++设计模式(行为模式)-Strategy (策略)模式
一、问题的提出 Strategy 模式:算法实现与抽象接口的解耦 Strategy 模式和 Template 模式要解决的问题是相似的,都是为了将业务逻辑(算法)的具体实现与抽象接口解耦。Strategy 模式通过将算法封装到一个类(Context&am…...
播放器系列4——PCM重采样
FFmpeg重采样过程 #mermaid-svg-QydNPsDAlg9lTn6z {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-QydNPsDAlg9lTn6z .error-icon{fill:#552222;}#mermaid-svg-QydNPsDAlg9lTn6z .error-text{fill:#552222;stroke:#5…...

网络安全需要学多久才能入门?
网络安全是一个复杂且不断发展的领域,想要入行该领域,我们需要付出足够多的时间和精力好好学习相关知识,才可以获得一份不错的工作,那么网络安全需要学多久才能入门?我们通过这篇文章来了解一下。 学习网络安全的入门时间因个人的…...
通俗版解释:分布式和微服务就像开餐厅
一、分布式系统:把大厨房拆成多个小厨房 想象你开了一家超火爆的餐厅,但原来的厨房太小了: 问题:一个厨师要同时切菜、炒菜、烤面包,手忙脚乱还容易出错。 解决方案: 拆分成多个小厨房(分布式…...

JAVA安全—手搓内存马
前言 最近在学这个内存马,就做一个记录,说实话这个内存马还是有点难度的。 什么是内存马 首先什么是内存马呢,顾名思义就是把木马打进内存中。传统的webshell一旦把文件删除就断开连接了,而Java内存马则不同,它将恶…...

【神经网络】python实现神经网络(一)——数据集获取
一.概述 在文章【机器学习】一个例子带你了解神经网络是什么中,我们大致了解神经网络的正向信息传导、反向传导以及学习过程的大致流程,现在我们正式开始进行代码的实现,首先我们来实现第一步的运算过程模拟讲解:正向传导。本次代…...
历年湖南大学计算机复试上机真题
历年湖南大学计算机复试机试真题 在线评测:https://app2098.acapp.acwing.com.cn/ 杨辉三角形 题目描述 提到杨辉三角形。 大家应该都很熟悉。 这是我国宋朝数学家杨辉在公元 1261 年著书《详解九章算法》提出的。 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 …...

[LeetCode]day33 150.逆波兰式求表达值 + 239.滑动窗口最大值
逆波兰式求表达值 题目链接 题目描述 给你一个字符串数组 tokens ,表示一个根据 逆波兰表示法 表示的算术表达式。 请你计算该表达式。返回一个表示表达式值的整数。 注意: 有效的算符为 ‘’、‘-’、‘*’ 和 ‘/’ 。 每个操作数(运…...

【银河麒麟高级服务器操作系统实际案例分享】数据库资源重启现象分析及处理全过程
更多银河麒麟操作系统产品及技术讨论,欢迎加入银河麒麟操作系统官方论坛 https://forum.kylinos.cn 了解更多银河麒麟操作系统全新产品,请点击访问 麒麟软件产品专区:https://product.kylinos.cn 开发者专区:https://developer…...
C#中泛型的协变和逆变
协变: 在泛型接口中,使用out关键字可以声明协变。这意味着接口的泛型参数只能作为返回类型出现,而不能作为方法的参数类型。 示例:泛型接口中的协变 假设我们有一个基类Animal和一个派生类Dog: csharp复制 public…...

UE5 学习系列(二)用户操作界面及介绍
这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…...

stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

UDP(Echoserver)
网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法:netstat [选项] 功能:查看网络状态 常用选项: n 拒绝显示别名&#…...

定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...

如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...

图表类系列各种样式PPT模版分享
图标图表系列PPT模版,柱状图PPT模版,线状图PPT模版,折线图PPT模版,饼状图PPT模版,雷达图PPT模版,树状图PPT模版 图表类系列各种样式PPT模版分享:图表系列PPT模板https://pan.quark.cn/s/20d40aa…...
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析
Java求职者面试指南:Spring、Spring Boot、MyBatis框架与计算机基础问题解析 一、第一轮提问(基础概念问题) 1. 请解释Spring框架的核心容器是什么?它在Spring中起到什么作用? Spring框架的核心容器是IoC容器&#…...
Java 与 MySQL 性能优化:MySQL 慢 SQL 诊断与分析方法详解
文章目录 一、开启慢查询日志,定位耗时SQL1.1 查看慢查询日志是否开启1.2 临时开启慢查询日志1.3 永久开启慢查询日志1.4 分析慢查询日志 二、使用EXPLAIN分析SQL执行计划2.1 EXPLAIN的基本使用2.2 EXPLAIN分析案例2.3 根据EXPLAIN结果优化SQL 三、使用SHOW PROFILE…...
Linux安全加固:从攻防视角构建系统免疫
Linux安全加固:从攻防视角构建系统免疫 构建坚不可摧的数字堡垒 引言:攻防对抗的新纪元 在日益复杂的网络威胁环境中,Linux系统安全已从被动防御转向主动免疫。2023年全球网络安全报告显示,高级持续性威胁(APT)攻击同比增长65%,平均入侵停留时间缩短至48小时。本章将从…...