当前位置: 首页 > news >正文

源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景

FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

punctuated 水位线发送源码解析

1.首先KafkaFetcher中的runFetchLoop方法

public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

    protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}       

4.最后是发送算子任务级别的水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}

你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

相关文章:

源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线&#xff0c;比如这条特殊的记录代表一个完整记录的结束等&#xff0c;本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…...

vue3学习(五)--- 父子组件传值

文章目录 defineProps普通写法TS写法 defineEmits普通写法TS写法 defineExpose defineProps 和 defineEmits 都是只能在 <script setup> 中使用的编译器宏。他们不需要导入&#xff0c;且会随着 <script setup> 的处理过程一同被编译掉。 defineProps 接收父组件传…...

寻找AI时代的关键拼图,从美国橡树岭国家实验室读懂AI存力信标

超算&#xff0c;是计算产业的明珠&#xff0c;是人类探索未知的航船。超算的发展与变化&#xff0c;不仅代表着各个国家与地区间的科技竞争力&#xff0c;更将作为趋势风向标&#xff0c;影响整个数字化体系的走向。 在目前阶段&#xff0c;超算与AI计算的融合是大势所趋。为了…...

多线程并发篇---第十二篇

系列文章目录 文章目录 系列文章目录一、说说ThreadLocal原理?二、线程池原理知道吗?以及核心参数三、线程池的拒绝策略有哪些?一、说说ThreadLocal原理? hreadLocal可以理解为线程本地变量,他会在每个线程都创建一个副本,那么在线程之间访问内部 副本变量就行了,做到了…...

P7537 [COCI2016-2017#4] Rima

由于题目涉及到后缀&#xff0c;不难想到用 trie 树处理。 将每个字符串翻转插入 trie&#xff0c;后缀就变成了前缀&#xff0c;方便处理。 条件 LCS ( A , B ) ≥ max ⁡ ( ∣ A ∣ , ∣ B ∣ ) − 1 \text{LCS}(A,B) \ge \max(|A|,|B|)-1 LCS(A,B)≥max(∣A∣,∣B∣)−1&…...

SwiftUI Swift CoreData 计算某实体某属性总和

有一个名为 Item 的实体&#xff0c;它有一个名为 amount 的 Double 属性&#xff0c;向你的 View 添加一个计算属性&#xff1a; Code: struct ContentView: View {Environment(\.managedObjectContext) private var viewContextFetchRequest(sortDescriptors: [NSSortDescri…...

docker安装skyWalking笔记

确保安装了docker和docker-compose sudo docker -v Docker version 20.10.12, build 20.10.12-0ubuntu4 sudo docker-compose -v docker-compose version 1.29.2, build unknown 编写docker-compose.yml version: "3.1" services: skywalking-oap:image: apach…...

【Codeforces】 CF1097G Vladislav and a Great Legend

题目链接 CF方向 Luogu方向 题目解法 首先一个套路是普通幂转下降幂&#xff08;为什么&#xff1f;因为观察到 k k k 很小&#xff0c;下降幂可以转化组合数问题&#xff0c;从而 d p dp dp 求解&#xff09; 即 f ( X ) k ∑ i 0 k { k i } i ! ( f ( X ) i ) f(X)^k…...

力扣每日一题36:有效的数独

题目描述&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考…...

钉钉数字校园小程序开发:开启智慧教育新时代

随着信息技术的快速发展和校园管理的日益复杂化&#xff0c;数字校园已成为现代教育的重要趋势。钉钉数字校园小程序作为一种创新应用&#xff0c;以其专业性、思考深度和逻辑性&#xff0c;为学校提供了全新的管理、教学和沟方式。本文从需求分析、技术实现和应用思考三个方面…...

数据结构与算法--其他算法

数据结构与算法--其他算法 1 汉诺塔问题 2 字符串的全部子序列 3 字符串的全排列 4 纸牌问题 5 逆序栈问题 6 数字和字符串转换问题 7 背包问题 8 N皇后问题 暴力递归就是尝试 1&#xff0c;把问题转化为规模缩小了的同类问题的子问题 2&#xff0c;有明确的不需要继续…...

矩阵键盘行列扫描

/*----------------------------------------------- 内容&#xff1a;如计算器输入数据形式相同 从右至左 使用行列扫描方法 ------------------------------------------------*/ #include<reg52.h> //包含头文件&#xff0c;一般情况不需要改动&#xff0c;头文件包含…...

unity 实现拖动ui填空,并判断对错

参考&#xff1a;https://ask.csdn.net/questions/7971448 根据自己的需求修改为如下代码 使用过程中&#xff0c;出现拖动ui位置错误的情况&#xff0c;修改为使用 localPosition 但是吸附到指定位置却需要用的position public class DragAndDrop : MonoBehaviour, IBeginDr…...

《机器学习》第5章 神经网络

文章目录 5.1 神经元模型5.2 感知机与多层网络5.3 误差逆传播算法5.4 全局最小与局部最小5.5 其他常见神经网络RBF网络ART网络SOM网络级联相关网络Elman网络Boltzmann机 5.6 深度学习 5.1 神经元模型 神经网络是由具有适应性的简单单元组成的广泛并行互连的网络&#xff0c;它…...

FPGA project : flash_erasure

SPI是什么&#xff1a; SPI&#xff08;Serial Peripheral Interface&#xff0c;串行外围设备接口&#xff09;通讯协议&#xff0c;是Motorola公司提出的一种同步串行接口技术&#xff0c;是一种高速、全双工、同步通信总线&#xff0c;在芯片中只占用四根管脚用来控制及数据…...

AC修炼计划(AtCoder Regular Contest 166)

传送门&#xff1a;AtCoder Regular Contest 166 - AtCoder 一直修炼cf&#xff0c;觉得遇到了瓶颈了&#xff0c;所以想在atcode上寻求一些突破&#xff0c;今天本来想尝试vp AtCoder Regular Contest 166&#xff0c;但结局本不是很好&#xff0c;被卡了半天&#xff0c;止步…...

Android---Android 是如何通过 Activity 进行交互的

相信对于 Android 工程师来说&#xff0c;startActivity 就像初恋一般。要求低&#xff0c;见效快&#xff0c;是每一个菜鸟 Android 工程师迈向高级 Android 工程师的必经阶段。经过这么多年的发展&#xff0c;startActivity 在 google 的调教下已经变得愈发成熟&#xff0c;对…...

【论文解读】单目3D目标检测 MonoCon(AAAI2022)

本文分享单目3D目标检测&#xff0c;MonoCon模型的论文解读&#xff0c;了解它的设计思路&#xff0c;论文核心观点&#xff0c;模型结构&#xff0c;以及效果和性能。 目录 一、MonoCon简介 二、论文核心观点 三、模型框架 四、模型预测信息与3D框联系 五、损失函数 六、…...

Angular知识点系列(5)-每天10个小知识

目录 41. Angular的路由守卫42. 处理文件的上传和下载43. Angular的动画系统44. 使用第三方库和选择评估45. 性能优化46. AOT和JIT编译47. 处理响应式布局和适配不同屏幕尺寸48. Angular的国际化&#xff08;i18n&#xff09;49. Angular的PWA开发50. 使用Angular Material或其…...

基于海洋捕食者优化的BP神经网络(分类应用) - 附代码

基于海洋捕食者优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于海洋捕食者优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.海洋捕食者优化BP神经网络3.1 BP神经网络参数设置3.2 海洋捕食者算法应用 4…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

如何在看板中体现优先级变化

在看板中有效体现优先级变化的关键措施包括&#xff1a;采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中&#xff0c;设置任务排序规则尤其重要&#xff0c;因为它让看板视觉上直观地体…...

全球首个30米分辨率湿地数据集(2000—2022)

数据简介 今天我们分享的数据是全球30米分辨率湿地数据集&#xff0c;包含8种湿地亚类&#xff0c;该数据以0.5X0.5的瓦片存储&#xff0c;我们整理了所有属于中国的瓦片名称与其对应省份&#xff0c;方便大家研究使用。 该数据集作为全球首个30米分辨率、覆盖2000–2022年时间…...

Spring Boot面试题精选汇总

&#x1f91f;致敬读者 &#x1f7e9;感谢阅读&#x1f7e6;笑口常开&#x1f7ea;生日快乐⬛早点睡觉 &#x1f4d8;博主相关 &#x1f7e7;博主信息&#x1f7e8;博客首页&#x1f7eb;专栏推荐&#x1f7e5;活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec&#xff1f; IPsec VPN 5.1 IPsec传输模式&#xff08;Transport Mode&#xff09; 5.2 IPsec隧道模式&#xff08;Tunne…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

视觉slam十四讲实践部分记录——ch2、ch3

ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...

全面解析数据库:从基础概念到前沿应用​

在数字化时代&#xff0c;数据已成为企业和社会发展的核心资产&#xff0c;而数据库作为存储、管理和处理数据的关键工具&#xff0c;在各个领域发挥着举足轻重的作用。从电商平台的商品信息管理&#xff0c;到社交网络的用户数据存储&#xff0c;再到金融行业的交易记录处理&a…...

规则与人性的天平——由高考迟到事件引发的思考

当那位身着校服的考生在考场关闭1分钟后狂奔而至&#xff0c;他涨红的脸上写满绝望。铁门内秒针划过的弧度&#xff0c;成为改变人生的残酷抛物线。家长声嘶力竭的哀求与考务人员机械的"这是规定"&#xff0c;构成当代中国教育最尖锐的隐喻。 一、刚性规则的必要性 …...