当前位置: 首页 > news >正文

Kafka 消息丢失如何处理?

今天给大家分享一个在面试中经常遇到的问题:Kafka 消息丢失该如何处理?

这个问题啊,看似简单,其实里面藏着很多“套路”。

来,咱们先讲一个面试的“真实”案例。

图片

面试官问:“Kafka 消息丢失如何处理?”

小明一听,反问:“你是怎么发现消息丢失了?”

面试官顿时一愣,沉默了片刻后,可能有点不耐烦,说道:“这个你不用管,反正现在发现消息丢失了,你就说如何处理。”

小明一头雾水:“问题是都不知道怎么丢的,处理起来岂不是瞎搞吗?”

画面一黑,面试官离开了会议室,留小明一个人凌乱在风中……

👀 这段子虽然搞笑,但实际工作中,确实“消息丢失”这个事儿有点让人摸不着头脑。

大家有没有想过:消息丢失的定义到底是什么?其实,发现消息丢失的过程,才是处理问题的关键!

图片

在用 Pub/Sub 类中间件,比如 Kafka 或 RocketMQ 时,消息丢失可能有很多原因,包括生产者、消费者和网络传输等各个环节。

我们今天就结合实际工作中遇到的情况,聊聊到底怎么发现消息丢失,又该怎么处理。😎

首先,我们要搞清楚消息丢失的几种典型场景:

1.生产者消息发送失败:这个比较简单,如果生产者发消息时,网络抖动、服务宕机或 Kafka broker 挂了,那消息就丢了。

这时候生产者通常会重试,但是如果重试策略不当,还是可能丢消息。

   

2.消费者消费消息失败:最常见的是消费者拉取了消息,但是业务处理失败,或者消费后没有提交 offset,导致消息“看似”消费了,实际根本没处理。

这种情况不算真正的消息丢失,但你业务数据不一致,这锅还是要 Kafka 来背。😂

3. 网络异常导致消息丢失:有时候消息发送成功了,但是因为网络问题,导致消费者没能拉到这些消息,这类情况更难排查。

OK,分析了几种可能性,接下来看看有哪些方法可以帮助我们及时发现这些问题。

1.监控和告警系统

  

监控是最基础的保障手段。一般来说,Kafka 提供了很多指标可以监控,比如生产端和消费端的吞吐量、消息积压(lag)情况、消费者组的 offset 等等。

通过这些监控指标,一旦消费端的消息积压开始异常增长,或者 offset 停滞不前,就说明很可能有消息丢失了。

很多公司会用 Prometheus + Grafana 来做监控和可视化,再配合告警系统(如 Alertmanager)实时提醒。

比如可以监控 `kafka_consumer_lag` 这个指标,一旦消息积压超过预设阈值,就触发告警。

# Prometheus 配置监控 Kafka 消费者积压
kafka_consumer_lag{consumer_group="your-consumer-group", topic="your-topic"} > 100

在工作中,这类告警往往是消息丢失的第一个信号,反应速度极快。

2.消息追踪机制

消息追踪就像在每个消息上打个“追踪码”,确保每条消息都能被追踪到。

具体做法是:生产者在发送每条消息时,生成一个唯一的 `message_id`,消费者在消费时同样记录消费的 `message_id`。

通过对比生产端和消费端的 ID,就可以发现有没有消息“掉队”了。

在实际应用中,通常会通过日志来记录这些 `message_id`,并定期检查对账,保证所有消息都正确处理了。

// 生产者发送消息时生成 message_id
String messageId = UUID.randomUUID().toString();
ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", messageId, messageContent);
producer.send(record);// 消费者消费消息时记录 message_id
public void consumeMessage(ConsumerRecord<String, String> record) {String messageId = record.key();  // 获取 message_id// 将 message_id 存储到日志或数据库中,用于后续追踪log.info("Consumed message with ID: {}", messageId);
}

3.消息确认机制

Kafka 本身有个很经典的机制,就是手动提交offset。消费者在处理完消息后,才提交消费位置的 offset。

如果消费失败了,不提交 offset,Kafka 就会重新分配这条消息,避免消息丢失。

很多时候,消息丢失的“锅”其实是消费者自己在消费时出了问题,明明没处理完却偷偷提交了 offset,让 Kafka 以为消息已经处理完毕了。

手动提交 offset 就能很好地避免这种情况。

public void consumeMessages() {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {try {// 处理消息逻辑processMessage(record);// 成功处理后提交 offsetconsumer.commitSync();} catch (Exception e) {// 处理失败不提交 offset,Kafka 会重试log.error("Failed to process message, will retry.", e);}}
}

4.消息重试和补偿机制

为了解决偶发性的消费失败,很多公司会为 Kafka 消费端加一个重试机制。

当消息处理失败时,重新将消息放回队列,或者放到一个死信队列(Dead Letter Queue, DLQ)里,然后专门处理这些异常消息。

// 如果消息处理失败,将其放回死信队列
try {processMessage(record);
} catch (Exception e) {producer.send(new ProducerRecord<>("dlq-topic", record.key(), record.value()));
}

这个方式虽然不能彻底避免消息丢失,但能保证消息不会轻易丢失,特别是一些重要业务场景中,消息的可靠性至关重要。

5.多副本存储

Kafka 还有一个核心功能,就是多副本机制,即消息在多个 broker 上都有副本。这样即使某个 broker 挂了,其他副本也能提供消息。

通过设置 `replication.factor` 参数,我们可以指定 Kafka 每条消息的副本数,确保即使一台机器挂了,消息也不会丢失。

# Kafka Topic 多副本配置
replication.factor=3
 

最后,真正发现消息丢失了,怎么办呢?这里有一些基本的补救措施:

1.检查消费端日志:首先要确定消息到底有没有消费。如果消费端日志显示消费失败,重新处理即可。

   

2.重发消息:如果消费端确实没处理成功,可以将消息重新发送到 Kafka,或者从备份中恢复并重放消息。

3.处理丢失后的补偿:业务上可能会涉及补偿措施,比如通知相关人员手动处理,或者对丢失的数据进行回补。

总之,消息丢失不算是特别常见的问题,但一旦遇到,还是需要冷静排查问题源头。

Kafka 等 Pub/Sub 中间件本身已经有比较强大的机制来应对这些场景,只要结合业务需求,做好监控和容错机制,基本都能把问题压到最小。

相关文章:

Kafka 消息丢失如何处理?

今天给大家分享一个在面试中经常遇到的问题&#xff1a;Kafka 消息丢失该如何处理&#xff1f; 这个问题啊&#xff0c;看似简单&#xff0c;其实里面藏着很多“套路”。 来&#xff0c;咱们先讲一个面试的“真实”案例。 面试官问&#xff1a;“Kafka 消息丢失如何处理&#x…...

Mysql报错注入之floor报错详解

updatexml extractvalue floor 是mysql的函数 groupbyrandfloorcount 一、简述 利用 select count(),(floor(rand(0)2))x from table group by x&#xff0c;导致数据库报错&#xff0c;通过 concat 函数&#xff0c;连接注入语句与 floor(rand(0)*2)函数&#xff0c;实现将…...

EPS原理笔记

EPS UE(user equipment)&#xff0c;移动用户设备 LTE(Long Term Evolution)&#xff0c;无线接入网部分&#xff0c;E-UTRAN EPC(system Architecture Evolution、Evoloed Packet Core)&#xff0c;核心网部分&#xff0c;主要包括MME、S-GW、P-GW、HSS&#xff0c;连接Intern…...

LeetCode 876. 链表的中间结点

题目描述: 给你单链表的头结点 head &#xff0c;请你找出并返回链表的中间结点。 如果有两个中间结点&#xff0c;则返回第二个中间结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[3,4,5] 解释&#xff1a;链表只有一个中间结点&#xff0…...

划界与分类的艺术:支持向量机(SVM)的深度解析

划界与分类的艺术&#xff1a;支持向量机&#xff08;SVM&#xff09;的深度解析 1. 引言 支持向量机&#xff08;Support Vector Machine, SVM&#xff09;是机器学习中的经典算法&#xff0c;以其强大的分类和回归能力在众多领域得到了广泛应用。SVM通过找到最优超平面来分…...

题目:100条经典C语言笔试题目(1-5)

题目&#xff1a; 1、请填写 bool , float, 指针变量 与“零值”比较的if 语句。 提示&#xff1a;这里“零值”可以是 0, 0.0 , FALSE 或者“空指针” 。例如 int 变量 n 与“零值”比较的 if 语句为&#xff1a; &#xff08;1&#xff09;请写出bool flag 与“零值”比较…...

python代码编写规范及注意事项

目录 1. 注意1.1 变量与常量解释&#xff1a;建议的修复&#xff1a; 1.2 Too many arguments 和 Too many local variables解决方案1. 减少参数数量2. 减少局部变量数量3. 调整 Pylint 配置 总结 1. 注意 1.1 变量与常量 解读下面的pylint问题 C0103: Constant name “file_p…...

【Linux】命令行参数 | 环境变量

&#x1fa90;&#x1fa90;&#x1fa90;欢迎来到程序员餐厅&#x1f4ab;&#x1f4ab;&#x1f4ab; 主厨&#xff1a;邪王真眼 主厨的主页&#xff1a;Chef‘s blog 所属专栏&#xff1a;青果大战linux 总有光环在陨落&#xff0c;总有新星在闪烁 前几天在搞硬件&…...

python 使用进程池并发执行 SQL 语句

这段代码使用了 Python 的 multiprocessing 模块来实现真正的并行处理&#xff0c;绕过 Python 的全局解释器锁&#xff08;GIL&#xff09;限制&#xff0c;从而在多核 CPU 上并发执行多个 SQL 语句。 from pyhive import hive import multiprocessing# 建立连接 conn hive.…...

我也谈AI

“随着人工智能技术的不断发展&#xff0c;我们已经看到了它在各行业带来的巨大变革。在医疗行业中&#xff0c;人工智能技术正在被应用于病例诊断、药物研发等方面&#xff0c;为医学研究和临床治疗提供了新的思路和方法&#xff1b;在企业中&#xff0c;人工智能技术可以通过…...

算法妙妙屋-------1.递归的深邃回响:二叉树的奇妙剪枝

大佬们好呀&#xff0c;这一次讲解的是二叉树的深度搜索&#xff0c;大佬们请阅 1.前言 ⼆叉树中的深搜&#xff08;介绍&#xff09; 深度优先遍历&#xff08;DFS&#xff0c;全称为DepthFirstTraversal&#xff09;&#xff0c;是我们树或者图这样的数据结构中常⽤的⼀种…...

编写第一个 Appium 测试脚本:从安装到运行!

前言 最近接到一个测试项目&#xff0c;简单描述一下&#xff0c;需求就是&#xff1a;一端发送指令&#xff0c;另一端接受指令并处理指令。大概看了看有上百条指令&#xff0c;点点点岂不是废了&#xff0c;而且后期迭代&#xff0c;每次都需要点点点&#xff0c;想想就头大…...

mysql查表相关练习

作业要求&#xff1a; 单表练习&#xff1a; 1 . 查询出部门编号为 D2019060011 的所有员工 2 . 所有财务总监的姓名、编号和部门编号。 3 . 找出奖金高于工资的员工。 4 . 找出奖金高于工资 40% 的员工。 5 找出部门编号为 D2019090011 中所有财务总监&#xff0c;和…...

airtest+poco多脚本、多设备批处理运行测试用例自动生成测试报告

一&#xff1a;主要内容 框架功能、框架架构及测试报告效果 airtest安装、环境搭建 框架搭建、框架运行说明 框架源码 二&#xff1a;框架功能及测试报告效果 1. 框架功能&#xff1a; 该框架笔者用来作为公司的项目的前端自动化&#xff0c;支持pc和app&#xff0c;本文…...

Prometheus套装部署到K8S+Dashboard部署详解

1、添加helm源并更新 helm repo add prometheus-community https://prometheus-community.github.io/helm-charts helm repo update2、创建namespace kubectl create namespace monitoring 3、安装Prometheus监控套装 helm install prometheus prometheus-community/prome…...

python使用pymysql

为了封装这个数据库操作为一个通用方法&#xff0c;我们可以创建一个函数&#xff0c;该函数接受数据库连接参数&#xff08;如主机名、用户名、密码、数据库名&#xff09;、SQL语句以及必要的参数&#xff08;用于参数化查询&#xff09;。下面是一个简单的封装示例&#xff…...

Vue3 + TypeScript 组件和文件命名规范及 setup 导入顺序规范

前言 在 Vue3 项目中&#xff0c;合理的文件命名规范和导入顺序不仅有助于提高代码的可读性&#xff0c;还能增强团队协作的效率。特别是在使用 TypeScript 和 Composition API 的项目中&#xff0c;清晰的组件和文件结构尤为重要。本文将详细介绍 Vue3 TypeScript 项目中的组…...

netty之处理连接源码分析

写在前面 在这篇文章看了netty服务是如何启动的&#xff0c;服务启动成功后&#xff0c;也就相当于是迎宾工作都已经准备好了&#xff0c;那么当客人来了怎么招待客人呢&#xff1f;也就是本文要看的处理连接的工作。 1&#xff1a;正文 先启动源码example模块的echoserver&a…...

Dockerfile文件编写

1、打nginx原始包 登录后复制 ROM nginxENV LANG zh_CN.UTF-8 ENV LC_ALL zh_CN.UTF-8 ENV TZ Asia/Singapore# 设置时区&#xff0c;同样保持在一层 RUN ln -sf /usr/share/zoneinfo/${TZ} /etc/localtime && \echo "${TZ}" > /etc/timezoneRUN apt-get …...

Oracle SQL 使用 ROWNUM 分页查询速度太慢的问题及解决方案!

在使用 Oracle 数据库进行数据查询时,分页查询是一种常见的需求。传统上,开发者常常使用 ROWNUM 来实现分页功能。 然而,当数据量较大时,使用 ROWNUM 进行分页查询可能会导致性能问题。本文将深入探讨这一问题的原因,并提供多种解决方案,以提高分页查询的性能。 一、RO…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

前端导出带有合并单元格的列表

// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

前端开发面试题总结-JavaScript篇(一)

文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包&#xff08;Closure&#xff09;&#xff1f;闭包有什么应用场景和潜在问题&#xff1f;2.解释 JavaScript 的作用域链&#xff08;Scope Chain&#xff09; 二、原型与继承3.原型链是什么&#xff1f;如何实现继承&a…...

【JavaSE】绘图与事件入门学习笔记

-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角&#xff0c;以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向&#xff0c;距离坐标原点x个像素;第二个是y坐标&#xff0c;表示当前位置为垂直方向&#xff0c;距离坐标原点y个像素。 坐标体系-像素 …...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

Element Plus 表单(el-form)中关于正整数输入的校验规则

目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入&#xff08;联动&#xff09;2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...