当前位置: 首页 > news >正文

Kafka重平衡导致无限循环消费问题

1. 问题描述

Kafka消费者消费消息超过了5分钟,不停的触发重平衡,消费者的offset因为重平衡提交失败,重复拉取消费,重复消费。

2. 问题原因

kafka默认的消息消费超时时间max.poll.interval.ms = 300000, 也就是5分钟,超过5分钟,会认为当前消费者已经死掉,会主动发起重平衡。

3. 解决方案

  1. 减小kafka一批次拉取的数据量: max.poll.records,加快消费者消费的速度,控制拉取的消息能在短时间内消费掉
  2. 拉取到消息之后异步处理(保证成功消费)
  3. 调大 拉取消息线程最长空闲时间:max.poll.interval.ms

重复消费有哪些原因

消息重复消费的根本原因都在于:已经消费了数据,但是offset没有成功提交。

offset提交失败,很大一部分原因在于发生了重平衡:

  1. 消费者宕机、重启等。导致消息已经消费但是没有提交offset。
  2. 消费者使用自动提交offset,但当还没有提交的时候,有新的消费者加入或者移除,发生了rebalance。再次消费的时候,消费者会根据提交的偏移量来,于是重复消费了数据。
  3. 消息处理耗时,或者消费者拉取的消息量太多,处理耗时,超过了max.poll.interval.ms的配置时间,导致认为当前消费者已经死掉,触发重平衡。

Kafka知识回顾

1. 常见配置参数
### broker
# offset 保留时间,默认为7天
offsets.retention.minutes=10080### producer
# 生产者会尝试将业务发送到相同的 Partition 的消息合包发送到 Broker,batch.size 设置合包的大小上限。默认为 16KB。batch.size 设太小会导致吞吐下降,设太大会导致内存使用过多。
batch.size=16384# Kafka producer 的 ack 有 3 种机制,分别说明如下:
# -1 或 all:Broker 在 leader 收到数据并同步给所有 ISR 中的 follower 后,才应答给 Producer 继续发送下一条(批)消息。 这种配置提供了最高的数据可靠性,只要有一个已同步的副本存活就不会有消息丢失。注意:这种配置不能确保所有的副本读写入该数据才返回,可以配合 Topic 级别参数 min.insync.replicas 使用。
# 0:生产者不等待来自 broker 同步完成的确认,继续发送下一条(批)消息。这种配置生产性能最高,但数据可靠性最低(当服务器故障时可能会有数据丢失,如果 leader 已死但是 producer 不知情,则 broker 收不到消息)
# 1:生产者在 leader 已成功收到的数据并得到确认后再发送下一条(批)消息。这种配置是在生产吞吐和数据可靠性之间的权衡(如果leader已死但是尚未复制,则消息可能丢失)
# 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置
acks=1# 控制生产请求在 Broker 等待副本同步满足 acks 设置的条件所等待的最大时间
timeout.ms=30000# 配置生产者用来缓存消息等待发送到 Broker 的内存。用户要根据生产者所在进程的内存总大小调节
buffer.memory=33554432# 当生产消息的速度比 Sender 线程发送到 Broker 速度快,导致 buffer.memory 配置的内存用完时会阻塞生产者 send 操作,该参数设置最大的阻塞时间
max.block.ms=60000# 设置消息延迟发送的时间(ms),这样可以等待更多的消息组成 batch 发送。默认为0表示立即发送。当待发送的消息达到 batch.size 设置的大小时,不管是否达到 linger.ms 设置的时间,请求也会立即发送
# 推荐用户根据实际使用场景,设置linger.ms在100~1000之间,更大的取值相对有更大的吞吐但会相应增加时延
linger.ms=100# 设置分区缓存消息量(bytes),达到该数值时生产者将批量的消息发送给broker。默认值为16384,过小的batch.size会增加发送请求次数可能会损耗性能和影响稳定性,用户根据实际场景,可适当增大该值。注:该值为上限值,当还未达到时若时间已经达到linger.ms时生产者会发送消息
batch.size=16384# 生产者能够发送的请求包大小上限,默认为1MB。在修改该值时注意不能超过 Broker 配置的包大小上限16MB
max.request.size=1048576# 压缩格式配置,目前 0.9(包含)以下版本不允许使用压缩,0.10(包含)以上不允许使用 GZip 压缩
compression.type=[none, snappy, lz4]# 客户端发送给 Broker 的请求的超时时间,不能小于 Broker 配置的 replica.lag.time.max.ms,目前该值为10000ms
request.timeout.ms=30000# 客户端在每个连接上最多可发送的最大的未确认请求数,该参数大于1且 retries 大于0时可能导致数据乱序。 希望消息严格有序时,建议客户将该值设置1
max.in.flight.requests.per.connection=5# 请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失
retries=0# 发送请求失败时到下一次重试请求之间的时间
retry.backoff.ms=100### Consumer
# 是否在消费消息后将 offset 同步到 Broker,当 Consumer 失败后就能从 Broker 获取最新的 offset, 如果使用 spring-kafka,即使设置false,也会自动提交,因为false表示的是kafka客户端放弃自动提交,把权利交给Spring框架,消息消费后Spring框架还是会帮你自动提交
enable.auto.commit=true# 当 auto.commit.enable=true 时,自动提交 Offset 的时间间隔,建议设置至少1000
auto.commit.interval.ms=5000# 当 Broker 端没有 offset(如第一次消费或 offset 超过7天过期)时如何初始化 offset,当收到 OFFSET_OUT_OF_RANGE 错误时,如何重置 Offset
# earliest:表示自动重置到 partition 的最小 offset
# latest:默认为 latest,表示自动重置到 partition 的最大 offset
# none:不自动进行 offset 重置,抛出 OffsetOutOfRangeException 异常
auto.offset.reset=latest# 标识消费者所属的消费分组
group.id=""# 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间
session.timeout.ms=10000# 使用 Kafka 消费分组机制时,消费者发送心跳的间隔。这个值必须小于 session.timeout.ms,一般小于它的三分之一
heartbeat.interval.ms=3000# 使用 Kafka 消费分组机制时,再次调用 poll 允许的最大间隔。如果在该时间内没有再次调用 poll,则认为该消费者已经失败,Broker 会重新发起 Rebalance 把分配给它的 partition 分配给其他消费者
max.poll.interval.ms=300000# Fecth 请求最少返回的数据大小。默认设置为 1B,表示请求能够尽快返回。增大该值会增加吞吐,同时也会增加延迟
fetch.min.bytes=1# Fetch 请求最多返回的数据大小,默认设置为 50MB
fetch.max.bytes=52428800# Fetch 请求等待时间
fetch.max.wait.ms=500# Fetch 请求每个 partition 返回的最大数据大小,默认为1MB
max.partition.fetch.bytes=1048576# 在一次 poll 调用中返回的记录数
max.poll.records=500# 客户端请求超时时间,如果超过这个时间没有收到应答,则请求超时失败
request.timeout.ms=305000
2. poll机制
  1. 每次poll的消息处理完成之后再进行下一次poll,是同步操作
  2. 每次poll之前检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移
  3. 每次poll时,consumer都将尝试使用上次消费的offset作为起始offset,然后依次拉取消息
  4. poll(long timeout),timeout指等待轮询缓冲区的数据所花费的时间,单位是毫秒
3. 重平衡 rebalance

将分区的所有权从一个消费者转移到其他消费者的行为称为再均衡(重平衡,rebalance)。

消费者通过向组织协调者(kafka broker)发送心跳来维护自己是消费者组的一员并确认其拥有的分区。对于不同不的消费群体来说,其组织协调者可以是不同的。只要消费者定期发送心跳,就会认为 消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心跳。

如果过了一段时间Kafka停止发送心跳了,会话(session)就会过期,组织协调者就会认为这个consumer已经死亡,就会触发一次重平衡。如果消费者宕机并且停止发送消息,组织协调者会等待几秒钟,确认它死亡了才会触发重平衡。在这段时间里,死亡的消费者将不处理任何消息。在清理消费者时,消费者将通知协调者它要离开群组,组织协调者会触发一次重平衡,尽量降低处理停顿。

重平衡是一把双刃剑,它为消费者群组带来高可用性和伸缩性的同时,还有有一些明显的缺点(bug),而这些 bug 到现在社区还无法修改。也就是说,在重平衡期间,消费者组中的消费者实例都会停止消费(Stop The World),等待重平衡的完成。而且重平衡这个过程很慢。

触发重平衡的三种情况:
1. 消费者组内成员发生变更,这个变更包括了增加和减少消费者。注意这里的减少有很大的可能是被动的,就是某个消费者崩溃退出了
2. 主题的分区数发生变更,分区增加或者减少时就会触发重平衡
3. 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡

重平衡异常:
max.poll.interval.ms,两次拉取消息的间隔,默认5分钟;通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行重平衡操作(将分区分配给组内其他消费者成员)若超过这个时间则报如下异常:

>org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records
4. 重复消费的解决方案

由于网络问题,重复消费不可避免,因此,消费者需要实现消费幂等。

方案:

  1. 消息表
  2. 数据库唯一索引
  3. 缓存消费过的消息id
5. 消费者消费异常,会重试消费吗
  1. enable.auto.commit=truekafka消费者拉取到消息后,即使消费异常也会自动提交offset,不会重新消费 .
  2. enable.auto.commit=false
    2.1 手动提交offset,提交offset之前抛异常,则不会提交offset,因此会再次重新消费
    2.2 spring-kafka帮忙提交offset,不手动提交,spring框架会在最后提交offset,因此消费过程中抛异常,也不会提交offset,因此会再次重新消费

相关文章:

Kafka重平衡导致无限循环消费问题

1. 问题描述 Kafka消费者消费消息超过了5分钟,不停的触发重平衡,消费者的offset因为重平衡提交失败,重复拉取消费,重复消费。 2. 问题原因 kafka默认的消息消费超时时间max.poll.interval.ms 300000, 也就是5分钟,…...

执行shell脚本时为什么要写成./test.sh,而不是test.sh?

一定要写成 ./test.sh,而不是 test.sh 运行其它二进制的程序也一样! 直接写 test.sh,linux 系统会去 PATH (系统环境)里寻找有没有叫 test.sh 的! 而只有 /bin, /sbin, /usr/bin,/usr/sbin 这…...

【人工智能】第一部分:ChatGPT的基本概念和技术背景

人不走空 🌈个人主页:人不走空 💖系列专栏:算法专题 ⏰诗词歌赋:斯是陋室,惟吾德馨 目录 🌈个人主页:人不走空 💖系列专栏:算法专题 ⏰诗词歌…...

雪花算法详解及源码分析

雪花算法的简介: 雪花算法用来实现全局唯一ID的业务主键,解决分库分表之后主键的唯一性问题,所以就单从全局唯一性来说,其实有很多的解决方法,比如说UUID、数据库的全局表的自增ID 但是在实际的开发过程中&#xff0…...

Golang TCP网络编程

文章目录 网络编程介绍TCP网络编程服务器监听客户端连接服务器服务端获取连接向连接中写入数据从连接中读取数据关闭连接/监听器 简易的TCP回声服务器效果展示服务端处理逻辑客户端处理逻辑 网络编程介绍 网络编程介绍 网络编程是指通过计算机网络实现程序间通信的一种编程技术…...

先进制造aps专题十 aps项目成功指南

aps项目成功指南 为了保证aps项目的成功 现在国内的aps项目 一是看aps软件本身是不是实现了复杂的排程算法和优化算法,算法引擎使用c高性能编译语言开发,支持工序的复杂关系,考虑副资源约束和特殊规格约束,提供了能考虑各种约束…...

实现Dropdown下拉菜单监听键盘上下键选中功能-React

用过ant design的小伙伴都知道,select组件是支持联想搜索跟上下键选中的效果的,但是在项目中我们可能会遇到用select组件无法实现我们的需求的情况,比如说一个div框,里面有input,又有tag标签,在input中输入…...

Ubuntu系统升级k8s节点的node节点遇到的问题

从1.23版本升级到1.28版本 node节点的是Ubuntu系统20.04的版本 Q1 node节点版本1.23升级1.28失败 解决办法: # 改为阿里云镜像 vim /etc/apt/sources.list.d/kubernetes.list# 新增 deb https://mirrors.aliyun.com/kubernetes/apt/ kubernetes-xenial main# 执…...

前端将DOM元素导出为图片

前端工作中经常会用到把一些元素导出,比如表格,正好项目有遇到导出为excel和导出为图片,就都封装实现了一下,以供其他需求的开发者使用: 1.导出为文档 这个说白了就是下载的功能,传过去检索参数&#xff…...

变现 5w+,一个被严重低估的 AI 蓝海赛道,居然用这个免费的AI绘画工具就能做!

大家好,我是画画的小强,致力于分享各类的 AI 工具,包括 AI 绘画工具、AI 视频工具、AI 写作工具等等。 但单纯地为了学而学,是没有任何意义的。 这些 AI 工具,学会了,用起来,才能发挥出他们的…...

Ubuntu server 24 (Linux) 安装部署smartdns 搭建智能DNS服务器

SmartDNS是推荐本地运行的DNS服务器,SmartDNS接受本地客户端的DNS查询请求,从多个上游DNS服务器获取DNS查询结果,并将访问速度最快的结果返回给客户端,提高网络访问速度和准确性。 支持指定域名IP地址,达到禁止过滤的效…...

正点原子[第二期]Linux之ARM(MX6U)裸机篇学习笔记-24.5,6 SPI驱动实验-ICM20608 ADC采样值

前言: 本文是根据哔哩哔哩网站上“正点原子[第二期]Linux之ARM(MX6U)裸机篇”视频的学习笔记,在这里会记录下正点原子 I.MX6ULL 开发板的配套视频教程所作的实验和学习笔记内容。本文大量引用了正点原子教学视频和链接中的内容。…...

安装vllm的时候卡主:Collecting vllm-nccl-cu12<2.19,>=2.18 (from vllm)

按照vllm的时候卡主: ... Requirement already satisfied: typing-extensions in /home/wangguisen/miniconda3/lib/python3.10/site-packages (from vllm) (4.9.0) Requirement already satisfied: filelock>3.10.4 in /home/wangguisen/miniconda3/lib/python…...

O2O : Finetuning Offline World Models in the Real World

CoRL 2023 Oral paper code Intro 算法基于TD-MPC,利用离线数据训练世界模型,然后在线融合基于集成Q的不确定性估计实现Planning。得到的在线数据将联合离线数据共同训练目标策略。 Method TD-MPC TD-MPC由五部分构成: 状态特征提取 z h θ ( s ) …...

嵌入式学习(Day:31 网络编程2:TCP)

client, server browser b/s http p2p peer TCP的特征:1.有链接;2.可靠传输;3.流式套接字 1、模式 C/S 模式 》服务器/客户端模型(服务端1个,客户端很多个) server:socket()-->bind()---…...

正则表达式 0.1v

正则表达式 扩展 --> :% s/\///g //文件里面所有的 / 去掉 * 通配符 \ //转义,让字符变成原本的意思 ^ //行首 $ //行尾 [0-9] //数字 [a-z] //小写字母 [A-Z] //大写字母 把文件的小写字母替换为大写字母? 固定写法 :% s/[a-…...

免费的仓库出入库管理软件有哪些?

中小企业因为预算有限,所以希望能在出入库管理软件方面能够减少成本。 但我们必须清醒地认识到,所谓的“永久免费”往往只是一个幌子。这些软件要么是新上市的、功能尚未完善的产品,试图通过免费吸引用户试用;要么在数据安全和客…...

python 办公自动化-生成ppt文本和图

最终样式 代码实现 # 可编辑折线写入文字 成功 # 问题: 设置字体类型和加粗和字体为微软雅黑,是只改了字母和数字的字体,中文没变化 pip install pptx_ea_font 这个库可以解决这个问题 import pandas as pd import pptx_ea_font import mat…...

「动态规划」买卖股票的最佳时机

力扣原题链接,点击跳转。 给定一个整数数组prices,prices[i]表示股票在第i天的价格。你最多完成2笔交易。你不能同时参与多笔交易(你必须在再次购买前出售掉之前的股票)。设计一个算法计算最大利润。 我们用动态规划的思想来解决…...

Java 并发编程面试二

目录 一、并发编程三要素? 二、实现可见性的方法有哪些? 三、多线程的价值? 四、创建线程的有哪些方式? 五、创建线程的三种方式的对比? 六、Java 线程具有五中基本状态 七、什么是线程池?有哪几种创建方式 八、四种线程池的创建 九、线程池的优点? 十、常用的…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞!!! 抽象 现代智能交通系统 (ITS) 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 (…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...

iOS 26 携众系统重磅更新,但“苹果智能”仍与国行无缘

美国西海岸的夏天,再次被苹果点燃。一年一度的全球开发者大会 WWDC25 如期而至,这不仅是开发者的盛宴,更是全球数亿苹果用户翘首以盼的科技春晚。今年,苹果依旧为我们带来了全家桶式的系统更新,包括 iOS 26、iPadOS 26…...

线程与协程

1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...

1.3 VSCode安装与环境配置

进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上,看到基于小智 AI DIY 玩具的演示,感觉有点意思,想着自己也来试试。 如果只是想烧录现成的固件,乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外,还提供了基于网页版的 ESP LA…...

HBuilderX安装(uni-app和小程序开发)

下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...

MySQL账号权限管理指南:安全创建账户与精细授权技巧

在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则&#xf…...

Git 3天2K星标:Datawhale 的 Happy-LLM 项目介绍(附教程)

引言 在人工智能飞速发展的今天,大语言模型(Large Language Models, LLMs)已成为技术领域的焦点。从智能写作到代码生成,LLM 的应用场景不断扩展,深刻改变了我们的工作和生活方式。然而,理解这些模型的内部…...