源码解析FlinkKafkaConsumer支持周期性水位线发送
背景
当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程
FlinkKafkaConsumer水位线发送
1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动
// if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}
2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行
public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值state.onPeriodicEmit();}//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}
3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法
@Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退this.watermark = Math.max(watermark, this.watermark);return updated;}
4.对应算子任务组合当前任务消费的所有分区水位线的方法
private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```相关文章:
源码解析FlinkKafkaConsumer支持周期性水位线发送
背景 当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程 FlinkKafkaConsumer水位线发送 1.首先从Fetcher类开始,…...
Nginx:动静分离(示意图+配置讲解)
示意图: 动静分离 动静分离是指将动态内容和静态内容分开处理的一种方式。通常,动态内容是指由服务器端处理的,例如动态生成的网页、数据库查询等。静态内容是指不需要经过服务器端处理的,例如图片、CSS、JavaScript文件等。通过…...
通讯网关软件024——利用CommGate X2Access实现Modbus TCP数据转储Access
本文介绍利用CommGate X2ACCESS实现从Modbus TCP设备读取数据并转储至ACCESS数据库。CommGate X2ACCESS是宁波科安网信开发的网关软件,软件可以登录到网信智汇(http://wangxinzhihui.com)下载。 【案例】如下图所示,实现从Modbus TCP设备读取数据并转储…...
vim工具的使用
目录 vi/vim键盘图 1、vim的基本概念 2、vim的基本使用 3、vim命令模式命令集 4、vim底行模式命令集 5、参考资料 vi/vim键盘图 1、vim的基本概念 vi和vim的区别:vi和vim的区别简单点来说,它们都是多模式编辑器,不同的是vim是vi…...
Docker学习_存储篇
当以默认的方式创建容器时,容器中的数据无法直接和其他容器或宿主机共享。为了解决这个问题需要学习一些Docker 存储卷的知识。 Docker提供了三种存储的方式。 bind mount共享宿主机文件目录volume共享docker存储卷tmpfs mount共享内存 volume* volume方式是容器…...
微信小程序获取当前日期时间
一、直接使用方式 在小程序中获取当前系统日期和时间,可直接拿来使用的常用的日期格式 //1. 当前日期 YYYY-MM-DDnew Date().toISOString().substring(0, 10)new Date().toJSON().substring(0, 10)//2. 当前日期 YYYY/MM/DDnew Date().toLocaleDateString()//3.…...
Unity关键词语音识别
一、背景 最近使用unity开发语音交互内容的时候,遇到了这样的需求,就是需要使用语音关键字来唤醒应用程序,然后再和程序做交互,有点像智能音箱的意思。具体的技术方案方面,也找了一些第三方的服务,比如百度…...
SpringBoot的配置文件——.yml和.properties
目录 1. Spring Boot 配置文件的使用场景 2. 配置文件的两种格式 2.0 特殊说明: 2.1 .properties 2.1.1 格式 2.2.2 缺陷 2.2.3 解决中文乱码的问题 2.2 .yml 2.2.3 格式 配置数据库连接 注意转义字符 编辑 编辑 配置null 配置对象 从.yml读取文件举例 Stud…...
Retrieve Anything To Augment Large Language Models
简介 论文主要介绍了一套通过对比学习和蒸馏学习的方法,来增强学习了embedding向量,然后能够在知识增强,长上下文建模,ICL和工具学习等方面来增强大模型能力。...
什么是面向对象编程
面向对象编程(Object-oriented programming,简称OOP)是一种编程范型,通过将数据和方法(即属性和行为)组织在一个单元中,以模拟现实世界中的实体或概念。在面向对象编程中,数据和方法…...
c++视觉处理----固定阈值操作:Threshold()函数,实时处理:二值化,反二值化,截断,设为零,反向设为零
固定阈值操作: Threshold()函数 cv::threshold() 函数是OpenCV中用于执行固定阈值二值化操作的函数。它可以用来将图像中的像素值根据用户定义的阈值转换为二进制值(0或255),以便进行图像分割、物体检测和特征提取等任务。 cv::…...
KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(8)
接前一篇文章:KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(7) 上一回讲到了drm_internal_framebuffer_create函数中的framebuffer_check函数中的drm_get_format_info函数,讲解了该函数的第一部分暨前一部分,本文讲解后一部分。为了便于理解以及理清脉络和当前所…...
【问题解决】Ubuntu 安装 SeisSol 依赖 easi 报错解决: undefined reference to `H5free_memory‘
兼职帮客户安装 SeisSol 时问题解决,安装 easi 这个报错卡了很久(搞了一天),记录下,以备后用~ # 编译器问题 rootubuntu:/opt/easi# make -j install [ 4%] Building CXX object CMakeFiles/easi.dir/src/component/…...
循环小数(Repeating Decimals, ACM/ICPC World Finals 1990, UVa202)rust解法
输入整数a和b(0≤a≤3000,1≤b≤3000),输出a/b的循环小数表示以及循环节长度。例如a5,b43,小数表示为0.(116279069767441860465),循环节长度为21。 解法 就是模拟竖式除法 use std::{collecti…...
[GAMES101]透视投影变换矩阵中为什么需要改变z值
一、问题提出 在GAMES101-Lecture4 Transformation Matrices 一节中,闫老师介绍了正交投影和透视投影。 在讲透视投影变换矩阵 M p e r s p → o r t h o M_{persp→ortho} Mpersp→ortho时,同学们对矩阵中的z分量是变化的还是不变的有很多争论。即下…...
sklearn处理离散变量的问题——以决策树为例
最近做项目遇到的数据集中,有许多高维类别特征。catboost是可以直接指定categorical_columns的【直接进行ordered TS编码】,但是XGboost和随机森林甚至决策树都没有这个接口。但是在学习决策树的时候(无论是ID3、C4.5还是CART)&am…...
QT 数据库表格----QSqlTableModel
将数据库数据以表格的形式转化处理的方法很多,但我觉得QSqlTableModel这个model应算是非常好用的; msql.exec("create table alldata(照片,车牌号 "",入车时间,出车时间,金额,状态,看守人员);"); //创建表格 //msql 打开的数据库即Q…...
Vue_Bug Failed to fetch extension, trying 4 more times
Bug描述: 启动electron时出现Failed to fetch extension, trying 4 more times的问题 解决方法: 去src/background.js文件中进行代码注释工作 app.on(ready, async() > {// if (isDevelopment && !process.env.IS_TEST) {// // Install V…...
缩短从需求到上线的距离:集成多种工程实践的稳定框架 | 开源日报 No.55
zeromicro/go-zero Stars: 25.7k License: MIT go-zero 是一个集成了各种工程实践的 web 和 rpc 框架。通过弹性设计保障了大并发服务端的稳定性,经受了充分的实战检验。 go-zero 包含极简的 API 定义和生成工具 goctl,可以根据定义的 api 文件一键生成…...
基于秃鹰优化的BP神经网络(分类应用) - 附代码
基于秃鹰优化的BP神经网络(分类应用) - 附代码 文章目录 基于秃鹰优化的BP神经网络(分类应用) - 附代码1.鸢尾花iris数据介绍2.数据集整理3.秃鹰优化BP神经网络3.1 BP神经网络参数设置3.2 秃鹰算法应用 4.测试结果:5.M…...
铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
高频面试之3Zookeeper
高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个?3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制(过半机制࿰…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
关于 WASM:1. WASM 基础原理
一、WASM 简介 1.1 WebAssembly 是什么? WebAssembly(WASM) 是一种能在现代浏览器中高效运行的二进制指令格式,它不是传统的编程语言,而是一种 低级字节码格式,可由高级语言(如 C、C、Rust&am…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...
mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
c++第七天 继承与派生2
这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分:派生类构造函数与析构函数 当创建一个派生类对象时,基类成员是如何初始化的? 1.当派生类对象创建的时候,基类成员的初始化顺序 …...
WEB3全栈开发——面试专业技能点P4数据库
一、mysql2 原生驱动及其连接机制 概念介绍 mysql2 是 Node.js 环境中广泛使用的 MySQL 客户端库,基于 mysql 库改进而来,具有更好的性能、Promise 支持、流式查询、二进制数据处理能力等。 主要特点: 支持 Promise / async-await…...
