源码解析FlinkKafkaConsumer支持周期性水位线发送
背景
当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程
FlinkKafkaConsumer水位线发送
1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动
// if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}
2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行
public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值state.onPeriodicEmit();}//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}
3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法
@Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退this.watermark = Math.max(watermark, this.watermark);return updated;}
4.对应算子任务组合当前任务消费的所有分区水位线的方法
private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```相关文章:
源码解析FlinkKafkaConsumer支持周期性水位线发送
背景 当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程 FlinkKafkaConsumer水位线发送 1.首先从Fetcher类开始,…...
Nginx:动静分离(示意图+配置讲解)
示意图: 动静分离 动静分离是指将动态内容和静态内容分开处理的一种方式。通常,动态内容是指由服务器端处理的,例如动态生成的网页、数据库查询等。静态内容是指不需要经过服务器端处理的,例如图片、CSS、JavaScript文件等。通过…...
通讯网关软件024——利用CommGate X2Access实现Modbus TCP数据转储Access
本文介绍利用CommGate X2ACCESS实现从Modbus TCP设备读取数据并转储至ACCESS数据库。CommGate X2ACCESS是宁波科安网信开发的网关软件,软件可以登录到网信智汇(http://wangxinzhihui.com)下载。 【案例】如下图所示,实现从Modbus TCP设备读取数据并转储…...
vim工具的使用
目录 vi/vim键盘图 1、vim的基本概念 2、vim的基本使用 3、vim命令模式命令集 4、vim底行模式命令集 5、参考资料 vi/vim键盘图 1、vim的基本概念 vi和vim的区别:vi和vim的区别简单点来说,它们都是多模式编辑器,不同的是vim是vi…...
Docker学习_存储篇
当以默认的方式创建容器时,容器中的数据无法直接和其他容器或宿主机共享。为了解决这个问题需要学习一些Docker 存储卷的知识。 Docker提供了三种存储的方式。 bind mount共享宿主机文件目录volume共享docker存储卷tmpfs mount共享内存 volume* volume方式是容器…...
微信小程序获取当前日期时间
一、直接使用方式 在小程序中获取当前系统日期和时间,可直接拿来使用的常用的日期格式 //1. 当前日期 YYYY-MM-DDnew Date().toISOString().substring(0, 10)new Date().toJSON().substring(0, 10)//2. 当前日期 YYYY/MM/DDnew Date().toLocaleDateString()//3.…...
Unity关键词语音识别
一、背景 最近使用unity开发语音交互内容的时候,遇到了这样的需求,就是需要使用语音关键字来唤醒应用程序,然后再和程序做交互,有点像智能音箱的意思。具体的技术方案方面,也找了一些第三方的服务,比如百度…...
SpringBoot的配置文件——.yml和.properties
目录 1. Spring Boot 配置文件的使用场景 2. 配置文件的两种格式 2.0 特殊说明: 2.1 .properties 2.1.1 格式 2.2.2 缺陷 2.2.3 解决中文乱码的问题 2.2 .yml 2.2.3 格式 配置数据库连接 注意转义字符 编辑 编辑 配置null 配置对象 从.yml读取文件举例 Stud…...
Retrieve Anything To Augment Large Language Models
简介 论文主要介绍了一套通过对比学习和蒸馏学习的方法,来增强学习了embedding向量,然后能够在知识增强,长上下文建模,ICL和工具学习等方面来增强大模型能力。...
什么是面向对象编程
面向对象编程(Object-oriented programming,简称OOP)是一种编程范型,通过将数据和方法(即属性和行为)组织在一个单元中,以模拟现实世界中的实体或概念。在面向对象编程中,数据和方法…...
c++视觉处理----固定阈值操作:Threshold()函数,实时处理:二值化,反二值化,截断,设为零,反向设为零
固定阈值操作: Threshold()函数 cv::threshold() 函数是OpenCV中用于执行固定阈值二值化操作的函数。它可以用来将图像中的像素值根据用户定义的阈值转换为二进制值(0或255),以便进行图像分割、物体检测和特征提取等任务。 cv::…...
KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(8)
接前一篇文章:KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(7) 上一回讲到了drm_internal_framebuffer_create函数中的framebuffer_check函数中的drm_get_format_info函数,讲解了该函数的第一部分暨前一部分,本文讲解后一部分。为了便于理解以及理清脉络和当前所…...
【问题解决】Ubuntu 安装 SeisSol 依赖 easi 报错解决: undefined reference to `H5free_memory‘
兼职帮客户安装 SeisSol 时问题解决,安装 easi 这个报错卡了很久(搞了一天),记录下,以备后用~ # 编译器问题 rootubuntu:/opt/easi# make -j install [ 4%] Building CXX object CMakeFiles/easi.dir/src/component/…...
循环小数(Repeating Decimals, ACM/ICPC World Finals 1990, UVa202)rust解法
输入整数a和b(0≤a≤3000,1≤b≤3000),输出a/b的循环小数表示以及循环节长度。例如a5,b43,小数表示为0.(116279069767441860465),循环节长度为21。 解法 就是模拟竖式除法 use std::{collecti…...
[GAMES101]透视投影变换矩阵中为什么需要改变z值
一、问题提出 在GAMES101-Lecture4 Transformation Matrices 一节中,闫老师介绍了正交投影和透视投影。 在讲透视投影变换矩阵 M p e r s p → o r t h o M_{persp→ortho} Mpersp→ortho时,同学们对矩阵中的z分量是变化的还是不变的有很多争论。即下…...
sklearn处理离散变量的问题——以决策树为例
最近做项目遇到的数据集中,有许多高维类别特征。catboost是可以直接指定categorical_columns的【直接进行ordered TS编码】,但是XGboost和随机森林甚至决策树都没有这个接口。但是在学习决策树的时候(无论是ID3、C4.5还是CART)&am…...
QT 数据库表格----QSqlTableModel
将数据库数据以表格的形式转化处理的方法很多,但我觉得QSqlTableModel这个model应算是非常好用的; msql.exec("create table alldata(照片,车牌号 "",入车时间,出车时间,金额,状态,看守人员);"); //创建表格 //msql 打开的数据库即Q…...
Vue_Bug Failed to fetch extension, trying 4 more times
Bug描述: 启动electron时出现Failed to fetch extension, trying 4 more times的问题 解决方法: 去src/background.js文件中进行代码注释工作 app.on(ready, async() > {// if (isDevelopment && !process.env.IS_TEST) {// // Install V…...
缩短从需求到上线的距离:集成多种工程实践的稳定框架 | 开源日报 No.55
zeromicro/go-zero Stars: 25.7k License: MIT go-zero 是一个集成了各种工程实践的 web 和 rpc 框架。通过弹性设计保障了大并发服务端的稳定性,经受了充分的实战检验。 go-zero 包含极简的 API 定义和生成工具 goctl,可以根据定义的 api 文件一键生成…...
基于秃鹰优化的BP神经网络(分类应用) - 附代码
基于秃鹰优化的BP神经网络(分类应用) - 附代码 文章目录 基于秃鹰优化的BP神经网络(分类应用) - 附代码1.鸢尾花iris数据介绍2.数据集整理3.秃鹰优化BP神经网络3.1 BP神经网络参数设置3.2 秃鹰算法应用 4.测试结果:5.M…...
【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15
缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下: struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
Debian系统简介
目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版ÿ…...
visual studio 2022更改主题为深色
visual studio 2022更改主题为深色 点击visual studio 上方的 工具-> 选项 在选项窗口中,选择 环境 -> 常规 ,将其中的颜色主题改成深色 点击确定,更改完成...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
加密通信 + 行为分析:运营商行业安全防御体系重构
在数字经济蓬勃发展的时代,运营商作为信息通信网络的核心枢纽,承载着海量用户数据与关键业务传输,其安全防御体系的可靠性直接关乎国家安全、社会稳定与企业发展。随着网络攻击手段的不断升级,传统安全防护体系逐渐暴露出局限性&a…...
CMS内容管理系统的设计与实现:多站点模式的实现
在一套内容管理系统中,其实有很多站点,比如企业门户网站,产品手册,知识帮助手册等,因此会需要多个站点,甚至PC、mobile、ipad各有一个站点。 每个站点关联的有站点所在目录及所属的域名。 一、站点表设计…...
