源码解析FlinkKafkaConsumer支持周期性水位线发送
背景
当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程
FlinkKafkaConsumer水位线发送
1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动
// if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}
2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行
public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值state.onPeriodicEmit();}//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}
3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法
@Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退this.watermark = Math.max(watermark, this.watermark);return updated;}
4.对应算子任务组合当前任务消费的所有分区水位线的方法
private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```
相关文章:
源码解析FlinkKafkaConsumer支持周期性水位线发送
背景 当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程 FlinkKafkaConsumer水位线发送 1.首先从Fetcher类开始,…...

Nginx:动静分离(示意图+配置讲解)
示意图: 动静分离 动静分离是指将动态内容和静态内容分开处理的一种方式。通常,动态内容是指由服务器端处理的,例如动态生成的网页、数据库查询等。静态内容是指不需要经过服务器端处理的,例如图片、CSS、JavaScript文件等。通过…...

通讯网关软件024——利用CommGate X2Access实现Modbus TCP数据转储Access
本文介绍利用CommGate X2ACCESS实现从Modbus TCP设备读取数据并转储至ACCESS数据库。CommGate X2ACCESS是宁波科安网信开发的网关软件,软件可以登录到网信智汇(http://wangxinzhihui.com)下载。 【案例】如下图所示,实现从Modbus TCP设备读取数据并转储…...

vim工具的使用
目录 vi/vim键盘图 1、vim的基本概念 2、vim的基本使用 3、vim命令模式命令集 4、vim底行模式命令集 5、参考资料 vi/vim键盘图 1、vim的基本概念 vi和vim的区别:vi和vim的区别简单点来说,它们都是多模式编辑器,不同的是vim是vi…...

Docker学习_存储篇
当以默认的方式创建容器时,容器中的数据无法直接和其他容器或宿主机共享。为了解决这个问题需要学习一些Docker 存储卷的知识。 Docker提供了三种存储的方式。 bind mount共享宿主机文件目录volume共享docker存储卷tmpfs mount共享内存 volume* volume方式是容器…...

微信小程序获取当前日期时间
一、直接使用方式 在小程序中获取当前系统日期和时间,可直接拿来使用的常用的日期格式 //1. 当前日期 YYYY-MM-DDnew Date().toISOString().substring(0, 10)new Date().toJSON().substring(0, 10)//2. 当前日期 YYYY/MM/DDnew Date().toLocaleDateString()//3.…...

Unity关键词语音识别
一、背景 最近使用unity开发语音交互内容的时候,遇到了这样的需求,就是需要使用语音关键字来唤醒应用程序,然后再和程序做交互,有点像智能音箱的意思。具体的技术方案方面,也找了一些第三方的服务,比如百度…...

SpringBoot的配置文件——.yml和.properties
目录 1. Spring Boot 配置文件的使用场景 2. 配置文件的两种格式 2.0 特殊说明: 2.1 .properties 2.1.1 格式 2.2.2 缺陷 2.2.3 解决中文乱码的问题 2.2 .yml 2.2.3 格式 配置数据库连接 注意转义字符 编辑 编辑 配置null 配置对象 从.yml读取文件举例 Stud…...

Retrieve Anything To Augment Large Language Models
简介 论文主要介绍了一套通过对比学习和蒸馏学习的方法,来增强学习了embedding向量,然后能够在知识增强,长上下文建模,ICL和工具学习等方面来增强大模型能力。...
什么是面向对象编程
面向对象编程(Object-oriented programming,简称OOP)是一种编程范型,通过将数据和方法(即属性和行为)组织在一个单元中,以模拟现实世界中的实体或概念。在面向对象编程中,数据和方法…...

c++视觉处理----固定阈值操作:Threshold()函数,实时处理:二值化,反二值化,截断,设为零,反向设为零
固定阈值操作: Threshold()函数 cv::threshold() 函数是OpenCV中用于执行固定阈值二值化操作的函数。它可以用来将图像中的像素值根据用户定义的阈值转换为二进制值(0或255),以便进行图像分割、物体检测和特征提取等任务。 cv::…...
KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(8)
接前一篇文章:KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(7) 上一回讲到了drm_internal_framebuffer_create函数中的framebuffer_check函数中的drm_get_format_info函数,讲解了该函数的第一部分暨前一部分,本文讲解后一部分。为了便于理解以及理清脉络和当前所…...
【问题解决】Ubuntu 安装 SeisSol 依赖 easi 报错解决: undefined reference to `H5free_memory‘
兼职帮客户安装 SeisSol 时问题解决,安装 easi 这个报错卡了很久(搞了一天),记录下,以备后用~ # 编译器问题 rootubuntu:/opt/easi# make -j install [ 4%] Building CXX object CMakeFiles/easi.dir/src/component/…...
循环小数(Repeating Decimals, ACM/ICPC World Finals 1990, UVa202)rust解法
输入整数a和b(0≤a≤3000,1≤b≤3000),输出a/b的循环小数表示以及循环节长度。例如a5,b43,小数表示为0.(116279069767441860465),循环节长度为21。 解法 就是模拟竖式除法 use std::{collecti…...

[GAMES101]透视投影变换矩阵中为什么需要改变z值
一、问题提出 在GAMES101-Lecture4 Transformation Matrices 一节中,闫老师介绍了正交投影和透视投影。 在讲透视投影变换矩阵 M p e r s p → o r t h o M_{persp→ortho} Mpersp→ortho时,同学们对矩阵中的z分量是变化的还是不变的有很多争论。即下…...

sklearn处理离散变量的问题——以决策树为例
最近做项目遇到的数据集中,有许多高维类别特征。catboost是可以直接指定categorical_columns的【直接进行ordered TS编码】,但是XGboost和随机森林甚至决策树都没有这个接口。但是在学习决策树的时候(无论是ID3、C4.5还是CART)&am…...
QT 数据库表格----QSqlTableModel
将数据库数据以表格的形式转化处理的方法很多,但我觉得QSqlTableModel这个model应算是非常好用的; msql.exec("create table alldata(照片,车牌号 "",入车时间,出车时间,金额,状态,看守人员);"); //创建表格 //msql 打开的数据库即Q…...
Vue_Bug Failed to fetch extension, trying 4 more times
Bug描述: 启动electron时出现Failed to fetch extension, trying 4 more times的问题 解决方法: 去src/background.js文件中进行代码注释工作 app.on(ready, async() > {// if (isDevelopment && !process.env.IS_TEST) {// // Install V…...

缩短从需求到上线的距离:集成多种工程实践的稳定框架 | 开源日报 No.55
zeromicro/go-zero Stars: 25.7k License: MIT go-zero 是一个集成了各种工程实践的 web 和 rpc 框架。通过弹性设计保障了大并发服务端的稳定性,经受了充分的实战检验。 go-zero 包含极简的 API 定义和生成工具 goctl,可以根据定义的 api 文件一键生成…...

基于秃鹰优化的BP神经网络(分类应用) - 附代码
基于秃鹰优化的BP神经网络(分类应用) - 附代码 文章目录 基于秃鹰优化的BP神经网络(分类应用) - 附代码1.鸢尾花iris数据介绍2.数据集整理3.秃鹰优化BP神经网络3.1 BP神经网络参数设置3.2 秃鹰算法应用 4.测试结果:5.M…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...

【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...

MyBatis中关于缓存的理解
MyBatis缓存 MyBatis系统当中默认定义两级缓存:一级缓存、二级缓存 默认情况下,只有一级缓存开启(sqlSession级别的缓存)二级缓存需要手动开启配置,需要局域namespace级别的缓存 一级缓存(本地缓存&#…...

基于开源AI智能名片链动2 + 1模式S2B2C商城小程序的沉浸式体验营销研究
摘要:在消费市场竞争日益激烈的当下,传统体验营销方式存在诸多局限。本文聚焦开源AI智能名片链动2 1模式S2B2C商城小程序,探讨其在沉浸式体验营销中的应用。通过对比传统品鉴、工厂参观等初级体验方式,分析沉浸式体验的优势与价值…...
raid存储技术
1. 存储技术概念 数据存储架构是对数据存储方式、存储设备及相关组件的组织和规划,涵盖存储系统的布局、数据存储策略等,它明确数据如何存储、管理与访问,为数据的安全、高效使用提供支撑。 由计算机中一组存储设备、控制部件和管理信息调度的…...