当前位置: 首页 > news >正文

源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景

FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码

punctuated 水位线发送源码解析

1.首先KafkaFetcher中的runFetchLoop方法

public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumerconsumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer threadfinal ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition :subscribedPartitionStates()) {List<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());
// 算子任务消费的每个分区都调用这个方法partitionConsumerRecordsHandler(partitionRecords, partition);}}} finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}

2.查看partitionConsumerRecordsHandler方法处理当前算子任务对应的每个分区的水位线

    protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);// 发送kafka记录到下游算子sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 处理分区的水位线,记录这个分区的水位线,并在满足条件时更新整个算子任务的水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}```3.处理每个分区的水位线```javapublic void onEvent(T event, long timestamp) {watermarkGenerator.onEvent(event, timestamp, immediateOutput);}public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next =wms.checkAndGetNextWatermark(event, eventTimestamp);if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}其中 output.emitWatermark(new Watermark(next.getTimestamp()));对应方法如下public void emitWatermark(Watermark watermark) {long timestamp = watermark.getTimestamp();// 更新每个分区对应的水位线,并且更新boolean wasUpdated = state.setWatermark(timestamp);// if it's higher than the max watermark so far we might have to update the// combined watermark 这个表明这个算子任务的最低水位线,也就是算子任务级别的水位线,而不是分区级别的了if (wasUpdated && timestamp > combinedWatermark) {updateCombinedWatermark();}}//每个分区水位线的更新如下public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;this.watermark = Math.max(watermark, this.watermark);return updated;}       

4.最后是发送算子任务级别的水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit thatif (!hasOutputs) {return;}if (allIdle) {underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}

你可以看这个流程,是不是意味着如果使用Punctuated的方式,是不支持Idle空闲时间的?–答案是的

相关文章:

源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景 FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线&#xff0c;比如这条特殊的记录代表一个完整记录的结束等&#xff0c;本文就来解析下发送punctuated水位线的源码 punctuated 水位线发送源码解析 1.首先KafkaFetcher中的runFetchLoop方法 public…...

vue3学习(五)--- 父子组件传值

文章目录 defineProps普通写法TS写法 defineEmits普通写法TS写法 defineExpose defineProps 和 defineEmits 都是只能在 <script setup> 中使用的编译器宏。他们不需要导入&#xff0c;且会随着 <script setup> 的处理过程一同被编译掉。 defineProps 接收父组件传…...

寻找AI时代的关键拼图,从美国橡树岭国家实验室读懂AI存力信标

超算&#xff0c;是计算产业的明珠&#xff0c;是人类探索未知的航船。超算的发展与变化&#xff0c;不仅代表着各个国家与地区间的科技竞争力&#xff0c;更将作为趋势风向标&#xff0c;影响整个数字化体系的走向。 在目前阶段&#xff0c;超算与AI计算的融合是大势所趋。为了…...

多线程并发篇---第十二篇

系列文章目录 文章目录 系列文章目录一、说说ThreadLocal原理?二、线程池原理知道吗?以及核心参数三、线程池的拒绝策略有哪些?一、说说ThreadLocal原理? hreadLocal可以理解为线程本地变量,他会在每个线程都创建一个副本,那么在线程之间访问内部 副本变量就行了,做到了…...

P7537 [COCI2016-2017#4] Rima

由于题目涉及到后缀&#xff0c;不难想到用 trie 树处理。 将每个字符串翻转插入 trie&#xff0c;后缀就变成了前缀&#xff0c;方便处理。 条件 LCS ( A , B ) ≥ max ⁡ ( ∣ A ∣ , ∣ B ∣ ) − 1 \text{LCS}(A,B) \ge \max(|A|,|B|)-1 LCS(A,B)≥max(∣A∣,∣B∣)−1&…...

SwiftUI Swift CoreData 计算某实体某属性总和

有一个名为 Item 的实体&#xff0c;它有一个名为 amount 的 Double 属性&#xff0c;向你的 View 添加一个计算属性&#xff1a; Code: struct ContentView: View {Environment(\.managedObjectContext) private var viewContextFetchRequest(sortDescriptors: [NSSortDescri…...

docker安装skyWalking笔记

确保安装了docker和docker-compose sudo docker -v Docker version 20.10.12, build 20.10.12-0ubuntu4 sudo docker-compose -v docker-compose version 1.29.2, build unknown 编写docker-compose.yml version: "3.1" services: skywalking-oap:image: apach…...

【Codeforces】 CF1097G Vladislav and a Great Legend

题目链接 CF方向 Luogu方向 题目解法 首先一个套路是普通幂转下降幂&#xff08;为什么&#xff1f;因为观察到 k k k 很小&#xff0c;下降幂可以转化组合数问题&#xff0c;从而 d p dp dp 求解&#xff09; 即 f ( X ) k ∑ i 0 k { k i } i ! ( f ( X ) i ) f(X)^k…...

力扣每日一题36:有效的数独

题目描述&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考…...

钉钉数字校园小程序开发:开启智慧教育新时代

随着信息技术的快速发展和校园管理的日益复杂化&#xff0c;数字校园已成为现代教育的重要趋势。钉钉数字校园小程序作为一种创新应用&#xff0c;以其专业性、思考深度和逻辑性&#xff0c;为学校提供了全新的管理、教学和沟方式。本文从需求分析、技术实现和应用思考三个方面…...

数据结构与算法--其他算法

数据结构与算法--其他算法 1 汉诺塔问题 2 字符串的全部子序列 3 字符串的全排列 4 纸牌问题 5 逆序栈问题 6 数字和字符串转换问题 7 背包问题 8 N皇后问题 暴力递归就是尝试 1&#xff0c;把问题转化为规模缩小了的同类问题的子问题 2&#xff0c;有明确的不需要继续…...

矩阵键盘行列扫描

/*----------------------------------------------- 内容&#xff1a;如计算器输入数据形式相同 从右至左 使用行列扫描方法 ------------------------------------------------*/ #include<reg52.h> //包含头文件&#xff0c;一般情况不需要改动&#xff0c;头文件包含…...

unity 实现拖动ui填空,并判断对错

参考&#xff1a;https://ask.csdn.net/questions/7971448 根据自己的需求修改为如下代码 使用过程中&#xff0c;出现拖动ui位置错误的情况&#xff0c;修改为使用 localPosition 但是吸附到指定位置却需要用的position public class DragAndDrop : MonoBehaviour, IBeginDr…...

《机器学习》第5章 神经网络

文章目录 5.1 神经元模型5.2 感知机与多层网络5.3 误差逆传播算法5.4 全局最小与局部最小5.5 其他常见神经网络RBF网络ART网络SOM网络级联相关网络Elman网络Boltzmann机 5.6 深度学习 5.1 神经元模型 神经网络是由具有适应性的简单单元组成的广泛并行互连的网络&#xff0c;它…...

FPGA project : flash_erasure

SPI是什么&#xff1a; SPI&#xff08;Serial Peripheral Interface&#xff0c;串行外围设备接口&#xff09;通讯协议&#xff0c;是Motorola公司提出的一种同步串行接口技术&#xff0c;是一种高速、全双工、同步通信总线&#xff0c;在芯片中只占用四根管脚用来控制及数据…...

AC修炼计划(AtCoder Regular Contest 166)

传送门&#xff1a;AtCoder Regular Contest 166 - AtCoder 一直修炼cf&#xff0c;觉得遇到了瓶颈了&#xff0c;所以想在atcode上寻求一些突破&#xff0c;今天本来想尝试vp AtCoder Regular Contest 166&#xff0c;但结局本不是很好&#xff0c;被卡了半天&#xff0c;止步…...

Android---Android 是如何通过 Activity 进行交互的

相信对于 Android 工程师来说&#xff0c;startActivity 就像初恋一般。要求低&#xff0c;见效快&#xff0c;是每一个菜鸟 Android 工程师迈向高级 Android 工程师的必经阶段。经过这么多年的发展&#xff0c;startActivity 在 google 的调教下已经变得愈发成熟&#xff0c;对…...

【论文解读】单目3D目标检测 MonoCon(AAAI2022)

本文分享单目3D目标检测&#xff0c;MonoCon模型的论文解读&#xff0c;了解它的设计思路&#xff0c;论文核心观点&#xff0c;模型结构&#xff0c;以及效果和性能。 目录 一、MonoCon简介 二、论文核心观点 三、模型框架 四、模型预测信息与3D框联系 五、损失函数 六、…...

Angular知识点系列(5)-每天10个小知识

目录 41. Angular的路由守卫42. 处理文件的上传和下载43. Angular的动画系统44. 使用第三方库和选择评估45. 性能优化46. AOT和JIT编译47. 处理响应式布局和适配不同屏幕尺寸48. Angular的国际化&#xff08;i18n&#xff09;49. Angular的PWA开发50. 使用Angular Material或其…...

基于海洋捕食者优化的BP神经网络(分类应用) - 附代码

基于海洋捕食者优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于海洋捕食者优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.海洋捕食者优化BP神经网络3.1 BP神经网络参数设置3.2 海洋捕食者算法应用 4…...

基于算法竞赛的c++编程(28)结构体的进阶应用

结构体的嵌套与复杂数据组织 在C中&#xff0c;结构体可以嵌套使用&#xff0c;形成更复杂的数据结构。例如&#xff0c;可以通过嵌套结构体描述多层级数据关系&#xff1a; struct Address {string city;string street;int zipCode; };struct Employee {string name;int id;…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍

文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结&#xff1a; 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析&#xff1a; 实际业务去理解体会统一注…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统&#xff0c;支持多语言对话&#xff08;如 中文&#xff0c;英文&#xff0c;日语&#xff09;&#xff0c;语音情感&#xff08;如 开心&#xff0c;悲伤&#xff09;&#x…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

C++八股 —— 单例模式

文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全&#xff08;Thread Safety&#xff09; 线程安全是指在多线程环境下&#xff0c;某个函数、类或代码片段能够被多个线程同时调用时&#xff0c;仍能保证数据的一致性和逻辑的正确性&#xf…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配

目录 一、C 内存的基本概念​ 1.1 内存的物理与逻辑结构​ 1.2 C 程序的内存区域划分​ 二、栈内存分配​ 2.1 栈内存的特点​ 2.2 栈内存分配示例​ 三、堆内存分配​ 3.1 new和delete操作符​ 4.2 内存泄漏与悬空指针问题​ 4.3 new和delete的重载​ 四、智能指针…...