KafkaConsumer 消费逻辑
版本:kafka-clients-2.0.1.jar
之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。
下面是相关的源码,并通过注释的方式进行说明。
先结论:kafkaConsumer 拉取消息的 offset 是存本地的,根据 offset 拉取消息。开启自动提交时,会自动提交 offset 到 broker(在一些场景下会手动检查是否需要提交),防止重启或reblance时 offset 丢失。而本地保存的 offset 是本地拉取到消息时就更新的,所以自动提交的场景下,在消费前过滤掉消息没有影响。
拉取消息
KafkaConsumer#poll
private ConsumerRecords<K, V> poll(final long timeoutMs, final boolean includeMetadataInTimeout) {// note: 获取轻锁同时检查非多线程环境,并检查 consumer 开启状态 (可以close的)acquireAndEnsureOpen();try {if (timeoutMs < 0) throw new IllegalArgumentException("Timeout must not be negative");// note: subscriptions:SubscriptionState 维护了当前消费者订阅的主题列表的状态信息(组、offset等)// 方法判断是否未订阅或未分配分区if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}// poll for new data until the timeout expireslong elapsedTime = 0L;do {// note: 是否触发了唤醒操作 (调用了当前对象的 wakeup 方法) 通过抛异常的方式退出当前方法,(这里是while循环,可能一直在拉取消息,(无新消息时))client.maybeTriggerWakeup();final long metadataEnd;if (includeMetadataInTimeout) {final long metadataStart = time.milliseconds();// note: 更新分区分配元数据以及offset, remain是用来算剩余时间的// 内部逻辑:// 1 协调器 ConsumerCoordinator.poll 拉取协调器事件(期间会发送心跳、自动提交)// 2 updateFetchPositions 更新positions,(但本地有positions数据就不更新,更新完pos后,如果还有缺的,就先使用reset策略,最后异步设置pos)if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {return ConsumerRecords.empty();}metadataEnd = time.milliseconds();elapsedTime += metadataEnd - metadataStart;} else {while (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) {log.warn("Still waiting for metadata");}metadataEnd = time.milliseconds();}//note: 这里终于开始拉取消息了,下面单独讲一下final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs, elapsedTime));if (!records.isEmpty()) {//note: 翻译:返回之前,发送下一个拉取的请求避免阻塞response// before returning the fetched records, we can send off the next round of fetches// and avoid block waiting for their responses to enable pipelining while the user// is handling the fetched records.//// NOTE: since the consumed position has already been updated, we must not allow// wakeups or any other errors to be triggered prior to returning the fetched records.if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.pollNoWakeup();}//note: 这里使用拦截器拦截一下,这里可以对消息进行修改或过滤,但需要注意commit的问题return this.interceptors.onConsume(new ConsumerRecords<>(records));}final long fetchEnd = time.milliseconds();elapsedTime += fetchEnd - metadataEnd;} while (elapsedTime < timeoutMs);return ConsumerRecords.empty();} finally {release();}
}
关于 pollForFetches 的逻辑
pollForFetches
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {final long startMs = time.milliseconds();long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs), timeoutMs);// note: 先获取已经拉取了的消息,存在就直接返回// fetcher 内部有一个 completedFetches 暂存预拉取的请求,可解析出 nextLineRecords 用于暂存预拉取的消息// 从 nextLineRecords 获取消息时,先判断一下状态(如assigned、paused、position),// 然后获取到消息后,再更新 subscriptions 中的 position 位置(值为下一个的offset), 注意这个时候还没commit// if data is available already, return it immediatelyfinal Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// note: 没有预拉取的消息,发送拉取请求(实际没发) // 先找到partition的leader,检查可用,检查没有待处理的请求,然后从 subscriptions 获取 position,构建ClientRequest暂存// 以及设置listener (成功则处理结果入队列completedFetches)// send any new fetches (won't resend pending fetches)fetcher.sendFetches();// We do not want to be stuck blocking in poll if we are missing some positions// since the offset lookup may be backing off after a failure// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call// updateAssignmentMetadataIfNeeded before this method.if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}// note: 轮询等待,详见下文client.poll(pollTimeout, startMs, () -> {// since a fetch might be completed by the background thread, we need this poll condition// to ensure that we do not block unnecessarily in poll()return !fetcher.hasCompletedFetches();});// after the long poll, we should check whether the group needs to rebalance// prior to returning data so that the group can stabilize fasterif (coordinator.rejoinNeededOrPending()) {return Collections.emptyMap();}return fetcher.fetchedRecords();
}
ConsumerNetworkClient#poll
/*** Poll for any network IO.* @param timeout timeout in milliseconds* @param now current time in milliseconds* @param disableWakeup If TRUE disable triggering wake-ups*/
public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {// note: 触发已完成的请求的回调处理器 (有一个pendingCompletion的队列)// there may be handlers which need to be invoked if we woke up the previous call to pollfirePendingCompletedRequests();lock.lock();try {// note: 处理断开的连接 (pendingDisconnects队列)// Handle async disconnects prior to attempting any sendshandlePendingDisconnects();// note: 实际上这里才真正发出请求。。 前面那个feature只是构建request// 前面准备的 ClientRequest 放在一个 UnsentRequests (内部map, key:Node,val: requests)中// 这里面取出来进行发送, kafkaClient.ready -> send// send all the requests we can send nowlong pollDelayMs = trySend(now);timeout = Math.min(timeout, pollDelayMs);// note: 这里主要是判断是否需要阻塞 poll (timeout是否为0) 如果没有待完成且判断应该阻塞(completedFetches为空)则阻塞// poll 里面是从 sockets 里面读写数据// check whether the poll is still needed by the caller. Note that if the expected completion// condition becomes satisfied after the call to shouldBlock() (because of a fired completion// handler), the client will be woken up.if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {// if there are no requests in flight, do not block longer than the retry backoffif (client.inFlightRequestCount() == 0)timeout = Math.min(timeout, retryBackoffMs);client.poll(Math.min(maxPollTimeoutMs, timeout), now);now = time.milliseconds();} else {client.poll(0, now);}// note: 检查断开的链接,判断node连接是否断开,是则从unset中取出对应requests,构建response加到completedFetches中// handle any disconnects by failing the active requests. note that disconnects must// be checked immediately following poll since any subsequent call to client.ready()// will reset the disconnect statuscheckDisconnects(now);if (!disableWakeup) {// trigger wakeups after checking for disconnects so that the callbacks will be ready// to be fired on the next call to poll()maybeTriggerWakeup();}// throw InterruptException if this thread is interruptedmaybeThrowInterruptException();// note: 再发一次请求,推测是可能部分 node 的连接在第一次没有ready (没ready会进行初始化,并返回false)// try again to send requests since buffer space may have been// cleared or a connect finished in the polltrySend(now);// fail requests that couldn't be sent if they have expiredfailExpiredRequests(now);// clean unsent requests collection to keep the map from growing indefinitelyunsent.clean();} finally {lock.unlock();}// called without the lock to avoid deadlock potential if handlers need to acquire locksfirePendingCompletedRequests();
}
自动提交
提交 offset 是为了防止重启或 rebalance 后,导致本地 position 丢失无法正常拉取后面的消息。
入口是 ConsumerCoordinator#maybeAutoCommitOffsetsAsync
触发逻辑主要是
KafkaConsumer#poll拉消息- ->
KafkaConsumer#updateAssignmentMetadataIfNeeded - ->
ConsumerCoordinator#poll->maybeAutoCommitOffsetsAsync(也是先构建请求存 unset 里面,等拉消息的时候再发出去)
public void maybeAutoCommitOffsetsAsync(long now) {// 这里用来判断是否满足自动提交的间隔if (autoCommitEnabled && now >= nextAutoCommitDeadline) {this.nextAutoCommitDeadline = now + autoCommitIntervalMs;doAutoCommitOffsetsAsync();}}相关文章:
KafkaConsumer 消费逻辑
版本:kafka-clients-2.0.1.jar 之前想写个插件修改 kafkaConsumer 消费者的逻辑,根据 header 过滤一些消息。于是需要了解一下 kafkaConsumer 具体是如何拉取消费消息的,确认在消费之前过滤掉消息是否会有影响。 下面是相关的源码࿰…...
scss 实用教程
变量 $ 定义变量 $link-color: blue;变量名可以与css中的属性名和选择器名称相同 使用变量 a {color: $link_color; }$highlight-border: 1px solid $link_color;中划线和下划线相互兼容,即中划线声明的变量可以使用下划线的方式引用,反之亦然。 $li…...
NO.304 二维区域和检索 - 矩阵不可变
题目 给定一个二维矩阵 matrix,以下类型的多个请求: 计算其子矩形范围内元素的总和,该子矩阵的 左上角 为 (row1, col1) ,右下角 为 (row2, col2) 。 实现 NumMatrix 类: NumMatrix(int[][] matrix) 给定整数矩阵 …...
牛客---简单密码python
现在有一种密码变换算法。 九键手机键盘上的数字与字母的对应: 1--1, abc--2, def--3, ghi--4, jkl--5, mno--6, pqrs--7, tuv--8 wxyz--9, 0--0,把密码中出现的小写字母都变成九键键盘对应的数字,如:a 变成 2&#x…...
devops完整搭建教程(gitlab、jenkins、harbor、docker)
devops完整搭建教程(gitlab、jenkins、harbor、docker) 文章目录 devops完整搭建教程(gitlab、jenkins、harbor、docker)1.简介:2.工作流程:3.优缺点4.环境说明5.部署前准备工作5.1.所有主机永久关闭防火墙…...
页面上时间显示为数字 后端返回给前端 response java系统
有时候,在一个系统里,会看到,有的页面时间显示正常,有的页面时间显示成数字。像这样: "createTime": 1698706491000 这是因为出参没有做转换,直接将java.util.Date类型的数据返回给前端了。 返…...
idea怎么配置tomcat
要在IntelliJ IDEA中配置Tomcat,请按照以下步骤操作: 打开IntelliJ IDEA,点击File -> Settings(或者使用快捷键CtrlAltS)。 在设置窗口左侧导航栏中,选择Build, Execution, Deployment -> Applicati…...
GoLong的学习之路(二十三)进阶,语法之并发(go最重要的特点)(锁,sync包,原子操作)
这章是我并发系列中最后的一章。这章主要讲的是锁。但是也会讲上一章channl遗留下的一些没有讲到的内容。select关键字的用法,以及错误的一些channl用法。废话不多说。。。 文章目录 select多路复用通道错误示例并发安全和锁问题描述互斥锁读写互斥锁 syncsync.Wait…...
asp.net core 生命周期
在ASP.NET Core中,有三个重要的生命周期阶段: 请求生命周期(Request Lifecycle):请求生命周期从接收到客户端的HTTP请求开始,到响应结果发送给客户端结束。在请求生命周期中,ASP.NET Core会创建…...
Leetcode刷题详解—— 目标和
1. 题目链接:494. 目标和 2. 题目描述: 给你一个非负整数数组 nums 和一个整数 target 。 向数组中的每个整数前添加 或 - ,然后串联起所有整数,可以构造一个 表达式 : 例如,nums [2, 1] ,可…...
学习c#的第六天
目录 C# 判断 if 语句 嵌套 if 语句 switch 语句 嵌套 switch 语句 ? : 运算符 C# 循环 循环类型 while 循环 for/foreach 循环 do...while 循环 嵌套循环 循环控制语句 break 语句 continue 语句 无限循环 C# 判断 if 语句 在C#中,if语句用于根…...
第七章 :Spring Boot web开发常用注解(二)
第七章 :Spring Boot web开发常用注解(二) 前言 本章节知识重点:作者结合自身开发经验,以及觉察到的一个现象:Springboot注解全面理解和掌握的并不多,对注解进行了全面总结,共分两个章节,可以作为web开发工程师注解参考手册,SpringBoot常用注解大全,一目了然!。本…...
IOC - Google Guice
Google Guice是一个轻量级的依赖注入框架,专注于依赖注入和IoC,适用于中小型应用。 Spring Framework是一个全面的企业级框架,提供了广泛的功能,适用于大型企业应用。 是吧!IOC 容器不止Spring,还有Google Guice,来体…...
国际阿里云:Linux实例负载高问题排查和异常处理!!!
问题描述 在您使用ECS实例过程中,可能会遇到实例系统负载较高的情况,负载过高,可能会引发一系列异常问题,简单说您如下: CPU使用率或负载过高:一般来说,当CPU使用率≥80%时,定义为C…...
【数据结构】二叉树的遍历递归算法详解
二叉树的遍历 💫二叉树的结点结构定义💫创建一个二叉树结点💫在主函数中手动创建一颗二叉树💫二叉树的前序遍历💫调用栈递归——实现前序遍历💫递归实现中序和后序遍历 💫二叉树的结点结构定义 …...
百度王颖:百度文库以AI创作能力突破语言边界,促进思想碰撞和文化融通
1月9日,2023年世界互联网大会乌镇峰会“网络传播与文明交流互鉴论坛”召开。百度副总裁、互娱和垂类平台负责人王颖出席并发表“以技术搭建跨文化交流桥梁”主题演讲。她表示,在大模型的加持下,百度各个产品都在重构,通过技术助力…...
人工智能基础_机器学习023_理解套索回归_认识L1正则---人工智能工作笔记0063
然后上一节我们说了L1,L2正则是为了提高,模型的泛化能力, 提高泛化能力,实际上就是把模型的公式的w,权重值,变小对吧. 然后我们这里首先看第一个L1正则,是怎么做到把w权重变小的 可以看到最上面是线性回归的损失函数,然后 L1可以看到,这个正则,就是在损失函数的基础上给损失…...
Learning an Animatable Detailed 3D Face Model from In-The-Wild Images论文笔记
Learning an Animatable Detailed 3D Face Model from In-The-Wild Images论文笔记 论文目标:提出一个端到端的框架,可以从非受控的图片中学习高质量、可动画的3D人脸模型。论文方法:论文结果:论文意义: 论文目标:提出一个端到端的框架,可以从非受控的图片中学习高质量、可动画…...
Lenovo联想小新Air-14笔记本2021款AMD锐龙ALC版(82LM)原装出厂Win10镜像和Windows11预装OEM系统
下载链接:https://pan.baidu.com/s/1akLkXM2HIg3eO76jqM-LVA?pwdxvo6 提取码:xvo6 系统自带所有驱动、出厂主题壁纸、系统属性专属LOGO标志、Office办公软件、联想电脑管家等预装程序 所需要工具:16G或以上的U盘 文件格式:…...
在程序中链接静态库
现在我们把上面src目录中的add.cpp、div.cpp、mult.cpp、sub.cpp编译成一个静态库文件libcalc.a。 add_library(库名称 STATIC 源文件1 [源文件2] ...) link_libraries(<static lib> [<static lib>...]) 参数1:指定出要链接的静态库的名字 可以是全…...
边缘计算医疗风险自查APP开发方案
核心目标:在便携设备(智能手表/家用检测仪)部署轻量化疾病预测模型,实现低延迟、隐私安全的实时健康风险评估。 一、技术架构设计 #mermaid-svg-iuNaeeLK2YoFKfao {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg…...
vue3 字体颜色设置的多种方式
在Vue 3中设置字体颜色可以通过多种方式实现,这取决于你是想在组件内部直接设置,还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法: 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...
Java多线程实现之Callable接口深度解析
Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...
【android bluetooth 框架分析 04】【bt-framework 层详解 1】【BluetoothProperties介绍】
1. BluetoothProperties介绍 libsysprop/srcs/android/sysprop/BluetoothProperties.sysprop BluetoothProperties.sysprop 是 Android AOSP 中的一种 系统属性定义文件(System Property Definition File),用于声明和管理 Bluetooth 模块相…...
leetcodeSQL解题:3564. 季节性销售分析
leetcodeSQL解题:3564. 季节性销售分析 题目: 表:sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...
12.找到字符串中所有字母异位词
🧠 题目解析 题目描述: 给定两个字符串 s 和 p,找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义: 若两个字符串包含的字符种类和出现次数完全相同,顺序无所谓,则互为…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...
ip子接口配置及删除
配置永久生效的子接口,2个IP 都可以登录你这一台服务器。重启不失效。 永久的 [应用] vi /etc/sysconfig/network-scripts/ifcfg-eth0修改文件内内容 TYPE"Ethernet" BOOTPROTO"none" NAME"eth0" DEVICE"eth0" ONBOOT&q…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
