当前位置: 首页 > news >正文

记一次Kafka重复消费解决过程

        起因:车联网项目开发,车辆发生故障需要给三个系统推送消息,故障上报较为频繁,所以为了不阻塞主流程,采用了使用kafka。消费方负责推送并保存推送记录,但在一次压测中发现,实际只发生了10次故障,但是推送记录却有30多条。

        问题排查,发现是因为其中一个系统宕机,导致往这个系统推送消息时,一直连接超时,导致每条消息的推送时长被拉长。而且kafka消息拉取参数max-poll-records设置了500,意味着一次会批量拉取500条消息到本地处理,而max.poll.interval.ms参数默认是5分钟,当500条消息处理时长超过5分钟后,就会认为消费者死掉了,触发再均衡,导致同一个消息被重复消费。

解决:

        主要是提高消费者的处理速度,避免不必要的Rebalance。主要采用2种措施:

  1. 减少每次拉去消息数max-poll-records,从500,降到20
  2. 拉取到消息之后异步处理(创建线程池,对推送消息的部分利用多线程处理)

常见配置

fetch.min.byte:配置Consumer一次拉取请求中能从Kafka中拉取的最小数据量,默认为1B,如果小于这个参数配置的值,就需要进行等待,直到数据量满足这个参数的配置大小。调大可以提交吞吐量,但也会造成延迟

fetch.max.bytes,一次拉取数据的最大数据量,默认为52428800B,也就是50M,但是如果设置的值过小,甚至小于每条消息的值,实际上也是能消费成功的

fetch.wait.max.ms,若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间,默认是500ms

max.poll.records,单次poll调用返回的最大消息记录数,如果处理逻辑很轻量,可以适当提高该值。一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完,默认值为500

consumer.poll(100) ,100 毫秒是一个超时时间,一旦拿到足够多的数据(fetch.min.bytes 参数设置),consumer.poll(100)会立即返回 ConsumerRecords<String, String> records。如果没有拿到足够多的数据,会阻塞100ms,但不会超过100ms就会返回

max.poll.interval.ms,两次拉取消息的间隔,默认5分钟;通过消费组管理消费者时,该配置指定拉取消息线程最长空闲时间,若超过这个时间间隔没有发起poll操作,则消费组认为该消费者已离开了消费组,将进行再均衡操作(将分区分配给组内其他消费者成员)

若超过这个时间则报如下异常:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has alreadyrebalanced and assigned the partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either by increasing the session timeout or byreducing the maximum size of batches returned in poll() with max.poll.records. 

  即:无法完成提交,因为组已经重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多的时间来处理消息。

可以通过增加max.poll.interval.ms来解决这个问题,也可以通过减少在poll()中使用max.poll.records返回的批的最大大小来解决这个问题。

max.partition.fetch.bytes:该属性指定了服务器从每个分区返回给消费者的最大字节数,默认为 1MB。

session.timeout.ms:消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s,将触发再均衡操作。

对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:

  1. 维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

  2. 协调Group成员的行为。

poll机制

  •    每次poll的消息处理完成之后再进行下一次poll,是同步操作
  •    每次poll之前检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移
  •    每次poll时,consumer都将尝试使用上次消费的offset作为起始offset,然后依次拉取消息
  •    poll(long timeout),timeout指等待轮询缓冲区的数据所花费的时间,单位是毫秒

相关文章:

记一次Kafka重复消费解决过程

起因&#xff1a;车联网项目开发&#xff0c;车辆发生故障需要给三个系统推送消息&#xff0c;故障上报较为频繁&#xff0c;所以为了不阻塞主流程&#xff0c;采用了使用kafka。消费方负责推送并保存推送记录&#xff0c;但在一次压测中发现&#xff0c;实际只发生了10次故障&…...

人工智能在公检系统中的应用:校对软件助推刑事侦查工作

人工智能在公检系统中的应用&#xff0c;尤其是校对软件的应用&#xff0c;可以有效地助推刑事侦查工作。 以下是校对软件在刑事侦查工作中的一些应用方面&#xff1a; 1.自动校对和纠错&#xff1a;校对软件可以自动检测和纠正刑事侦查报告中的语法、拼写和标点错误等问题。通…...

OSI七层模型和TCP/IP四层模型

OSI七层模型和TCP/IP四层模型 七层模型(OSI) OSI七层模型&#xff08;Open Systems Interconnection Reference Model&#xff09;是一个用于计算机网络体系结构的标准化框架&#xff0c;旨在定义网络通信中不同层次的功能和协议。 各个层次具体如下&#xff1a; 物理层&am…...

vant金额输入框

1.在components中新建文件夹currency&#xff0c;新建index.js import Currency from ./src/currency.vueCurrency.install function (Vue) {Vue.component(Currency.name, Currency) }export default Currency 2.在currency中新建文件夹src&#xff0c;在src中间currency.v…...

uni-app base64转图片

pathToBase64 pathToBase64(path).then(base64 > {console.log(base64)}).catch(error > {console.error(error)})base64ToPath base64ToPath(base64).then(path > {console.log(path)}).catch(error > {console.error(error)})首先将插件引入项目。按照image-to…...

【webpack】自定义loader

&#x1f4dd;个人主页&#xff1a;爱吃炫迈 &#x1f48c;系列专栏&#xff1a;前端工程化 &#x1f9d1;‍&#x1f4bb;座右铭&#xff1a;道阻且长&#xff0c;行则将至&#x1f497; 文章目录 loaderloader引入方式loader传入/接收参数传入参数接收参数 loader返回值retur…...

【kubernetes】在k8s集群环境上,部署kubesphere

部署kubesphere 学习于尚硅谷kubesphere课程 前置环境配置-部署默认存储类型 这里使用nfs #所有节点安装 yum install -y nfs-utils# 在master节点执行以下命令 echo "/nfs/data/ *(insecure,rw,sync,no_root_squash)" > /etc/exports # 执行以下命令&#xff…...

STM32 F103C8T6学习笔记4:时钟树、滴答计时器、定时器定时中断

今日理解一下STM32F103 C8T6的时钟与时钟系统、滴答计时器、定时器计时中断的配置&#xff0c;文章提供原理&#xff0c;代码&#xff0c;测试工程下载。 目录 时钟树与时钟系统&#xff1a; 滴答计时器&#xff1a; 定时器计时中断&#xff1a; 测试结果&#xff1a; 测…...

代理模式【Proxy Pattern】

什么是代理模式呢&#xff1f;我很忙&#xff0c;忙的没空理你&#xff0c;那你要找我呢就先找我的代理人吧&#xff0c;那代理人总要知道 被代理人能做哪些事情不能做哪些事情吧&#xff0c;那就是两个人具备同一个接口&#xff0c;代理人虽然不能干活&#xff0c;但是被 代…...

Oracle切割字符串的方法,SQL语句完成。

Oracle用正则的方式循环切割字符串 需求&#xff1a;有一个这样子的 Str “‘CNJ-520-180500000001|CNJ-520-181200000001|CNJ-520-190300000001|CNJ-520-190100000001|CNJ-520-181200000002’” &#xff0c;然后我需要拿到每一个单号&#xff0c;每一个单号都要走一遍固定的…...

Https、CA证书、数字签名

Https Http协议 Http协议是目前应用比较多应用层协议&#xff0c;浏览器对于Http协议已经实现。Http协议基本的构成部分有 请求行 &#xff1a; 请求报文的第一行请求头 &#xff1a; 从第二行开始为请求头内容的开始部分。每一个请求头都是由K-V键值对组成。请求体&#xf…...

Jmeter-压测时接口按照顺序执行-临界部分控制器

文章目录 临界部分控制器存在问题 临界部分控制器 在进行压力测试时&#xff0c;需要按照顺序进行压测&#xff0c;比如按照接口1、接口2、接口3、接口4 进行执行 查询结果是很混乱的&#xff0c;如果请求次数少&#xff0c;可能会按照顺序执行&#xff0c;但是随着次数增加&a…...

linux 文件权限识别及其修改

一、文件权限认识 在 Linux 系统中&#xff0c;一切皆文件&#xff0c;目录也是一种文件形式叫目录文件&#xff0c;它们的属性主要包含&#xff1a;索引节点(inode)&#xff0c;类型、权限属性、链接数、所归属的用户和用户组、最近修改时间等内容。 如下为根目录下目录&…...

Java:简单算法:冒泡排序、选择排序、二分查找

冒泡排序 // 1、准备一个数组 int[] arr {5&#xff0c;2&#xff0c;3&#xff0c;1};//2、定义一个循环控制排几轮 for (int i 0; i < arr.length - 1; i) { // i 0 1 2 【5&#xff0c;2&#xff0c;3&#xff0c;1】 次数 // i 0 第一轮 0 1 2 …...

C、C++项目中 configure、makefile.am、makefile.in、makefile 之间的关系

一、configure、makefile.am、makefile.in、makefile 之间的关系 这四个文件都是与 GNU Make&#xff08;一个用于管理程序的编译和安装过程的工具&#xff09;有关的文件&#xff0c;它们的关系如下&#xff1a; configure&#xff1a;是一个脚本文件&#xff0c;用于根据系统…...

【网络】传输层——UDP | TCP(协议格式确认应答超时重传连接管理)

&#x1f431;作者&#xff1a;一只大喵咪1201 &#x1f431;专栏&#xff1a;《网络》 &#x1f525;格言&#xff1a;你只管努力&#xff0c;剩下的交给时间&#xff01; 现在是传输层&#xff0c;在应用层中的报文(报头 有效载荷)就不能被叫做报文了&#xff0c;而是叫做数…...

198.打家劫舍 ● 213.打家劫舍II ● 337.打家劫舍III

198.打家劫舍 class Solution { public:int rob(vector<int>& nums) {if(nums.size()0)return 0;if(nums.size()1)return nums[0];vector<int>dp(nums.size());dp[0]nums[0];dp[1]max(nums[0],nums[1]);for(int i2;i<nums.size();i)dp[i]max(dp[i-1],dp[i-…...

ArcGIS Maps SDK for JavaScript系列之一:在Vue3中加载ArcGIS地图

目录 ArcGIS Maps SDK for JavaScript简介ArcGIS Maps SDK for JavaScript 4.x 的主要特点和功能AMD modules 和 ES modules两种方式比较Vue3中使用ArcGIS Maps SDK for JavaScript的步骤创建 Vue 3 项目安装 ArcGIS Maps SDK for JavaScript创建地图组件 ArcGIS Maps SDK for …...

服务器扩展未生效

服务器扩容未生效 在阿里云付费扩容后&#xff0c;在服务器里面看未生效。 阿里云->实例与镜像->实例->选择实例->云盘->扩容进入linux服务器查看&#xff1a; df -h vda1扩容未生效。原40g->扩容后100g 解决方法&#xff1a; 1、安装growpart yum inst…...

Jenkins构建自由风格项目发布jar到服务器

前面的文章有介绍 docker安装jenkins 和 dockerjenkins发布spring项目&#xff1b;这里就不做过多的介绍&#xff0c;直接说明构建步骤。 1、选择构建一个自由风格的项目 2、 选择丢弃旧的构建 3、配置Git信息 4、构建触发器 和 构建环境可以直接跳过 5、直接来到Build Step…...

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

SkyWalking 10.2.0 SWCK 配置过程

SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外&#xff0c;K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案&#xff0c;全安装在K8S群集中。 具体可参…...

ffmpeg(四):滤镜命令

FFmpeg 的滤镜命令是用于音视频处理中的强大工具&#xff0c;可以完成剪裁、缩放、加水印、调色、合成、旋转、模糊、叠加字幕等复杂的操作。其核心语法格式一般如下&#xff1a; ffmpeg -i input.mp4 -vf "滤镜参数" output.mp4或者带音频滤镜&#xff1a; ffmpeg…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

在Ubuntu中设置开机自动运行(sudo)指令的指南

在Ubuntu系统中&#xff0c;有时需要在系统启动时自动执行某些命令&#xff0c;特别是需要 sudo权限的指令。为了实现这一功能&#xff0c;可以使用多种方法&#xff0c;包括编写Systemd服务、配置 rc.local文件或使用 cron任务计划。本文将详细介绍这些方法&#xff0c;并提供…...

ServerTrust 并非唯一

NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...