当前位置: 首页 > article >正文

Kafka 消息 0 丢失的最佳实践

文章目录

  • Kafka 消息 0 丢失的最佳实践
  • 生产者端的最佳实践
    • 使用带有回调的 producer.send(msg, callback) 方法
    • 设置 acks = all
    • 设置 retries 为一个较大的值
    • 启用幂等性与事务(Kafka 0.11+)
    • 正确关闭生产者与 flush() 方法
  • Broker 端的最佳实践
    • 设置 unclean.leader.election.enable = false
    • 设置 replication.factor >= 3
    • 设置 min.insync.replicas > 1
    • 确保 replication.factor > min.insync.replicas
    • 优化 Broker 存储与磁盘配置
  • 消费者端的最佳实践
    • 确保消息消费完成再提交
    • 处理 Rebalance 事件
    • 异常重试与死信队列(DLQ)
  • 业务维度的 0 丢失架构
    • 本地消息表 + 定时扫描
  • 监控与告警
  • 结论


Kafka 消息 0 丢失的最佳实践

在分布式系统中,消息队列(如 Kafka)是核心组件之一,用于解耦系统、异步通信和流量削峰。
然而,消息丢失是生产环境中必须解决的关键问题。尽管 Kafka 本身设计为高可靠、高吞吐的系统,但在实际使用中,仍需通过合理的配置和最佳实践来确保消息的 0 丢失。
本文将详细介绍 Kafka 消息 0 丢失的最佳实践,涵盖生产者Broker消费者三方面的配置与优化。


生产者端的最佳实践

使用带有回调的 producer.send(msg, callback) 方法

Kafka 的 producer.send(msg) 方法虽然可以发送消息,但它无法提供消息发送成功与否的反馈。为了确保消息发送的可靠性,必须使用带有回调的 producer.send(msg, callback) 方法。回调函数可以在消息发送成功或失败时通知开发者,从而在应用层执行适当的补救措施。

示例代码:

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')def callback(record_metadata, exception):if exception:print(f"Message failed to send: {exception}")else:print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} at offset {record_metadata.offset}")producer.send('my-topic', b'Hello, Kafka!', callback=callback)

设置 acks = all

acks 参数用于控制 Kafka 消息发送的确认机制。当 acks=all 时,Kafka 会要求所有副本的 Broker 都成功接收到消息后才认为消息“已提交”。这是 Kafka 提供的最严格的确认机制,能够有效防止消息丢失。

配置方法:

producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all'  # 设置为 all 以确保所有副本都成功接收消息
)

acks = 0 (No acknowledgment)

在这种模式下,生产者在发送消息后不会等待任何确认。即,消息发送后立即返回,生产者不会知道消息是否成功到达 Kafka 集群。这种模式的性能最好,因为它不需要等待 Kafka 进行任何确认,但它的可靠性较差。

优点:

  • 性能非常高,因为生产者发送完消息后就立即继续执行,不会等待任何确认。
  • 延迟最小,适用于对消息丢失容忍度较高的场景。

缺点:

  • 消息丢失的风险较高。如果消息在网络传输过程中丢失,生产者无法知道,因此无法做出补救。
  • 对于大多数生产环境不建议使用,因为会丢失数据。

适用场景:

  • 对消息丢失不敏感的场景,比如一些日志系统、缓存系统等。

acks = 1 (Leader acknowledgment)

在这种模式下,生产者会等待 Kafka 集群的 Leader 节点确认收到消息。Leader 节点收到消息后会立即向生产者发送确认,不需要等待副本节点的响应。如果 Leader 成功接收到消息,那么生产者会认为该消息已经成功发送。

优点:

  • 相对于 acks=0,可靠性更高,因为至少 Leader 节点会确认收到消息。
  • 仍然保持较好的性能,延迟比 acks=all 要低。

缺点:

  • 如果 Leader 收到消息后崩溃,但副本节点还未同步数据,消息可能会丢失。
  • 不能保证消息最终会被所有副本保存。

适用场景:

  • 对消息丢失容忍度较高,但仍希望比 acks=0 更加可靠的场景。

acks = all (All acknowledgments)

在这种模式下,生产者会等待 Kafka 集群中所有副本的确认。即,生产者只有在所有副本都确认收到消息后才会认为消息发送成功。这是 Kafka 中最严格的消息确认机制,确保消息不会丢失。

优点:

  • 提供最强的消息可靠性,因为只有当所有副本都接收到消息后,生产者才会收到成功确认。
  • 即使 Kafka 集群的某些节点发生故障,消息依然可以保证不会丢失。

缺点:

  • 性能较低,因为生产者需要等待所有副本的确认,增加了延迟。
  • 可能导致较高的网络负载和集群负担,尤其在集群副本数较多时。

适用场景:

  • 对消息可靠性要求极高的场景,比如金融交易系统、在线支付、订单处理等。

总结

  • acks=0:适合对数据丢失不敏感且要求极高性能的场景。
  • acks=1:适合对性能要求高,但也需要一定可靠性的场景。
  • acks=all:适合对可靠性要求极高,愿意牺牲一定性能来保证数据不丢失的场景。

设置 retries 为一个较大的值

在网络波动或 Broker 暂时不可用的情况下,消息发送可能会失败。通过设置 retries 参数,可以让 Kafka 在消息发送失败时自动重试,确保消息最终能够成功传输。

配置方法:

producer = KafkaProducer(bootstrap_servers='localhost:9092',retries=10  # 设置重试次数,确保网络波动时消息不会丢失
)

启用幂等性与事务(Kafka 0.11+)

在 Kafka 0.11+ 版本中,可以启用幂等性(enable.idempotence=True)防止生产者重复发送消息(如因网络重试导致的重复),同时结合事务(Transactional API)确保端到端的 Exactly-Once 语义。

配置方法:

producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all',enable_idempotence=True,transactional_id='my-transaction-id'
)
producer.init_transactions()
try:producer.begin_transaction()producer.send('my-topic', b'Transactional message')producer.commit_transaction()
except Exception as e:producer.abort_transaction()

正确关闭生产者与 flush() 方法

在生产者发送消息后,尤其是在批量发送或高吞吐场景下,务必在关闭生产者前调用 flush() 方法,确保所有缓冲区的消息都被发送。否则,未发送的消息可能在程序异常终止时丢失。

示例代码:

producer.send('my-topic', b'Final message')
producer.flush()  # 确保所有消息发送完成
producer.close()

Broker 端的最佳实践

设置 unclean.leader.election.enable = false

unclean.leader.election.enable 参数控制哪些 Broker 有资格竞选分区的 Leader。如果设置为 true,即使某个 Broker 落后原先的 Leader 很多,它仍然可以成为新的 Leader,这可能导致消息丢失。因此,建议将该参数设置为 false

配置方法:

unclean.leader.election.enable=false

设置 replication.factor >= 3

通过增加分区副本数量,可以有效避免单点故障导致的数据丢失。通常建议设置 replication.factor >= 3,即每个分区有至少三个副本。

配置方法:

replication.factor=3

设置 min.insync.replicas > 1

min.insync.replicas 参数控制消息至少需要写入到多少个副本才算“已提交”。将其设置为大于 1,能够确保消息在多个副本上持久化,提升系统的容错能力。

配置方法:

min.insync.replicas=2

确保 replication.factor > min.insync.replicas

为了确保 Kafka 集群在面对副本丢失时仍能提供高可用性,replication.factor 应该大于 min.insync.replicas。否则,在某些副本故障时,分区将无法正常工作,导致消息丢失。

推荐配置:

replication.factor=3
min.insync.replicas=2

优化 Broker 存储与磁盘配置

  • 文件系统选择:使用 XFS 或 ext4 等具备高效持久化能力的文件系统。
  • 磁盘配置:避免使用 NAS/SAN 等网络存储,优先本地磁盘,并确保写缓存策略正确(如内核参数 fsync 配置)。
  • 日志刷写策略:调整 log.flush.interval.messageslog.flush.interval.ms(默认不推荐修改,但在极端情况下可适当调整)。

消费者端的最佳实践

确保消息消费完成再提交

Kafka 的 Consumer 端提供了 enable.auto.commit 配置项来控制位移提交。将其设置为 false,并结合 commitSync()commitAsync() 方法进行手动提交,可以确保每个消息都被成功处理后才提交位移,防止消费失败时丢失消息。

配置方法:

consumer = KafkaConsumer('my-topic', enable_auto_commit=False)# 手动提交位移
consumer.commitSync()

处理 Rebalance 事件

消费者需正确处理 Rebalance 事件,避免在分区重新分配时消息处理未完成导致偏移量提交错误。实现 ConsumerRebalanceListener 并在失去分区所有权前提交偏移量。

示例代码:

from kafka import ConsumerRebalanceListenerclass RebalanceListener(ConsumerRebalanceListener):def on_partitions_revoked(self, revoked):consumer.commitSync()def on_partitions_assigned(self, assigned):passconsumer = KafkaConsumer('my-topic', enable_auto_commit=False)
consumer.subscribe(topics=['my-topic'], listener=RebalanceListener())

异常重试与死信队列(DLQ)

在消费逻辑中捕获异常并实现重试机制,若多次重试失败则将消息转入死信队列,避免阻塞消费且保留异常数据。

示例代码:

for message in consumer:try:process_message(message)consumer.commitSync()except Exception as e:send_to_dlq(message)consumer.commitSync()  # 避免重复消费

业务维度的 0 丢失架构

本地消息表 + 定时扫描

在高可靠性要求的业务场景中,可以通过结合业务系统本地的消息表和定时扫描机制,进一步增强消息丢失的防范能力。
例如,业务系统可以在本地保存未成功消费的消息,在系统启动时或者定时进行消息的重新扫描和处理,从而避免消息丢失。


监控与告警

  • 生产者监控:跟踪 record-error-raterequest-latency 等指标。
  • Broker 监控:关注 UnderReplicatedPartitionsIsrShrinksPerSecOfflinePartitionsCount
  • 消费者监控:监控 Consumer Lag(滞后量),确保消费进度正常。
  • 告警规则:当 ISR 数量小于 min.insync.replicas 或副本不足时触发告警。

结论

通过结合 Kafka 的配置和应用层的最佳实践,我们可以最大程度上防止消息丢失。尤其是在高可靠性要求的场景中,务必遵循上述实践,保证 Kafka 消息系统的稳定性和可靠性。你可以根据实际业务的需求,对 Kafka 配置做进一步的优化。通过这些措施,Kafka 能够提供近乎零丢失的消息传输服务。

相关文章:

Kafka 消息 0 丢失的最佳实践

文章目录 Kafka 消息 0 丢失的最佳实践生产者端的最佳实践使用带有回调的 producer.send(msg, callback) 方法设置 acks all设置 retries 为一个较大的值启用幂等性与事务(Kafka 0.11)正确关闭生产者与 flush() 方法 Broker 端的最佳实践设置 unclean.l…...

vue3(笔记)3.0 Pinia状态管理数据.持久化插件.内置vue devtools调试工具

---pinia状态管理数据(vuex升级版) 官网镜像:(https://pinia.vuejs.org/zh/core-concepts/) 安装(手动): npm install pinia 导入pinia: 组合式写法的格式: 使用前需要导入: import {defineStore} from piniaactions:支持了同步和异步的方法(融合了mutations) 在组件中调…...

装饰器模式:灵活扩展对象功能的利器

一、从咖啡加料说起:什么是装饰器模式? 假设您走进咖啡馆点单: 基础款:美式咖啡(15元)加料需求:加牛奶(3元)、加焦糖(5元)、加奶油(…...

linux应用:errno、perror、open、fopen

errno errno 是一个全局变量,定义在 头文件中。当系统调用(如 open、read、write 等)或库函数执行失败时,会将一个错误码赋值给 errno。不同的错误码代表不同的错误类型,通过检查 errno 的值,可以判断具体…...

网络原理--HTTP协议

http中文名为超文本传输协议,所谓“超文本”就是指传输范围超出了能在UTF8等码表上找到的字符的范围,包含一些图片,特殊格式之类的。 HTTP的发展简介 从图中可以看出到现在已经发展出了HTTP3,但是市面上的主流还是以HTTP1.0为主。…...

编译可以在Android手机上运行的ffmpeg程序

下载代码 git clone gitgithub.com:FFmpeg/FFmpeg.git git checkout n7.0建立build目录 mkdir build cd build创建build.sh脚本 vim build.sh这段脚本的主要功能是配置和编译 FFmpeg,使其能够在 Android 平台上运行,通过设置不同的架构和 API 级别&am…...

华为hcia——Datacom实验指南——配置手工模式以太网链路聚合

什么是以太网链路聚合(Eth-trunk) 是一种将多个物理链路捆绑在一起,让设备以为是一条大链路,能够增加带宽,增加冗余度,提升可靠性,实现负载平衡。 传输方式有两种 基于数据流传输和基于数据包…...

【C语言6】数组和函数实践:扫雷游戏的简单实现

文章目录 一、扫雷游戏分析和设计1.1 扫雷游戏的功能说明1.2 游戏的分析和设计1.2.1 数据结构的分析1.2.2 文件结构设计 二、扫雷游戏的代码实现三、扫雷游戏的扩展总结 一、扫雷游戏分析和设计 1.1 扫雷游戏的功能说明 使用控制台实现经典的扫雷游戏游戏可以通过菜单实现继续…...

LeetCode 热题 100----1.两数之和

LeetCode 热题 100----1.两数之和 题目描述 我的解法 语言:js 思路就是:用双重循环去找哪两个数字相加等于target,目前的时间复杂度为O(n2),之后右优化思路再更新。...

《模式和状态管理》知识总结三-EcuM与BswM模块的交互

前言 这篇文章主要搞清楚在模式管理中,BswM和EcuM各自的分工。距离学完模式管理也有几天时间了,写这篇文章算是复习一下。 EcuM及BswM交互总览 EcuM负责Ecu的上下电状态的处理,当Ecu处于正常运行状态的时候,EcuM会将Ecu的控制权…...

RK3568平台(网络篇)RTL8111网卡

RTL8111 是 Realtek 推出的一款高性能千兆以太网控制器芯片,广泛应用于 PCIE 网卡中。 其工作原理涉及 数据链路层 和 物理层 的协同工作,以下是其核心原理的详细说明: 一.网卡的基本功能 1.数据封装与解封装: 网卡负责将计算机中的数据封装成网络传输的帧(Frame),并…...

客户需求模糊或频繁变更怎么办

应对客户需求模糊或频繁变更的关键在于 明确沟通、敏捷应对、科学决策。其中,明确沟通尤为重要,因为通过有效沟通,不仅能迅速厘清客户真实需求,还能及时发现隐藏问题,降低项目风险,为后续调整提供有力数据支…...

动静态库-Linux 学习

在软件开发中,程序库是一组预先编写好的程序代码,它们存储了常用的函数、变量和数据结构等。这些库可以帮助开发者节省大量的时间和精力,避免重复编写相同的代码。当我们在 Linux 系统中开发程序时,经常会用到两种类型的程序库&am…...

DeepSeek 系列模型:论文精读《A Survey of DeepSeek Models》

引言:一篇快速了解 DeepSeek 系列的论文。我在翻译时加入了一些可以提高 “可读性” 的连词 ✅ NLP 研 2 选手的学习笔记 笔者简介:Wang Linyong,NPU,2023级,计算机技术 研究方向:文本生成、大语言模型 论文…...

Python解决“找出整形数组中占比超过一半的数”问题

这里写目录标题 问题描述测试样例解决思路代码法1法2 问题描述 小R从班级中抽取了一些同学,每位同学都会给出一个数字。已知在这些数字中,某个数字的出现次数超过了数字总数的一半。现在需要你帮助小R找到这个数字。 测试样例 样例1: 输入&…...

机器人学习模拟框架 robosuite (3) 机器人控制代码示例

Robosuite框架是一个用于机器人模拟和控制的强大工具,支持多种类型的机器人。 官方文档:Overview — robosuite 1.5 documentation 开源地址:https://github.com/ARISE-Initiative/robosuite 目录 1、通过键盘或SpaceMouse远程控制机器人…...

kakfa-3:ISR机制、HWLEO、生产者、消费者、核心参数负载均衡

1. kafka内核原理 1.1 ISR机制 光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader&#xff…...

【微知】如何查看Mellanox网卡上的光模块的信息?(ethtool -m enp1s0f0 看型号、厂商、生产日期等)

背景 服务器上插入的光模块经常被忽略,往往这里是定位问题最根本的地方。如何通过命令查看? 命令 ethtool提供了-m参数,m是module-info的意思,他是从光模块的eeprom中读取数据。(应该是用i2c协议读取的)…...

yum源选要配置华为云的源,阿里云用不了的情况

curl -O /etc/yum.repos.d/CentOS-Base.repo https://repo.huaweicloud.com/repository/conf/CentOS-7-reg.repo...

nginx accesslog 打印自定义header

比如我在请求的header中添加了一个path-match-type,那我现在nginx的accesslog 中打印出来,应该如何配置呢? rootnginx-59f5d66df6-jw5k8:/# cat /etc/nginx/nginx.conf user nginx; worker_processes auto;error_log /var/log/nginx/erro…...

好数——前缀和思想(题目分享)

今天我的舍友去参加“传智杯”广东省的省赛,跟我说了这样一道题,他说他想不出来怎么去优化代码,怎么做都是套用两层for循环超时,下面我就根据题意,使用前缀和的算法去优化一下思路,题目本身是不难的&#x…...

MWC 2025 | 移远通信大模型解决方案加速落地,引领服务机器人创新变革

随着人工智能、大模型等技术的蓬勃发展,生成式AI应用全面爆发。在此背景下,服务机器人作为大模型技术在端侧落地的关键场景,迎来了前所未有的发展机遇。 作为与用户直接交互的智能设备,服务机器人需要应对复杂场景下的感知、决策和…...

【大模型基础_毛玉仁】0.概述

更多内容:XiaoJ的知识星球 【大模型基础_毛玉仁】 系列文章参考 系列文章 【大模型基础_毛玉仁】0.概述 【大模型基础_毛玉仁】1.1 基于统计方法的语言模型 更新中。。。。。。 参考 书籍:大模型基础_完整版.pdf Github:https://github.co…...

ADB、Appium 和 大模型融合开展移动端自动化测试

将 ADB、Appium 和 大模型(如 GPT、LLM) 结合,可以显著提升移动端自动化测试的智能化水平和效率。以下是具体的实现思路和应用场景: 1. 核心组件的作用 ADB(Android Debug Bridge): 用于与 Android 设备通信,执行设备操作(如安装应用、获取日志、截图等)。Appium: 用…...

springboot425-基于SpringBoot的BUG管理系统(源码+数据库+纯前后端分离+部署讲解等)

💕💕作者: 爱笑学姐 💕💕个人简介:十年Java,Python美女程序员一枚,精通计算机专业前后端各类框架。 💕💕各类成品Java毕设 。javaweb,ssm&#xf…...

Ubuntu系统上部署Node.js项目的完整流程

以下是在Ubuntu系统上部署Node.js项目的完整流程,分为系统初始化、环境配置、项目部署三个部分: 一、系统初始化 & 环境准备 bash # 1. 更新系统软件包 sudo apt update && sudo apt upgrade -y# 2. 安装基础工具 sudo apt install -y buil…...

X Window---图形接口

摘抄自 鸟哥的linux私房菜 基础篇 第四版 有鉴于图形用户接口(Graphical User Interface, GUI) 的需求日益加重,在 1984 年由 MIT 与其他第三方首次发表了 X Window System ,并且更在 1988 年成立了非营利性质的 XFree86 这个组织。所谓的XFree86 其实是…...

数据序列化协议 Protobuf 3 介绍(Go 语言)

Protobuf 3 入门 1. 什么是序列化? 1.1 概念 序列化(Serialization 或 Marshalling) 是指将数据结构或对象的状态转换成可存储或传输的格式。反向操作称为反序列化(Deserialization 或 Unmarshalling),它…...

FineReport 操作注意

1.父单元格重复的时候,如何取消合并 效果如下: 只需要在单元格中,将数据设置为【列表】即可。 2.待定...

3D手眼标定转换详细实施步骤及原理概述

3D手眼标定转换详细实施步骤及原理概述 一、手眼标定的核心目标二、3D手眼标定的原理概述一、基本概念与坐标系定义**二、数学建模与方程推导****1. 坐标变换的齐次矩阵表示****2. 手眼标定方程推导** **三、方程求解方法****1. 分离旋转与平移****2. 旋转矩阵求解****3. 平移向…...