源码解析FlinkKafkaConsumer支持punctuated水位线发送
背景
FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码
punctuated 水位线发送源码解析
1.首先KafkaFetcher中的runFetchLoop方法
public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}
2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线
protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}
4.最后是发送算子任务级别的水位线的方法
private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}
你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的
相关文章:
源码解析FlinkKafkaConsumer支持punctuated水位线发送
背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…...
vue3学习(五)--- 父子组件传值
文章目录 defineProps普通写法TS写法 defineEmits普通写法TS写法 defineExpose defineProps 和 defineEmits 都是只能在 <script setup> 中使用的编译器宏。他们不需要导入,且会随着 <script setup> 的处理过程一同被编译掉。 defineProps 接收父组件传…...

寻找AI时代的关键拼图,从美国橡树岭国家实验室读懂AI存力信标
超算,是计算产业的明珠,是人类探索未知的航船。超算的发展与变化,不仅代表着各个国家与地区间的科技竞争力,更将作为趋势风向标,影响整个数字化体系的走向。 在目前阶段,超算与AI计算的融合是大势所趋。为了…...
多线程并发篇---第十二篇
系列文章目录 文章目录 系列文章目录一、说说ThreadLocal原理?二、线程池原理知道吗?以及核心参数三、线程池的拒绝策略有哪些?一、说说ThreadLocal原理? hreadLocal可以理解为线程本地变量,他会在每个线程都创建一个副本,那么在线程之间访问内部 副本变量就行了,做到了…...
P7537 [COCI2016-2017#4] Rima
由于题目涉及到后缀,不难想到用 trie 树处理。 将每个字符串翻转插入 trie,后缀就变成了前缀,方便处理。 条件 LCS ( A , B ) ≥ max ( ∣ A ∣ , ∣ B ∣ ) − 1 \text{LCS}(A,B) \ge \max(|A|,|B|)-1 LCS(A,B)≥max(∣A∣,∣B∣)−1&…...

SwiftUI Swift CoreData 计算某实体某属性总和
有一个名为 Item 的实体,它有一个名为 amount 的 Double 属性,向你的 View 添加一个计算属性: Code: struct ContentView: View {Environment(\.managedObjectContext) private var viewContextFetchRequest(sortDescriptors: [NSSortDescri…...

docker安装skyWalking笔记
确保安装了docker和docker-compose sudo docker -v Docker version 20.10.12, build 20.10.12-0ubuntu4 sudo docker-compose -v docker-compose version 1.29.2, build unknown 编写docker-compose.yml version: "3.1" services: skywalking-oap:image: apach…...
【Codeforces】 CF1097G Vladislav and a Great Legend
题目链接 CF方向 Luogu方向 题目解法 首先一个套路是普通幂转下降幂(为什么?因为观察到 k k k 很小,下降幂可以转化组合数问题,从而 d p dp dp 求解) 即 f ( X ) k ∑ i 0 k { k i } i ! ( f ( X ) i ) f(X)^k…...

力扣每日一题36:有效的数独
题目描述: 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 ,验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。(请参考…...

钉钉数字校园小程序开发:开启智慧教育新时代
随着信息技术的快速发展和校园管理的日益复杂化,数字校园已成为现代教育的重要趋势。钉钉数字校园小程序作为一种创新应用,以其专业性、思考深度和逻辑性,为学校提供了全新的管理、教学和沟方式。本文从需求分析、技术实现和应用思考三个方面…...

数据结构与算法--其他算法
数据结构与算法--其他算法 1 汉诺塔问题 2 字符串的全部子序列 3 字符串的全排列 4 纸牌问题 5 逆序栈问题 6 数字和字符串转换问题 7 背包问题 8 N皇后问题 暴力递归就是尝试 1,把问题转化为规模缩小了的同类问题的子问题 2,有明确的不需要继续…...
矩阵键盘行列扫描
/*----------------------------------------------- 内容:如计算器输入数据形式相同 从右至左 使用行列扫描方法 ------------------------------------------------*/ #include<reg52.h> //包含头文件,一般情况不需要改动,头文件包含…...

unity 实现拖动ui填空,并判断对错
参考:https://ask.csdn.net/questions/7971448 根据自己的需求修改为如下代码 使用过程中,出现拖动ui位置错误的情况,修改为使用 localPosition 但是吸附到指定位置却需要用的position public class DragAndDrop : MonoBehaviour, IBeginDr…...

《机器学习》第5章 神经网络
文章目录 5.1 神经元模型5.2 感知机与多层网络5.3 误差逆传播算法5.4 全局最小与局部最小5.5 其他常见神经网络RBF网络ART网络SOM网络级联相关网络Elman网络Boltzmann机 5.6 深度学习 5.1 神经元模型 神经网络是由具有适应性的简单单元组成的广泛并行互连的网络,它…...

FPGA project : flash_erasure
SPI是什么: SPI(Serial Peripheral Interface,串行外围设备接口)通讯协议,是Motorola公司提出的一种同步串行接口技术,是一种高速、全双工、同步通信总线,在芯片中只占用四根管脚用来控制及数据…...

AC修炼计划(AtCoder Regular Contest 166)
传送门:AtCoder Regular Contest 166 - AtCoder 一直修炼cf,觉得遇到了瓶颈了,所以想在atcode上寻求一些突破,今天本来想尝试vp AtCoder Regular Contest 166,但结局本不是很好,被卡了半天,止步…...
Android---Android 是如何通过 Activity 进行交互的
相信对于 Android 工程师来说,startActivity 就像初恋一般。要求低,见效快,是每一个菜鸟 Android 工程师迈向高级 Android 工程师的必经阶段。经过这么多年的发展,startActivity 在 google 的调教下已经变得愈发成熟,对…...

【论文解读】单目3D目标检测 MonoCon(AAAI2022)
本文分享单目3D目标检测,MonoCon模型的论文解读,了解它的设计思路,论文核心观点,模型结构,以及效果和性能。 目录 一、MonoCon简介 二、论文核心观点 三、模型框架 四、模型预测信息与3D框联系 五、损失函数 六、…...
Angular知识点系列(5)-每天10个小知识
目录 41. Angular的路由守卫42. 处理文件的上传和下载43. Angular的动画系统44. 使用第三方库和选择评估45. 性能优化46. AOT和JIT编译47. 处理响应式布局和适配不同屏幕尺寸48. Angular的国际化(i18n)49. Angular的PWA开发50. 使用Angular Material或其…...

基于海洋捕食者优化的BP神经网络(分类应用) - 附代码
基于海洋捕食者优化的BP神经网络(分类应用) - 附代码 文章目录 基于海洋捕食者优化的BP神经网络(分类应用) - 附代码1.鸢尾花iris数据介绍2.数据集整理3.海洋捕食者优化BP神经网络3.1 BP神经网络参数设置3.2 海洋捕食者算法应用 4…...

【JavaEE】-- HTTP
1. HTTP是什么? HTTP(全称为"超文本传输协议")是一种应用非常广泛的应用层协议,HTTP是基于TCP协议的一种应用层协议。 应用层协议:是计算机网络协议栈中最高层的协议,它定义了运行在不同主机上…...

如何在看板中体现优先级变化
在看板中有效体现优先级变化的关键措施包括:采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中,设置任务排序规则尤其重要,因为它让看板视觉上直观地体…...

全球首个30米分辨率湿地数据集(2000—2022)
数据简介 今天我们分享的数据是全球30米分辨率湿地数据集,包含8种湿地亚类,该数据以0.5X0.5的瓦片存储,我们整理了所有属于中国的瓦片名称与其对应省份,方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

分布式增量爬虫实现方案
之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面,避免重复抓取,以节省资源和时间。 在分布式环境下,增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路:将增量判…...

视觉slam十四讲实践部分记录——ch2、ch3
ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...
全面解析数据库:从基础概念到前沿应用
在数字化时代,数据已成为企业和社会发展的核心资产,而数据库作为存储、管理和处理数据的关键工具,在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理,到社交网络的用户数据存储,再到金融行业的交易记录处理&a…...
规则与人性的天平——由高考迟到事件引发的思考
当那位身着校服的考生在考场关闭1分钟后狂奔而至,他涨红的脸上写满绝望。铁门内秒针划过的弧度,成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定",构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...