当前位置: 首页 > news >正文

源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景

FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

punctuated 水位线发送源码解析

1.首先KafkaFetcher中的runFetchLoop方法

public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

    protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}       

4.最后是发送算子任务级别的水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}

你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

相关文章:

源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线&#xff0c;比如这条特殊的记录代表一个完整记录的结束等&#xff0c;本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…...

vue3学习(五)--- 父子组件传值

文章目录 defineProps普通写法TS写法 defineEmits普通写法TS写法 defineExpose defineProps 和 defineEmits 都是只能在 <script setup> 中使用的编译器宏。他们不需要导入&#xff0c;且会随着 <script setup> 的处理过程一同被编译掉。 defineProps 接收父组件传…...

寻找AI时代的关键拼图,从美国橡树岭国家实验室读懂AI存力信标

超算&#xff0c;是计算产业的明珠&#xff0c;是人类探索未知的航船。超算的发展与变化&#xff0c;不仅代表着各个国家与地区间的科技竞争力&#xff0c;更将作为趋势风向标&#xff0c;影响整个数字化体系的走向。 在目前阶段&#xff0c;超算与AI计算的融合是大势所趋。为了…...

多线程并发篇---第十二篇

系列文章目录 文章目录 系列文章目录一、说说ThreadLocal原理?二、线程池原理知道吗?以及核心参数三、线程池的拒绝策略有哪些?一、说说ThreadLocal原理? hreadLocal可以理解为线程本地变量,他会在每个线程都创建一个副本,那么在线程之间访问内部 副本变量就行了,做到了…...

P7537 [COCI2016-2017#4] Rima

由于题目涉及到后缀&#xff0c;不难想到用 trie 树处理。 将每个字符串翻转插入 trie&#xff0c;后缀就变成了前缀&#xff0c;方便处理。 条件 LCS ( A , B ) ≥ max ⁡ ( ∣ A ∣ , ∣ B ∣ ) − 1 \text{LCS}(A,B) \ge \max(|A|,|B|)-1 LCS(A,B)≥max(∣A∣,∣B∣)−1&…...

SwiftUI Swift CoreData 计算某实体某属性总和

有一个名为 Item 的实体&#xff0c;它有一个名为 amount 的 Double 属性&#xff0c;向你的 View 添加一个计算属性&#xff1a; Code: struct ContentView: View {Environment(\.managedObjectContext) private var viewContextFetchRequest(sortDescriptors: [NSSortDescri…...

docker安装skyWalking笔记

确保安装了docker和docker-compose sudo docker -v Docker version 20.10.12, build 20.10.12-0ubuntu4 sudo docker-compose -v docker-compose version 1.29.2, build unknown 编写docker-compose.yml version: "3.1" services: skywalking-oap:image: apach…...

【Codeforces】 CF1097G Vladislav and a Great Legend

题目链接 CF方向 Luogu方向 题目解法 首先一个套路是普通幂转下降幂&#xff08;为什么&#xff1f;因为观察到 k k k 很小&#xff0c;下降幂可以转化组合数问题&#xff0c;从而 d p dp dp 求解&#xff09; 即 f ( X ) k ∑ i 0 k { k i } i ! ( f ( X ) i ) f(X)^k…...

力扣每日一题36:有效的数独

题目描述&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考…...

钉钉数字校园小程序开发:开启智慧教育新时代

随着信息技术的快速发展和校园管理的日益复杂化&#xff0c;数字校园已成为现代教育的重要趋势。钉钉数字校园小程序作为一种创新应用&#xff0c;以其专业性、思考深度和逻辑性&#xff0c;为学校提供了全新的管理、教学和沟方式。本文从需求分析、技术实现和应用思考三个方面…...

数据结构与算法--其他算法

数据结构与算法--其他算法 1 汉诺塔问题 2 字符串的全部子序列 3 字符串的全排列 4 纸牌问题 5 逆序栈问题 6 数字和字符串转换问题 7 背包问题 8 N皇后问题 暴力递归就是尝试 1&#xff0c;把问题转化为规模缩小了的同类问题的子问题 2&#xff0c;有明确的不需要继续…...

矩阵键盘行列扫描

/*----------------------------------------------- 内容&#xff1a;如计算器输入数据形式相同 从右至左 使用行列扫描方法 ------------------------------------------------*/ #include<reg52.h> //包含头文件&#xff0c;一般情况不需要改动&#xff0c;头文件包含…...

unity 实现拖动ui填空,并判断对错

参考&#xff1a;https://ask.csdn.net/questions/7971448 根据自己的需求修改为如下代码 使用过程中&#xff0c;出现拖动ui位置错误的情况&#xff0c;修改为使用 localPosition 但是吸附到指定位置却需要用的position public class DragAndDrop : MonoBehaviour, IBeginDr…...

《机器学习》第5章 神经网络

文章目录 5.1 神经元模型5.2 感知机与多层网络5.3 误差逆传播算法5.4 全局最小与局部最小5.5 其他常见神经网络RBF网络ART网络SOM网络级联相关网络Elman网络Boltzmann机 5.6 深度学习 5.1 神经元模型 神经网络是由具有适应性的简单单元组成的广泛并行互连的网络&#xff0c;它…...

FPGA project : flash_erasure

SPI是什么&#xff1a; SPI&#xff08;Serial Peripheral Interface&#xff0c;串行外围设备接口&#xff09;通讯协议&#xff0c;是Motorola公司提出的一种同步串行接口技术&#xff0c;是一种高速、全双工、同步通信总线&#xff0c;在芯片中只占用四根管脚用来控制及数据…...

AC修炼计划(AtCoder Regular Contest 166)

传送门&#xff1a;AtCoder Regular Contest 166 - AtCoder 一直修炼cf&#xff0c;觉得遇到了瓶颈了&#xff0c;所以想在atcode上寻求一些突破&#xff0c;今天本来想尝试vp AtCoder Regular Contest 166&#xff0c;但结局本不是很好&#xff0c;被卡了半天&#xff0c;止步…...

Android---Android 是如何通过 Activity 进行交互的

相信对于 Android 工程师来说&#xff0c;startActivity 就像初恋一般。要求低&#xff0c;见效快&#xff0c;是每一个菜鸟 Android 工程师迈向高级 Android 工程师的必经阶段。经过这么多年的发展&#xff0c;startActivity 在 google 的调教下已经变得愈发成熟&#xff0c;对…...

【论文解读】单目3D目标检测 MonoCon(AAAI2022)

本文分享单目3D目标检测&#xff0c;MonoCon模型的论文解读&#xff0c;了解它的设计思路&#xff0c;论文核心观点&#xff0c;模型结构&#xff0c;以及效果和性能。 目录 一、MonoCon简介 二、论文核心观点 三、模型框架 四、模型预测信息与3D框联系 五、损失函数 六、…...

Angular知识点系列(5)-每天10个小知识

目录 41. Angular的路由守卫42. 处理文件的上传和下载43. Angular的动画系统44. 使用第三方库和选择评估45. 性能优化46. AOT和JIT编译47. 处理响应式布局和适配不同屏幕尺寸48. Angular的国际化&#xff08;i18n&#xff09;49. Angular的PWA开发50. 使用Angular Material或其…...

基于海洋捕食者优化的BP神经网络(分类应用) - 附代码

基于海洋捕食者优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于海洋捕食者优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.海洋捕食者优化BP神经网络3.1 BP神经网络参数设置3.2 海洋捕食者算法应用 4…...

Lift, Splat, Shoot图像BEV安装与模型详解

1 前言 计算机视觉算法通常使用图像是作为输入并输出预测的结果,但是对结果所在的坐标系却并不关心,例如图像分类、图像分割、图像检测等任务中,输出的结果均在原始的图像坐标系中。因此这种范式不能很好的与自动驾驶契合。 在自动驾驶中,多个相机传感器的数据一起作为输…...

MySQL简介

数据库管理系统 1、关系型数据库管理系统: Oracle:Oracle是一种商业级关系型数据库管理系统,支持高可用性、高安全性以及广泛的企业级应用需求。SQL Server:SQL Server是Microsoft开发的企业级关系型数据库管理系统,广泛应用于Windows环境下的软件开发。MySQL:MySQL是一…...

php代码优化---本人的例子

直接上货&#xff1a; 1&#xff1a;数据统计 店铺数量、提现金额、收益金额、用户数量 旧&#xff1a; // //店铺// $storey db( store )->whereTime( addtime, yesterday )->count();//昨天// $stored db( store )->whereTime( addtime, d )->count();//今天…...

EMC Unity存储(VNXe) service Mode和Normal Mode的一些说明

本文介绍下EMC unity存储设备&#xff08;也包含VNXe存储设备&#xff09;的两种工作模式&#xff1a; Service mode&#xff1a;也叫做rescue mode&#xff0c;存储OS工作不正常或者有其他故障&#xff0c;就会进入这个模式&#xff0c;无法对外提供服务Normal mode&#xff…...

基于全景运动感知的飞行视觉脑关节神经网络全方位碰撞检测

https:/doi.org/10.1155/2023/5784720 摘要&#xff1a; 生物系统有大量的视觉运动检测神经元&#xff0c;其中一些神经元可以优先对特定的视觉区域做出反应。然而&#xff0c;关于如何使用它们来开发用于全向碰撞检测的神经网络模型&#xff0c;很少有人做过工作。为此&#…...

Java 继承与实现

一、继承&#xff08;extends&#xff09; 1.1 继承概念 继承是面向对象的基本特征&#xff0c;它允许子类继承父类的特征和行为&#xff0c;以提高代码的复用率和维护性等。下面一张图生动地展示了继承和类之间的关系&#xff1a; 继承图 上图中&#xff0c;“动物”、“食草…...

Unity 3D基础——计算两个物体之间的距离

1.在场景中新建两个 Cube 立方体&#xff0c;在 Scene 视图中将两个 Cude的位置错开。 2.新建 C# 脚本 Distance.cs&#xff08;写完记得保存&#xff09; using System.Collections; using System.Collections.Generic; using UnityEngine;public class Distance : MonoBehav…...

css常见问题处理

文章目录 1&#xff1a;禁止文字被复制粘贴1.1 Css 处理1.2 Js 处理 2&#xff1a;元素垂直水平居中2.1:方案一2.2 方案二2.3 方案三2.4 方案四2.5 方案五 1&#xff1a;禁止文字被复制粘贴 1.1 Css 处理 <div class"text">我不可以复制信息</div> <…...

蓝桥杯(迷宫,C++)

输入&#xff1a; 思路&#xff1a; 1、注意输入用字符串。 2、采用广度搜素的方法来求解。 3、因为最后要求字典序最小且D<L<R<U,所以在遍历四个方向的时候&#xff0c; 先向下&#xff0c;再向左、右&#xff0c;最后向上。 #include<iostream> #include…...

Python爬虫selenium安装谷歌驱动解决办法

驱动下载链接&#xff1a;CNPM Binaries Mirror (npmmirror.com) 谷歌浏览器老版本下载&#xff1a;Google Chrome 64bit Windows版_chrome浏览器,chrome插件,谷歌浏览器下载,谈笑有鸿儒 (chromedownloads.net) 驱动下载后解压缩直接放入python相应文件夹&#xff1a; 最后&a…...