当前位置: 首页 > news >正文

KafkaConsumer 消费逻辑

版本:kafka-clients-2.0.1.jar

之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。
下面是相关的源码,并通过注释的方式进行说明。

先结论:kafkaConsumer 拉取消息的 offset 是存本地的,根据 offset 拉取消息。开启自动提交时,会自动提交 offset 到 broker(在一些场景下会手动检查是否需要提交),防止重启或reblance时 offset 丢失。而本地保存的 offset 是本地拉取到消息时就更新的,所以自动提交的场景下,在消费前过滤掉消息没有影响。

拉取消息

KafkaConsumer#poll

private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {// note: 获取轻锁同时检查非多线程环境,并检查 consumer 开启状态 (可以close的)acquireAndEnsureOpen();try {if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");// note: subscriptions:SubscriptionState  维护了当前消费者订阅的主题列表的状态信息(组、offset等)//   方法判断是否未订阅或未分配分区if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}// poll for new data until the timeout expireslong elapsedTime = 0L;do {// note: 是否触发了唤醒操作 (调用了当前对象的 wakeup 方法) 通过抛异常的方式退出当前方法,(这里是while循环,可能一直在拉取消息,(无新消息时))client.maybeTriggerWakeup();final long metadataEnd;if (includeMetadataInTimeout) {final long metadataStart = time.milliseconds();// note: 更新分区分配元数据以及offset, remain是用来算剩余时间的// 内部逻辑://  1 协调器 ConsumerCoordinator.poll 拉取协调器事件(期间会发送心跳、自动提交)//  2 updateFetchPositions 更新positions,(但本地有positions数据就不更新,更新完pos后,如果还有缺的,就先使用reset策略,最后异步设置pos)if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {return ConsumerRecords.empty();}metadataEnd = time.milliseconds();elapsedTime += metadataEnd - metadataStart;} else {while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {log.warn("Still waiting for metadata");}metadataEnd = time.milliseconds();}//note: 这里终于开始拉取消息了,下面单独讲一下final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));if (!records.isEmpty()) {//note: 翻译:返回之前,发送下一个拉取的请求避免阻塞response// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.pollNoWakeup();}//note:  这里使用拦截器拦截一下,这里可以对消息进行修改或过滤,但需要注意commit的问题return this.interceptors.onConsume(new ConsumerRecords<>(records));}final long fetchEnd = time.milliseconds();elapsedTime += fetchEnd - metadataEnd;} while (elapsedTime < timeoutMs);return ConsumerRecords.empty();} finally {release();}
}

关于 pollForFetches 的逻辑

pollForFetches

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {final long startMs = time.milliseconds();long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);// note: 先获取已经拉取了的消息,存在就直接返回//  fetcher 内部有一个 completedFetches 暂存预拉取的请求,可解析出 nextLineRecords 用于暂存预拉取的消息//    从 nextLineRecords 获取消息时,先判断一下状态(如assigned、paused、position),//      然后获取到消息后,再更新 subscriptions 中的 position 位置(值为下一个的offset), 注意这个时候还没commit// if data is available already, return it immediatelyfinal Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// note: 没有预拉取的消息,发送拉取请求(实际没发) //  先找到partition的leader,检查可用,检查没有待处理的请求,然后从 subscriptions 获取 position,构建ClientRequest暂存//  以及设置listener (成功则处理结果入队列completedFetches)// send any new fetches (won't resend pending fetches)fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}// note: 轮询等待,详见下文client.poll(pollTimeout, startMs, () -> {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();});// after the long poll, we should check whether the group needs to rebalance// prior to returning data so that the group can stabilize fasterif (coordinator.rejoinNeededOrPending()) {return Collections.emptyMap();}return fetcher.fetchedRecords();
}

ConsumerNetworkClient#poll

/*** Poll for any network IO.* @param timeout timeout in milliseconds* @param now current time in milliseconds* @param disableWakeup If TRUE disable triggering wake-ups*/
public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {// note: 触发已完成的请求的回调处理器  (有一个pendingCompletion的队列)// there may be handlers which need to be invoked if we woke up the previous call to pollfirePendingCompletedRequests();lock.lock();try {// note: 处理断开的连接 (pendingDisconnects队列)// Handle async disconnects prior to attempting any sendshandlePendingDisconnects();// note: 实际上这里才真正发出请求。。 前面那个feature只是构建request//  前面准备的 ClientRequest 放在一个 UnsentRequests (内部map, key:Node,val: requests)中//  这里面取出来进行发送, kafkaClient.ready -> send// send all the requests we can send nowlong pollDelayMs = trySend(now);timeout = Math.min(timeout, pollDelayMs);// note: 这里主要是判断是否需要阻塞 poll (timeout是否为0) 如果没有待完成且判断应该阻塞(completedFetches为空)则阻塞//  poll 里面是从 sockets 里面读写数据// check whether the poll is still needed by the caller. Note that if the expected completion// condition becomes satisfied after the call to shouldBlock() (because of a fired completion// handler), the client will be woken up.if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {// if there are no requests in flight, do not block longer than the retry backoffif (client.inFlightRequestCount() == 0)timeout = Math.min(timeout, retryBackoffMs);client.poll(Math.min(maxPollTimeoutMs, timeout), now);now = time.milliseconds();} else {client.poll(0, now);}// note: 检查断开的链接,判断node连接是否断开,是则从unset中取出对应requests,构建response加到completedFetches中// handle any disconnects by failing the active requests. note that disconnects must// be checked immediately following poll since any subsequent call to client.ready()// will reset the disconnect statuscheckDisconnects(now);if (!disableWakeup) {// trigger wakeups after checking for disconnects so that the callbacks will be ready// to be fired on the next call to poll()maybeTriggerWakeup();}// throw InterruptException if this thread is interruptedmaybeThrowInterruptException();// note: 再发一次请求,推测是可能部分 node 的连接在第一次没有ready (没ready会进行初始化,并返回false)// try again to send requests since buffer space may have been// cleared or a connect finished in the polltrySend(now);// fail requests that couldn't be sent if they have expiredfailExpiredRequests(now);// clean unsent requests collection to keep the map from growing indefinitelyunsent.clean();} finally {lock.unlock();}// called without the lock to avoid deadlock potential if handlers need to acquire locksfirePendingCompletedRequests();
}

自动提交

提交 offset 是为了防止重启或 rebalance 后,导致本地 position 丢失无法正常拉取后面的消息。

入口是 ConsumerCoordinator#maybeAutoCommitOffsetsAsync

触发逻辑主要是

  • KafkaConsumer#poll 拉消息
  • -> KafkaConsumer#updateAssignmentMetadataIfNeeded
  • -> ConsumerCoordinator#poll -> maybeAutoCommitOffsetsAsync (也是先构建请求存 unset 里面,等拉消息的时候再发出去)
    public void maybeAutoCommitOffsetsAsync(long now) {// 这里用来判断是否满足自动提交的间隔if (autoCommitEnabled && now >= nextAutoCommitDeadline) {this.nextAutoCommitDeadline = now + autoCommitIntervalMs;doAutoCommitOffsetsAsync();}}

相关文章:

KafkaConsumer 消费逻辑

版本&#xff1a;kafka-clients-2.0.1.jar 之前想写个插件修改 kafkaConsumer 消费者的逻辑&#xff0c;根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的&#xff0c;确认在消费之前过滤掉消息是否会有影响。 下面是相关的源码&#xff0…...

scss 实用教程

变量 $ 定义变量 $link-color: blue;变量名可以与css中的属性名和选择器名称相同 使用变量 a {color: $link_color; }$highlight-border: 1px solid $link_color;中划线和下划线相互兼容&#xff0c;即中划线声明的变量可以使用下划线的方式引用&#xff0c;反之亦然。 $li…...

NO.304 二维区域和检索 - 矩阵不可变

题目 给定一个二维矩阵 matrix&#xff0c;以下类型的多个请求&#xff1a; 计算其子矩形范围内元素的总和&#xff0c;该子矩阵的 左上角 为 (row1, col1) &#xff0c;右下角 为 (row2, col2) 。 实现 NumMatrix 类&#xff1a; NumMatrix(int[][] matrix) 给定整数矩阵 …...

牛客---简单密码python

现在有一种密码变换算法。 九键手机键盘上的数字与字母的对应&#xff1a; 1--1&#xff0c; abc--2, def--3, ghi--4, jkl--5, mno--6, pqrs--7, tuv--8 wxyz--9, 0--0&#xff0c;把密码中出现的小写字母都变成九键键盘对应的数字&#xff0c;如&#xff1a;a 变成 2&#x…...

devops完整搭建教程(gitlab、jenkins、harbor、docker)

devops完整搭建教程&#xff08;gitlab、jenkins、harbor、docker&#xff09; 文章目录 devops完整搭建教程&#xff08;gitlab、jenkins、harbor、docker&#xff09;1.简介&#xff1a;2.工作流程&#xff1a;3.优缺点4.环境说明5.部署前准备工作5.1.所有主机永久关闭防火墙…...

页面上时间显示为数字 后端返回给前端 response java系统

有时候&#xff0c;在一个系统里&#xff0c;会看到&#xff0c;有的页面时间显示正常&#xff0c;有的页面时间显示成数字。像这样&#xff1a; "createTime": 1698706491000 这是因为出参没有做转换&#xff0c;直接将java.util.Date类型的数据返回给前端了。 返…...

idea怎么配置tomcat

要在IntelliJ IDEA中配置Tomcat&#xff0c;请按照以下步骤操作&#xff1a; 打开IntelliJ IDEA&#xff0c;点击File -> Settings&#xff08;或者使用快捷键CtrlAltS&#xff09;。 在设置窗口左侧导航栏中&#xff0c;选择Build, Execution, Deployment -> Applicati…...

GoLong的学习之路(二十三)进阶,语法之并发(go最重要的特点)(锁,sync包,原子操作)

这章是我并发系列中最后的一章。这章主要讲的是锁。但是也会讲上一章channl遗留下的一些没有讲到的内容。select关键字的用法&#xff0c;以及错误的一些channl用法。废话不多说。。。 文章目录 select多路复用通道错误示例并发安全和锁问题描述互斥锁读写互斥锁 syncsync.Wait…...

asp.net core 生命周期

在ASP.NET Core中&#xff0c;有三个重要的生命周期阶段&#xff1a; 请求生命周期&#xff08;Request Lifecycle&#xff09;&#xff1a;请求生命周期从接收到客户端的HTTP请求开始&#xff0c;到响应结果发送给客户端结束。在请求生命周期中&#xff0c;ASP.NET Core会创建…...

Leetcode刷题详解—— 目标和

1. 题目链接&#xff1a;494. 目标和 2. 题目描述&#xff1a; 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 或 - &#xff0c;然后串联起所有整数&#xff0c;可以构造一个 表达式 &#xff1a; 例如&#xff0c;nums [2, 1] &#xff0c;可…...

学习c#的第六天

目录 C# 判断 if 语句 嵌套 if 语句 switch 语句 嵌套 switch 语句 ? : 运算符 C# 循环 循环类型 while 循环 for/foreach 循环 do...while 循环 嵌套循环 循环控制语句 break 语句 continue 语句 无限循环 C# 判断 if 语句 在C#中&#xff0c;if语句用于根…...

第七章 :Spring Boot web开发常用注解(二)

第七章 :Spring Boot web开发常用注解(二) 前言 本章节知识重点:作者结合自身开发经验,以及觉察到的一个现象:Springboot注解全面理解和掌握的并不多,对注解进行了全面总结,共分两个章节,可以作为web开发工程师注解参考手册,SpringBoot常用注解大全,一目了然!。本…...

IOC - Google Guice

Google Guice是一个轻量级的依赖注入框架&#xff0c;专注于依赖注入和IoC&#xff0c;适用于中小型应用。 Spring Framework是一个全面的企业级框架&#xff0c;提供了广泛的功能&#xff0c;适用于大型企业应用。 是吧&#xff01;IOC 容器不止Spring,还有Google Guice,来体…...

国际阿里云:Linux实例负载高问题排查和异常处理!!!

问题描述 在您使用ECS实例过程中&#xff0c;可能会遇到实例系统负载较高的情况&#xff0c;负载过高&#xff0c;可能会引发一系列异常问题&#xff0c;简单说您如下&#xff1a; CPU使用率或负载过高&#xff1a;一般来说&#xff0c;当CPU使用率≥80%时&#xff0c;定义为C…...

【数据结构】二叉树的遍历递归算法详解

二叉树的遍历 &#x1f4ab;二叉树的结点结构定义&#x1f4ab;创建一个二叉树结点&#x1f4ab;在主函数中手动创建一颗二叉树&#x1f4ab;二叉树的前序遍历&#x1f4ab;调用栈递归——实现前序遍历&#x1f4ab;递归实现中序和后序遍历 &#x1f4ab;二叉树的结点结构定义 …...

百度王颖:百度文库以AI创作能力突破语言边界,促进思想碰撞和文化融通

1月9日&#xff0c;2023年世界互联网大会乌镇峰会“网络传播与文明交流互鉴论坛”召开。百度副总裁、互娱和垂类平台负责人王颖出席并发表“以技术搭建跨文化交流桥梁”主题演讲。她表示&#xff0c;在大模型的加持下&#xff0c;百度各个产品都在重构&#xff0c;通过技术助力…...

人工智能基础_机器学习023_理解套索回归_认识L1正则---人工智能工作笔记0063

然后上一节我们说了L1,L2正则是为了提高,模型的泛化能力, 提高泛化能力,实际上就是把模型的公式的w,权重值,变小对吧. 然后我们这里首先看第一个L1正则,是怎么做到把w权重变小的 可以看到最上面是线性回归的损失函数,然后 L1可以看到,这个正则,就是在损失函数的基础上给损失…...

Learning an Animatable Detailed 3D Face Model from In-The-Wild Images论文笔记

Learning an Animatable Detailed 3D Face Model from In-The-Wild Images论文笔记 论文目标:提出一个端到端的框架,可以从非受控的图片中学习高质量、可动画的3D人脸模型。论文方法:论文结果:论文意义: 论文目标:提出一个端到端的框架,可以从非受控的图片中学习高质量、可动画…...

Lenovo联想小新Air-14笔记本2021款AMD锐龙ALC版(82LM)原装出厂Win10镜像和Windows11预装OEM系统

下载链接&#xff1a;https://pan.baidu.com/s/1akLkXM2HIg3eO76jqM-LVA?pwdxvo6 提取码&#xff1a;xvo6 系统自带所有驱动、出厂主题壁纸、系统属性专属LOGO标志、Office办公软件、联想电脑管家等预装程序 所需要工具&#xff1a;16G或以上的U盘 文件格式&#xff1a;…...

在程序中链接静态库

现在我们把上面src目录中的add.cpp、div.cpp、mult.cpp、sub.cpp编译成一个静态库文件libcalc.a。 add_library(库名称 STATIC 源文件1 [源文件2] ...) link_libraries(<static lib> [<static lib>...]) 参数1&#xff1a;指定出要链接的静态库的名字 可以是全…...

Vim 调用外部命令学习笔记

Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

DIY|Mac 搭建 ESP-IDF 开发环境及编译小智 AI

前一阵子在百度 AI 开发者大会上&#xff0c;看到基于小智 AI DIY 玩具的演示&#xff0c;感觉有点意思&#xff0c;想着自己也来试试。 如果只是想烧录现成的固件&#xff0c;乐鑫官方除了提供了 Windows 版本的 Flash 下载工具 之外&#xff0c;还提供了基于网页版的 ESP LA…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

算法岗面试经验分享-大模型篇

文章目录 A 基础语言模型A.1 TransformerA.2 Bert B 大语言模型结构B.1 GPTB.2 LLamaB.3 ChatGLMB.4 Qwen C 大语言模型微调C.1 Fine-tuningC.2 Adapter-tuningC.3 Prefix-tuningC.4 P-tuningC.5 LoRA A 基础语言模型 A.1 Transformer &#xff08;1&#xff09;资源 论文&a…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

动态 Web 开发技术入门篇

一、HTTP 协议核心 1.1 HTTP 基础 协议全称 &#xff1a;HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09; 默认端口 &#xff1a;HTTP 使用 80 端口&#xff0c;HTTPS 使用 443 端口。 请求方法 &#xff1a; GET &#xff1a;用于获取资源&#xff0c;…...

C/C++ 中附加包含目录、附加库目录与附加依赖项详解

在 C/C 编程的编译和链接过程中&#xff0c;附加包含目录、附加库目录和附加依赖项是三个至关重要的设置&#xff0c;它们相互配合&#xff0c;确保程序能够正确引用外部资源并顺利构建。虽然在学习过程中&#xff0c;这些概念容易让人混淆&#xff0c;但深入理解它们的作用和联…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...