当前位置: 首页 > news >正文

Kafka 消息 0 丢失的最佳实践

文章目录

  • Kafka 消息 0 丢失的最佳实践
  • 生产者端的最佳实践
    • 使用带有回调的 producer.send(msg, callback) 方法
    • 设置 acks = all
    • 设置 retries 为一个较大的值
    • 启用幂等性与事务(Kafka 0.11+)
    • 正确关闭生产者与 flush() 方法
  • Broker 端的最佳实践
    • 设置 unclean.leader.election.enable = false
    • 设置 replication.factor >= 3
    • 设置 min.insync.replicas > 1
    • 确保 replication.factor > min.insync.replicas
    • 优化 Broker 存储与磁盘配置
  • 消费者端的最佳实践
    • 确保消息消费完成再提交
    • 处理 Rebalance 事件
    • 异常重试与死信队列(DLQ)
  • 业务维度的 0 丢失架构
    • 本地消息表 + 定时扫描
  • 监控与告警
  • 结论


Kafka 消息 0 丢失的最佳实践

在分布式系统中,消息队列(如 Kafka)是核心组件之一,用于解耦系统、异步通信和流量削峰。
然而,消息丢失是生产环境中必须解决的关键问题。尽管 Kafka 本身设计为高可靠、高吞吐的系统,但在实际使用中,仍需通过合理的配置和最佳实践来确保消息的 0 丢失。
本文将详细介绍 Kafka 消息 0 丢失的最佳实践,涵盖生产者Broker消费者三方面的配置与优化。


生产者端的最佳实践

使用带有回调的 producer.send(msg, callback) 方法

Kafka 的 producer.send(msg) 方法虽然可以发送消息,但它无法提供消息发送成功与否的反馈。为了确保消息发送的可靠性,必须使用带有回调的 producer.send(msg, callback) 方法。回调函数可以在消息发送成功或失败时通知开发者,从而在应用层执行适当的补救措施。

示例代码:

from kafka import KafkaProducerproducer = KafkaProducer(bootstrap_servers='localhost:9092')def callback(record_metadata, exception):if exception:print(f"Message failed to send: {exception}")else:print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} at offset {record_metadata.offset}")producer.send('my-topic', b'Hello, Kafka!', callback=callback)

设置 acks = all

acks 参数用于控制 Kafka 消息发送的确认机制。当 acks=all 时,Kafka 会要求所有副本的 Broker 都成功接收到消息后才认为消息“已提交”。这是 Kafka 提供的最严格的确认机制,能够有效防止消息丢失。

配置方法:

producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all'  # 设置为 all 以确保所有副本都成功接收消息
)

acks = 0 (No acknowledgment)

在这种模式下,生产者在发送消息后不会等待任何确认。即,消息发送后立即返回,生产者不会知道消息是否成功到达 Kafka 集群。这种模式的性能最好,因为它不需要等待 Kafka 进行任何确认,但它的可靠性较差。

优点:

  • 性能非常高,因为生产者发送完消息后就立即继续执行,不会等待任何确认。
  • 延迟最小,适用于对消息丢失容忍度较高的场景。

缺点:

  • 消息丢失的风险较高。如果消息在网络传输过程中丢失,生产者无法知道,因此无法做出补救。
  • 对于大多数生产环境不建议使用,因为会丢失数据。

适用场景:

  • 对消息丢失不敏感的场景,比如一些日志系统、缓存系统等。

acks = 1 (Leader acknowledgment)

在这种模式下,生产者会等待 Kafka 集群的 Leader 节点确认收到消息。Leader 节点收到消息后会立即向生产者发送确认,不需要等待副本节点的响应。如果 Leader 成功接收到消息,那么生产者会认为该消息已经成功发送。

优点:

  • 相对于 acks=0,可靠性更高,因为至少 Leader 节点会确认收到消息。
  • 仍然保持较好的性能,延迟比 acks=all 要低。

缺点:

  • 如果 Leader 收到消息后崩溃,但副本节点还未同步数据,消息可能会丢失。
  • 不能保证消息最终会被所有副本保存。

适用场景:

  • 对消息丢失容忍度较高,但仍希望比 acks=0 更加可靠的场景。

acks = all (All acknowledgments)

在这种模式下,生产者会等待 Kafka 集群中所有副本的确认。即,生产者只有在所有副本都确认收到消息后才会认为消息发送成功。这是 Kafka 中最严格的消息确认机制,确保消息不会丢失。

优点:

  • 提供最强的消息可靠性,因为只有当所有副本都接收到消息后,生产者才会收到成功确认。
  • 即使 Kafka 集群的某些节点发生故障,消息依然可以保证不会丢失。

缺点:

  • 性能较低,因为生产者需要等待所有副本的确认,增加了延迟。
  • 可能导致较高的网络负载和集群负担,尤其在集群副本数较多时。

适用场景:

  • 对消息可靠性要求极高的场景,比如金融交易系统、在线支付、订单处理等。

总结

  • acks=0:适合对数据丢失不敏感且要求极高性能的场景。
  • acks=1:适合对性能要求高,但也需要一定可靠性的场景。
  • acks=all:适合对可靠性要求极高,愿意牺牲一定性能来保证数据不丢失的场景。

设置 retries 为一个较大的值

在网络波动或 Broker 暂时不可用的情况下,消息发送可能会失败。通过设置 retries 参数,可以让 Kafka 在消息发送失败时自动重试,确保消息最终能够成功传输。

配置方法:

producer = KafkaProducer(bootstrap_servers='localhost:9092',retries=10  # 设置重试次数,确保网络波动时消息不会丢失
)

启用幂等性与事务(Kafka 0.11+)

在 Kafka 0.11+ 版本中,可以启用幂等性(enable.idempotence=True)防止生产者重复发送消息(如因网络重试导致的重复),同时结合事务(Transactional API)确保端到端的 Exactly-Once 语义。

配置方法:

producer = KafkaProducer(bootstrap_servers='localhost:9092',acks='all',enable_idempotence=True,transactional_id='my-transaction-id'
)
producer.init_transactions()
try:producer.begin_transaction()producer.send('my-topic', b'Transactional message')producer.commit_transaction()
except Exception as e:producer.abort_transaction()

正确关闭生产者与 flush() 方法

在生产者发送消息后,尤其是在批量发送或高吞吐场景下,务必在关闭生产者前调用 flush() 方法,确保所有缓冲区的消息都被发送。否则,未发送的消息可能在程序异常终止时丢失。

示例代码:

producer.send('my-topic', b'Final message')
producer.flush()  # 确保所有消息发送完成
producer.close()

Broker 端的最佳实践

设置 unclean.leader.election.enable = false

unclean.leader.election.enable 参数控制哪些 Broker 有资格竞选分区的 Leader。如果设置为 true,即使某个 Broker 落后原先的 Leader 很多,它仍然可以成为新的 Leader,这可能导致消息丢失。因此,建议将该参数设置为 false

配置方法:

unclean.leader.election.enable=false

设置 replication.factor >= 3

通过增加分区副本数量,可以有效避免单点故障导致的数据丢失。通常建议设置 replication.factor >= 3,即每个分区有至少三个副本。

配置方法:

replication.factor=3

设置 min.insync.replicas > 1

min.insync.replicas 参数控制消息至少需要写入到多少个副本才算“已提交”。将其设置为大于 1,能够确保消息在多个副本上持久化,提升系统的容错能力。

配置方法:

min.insync.replicas=2

确保 replication.factor > min.insync.replicas

为了确保 Kafka 集群在面对副本丢失时仍能提供高可用性,replication.factor 应该大于 min.insync.replicas。否则,在某些副本故障时,分区将无法正常工作,导致消息丢失。

推荐配置:

replication.factor=3
min.insync.replicas=2

优化 Broker 存储与磁盘配置

  • 文件系统选择:使用 XFS 或 ext4 等具备高效持久化能力的文件系统。
  • 磁盘配置:避免使用 NAS/SAN 等网络存储,优先本地磁盘,并确保写缓存策略正确(如内核参数 fsync 配置)。
  • 日志刷写策略:调整 log.flush.interval.messageslog.flush.interval.ms(默认不推荐修改,但在极端情况下可适当调整)。

消费者端的最佳实践

确保消息消费完成再提交

Kafka 的 Consumer 端提供了 enable.auto.commit 配置项来控制位移提交。将其设置为 false,并结合 commitSync()commitAsync() 方法进行手动提交,可以确保每个消息都被成功处理后才提交位移,防止消费失败时丢失消息。

配置方法:

consumer = KafkaConsumer('my-topic', enable_auto_commit=False)# 手动提交位移
consumer.commitSync()

处理 Rebalance 事件

消费者需正确处理 Rebalance 事件,避免在分区重新分配时消息处理未完成导致偏移量提交错误。实现 ConsumerRebalanceListener 并在失去分区所有权前提交偏移量。

示例代码:

from kafka import ConsumerRebalanceListenerclass RebalanceListener(ConsumerRebalanceListener):def on_partitions_revoked(self, revoked):consumer.commitSync()def on_partitions_assigned(self, assigned):passconsumer = KafkaConsumer('my-topic', enable_auto_commit=False)
consumer.subscribe(topics=['my-topic'], listener=RebalanceListener())

异常重试与死信队列(DLQ)

在消费逻辑中捕获异常并实现重试机制,若多次重试失败则将消息转入死信队列,避免阻塞消费且保留异常数据。

示例代码:

for message in consumer:try:process_message(message)consumer.commitSync()except Exception as e:send_to_dlq(message)consumer.commitSync()  # 避免重复消费

业务维度的 0 丢失架构

本地消息表 + 定时扫描

在高可靠性要求的业务场景中,可以通过结合业务系统本地的消息表和定时扫描机制,进一步增强消息丢失的防范能力。
例如,业务系统可以在本地保存未成功消费的消息,在系统启动时或者定时进行消息的重新扫描和处理,从而避免消息丢失。


监控与告警

  • 生产者监控:跟踪 record-error-raterequest-latency 等指标。
  • Broker 监控:关注 UnderReplicatedPartitionsIsrShrinksPerSecOfflinePartitionsCount
  • 消费者监控:监控 Consumer Lag(滞后量),确保消费进度正常。
  • 告警规则:当 ISR 数量小于 min.insync.replicas 或副本不足时触发告警。

结论

通过结合 Kafka 的配置和应用层的最佳实践,我们可以最大程度上防止消息丢失。尤其是在高可靠性要求的场景中,务必遵循上述实践,保证 Kafka 消息系统的稳定性和可靠性。你可以根据实际业务的需求,对 Kafka 配置做进一步的优化。通过这些措施,Kafka 能够提供近乎零丢失的消息传输服务。

相关文章:

Kafka 消息 0 丢失的最佳实践

文章目录 Kafka 消息 0 丢失的最佳实践生产者端的最佳实践使用带有回调的 producer.send(msg, callback) 方法设置 acks all设置 retries 为一个较大的值启用幂等性与事务(Kafka 0.11)正确关闭生产者与 flush() 方法 Broker 端的最佳实践设置 unclean.l…...

机器学习算法——回归任务

回归分析是估计因变量和自变量之间关系的过程。 目录 1、多元线性回归 2、岭回归 3、Lasso回归 4、弹性网络回归 5、多项式回归 6、指数回归 7、自然对数回归 8、广义线性模型 GLM 9、Cox比例风险模型 10、决策树回归 11、随机森林回归 12、梯度提升回归 13、XGBoost回归 14、…...

【数据库】数据库基础

第一章 数据库基础 一、数据库基础1.1 数据库系统的体系结构 (三层模式两级映像)1.1.1 逻辑模式1.1.2 外模式1.1.3 内模式1.1.4 外模式/模式映象1.1.5 逻辑模式/内模式映象1.1.6 逻辑独立性1.1.7 物理独立性 1.2 数据模型 一、数据库基础 1.1 数据库系统…...

端到端自动驾驶——cnn网络搭建

论文参考:https://arxiv.org/abs/1604.07316 demo 今天主要来看一个如何通过图像直接到控制的自动驾驶端到端的项目,首先需要配置好我的仿真环境,下载软件udacity: https://d17h27t6h515a5.cloudfront.net/topher/2016/November…...

深度学习R8周:RNN实现阿尔兹海默症(pytorch)

🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊 数据集包含2149名患者的广泛健康信息,每名患者的ID范围从4751到6900不等。该数据集包括人口统计详细信息、生活方式因素、病史、临床测量、认知和功…...

vuex中的state是响应式的吗?

在 Vue.js 中,Vuex 的 state 是响应式的。这意味着当你更改 state 中的数据时,依赖于这些数据的 Vue 组件会自动更新。这是通过 Vue 的响应式系统实现的,该系统使用了 ES6 的 Proxy 对象来监听数据的变化。 当你在 Vuex 中定义了一个 state …...

JavaScript系列05-现代JavaScript新特性

JavaScript作为网络的核心语言之一,近年来发展迅速。从ES6(ECMAScript 2015)开始,JavaScript几乎每年都有新的语言特性加入,极大地改善了开发体验和代码质量。本文主要内容包括: ES6关键特性:解构赋值与扩展运算符&am…...

【量化金融自学笔记】--开篇.基本术语及学习路径建议

在当今这个信息爆炸的时代,金融领域正经历着一场前所未有的变革。传统的金融分析方法逐渐被更加科学、精准的量化技术所取代。量化金融,这个曾经高不可攀的领域,如今正逐渐走进大众的视野。它将数学、统计学、计算机科学与金融学深度融合&…...

3d投影到2d python opencv

目录 cv2.projectPoints 投影 矩阵计算投影 cv2.projectPoints 投影 cv2.projectPoints() 是 OpenCV 中的一个函数,用于将三维空间中的点(3D points)投影到二维图像平面上。这在计算机视觉中经常用于相机标定、物体姿态估计、3D物体与2D图…...

26-小迪安全-模块引用,mvc框架,渲染,数据联动0-rce安全

先创建一个新闻需要的库 这样id值可以逐级递增 然后随便写个值,让他输出一下看看 模板引入 但是这样不够美观,这就涉及到了引入html模板 模板引入是html有一个的地方值可以通过php代码去传入过去,其他的html界面直接调用,这样页…...

【第14节】C++设计模式(行为模式)-Strategy (策略)模式

一、问题的提出 Strategy 模式:算法实现与抽象接口的解耦 Strategy 模式和 Template 模式要解决的问题是相似的,都是为了将业务逻辑(算法)的具体实现与抽象接口解耦。Strategy 模式通过将算法封装到一个类(Context&am…...

播放器系列4——PCM重采样

FFmpeg重采样过程 #mermaid-svg-QydNPsDAlg9lTn6z {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-QydNPsDAlg9lTn6z .error-icon{fill:#552222;}#mermaid-svg-QydNPsDAlg9lTn6z .error-text{fill:#552222;stroke:#5…...

网络安全需要学多久才能入门?

网络安全是一个复杂且不断发展的领域,想要入行该领域,我们需要付出足够多的时间和精力好好学习相关知识,才可以获得一份不错的工作,那么网络安全需要学多久才能入门?我们通过这篇文章来了解一下。 学习网络安全的入门时间因个人的…...

通俗版解释:分布式和微服务就像开餐厅

一、分布式系统:把大厨房拆成多个小厨房 想象你开了一家超火爆的餐厅,但原来的厨房太小了: 问题:一个厨师要同时切菜、炒菜、烤面包,手忙脚乱还容易出错。 解决方案: 拆分成多个小厨房(分布式…...

JAVA安全—手搓内存马

前言 最近在学这个内存马,就做一个记录,说实话这个内存马还是有点难度的。 什么是内存马 首先什么是内存马呢,顾名思义就是把木马打进内存中。传统的webshell一旦把文件删除就断开连接了,而Java内存马则不同,它将恶…...

【神经网络】python实现神经网络(一)——数据集获取

一.概述 在文章【机器学习】一个例子带你了解神经网络是什么中,我们大致了解神经网络的正向信息传导、反向传导以及学习过程的大致流程,现在我们正式开始进行代码的实现,首先我们来实现第一步的运算过程模拟讲解:正向传导。本次代…...

历年湖南大学计算机复试上机真题

历年湖南大学计算机复试机试真题 在线评测:https://app2098.acapp.acwing.com.cn/ 杨辉三角形 题目描述 提到杨辉三角形。 大家应该都很熟悉。 这是我国宋朝数学家杨辉在公元 1261 年著书《详解九章算法》提出的。 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 …...

[LeetCode]day33 150.逆波兰式求表达值 + 239.滑动窗口最大值

逆波兰式求表达值 题目链接 题目描述 给你一个字符串数组 tokens ,表示一个根据 逆波兰表示法 表示的算术表达式。 请你计算该表达式。返回一个表示表达式值的整数。 注意: 有效的算符为 ‘’、‘-’、‘*’ 和 ‘/’ 。 每个操作数(运…...

【银河麒麟高级服务器操作系统实际案例分享】数据库资源重启现象分析及处理全过程

更多银河麒麟操作系统产品及技术讨论,欢迎加入银河麒麟操作系统官方论坛 https://forum.kylinos.cn 了解更多银河麒麟操作系统全新产品,请点击访问 麒麟软件产品专区:https://product.kylinos.cn 开发者专区:https://developer…...

C#中泛型的协变和逆变

协变: 在泛型接口中,使用out关键字可以声明协变。这意味着接口的泛型参数只能作为返回类型出现,而不能作为方法的参数类型。 示例:泛型接口中的协变 假设我们有一个基类Animal和一个派生类Dog: csharp复制 public…...

【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器

一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能:服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

【Android】Android 开发 ADB 常用指令

查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

关于easyexcel动态下拉选问题处理

前些日子突然碰到一个问题,说是客户的导入文件模版想支持部分导入内容的下拉选,于是我就找了easyexcel官网寻找解决方案,并没有找到合适的方案,没办法只能自己动手并分享出来,针对Java生成Excel下拉菜单时因选项过多导…...

掌握 HTTP 请求:理解 cURL GET 语法

cURL 是一个强大的命令行工具,用于发送 HTTP 请求和与 Web 服务器交互。在 Web 开发和测试中,cURL 经常用于发送 GET 请求来获取服务器资源。本文将详细介绍 cURL GET 请求的语法和使用方法。 一、cURL 基本概念 cURL 是 "Client URL" 的缩写…...

抽象类和接口(全)

一、抽象类 1.概念:如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象,这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法,包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中,⼀个类如果被 abs…...

写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里

写一个shell脚本&#xff0c;把局域网内&#xff0c;把能ping通的IP和不能ping通的IP分类&#xff0c;并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...