Kafka 如何保证不重复消费
在消息队列的使用场景中,避免消息重复消费是保障数据准确性和业务逻辑正确性的关键。对于 Kafka 而言,保证不重复消费并非单一机制就能实现,而是需要从生产者、消费者以及业务层等多个维度协同配合。接下来,我们将结合图文详细解析 Kafka 保证不重复消费的核心策略与实现方式。
一、消费者端:精确控制偏移量提交
在 Kafka 中,偏移量(Offset)是标识分区内消息位置的关键要素,消费者通过提交偏移量来标记已消费的消息位置。而合理管理偏移量提交,是避免重复消费的重要一环。
1.1 禁用自动提交,启用手动提交
自动提交偏移量(enable.auto.commit=true)是 Kafka 消费者的默认设置,但这种方式存在风险。因为自动提交可能在消息尚未完全处理完成时就执行,一旦消费者在此期间出现故障,重启后就会从已提交的偏移量位置开始消费,导致部分消息被重复处理。因此,为了更精确地控制消费进度,我们通常会禁用自动提交,改用手动提交。
props.put("enable.auto.commit", "false"); // 禁用自动提交
1.2 手动提交的正确时机
手动提交偏移量需要确保在消息完全处理成功后进行。以下是一段示例代码,展示了手动提交的逻辑:
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processMessage(record); // 处理消息
}
consumer.commitSync(); // 批量提交偏移量(仅当所有消息处理完成)
} catch (Exception e) {
// 处理失败,不提交偏移量,重启后重新消费
}
在上述代码中,只有当processMessage(record)方法成功处理完所有拉取到的消息后,才会调用consumer.commitSync()提交偏移量。如果在处理过程中出现异常,偏移量不会被提交,消费者重启后将重新消费这些消息,从而保证消息至少被处理一次(At-Least-Once)。结合后续的去重逻辑,即可实现不重复消费(Exactly-Once)。
1.3 异步提交与回调处理
除了同步提交,Kafka 还支持异步提交偏移量,通过consumer.commitAsync()方法实现。异步提交不会阻塞线程,适用于对实时性要求较高的场景。不过,异步提交存在并发问题,例如旧偏移量可能覆盖新偏移量。因此,通常会搭配回调函数处理提交失败的情况:
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed: {}", exception.getMessage());
// 可重试或记录日志
}
});
消费者偏移量提交逻辑示意图如下:
二、生产者端:幂等性与事务机制
如果生产者重复发送消息,即便消费者端精确管理了偏移量,仍然可能导致重复消费。为此,Kafka 在生产者端提供了幂等性和事务机制来解决这一问题。
2.1 幂等性生产者
幂等性生产者(Idempotent Producer)是 Kafka 从 0.11.0.0 版本开始引入的特性。其核心原理是 Kafka 为每个生产者分配唯一的Producer ID(PID),并为每条消息生成递增的Sequence Number。当生产者因网络问题等原因重复发送同一消息时,Broker 会根据 PID 和 Sequence Number 过滤掉重复消息,确保相同消息仅被写入一次。
开启幂等性生产者非常简单,只需在生产者配置中设置:
props.put("enable.idempotence", "true"); // 默认为 true
不过,需要注意的是,幂等性生产者仅能保证单分区内的幂等性,无法跨分区或跨会话保证消息不重复。
2.2 事务性生产者
对于需要跨分区或跨会话保证消息不重复的场景,就需要使用事务性生产者(Transactional Producer)。事务性生产者通过Transactional ID将多个分区的消息写入操作封装为一个原子操作,确保这些操作要么全部成功,要么全部回滚。
事务性生产者的关键操作步骤如下:
- 初始化事务:producer.beginTransaction();
- 发送消息到多个分区:producer.send(...);
- 提交事务:producer.commitTransaction();
- 若中途失败,回滚事务:producer.abortTransaction();
通过事务性生产者,即使生产者重启,新实例也能通过相同的 Transactional ID 继承旧 PID,避免重复消息的产生。同时,配合消费者的偏移量管理,能够实现端到端的不重复消费语义。
生产者幂等性与事务机制示意图如下:
三、业务层:添加去重逻辑
尽管 Kafka 在生产者和消费者端提供了多种机制来避免重复消费,但在一些极端情况下,例如下游系统处理消息时出现异常重试,仍然可能导致重复数据。因此,在业务层添加去重逻辑是保证不重复消费的最后一道防线。
3.1 为消息添加唯一标识
一种常见的去重方式是为每条消息添加唯一标识,例如 UUID。消费者在处理消息时,首先检查本地是否已处理过该标识的消息。如果已处理,则直接跳过;否则,进行正常的消息处理流程,并在处理完成后将该标识记录下来。
3.2 利用数据库特性
在将消息写入数据库时,可以利用数据库的特性实现去重。例如,在 MySQL 中使用INSERT IGNORE语句,当插入重复数据时,数据库会自动忽略该操作;或者结合版本号(Version)或时间戳(Timestamp)实现乐观锁,确保同一数据不会被重复更新。
以下是一个简单的伪代码示例,展示了业务层去重逻辑:
void processMessage(ConsumerRecord record) {
String messageId = record.value().getMessageId();
if (isProcessed(messageId)) { // 检查本地缓存或数据库
return; // 已处理,跳过
}
saveToDatabase(record.value()); // 写入业务系统
markAsProcessed(messageId); // 标记为已处理
}
四、不同场景下的配置组合与实践建议
在实际应用中,需要根据具体的业务场景选择合适的配置组合来保证不重复消费:
场景 | 生产者配置 | 消费者配置 | 去重方式 |
单分区,不跨会话 | 开启幂等性(默认) | 手动提交偏移量 | 可选(幂等性已保障) |
多分区,需跨会话 | 开启事务性(transactional.id) | 手动提交偏移量 + 事务性消费 | 可选(事务机制保障) |
下游系统无去重能力 | 幂等性 / 事务性 + 消息唯一标识 | 手动提交偏移量 | 业务层去重(必选) |
此外,在实际操作中还应注意以下几点:
- 监控消费者的consumer_lag(消费滞后量)和生产者的transactional_id_expiry(事务 ID 过期时间)等关键指标,及时发现潜在问题。
- 合理调整max.in.flight.requests.per.connection等参数,控制未确认请求数,避免重试时出现消息乱序。
Kafka 保证不重复消费是一个多机制协同工作的过程,需要从生产者、消费者和业务层等多个层面综合考虑和配置。通过正确运用这些机制和策略,能够在分布式消息处理场景中高效、可靠地避免重复消费,确保数据的准确性和业务的稳定性。
相关文章:

Kafka 如何保证不重复消费
在消息队列的使用场景中,避免消息重复消费是保障数据准确性和业务逻辑正确性的关键。对于 Kafka 而言,保证不重复消费并非单一机制就能实现,而是需要从生产者、消费者以及业务层等多个维度协同配合。接下来,我们将结合图文详细解析…...
SpringBoot整合MyBatis完整实践指南
在Java企业级应用开发中,SpringBoot和MyBatis的组合已经成为主流的技术选型方案之一。本文将详细介绍如何从零开始搭建一个基于SpringBoot和MyBatis的项目,包括环境配置、数据库设计、实体类创建、Mapper接口编写以及实际应用等完整流程。 一、环境准备…...

RNN结构扩展与改进:从简单循环网络到时间间隔网络的技术演进
本文系统介绍 RNN 结构的常见扩展与改进方案。涵盖 简单循环神经网络(SRN)、双向循环神经网络(BRNN)、深度循环神经网络(Deep RNN) 等多种变体,解析其核心架构、技术特点及应用场景,…...
docker中,容器时间和宿机主机时间不一致问题
win11下的docker中有个mysql。今天发现插入数据的时间不正确。后来发现原来是docker容器中的时间不正确。于是尝试了各种修改,什么run -e TZ"${tzutil /g}",TZ"Asia/Shanghai",还有初始化时带--mysqld一类的,…...
Unity Shader编程】之高级纹理
一,立方体纹理 Cubemap 用途 用途说明反射贴图表面镜面高光或金属反射环境光采样模拟环境对物体的影响天空盒背景使用六张图拼接场景背景全景投影做360度相机渲染、投影等 二,创建立方体纹理 在 Unity 中创建和保存一个 立方体纹理(Cubema…...

类 Excel 数据填报
类 Excel 填报模式,满足用户 Excel 使用习惯 数据填报,可作为独立的功能模块,用于管理业务流程、汇总采集数据,以及开发各类数据报送系统,因此,对于报表工具而言,其典型场景之一就是利用报表模…...
vscode调试stm32,Cortex Debug的配置文件lanuch.json如何写,日志
https://blog.csdn.net/jiladahe1997/article/details/122046665 https://discuss.em-ide.com/blog/67-cortex-debug 第一版 {// 使用 IntelliSense 了解相关属性。 // 悬停以查看现有属性的描述。// 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?li…...

Office文档图片批量导出工具
软件介绍 本文介绍一款专业的Office文档图片批量导出工具。 软件特点 这款软件能够批量导出Word、Excel和PPT中的图片,采用绿色单文件设计,体积小巧仅344KB。 基本操作流程 使用方法十分简单:直接将Word、Excel或PPT文件拖入软件…...

【iOS】ARC 与 Autorelease
ARC 与 Autorelease 文章目录 ARC 与 Autorelease前言何为ARC内存管理考虑方式自己生成的对象,自己持有非自己生成的对象,自己也可以持有不再需要自己持有的对象时释放非自己持有的对象无法释放 ARC的具体实现编译期和运行期ARC做的事情ARC实现: __autoreleasing 与 Autoreleas…...
人工智能在智能零售中的创新应用与未来趋势
随着电子商务的蓬勃发展和消费者需求的不断变化,零售行业正面临着前所未有的挑战和机遇。智能零售作为零售行业的重要发展方向,通过引入人工智能(AI)、物联网(IoT)、大数据和云计算等前沿技术,正…...
业务材料——半导体行业MES系统核心功能工业协议AI赋能
一、前置概念 半导体行业 半导体行业主要生产基于半导体材料(如硅、锗、化合物半导体等)的电子元器件及相关产品,广泛应用于计算、通信、能源、医疗等领域。 MES系统 MES系统(Manufacturing Execution System,制造…...
docker部署命令行 — 启动一个 MySQL 数据库服务 并且把它的数据存储挂载到卷(volume)里
挂载卷的配置写法: version: "3" services:db:image: mysqlvolumes:- mysql_data:/var/lib/mysqlvolumes:mysql_data:这段 docker-compose.yml 配置非常典型,是用来启动一个 MySQL 数据库服务 并且把它的数据存储挂载到卷(volume&…...

铁电液晶破局 VR/AR:10000PPI 重构元宇宙显示体验
一、VR/AR 沉浸感困境:传统显示技术的天花板在哪? (一)纱窗效应与眩晕感:近眼显示的双重枷锁 当用户戴上 VR 头显,眼前像素网格形成的 “纱窗效应” 瞬间打破沉浸感。传统液晶 500-600PPI 的像素密度&…...
2025年微信小程序开发:AR/VR与电商的最新案例
引言 微信小程序自2017年推出以来,已成为中国移动互联网生态的核心组成部分。根据最新数据,截至2025年,微信小程序的日活跃用户超过4.5亿,总数超过430万,覆盖电商、社交、线下服务等多个领域(WeChat Mini …...
从零开始,学会上传,更新,维护github仓库
以下是一份从头到尾、覆盖安装、配置、创建仓库、上传项目到 GitHub 的完整教程。全程使用通用示例,不包含任何具体的仓库链接,仅供参考。 一、准备工作 1. 注册 GitHub 账号 打开浏览器,访问 GitHub 官网(输入 “GitHub” 即可找…...
#STM32 HAL库实现的STM32F407时钟配置程序以及和STM32F103配置对比
以下是使用STM32 HAL库实现的STM32F407时钟配置完整代码(基于8MHz外部晶振,配置为168MHz系统时钟),包含详细注释和关键点说明: 完整HAL库实现(system_stm32f4xx.c main.c) 1. 首先在stm32f4xx…...

竞争加剧,美团的战略升维:反内卷、科技与全球化
5月26日,美团发布2025年第一季度业绩报告,交出了一份兼具韧性与创新性的成绩单。 报告显示,公司一季度总营收866亿元,同比增长18%;核心本地商业收入643亿元,同比增长18%;季度研发投入58亿元&a…...

(17)课36:窗口函数的例题:例三登录时间与连续三天登录,例四球员的进球时刻连续进球。
(89)例三登录时间 : 保留代码版本 : CREATE TABLE sql_8( user_id varchar(2), login_date date ); insert into sql_8(user_id,login_date) values(A,2024-09-02),(A,2024-09-03),(A,2024-09-04),(B,2023-11-25),(B,2023-12- 3…...

高性能分布式消息队列系统(二)
上一篇博客将C进行实现消息队列的用到的核心技术以及环境配置进行了详细的说明,这一篇博客进行记录消息队列进行实现的核心模块的设计 五、项目的需求分析 5.1、项目框架的概念性理解 5.1.1、消息队列的设计和生产消费者模型的关系 在现代系统架构中,…...
Spring 官方推荐构造函数注入
1. 依赖关系明确 构造函数注入可以清晰地声明类的依赖关系,所有必需的依赖项都通过构造函数参数传递,使得代码的可读性更高。这种方式让类的使用者能够直观地了解类的依赖,而不需要通过注解或反射来猜测。 2. 增强代码健壮性 构造函数注入…...

华为OD机试真题——天然蓄水库(2025A卷:200分)Java/python/JavaScript/C++/C语言/GO六种最佳实现
2025 A卷 200分 题型 本文涵盖详细的问题分析、解题思路、代码实现、代码详解、测试用例以及综合分析; 并提供Java、python、JavaScript、C++、C语言、GO六种语言的最佳实现方式! 2025华为OD真题目录+全流程解析/备考攻略/经验分享 华为OD机试真题《天然蓄水库》: 目录 题目…...

【Harmony OS】数据存储
目录 数据存储概述 首选项数据存储 关系型数据库 数据存储概述 • 数据存储 是为了解决应用数据持久化问题,使得数据能够存储在外存中,达到保存或共享目的。 • 鸿蒙应用数据存储包括 本地数据存储 和 分布式数据存储 。 • 本地数据存储 为应用…...

MybatisPlus--核心功能--service接口
Service接口 基本用法 MyBatisPlus同时也提供了service接口,继承后一些基础的增删改查的service代码,也不需要去书写。 接口名为Iservice,而Iservice也继承了IRepository,这里提供的方法跟BaseMapper相比只多不少,整…...

uniapp调试,设置默认展示的toolbar内容
uniapp调试,设置默认展示的toolbar内容 设置pages.json中 pages数组中 json的顺序就可以只需要调整顺序,不会影响该bar在页面中的显示默认展示第一条page...

笔记本电脑开机无线网卡自动禁用问题
1.问题环境 电脑品牌:华硕笔记本天选4 电脑型号:FX507VV 电脑系统:windows 11_x64_24h2 文档编写时间:2025年6月 2.问题现象 1. 笔记本电脑开机之后自动禁用无线网卡 使用USB转RJ45转接头同样无效,这个网卡也给禁…...

推荐一款使用html开发桌面应用的工具——mixone
简介 mixone是开发桌面应用(Win、Mac、Linux)的一款工具、其基于electron实现。其拥有简单的工程结构。以为熟悉前端开发的程序员可以很轻松的开发出桌面应用,它比electron的其他框架更简单,因为那些框架基本上还需要了解electro…...
支持TypeScript并打包为ESM/CommonJS/UMD三种格式的脚手架项目
支持TypeScript并打包为ESM、CommonJS和UMD三种格式的脚手架项目 码云地址 NODE 版本要求 node v16.17.1 npm 8.15.0 设置淘宝镜像 npm set registry https://registry.npmjs.org/ cnpm set registry https://registry.npmjs.org/安装依赖 npm install打包 npm run build…...

【云原生开发】如何通过client-go来操作K8S集群
✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,…...

八.MySQL复合查询
一.基本查询回顾 分组统计 group by 函数作用示例语句说明count(*)统计记录条数select deptno, count(*) from emp group by deptno;每个部门有多少人?sum(sal)某字段求和select deptno, sum(sal) from emp group by deptno;每个部门总工资avg(sal)求平均值select…...
cacti导出的1分钟监控数据csv文件读取并按5分钟求平均值,然后计算95计费值,假设31天的月份
cacti导出的1分钟监控数据csv文件读取并按5分钟求平均值,然后计算95计费值,假设31天的月份 import pandas as pd import openpyxl from openpyxl.styles import Font from openpyxl.utils.dataframe import dataframe_to_rows import os import chardet…...