当前位置: 首页 > news >正文

KafkaConsumer 消费逻辑

版本:kafka-clients-2.0.1.jar

之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。
下面是相关的源码,并通过注释的方式进行说明。

先结论:kafkaConsumer 拉取消息的 offset 是存本地的,根据 offset 拉取消息。开启自动提交时,会自动提交 offset 到 broker(在一些场景下会手动检查是否需要提交),防止重启或reblance时 offset 丢失。而本地保存的 offset 是本地拉取到消息时就更新的,所以自动提交的场景下,在消费前过滤掉消息没有影响。

拉取消息

KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {// note: 获取轻锁同时检查非多线程环境,并检查 consumer 开启状态 (可以close的)acquireAndEnsureOpen();try {if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");// note: subscriptions:SubscriptionState  维护了当前消费者订阅的主题列表的状态信息(组、offset等)//   方法判断是否未订阅或未分配分区if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}// poll for new data until the timeout expireslong elapsedTime = 0L;do {// note: 是否触发了唤醒操作 (调用了当前对象的 wakeup 方法) 通过抛异常的方式退出当前方法,(这里是while循环,可能一直在拉取消息,(无新消息时))client.maybeTriggerWakeup();final long metadataEnd;if (includeMetadataInTimeout) {final long metadataStart = time.milliseconds();// note: 更新分区分配元数据以及offset, remain是用来算剩余时间的// 内部逻辑://  1 协调器 ConsumerCoordinator.poll 拉取协调器事件(期间会发送心跳、自动提交)//  2 updateFetchPositions 更新positions,(但本地有positions数据就不更新,更新完pos后,如果还有缺的,就先使用reset策略,最后异步设置pos)if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {return ConsumerRecords.empty();}metadataEnd = time.milliseconds();elapsedTime += metadataEnd - metadataStart;} else {while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {log.warn("Still waiting for metadata");}metadataEnd = time.milliseconds();}//note: 这里终于开始拉取消息了,下面单独讲一下final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));if (!records.isEmpty()) {//note: 翻译:返回之前,发送下一个拉取的请求避免阻塞response// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.pollNoWakeup();}//note:  这里使用拦截器拦截一下,这里可以对消息进行修改或过滤,但需要注意commit的问题return this.interceptors.onConsume(new ConsumerRecords<>(records));}final long fetchEnd = time.milliseconds();elapsedTime += fetchEnd - metadataEnd;} while (elapsedTime < timeoutMs);return ConsumerRecords.empty();} finally {release();}
}

关于 pollForFetches 的逻辑

pollForFetches

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {final long startMs = time.milliseconds();long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);// note: 先获取已经拉取了的消息,存在就直接返回//  fetcher 内部有一个 completedFetches 暂存预拉取的请求,可解析出 nextLineRecords 用于暂存预拉取的消息//    从 nextLineRecords 获取消息时,先判断一下状态(如assigned、paused、position),//      然后获取到消息后,再更新 subscriptions 中的 position 位置(值为下一个的offset), 注意这个时候还没commit// if data is available already, return it immediatelyfinal Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// note: 没有预拉取的消息,发送拉取请求(实际没发) //  先找到partition的leader,检查可用,检查没有待处理的请求,然后从 subscriptions 获取 position,构建ClientRequest暂存//  以及设置listener (成功则处理结果入队列completedFetches)// send any new fetches (won't resend pending fetches)fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}// note: 轮询等待,详见下文client.poll(pollTimeout, startMs, () -> {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();});// after the long poll, we should check whether the group needs to rebalance// prior to returning data so that the group can stabilize fasterif (coordinator.rejoinNeededOrPending()) {return Collections.emptyMap();}return fetcher.fetchedRecords();
}

ConsumerNetworkClient#poll

/*** Poll for any network IO.* @param timeout timeout in milliseconds* @param now current time in milliseconds* @param disableWakeup If TRUE disable triggering wake-ups*/
public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {// note: 触发已完成的请求的回调处理器  (有一个pendingCompletion的队列)// there may be handlers which need to be invoked if we woke up the previous call to pollfirePendingCompletedRequests();lock.lock();try {// note: 处理断开的连接 (pendingDisconnects队列)// Handle async disconnects prior to attempting any sendshandlePendingDisconnects();// note: 实际上这里才真正发出请求。。 前面那个feature只是构建request//  前面准备的 ClientRequest 放在一个 UnsentRequests (内部map, key:Node,val: requests)中//  这里面取出来进行发送, kafkaClient.ready -> send// send all the requests we can send nowlong pollDelayMs = trySend(now);timeout = Math.min(timeout, pollDelayMs);// note: 这里主要是判断是否需要阻塞 poll (timeout是否为0) 如果没有待完成且判断应该阻塞(completedFetches为空)则阻塞//  poll 里面是从 sockets 里面读写数据// check whether the poll is still needed by the caller. Note that if the expected completion// condition becomes satisfied after the call to shouldBlock() (because of a fired completion// handler), the client will be woken up.if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {// if there are no requests in flight, do not block longer than the retry backoffif (client.inFlightRequestCount() == 0)timeout = Math.min(timeout, retryBackoffMs);client.poll(Math.min(maxPollTimeoutMs, timeout), now);now = time.milliseconds();} else {client.poll(0, now);}// note: 检查断开的链接,判断node连接是否断开,是则从unset中取出对应requests,构建response加到completedFetches中// handle any disconnects by failing the active requests. note that disconnects must// be checked immediately following poll since any subsequent call to client.ready()// will reset the disconnect statuscheckDisconnects(now);if (!disableWakeup) {// trigger wakeups after checking for disconnects so that the callbacks will be ready// to be fired on the next call to poll()maybeTriggerWakeup();}// throw InterruptException if this thread is interruptedmaybeThrowInterruptException();// note: 再发一次请求,推测是可能部分 node 的连接在第一次没有ready (没ready会进行初始化,并返回false)// try again to send requests since buffer space may have been// cleared or a connect finished in the polltrySend(now);// fail requests that couldn't be sent if they have expiredfailExpiredRequests(now);// clean unsent requests collection to keep the map from growing indefinitelyunsent.clean();} finally {lock.unlock();}// called without the lock to avoid deadlock potential if handlers need to acquire locksfirePendingCompletedRequests();
}

自动提交

提交 offset 是为了防止重启或 rebalance 后,导致本地 position 丢失无法正常拉取后面的消息。

入口是 ConsumerCoordinator#maybeAutoCommitOffsetsAsync

触发逻辑主要是

  • KafkaConsumer#poll 拉消息
  • -> KafkaConsumer#updateAssignmentMetadataIfNeeded
  • -> ConsumerCoordinator#poll -> maybeAutoCommitOffsetsAsync (也是先构建请求存 unset 里面,等拉消息的时候再发出去)
    public void maybeAutoCommitOffsetsAsync(long now) {// 这里用来判断是否满足自动提交的间隔if (autoCommitEnabled && now >= nextAutoCommitDeadline) {this.nextAutoCommitDeadline = now + autoCommitIntervalMs;doAutoCommitOffsetsAsync();}}

相关文章:

KafkaConsumer 消费逻辑

版本&#xff1a;kafka-clients-2.0.1.jar 之前想写个插件修改 kafkaConsumer 消费者的逻辑&#xff0c;根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的&#xff0c;确认在消费之前过滤掉消息是否会有影响。 下面是相关的源码&#xff0…...

scss 实用教程

变量 $ 定义变量 $link-color: blue;变量名可以与css中的属性名和选择器名称相同 使用变量 a {color: $link_color; }$highlight-border: 1px solid $link_color;中划线和下划线相互兼容&#xff0c;即中划线声明的变量可以使用下划线的方式引用&#xff0c;反之亦然。 $li…...

NO.304 二维区域和检索 - 矩阵不可变

题目 给定一个二维矩阵 matrix&#xff0c;以下类型的多个请求&#xff1a; 计算其子矩形范围内元素的总和&#xff0c;该子矩阵的 左上角 为 (row1, col1) &#xff0c;右下角 为 (row2, col2) 。 实现 NumMatrix 类&#xff1a; NumMatrix(int[][] matrix) 给定整数矩阵 …...

牛客---简单密码python

现在有一种密码变换算法。 九键手机键盘上的数字与字母的对应&#xff1a; 1--1&#xff0c; abc--2, def--3, ghi--4, jkl--5, mno--6, pqrs--7, tuv--8 wxyz--9, 0--0&#xff0c;把密码中出现的小写字母都变成九键键盘对应的数字&#xff0c;如&#xff1a;a 变成 2&#x…...

devops完整搭建教程(gitlab、jenkins、harbor、docker)

devops完整搭建教程&#xff08;gitlab、jenkins、harbor、docker&#xff09; 文章目录 devops完整搭建教程&#xff08;gitlab、jenkins、harbor、docker&#xff09;1.简介&#xff1a;2.工作流程&#xff1a;3.优缺点4.环境说明5.部署前准备工作5.1.所有主机永久关闭防火墙…...

页面上时间显示为数字 后端返回给前端 response java系统

有时候&#xff0c;在一个系统里&#xff0c;会看到&#xff0c;有的页面时间显示正常&#xff0c;有的页面时间显示成数字。像这样&#xff1a; "createTime": 1698706491000 这是因为出参没有做转换&#xff0c;直接将java.util.Date类型的数据返回给前端了。 返…...

idea怎么配置tomcat

要在IntelliJ IDEA中配置Tomcat&#xff0c;请按照以下步骤操作&#xff1a; 打开IntelliJ IDEA&#xff0c;点击File -> Settings&#xff08;或者使用快捷键CtrlAltS&#xff09;。 在设置窗口左侧导航栏中&#xff0c;选择Build, Execution, Deployment -> Applicati…...

GoLong的学习之路(二十三)进阶,语法之并发(go最重要的特点)(锁,sync包,原子操作)

这章是我并发系列中最后的一章。这章主要讲的是锁。但是也会讲上一章channl遗留下的一些没有讲到的内容。select关键字的用法&#xff0c;以及错误的一些channl用法。废话不多说。。。 文章目录 select多路复用通道错误示例并发安全和锁问题描述互斥锁读写互斥锁 syncsync.Wait…...

asp.net core 生命周期

在ASP.NET Core中&#xff0c;有三个重要的生命周期阶段&#xff1a; 请求生命周期&#xff08;Request Lifecycle&#xff09;&#xff1a;请求生命周期从接收到客户端的HTTP请求开始&#xff0c;到响应结果发送给客户端结束。在请求生命周期中&#xff0c;ASP.NET Core会创建…...

Leetcode刷题详解—— 目标和

1. 题目链接&#xff1a;494. 目标和 2. 题目描述&#xff1a; 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 或 - &#xff0c;然后串联起所有整数&#xff0c;可以构造一个 表达式 &#xff1a; 例如&#xff0c;nums [2, 1] &#xff0c;可…...

学习c#的第六天

目录 C# 判断 if 语句 嵌套 if 语句 switch 语句 嵌套 switch 语句 ? : 运算符 C# 循环 循环类型 while 循环 for/foreach 循环 do...while 循环 嵌套循环 循环控制语句 break 语句 continue 语句 无限循环 C# 判断 if 语句 在C#中&#xff0c;if语句用于根…...

第七章 :Spring Boot web开发常用注解(二)

第七章 :Spring Boot web开发常用注解(二) 前言 本章节知识重点:作者结合自身开发经验,以及觉察到的一个现象:Springboot注解全面理解和掌握的并不多,对注解进行了全面总结,共分两个章节,可以作为web开发工程师注解参考手册,SpringBoot常用注解大全,一目了然!。本…...

IOC - Google Guice

Google Guice是一个轻量级的依赖注入框架&#xff0c;专注于依赖注入和IoC&#xff0c;适用于中小型应用。 Spring Framework是一个全面的企业级框架&#xff0c;提供了广泛的功能&#xff0c;适用于大型企业应用。 是吧&#xff01;IOC 容器不止Spring,还有Google Guice,来体…...

国际阿里云:Linux实例负载高问题排查和异常处理!!!

问题描述 在您使用ECS实例过程中&#xff0c;可能会遇到实例系统负载较高的情况&#xff0c;负载过高&#xff0c;可能会引发一系列异常问题&#xff0c;简单说您如下&#xff1a; CPU使用率或负载过高&#xff1a;一般来说&#xff0c;当CPU使用率≥80%时&#xff0c;定义为C…...

【数据结构】二叉树的遍历递归算法详解

二叉树的遍历 &#x1f4ab;二叉树的结点结构定义&#x1f4ab;创建一个二叉树结点&#x1f4ab;在主函数中手动创建一颗二叉树&#x1f4ab;二叉树的前序遍历&#x1f4ab;调用栈递归——实现前序遍历&#x1f4ab;递归实现中序和后序遍历 &#x1f4ab;二叉树的结点结构定义 …...

百度王颖:百度文库以AI创作能力突破语言边界,促进思想碰撞和文化融通

1月9日&#xff0c;2023年世界互联网大会乌镇峰会“网络传播与文明交流互鉴论坛”召开。百度副总裁、互娱和垂类平台负责人王颖出席并发表“以技术搭建跨文化交流桥梁”主题演讲。她表示&#xff0c;在大模型的加持下&#xff0c;百度各个产品都在重构&#xff0c;通过技术助力…...

人工智能基础_机器学习023_理解套索回归_认识L1正则---人工智能工作笔记0063

然后上一节我们说了L1,L2正则是为了提高,模型的泛化能力, 提高泛化能力,实际上就是把模型的公式的w,权重值,变小对吧. 然后我们这里首先看第一个L1正则,是怎么做到把w权重变小的 可以看到最上面是线性回归的损失函数,然后 L1可以看到,这个正则,就是在损失函数的基础上给损失…...

Learning an Animatable Detailed 3D Face Model from In-The-Wild Images论文笔记

Learning an Animatable Detailed 3D Face Model from In-The-Wild Images论文笔记 论文目标:提出一个端到端的框架,可以从非受控的图片中学习高质量、可动画的3D人脸模型。论文方法:论文结果:论文意义: 论文目标:提出一个端到端的框架,可以从非受控的图片中学习高质量、可动画…...

Lenovo联想小新Air-14笔记本2021款AMD锐龙ALC版(82LM)原装出厂Win10镜像和Windows11预装OEM系统

下载链接&#xff1a;https://pan.baidu.com/s/1akLkXM2HIg3eO76jqM-LVA?pwdxvo6 提取码&#xff1a;xvo6 系统自带所有驱动、出厂主题壁纸、系统属性专属LOGO标志、Office办公软件、联想电脑管家等预装程序 所需要工具&#xff1a;16G或以上的U盘 文件格式&#xff1a;…...

在程序中链接静态库

现在我们把上面src目录中的add.cpp、div.cpp、mult.cpp、sub.cpp编译成一个静态库文件libcalc.a。 add_library(库名称 STATIC 源文件1 [源文件2] ...) link_libraries(<static lib> [<static lib>...]) 参数1&#xff1a;指定出要链接的静态库的名字 可以是全…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

06 Deep learning神经网络编程基础 激活函数 --吴恩达

深度学习激活函数详解 一、核心作用 引入非线性:使神经网络可学习复杂模式控制输出范围:如Sigmoid将输出限制在(0,1)梯度传递:影响反向传播的稳定性二、常见类型及数学表达 Sigmoid σ ( x ) = 1 1 +...

【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)

1.获取 authorizationCode&#xff1a; 2.利用 authorizationCode 获取 accessToken&#xff1a;文档中心 3.获取手机&#xff1a;文档中心 4.获取昵称头像&#xff1a;文档中心 首先创建 request 若要获取手机号&#xff0c;scope必填 phone&#xff0c;permissions 必填 …...

Typeerror: cannot read properties of undefined (reading ‘XXX‘)

最近需要在离线机器上运行软件&#xff0c;所以得把软件用docker打包起来&#xff0c;大部分功能都没问题&#xff0c;出了一个奇怪的事情。同样的代码&#xff0c;在本机上用vscode可以运行起来&#xff0c;但是打包之后在docker里出现了问题。使用的是dialog组件&#xff0c;…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定&#xff0c;这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中&#xff0c;积分电荷法最为常用&#xff0c;其原理是通过测量在电容器上积累的热释电电荷&#xff0c;从而确定热释电系数…...

数学建模-滑翔伞伞翼面积的设计,运动状态计算和优化 !

我们考虑滑翔伞的伞翼面积设计问题以及运动状态描述。滑翔伞的性能主要取决于伞翼面积、气动特性以及飞行员的重量。我们的目标是建立数学模型来描述滑翔伞的运动状态,并优化伞翼面积的设计。 一、问题分析 滑翔伞在飞行过程中受到重力、升力和阻力的作用。升力和阻力与伞翼面…...

认识CMake并使用CMake构建自己的第一个项目

1.CMake的作用和优势 跨平台支持&#xff1a;CMake支持多种操作系统和编译器&#xff0c;使用同一份构建配置可以在不同的环境中使用 简化配置&#xff1a;通过CMakeLists.txt文件&#xff0c;用户可以定义项目结构、依赖项、编译选项等&#xff0c;无需手动编写复杂的构建脚本…...