源码解析FlinkKafkaConsumer支持punctuated水位线发送
背景
FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码
punctuated 水位线发送源码解析
1.首先KafkaFetcher中的runFetchLoop方法
public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}
2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线
protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}
4.最后是发送算子任务级别的水位线的方法
private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}
你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的
相关文章:
源码解析FlinkKafkaConsumer支持punctuated水位线发送
背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…...
vue3学习(五)--- 父子组件传值
文章目录 defineProps普通写法TS写法 defineEmits普通写法TS写法 defineExpose defineProps 和 defineEmits 都是只能在 <script setup> 中使用的编译器宏。他们不需要导入,且会随着 <script setup> 的处理过程一同被编译掉。 defineProps 接收父组件传…...
寻找AI时代的关键拼图,从美国橡树岭国家实验室读懂AI存力信标
超算,是计算产业的明珠,是人类探索未知的航船。超算的发展与变化,不仅代表着各个国家与地区间的科技竞争力,更将作为趋势风向标,影响整个数字化体系的走向。 在目前阶段,超算与AI计算的融合是大势所趋。为了…...
多线程并发篇---第十二篇
系列文章目录 文章目录 系列文章目录一、说说ThreadLocal原理?二、线程池原理知道吗?以及核心参数三、线程池的拒绝策略有哪些?一、说说ThreadLocal原理? hreadLocal可以理解为线程本地变量,他会在每个线程都创建一个副本,那么在线程之间访问内部 副本变量就行了,做到了…...
P7537 [COCI2016-2017#4] Rima
由于题目涉及到后缀,不难想到用 trie 树处理。 将每个字符串翻转插入 trie,后缀就变成了前缀,方便处理。 条件 LCS ( A , B ) ≥ max ( ∣ A ∣ , ∣ B ∣ ) − 1 \text{LCS}(A,B) \ge \max(|A|,|B|)-1 LCS(A,B)≥max(∣A∣,∣B∣)−1&…...
SwiftUI Swift CoreData 计算某实体某属性总和
有一个名为 Item 的实体,它有一个名为 amount 的 Double 属性,向你的 View 添加一个计算属性: Code: struct ContentView: View {Environment(\.managedObjectContext) private var viewContextFetchRequest(sortDescriptors: [NSSortDescri…...
docker安装skyWalking笔记
确保安装了docker和docker-compose sudo docker -v Docker version 20.10.12, build 20.10.12-0ubuntu4 sudo docker-compose -v docker-compose version 1.29.2, build unknown 编写docker-compose.yml version: "3.1" services: skywalking-oap:image: apach…...
【Codeforces】 CF1097G Vladislav and a Great Legend
题目链接 CF方向 Luogu方向 题目解法 首先一个套路是普通幂转下降幂(为什么?因为观察到 k k k 很小,下降幂可以转化组合数问题,从而 d p dp dp 求解) 即 f ( X ) k ∑ i 0 k { k i } i ! ( f ( X ) i ) f(X)^k…...
力扣每日一题36:有效的数独
题目描述: 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 ,验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。(请参考…...
钉钉数字校园小程序开发:开启智慧教育新时代
随着信息技术的快速发展和校园管理的日益复杂化,数字校园已成为现代教育的重要趋势。钉钉数字校园小程序作为一种创新应用,以其专业性、思考深度和逻辑性,为学校提供了全新的管理、教学和沟方式。本文从需求分析、技术实现和应用思考三个方面…...
数据结构与算法--其他算法
数据结构与算法--其他算法 1 汉诺塔问题 2 字符串的全部子序列 3 字符串的全排列 4 纸牌问题 5 逆序栈问题 6 数字和字符串转换问题 7 背包问题 8 N皇后问题 暴力递归就是尝试 1,把问题转化为规模缩小了的同类问题的子问题 2,有明确的不需要继续…...
矩阵键盘行列扫描
/*----------------------------------------------- 内容:如计算器输入数据形式相同 从右至左 使用行列扫描方法 ------------------------------------------------*/ #include<reg52.h> //包含头文件,一般情况不需要改动,头文件包含…...
unity 实现拖动ui填空,并判断对错
参考:https://ask.csdn.net/questions/7971448 根据自己的需求修改为如下代码 使用过程中,出现拖动ui位置错误的情况,修改为使用 localPosition 但是吸附到指定位置却需要用的position public class DragAndDrop : MonoBehaviour, IBeginDr…...
《机器学习》第5章 神经网络
文章目录 5.1 神经元模型5.2 感知机与多层网络5.3 误差逆传播算法5.4 全局最小与局部最小5.5 其他常见神经网络RBF网络ART网络SOM网络级联相关网络Elman网络Boltzmann机 5.6 深度学习 5.1 神经元模型 神经网络是由具有适应性的简单单元组成的广泛并行互连的网络,它…...
FPGA project : flash_erasure
SPI是什么: SPI(Serial Peripheral Interface,串行外围设备接口)通讯协议,是Motorola公司提出的一种同步串行接口技术,是一种高速、全双工、同步通信总线,在芯片中只占用四根管脚用来控制及数据…...
AC修炼计划(AtCoder Regular Contest 166)
传送门:AtCoder Regular Contest 166 - AtCoder 一直修炼cf,觉得遇到了瓶颈了,所以想在atcode上寻求一些突破,今天本来想尝试vp AtCoder Regular Contest 166,但结局本不是很好,被卡了半天,止步…...
Android---Android 是如何通过 Activity 进行交互的
相信对于 Android 工程师来说,startActivity 就像初恋一般。要求低,见效快,是每一个菜鸟 Android 工程师迈向高级 Android 工程师的必经阶段。经过这么多年的发展,startActivity 在 google 的调教下已经变得愈发成熟,对…...
【论文解读】单目3D目标检测 MonoCon(AAAI2022)
本文分享单目3D目标检测,MonoCon模型的论文解读,了解它的设计思路,论文核心观点,模型结构,以及效果和性能。 目录 一、MonoCon简介 二、论文核心观点 三、模型框架 四、模型预测信息与3D框联系 五、损失函数 六、…...
Angular知识点系列(5)-每天10个小知识
目录 41. Angular的路由守卫42. 处理文件的上传和下载43. Angular的动画系统44. 使用第三方库和选择评估45. 性能优化46. AOT和JIT编译47. 处理响应式布局和适配不同屏幕尺寸48. Angular的国际化(i18n)49. Angular的PWA开发50. 使用Angular Material或其…...
基于海洋捕食者优化的BP神经网络(分类应用) - 附代码
基于海洋捕食者优化的BP神经网络(分类应用) - 附代码 文章目录 基于海洋捕食者优化的BP神经网络(分类应用) - 附代码1.鸢尾花iris数据介绍2.数据集整理3.海洋捕食者优化BP神经网络3.1 BP神经网络参数设置3.2 海洋捕食者算法应用 4…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
JVM垃圾回收机制全解析
Java虚拟机(JVM)中的垃圾收集器(Garbage Collector,简称GC)是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象,从而释放内存空间,避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...
2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度
文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...
Xen Server服务器释放磁盘空间
disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
面向无人机海岸带生态系统监测的语义分割基准数据集
描述:海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而,目前该领域仍面临一个挑战,即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...
如何更改默认 Crontab 编辑器 ?
在 Linux 领域中,crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用,用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益,允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...
