当前位置: 首页 > news >正文

源码解析FlinkKafkaConsumer支持周期性水位线发送

背景

当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程

FlinkKafkaConsumer水位线发送

1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动

        // if we have periodic watermarks, kick off the interval schedulerif (timestampWatermarkMode == WITH_WATERMARK_GENERATOR && autoWatermarkInterval > 0) {PeriodicWatermarkEmitter<T, KPH> periodicEmitter =new PeriodicWatermarkEmitter<>(checkpointLock,subscribedPartitionStates,watermarkOutputMultiplexer,processingTimeProvider,autoWatermarkInterval);periodicEmitter.start();}

2.随后,PeriodicWatermarkEmitter中注册处理时间定时器,周期性执行

        public void start() {timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}@Overridepublic void onProcessingTime(long timestamp) {synchronized (checkpointLock) {for (KafkaTopicPartitionState<?, ?> state : allPartitions) {// 这里当前算子任务消费的kafka 分区分别记录每个分区的水位值state.onPeriodicEmit();}//这里当前算子会把自己消费的kafka分区的所有水位线取最小值后当成当前算子任务自身的水位线发送出去,注意这里是当前算子任务级别的watermarkOutputMultiplexer.onPeriodicEmit();}// schedule the next watermarktimerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}}

3.对应state.onPeriodicEmit();记录每个kafka分区的水位线方法

    @Overridepublic void onPeriodicEmit(WatermarkOutput output) {final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();if (next != null) {output.emitWatermark(new Watermark(next.getTimestamp()));}}
其中 WatermarkOutput output.emitWatermark(new Watermark(next.getTimestamp()))代码如下:public DeferredOutput(OutputState state) {this.state = state;}@Overridepublic void emitWatermark(Watermark watermark) {state.setWatermark(watermark.getTimestamp());}
所以这里最终效果只是对应state(kafka分区[注意,一个算子任务有可能消费好几个kafka分区])上设置了水位线/*** Returns true if the watermark was advanced, that is if the new watermark is larger than* the previous one.** <p>Setting a watermark will clear the idleness flag.*/public boolean setWatermark(long watermark) {this.idle = false;final boolean updated = watermark > this.watermark;// 这里也可以看出来,即使代码里面发送了更小值的水位线,水位线也不会回退this.watermark = Math.max(watermark, this.watermark);return updated;}        

4.对应算子任务组合当前任务消费的所有分区水位线的方法

private void updateCombinedWatermark() {long minimumOverAllOutputs = Long.MAX_VALUE;boolean hasOutputs = false;boolean allIdle = true;for (OutputState outputState : watermarkOutputs) {if (!outputState.isIdle()) {minimumOverAllOutputs = Math.min(minimumOverAllOutputs, outputState.getWatermark());allIdle = false;}hasOutputs = true;}// if we don't have any outputs minimumOverAllOutputs is not valid, it's still// at its initial Long.MAX_VALUE state and we must not emit that// 如果算子任务不消费任何分区,它不会发出任何水位线,这里是不是就是kafka消费者要小于kafka主题的原因所在???if (!hasOutputs) {return;}if (allIdle) {// 如果当前算子任务处于空闲时间,标识空闲,以便后续算子可以继续推进underlyingOutput.markIdle();} else if (minimumOverAllOutputs > combinedWatermark) {combinedWatermark = minimumOverAllOutputs;underlyingOutput.emitWatermark(new Watermark(minimumOverAllOutputs));}}```

相关文章:

源码解析FlinkKafkaConsumer支持周期性水位线发送

背景 当flink消费kafka的消息时&#xff0c;我们经常会用到FlinkKafkaConsumer进行水位线的发送&#xff0c;本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程 FlinkKafkaConsumer水位线发送 1.首先从Fetcher类开始&#xff0c…...

Nginx:动静分离(示意图+配置讲解)

示意图&#xff1a; 动静分离 动静分离是指将动态内容和静态内容分开处理的一种方式。通常&#xff0c;动态内容是指由服务器端处理的&#xff0c;例如动态生成的网页、数据库查询等。静态内容是指不需要经过服务器端处理的&#xff0c;例如图片、CSS、JavaScript文件等。通过…...

通讯网关软件024——利用CommGate X2Access实现Modbus TCP数据转储Access

本文介绍利用CommGate X2ACCESS实现从Modbus TCP设备读取数据并转储至ACCESS数据库。CommGate X2ACCESS是宁波科安网信开发的网关软件&#xff0c;软件可以登录到网信智汇(http://wangxinzhihui.com)下载。 【案例】如下图所示&#xff0c;实现从Modbus TCP设备读取数据并转储…...

vim工具的使用

目录 vi/vim键盘图 1、vim的基本概念 2、vim的基本使用 3、vim命令模式命令集 4、vim底行模式命令集 5、参考资料 vi/vim键盘图 1、vim的基本概念 vi和vim的区别&#xff1a;vi和vim的区别简单点来说&#xff0c;它们都是多模式编辑器&#xff0c;不同的是vim是vi…...

Docker学习_存储篇

当以默认的方式创建容器时&#xff0c;容器中的数据无法直接和其他容器或宿主机共享。为了解决这个问题需要学习一些Docker 存储卷的知识。 Docker提供了三种存储的方式。 bind mount共享宿主机文件目录volume共享docker存储卷tmpfs mount共享内存 volume* volume方式是容器…...

微信小程序获取当前日期时间

一、直接使用方式 在小程序中获取当前系统日期和时间&#xff0c;可直接拿来使用的常用的日期格式 //1. 当前日期 YYYY-MM-DDnew Date().toISOString().substring(0, 10)new Date().toJSON().substring(0, 10)//2. 当前日期 YYYY/MM/DDnew Date().toLocaleDateString()//3.…...

Unity关键词语音识别

一、背景 最近使用unity开发语音交互内容的时候&#xff0c;遇到了这样的需求&#xff0c;就是需要使用语音关键字来唤醒应用程序&#xff0c;然后再和程序做交互&#xff0c;有点像智能音箱的意思。具体的技术方案方面&#xff0c;也找了一些第三方的服务&#xff0c;比如百度…...

SpringBoot的配置文件——.yml和.properties

目录 1. Spring Boot 配置文件的使用场景 2. 配置文件的两种格式 2.0 特殊说明&#xff1a; 2.1 .properties 2.1.1 格式 2.2.2 缺陷 2.2.3 解决中文乱码的问题 2.2 .yml 2.2.3 格式 配置数据库连接 注意转义字符 ​编辑 ​编辑 配置null 配置对象 从.yml读取文件举例 Stud…...

Retrieve Anything To Augment Large Language Models

简介 论文主要介绍了一套通过对比学习和蒸馏学习的方法&#xff0c;来增强学习了embedding向量&#xff0c;然后能够在知识增强&#xff0c;长上下文建模&#xff0c;ICL和工具学习等方面来增强大模型能力。...

什么是面向对象编程

面向对象编程&#xff08;Object-oriented programming&#xff0c;简称OOP&#xff09;是一种编程范型&#xff0c;通过将数据和方法&#xff08;即属性和行为&#xff09;组织在一个单元中&#xff0c;以模拟现实世界中的实体或概念。在面向对象编程中&#xff0c;数据和方法…...

c++视觉处理----固定阈值操作:Threshold()函数,实时处理:二值化,反二值化,截断,设为零,反向设为零

固定阈值操作&#xff1a; Threshold()函数 cv::threshold() 函数是OpenCV中用于执行固定阈值二值化操作的函数。它可以用来将图像中的像素值根据用户定义的阈值转换为二进制值&#xff08;0或255&#xff09;&#xff0c;以便进行图像分割、物体检测和特征提取等任务。 cv::…...

KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(8)

接前一篇文章:KWin、libdrm、DRM从上到下全过程 —— drmModeAddFBxxx(7) 上一回讲到了drm_internal_framebuffer_create函数中的framebuffer_check函数中的drm_get_format_info函数,讲解了该函数的第一部分暨前一部分,本文讲解后一部分。为了便于理解以及理清脉络和当前所…...

【问题解决】Ubuntu 安装 SeisSol 依赖 easi 报错解决: undefined reference to `H5free_memory‘

兼职帮客户安装 SeisSol 时问题解决&#xff0c;安装 easi 这个报错卡了很久&#xff08;搞了一天&#xff09;&#xff0c;记录下&#xff0c;以备后用~ # 编译器问题 rootubuntu:/opt/easi# make -j install [ 4%] Building CXX object CMakeFiles/easi.dir/src/component/…...

循环小数(Repeating Decimals, ACM/ICPC World Finals 1990, UVa202)rust解法

输入整数a和b&#xff08;0≤a≤3000&#xff0c;1≤b≤3000&#xff09;&#xff0c;输出a/b的循环小数表示以及循环节长度。例如a5&#xff0c;b43&#xff0c;小数表示为0.(116279069767441860465)&#xff0c;循环节长度为21。 解法 就是模拟竖式除法 use std::{collecti…...

[GAMES101]透视投影变换矩阵中为什么需要改变z值

一、问题提出 在GAMES101-Lecture4 Transformation Matrices 一节中&#xff0c;闫老师介绍了正交投影和透视投影。 在讲透视投影变换矩阵 M p e r s p → o r t h o M_{persp→ortho} Mpersp→ortho​时&#xff0c;同学们对矩阵中的z分量是变化的还是不变的有很多争论。即下…...

sklearn处理离散变量的问题——以决策树为例

最近做项目遇到的数据集中&#xff0c;有许多高维类别特征。catboost是可以直接指定categorical_columns的【直接进行ordered TS编码】&#xff0c;但是XGboost和随机森林甚至决策树都没有这个接口。但是在学习决策树的时候&#xff08;无论是ID3、C4.5还是CART&#xff09;&am…...

QT 数据库表格----QSqlTableModel

将数据库数据以表格的形式转化处理的方法很多&#xff0c;但我觉得QSqlTableModel这个model应算是非常好用的&#xff1b; msql.exec("create table alldata(照片,车牌号 "",入车时间,出车时间,金额,状态,看守人员);"); //创建表格 //msql 打开的数据库即Q…...

Vue_Bug Failed to fetch extension, trying 4 more times

Bug描述&#xff1a; 启动electron时出现Failed to fetch extension, trying 4 more times的问题 解决方法&#xff1a; 去src/background.js文件中进行代码注释工作 app.on(ready, async() > {// if (isDevelopment && !process.env.IS_TEST) {// // Install V…...

缩短从需求到上线的距离:集成多种工程实践的稳定框架 | 开源日报 No.55

zeromicro/go-zero Stars: 25.7k License: MIT go-zero 是一个集成了各种工程实践的 web 和 rpc 框架。通过弹性设计保障了大并发服务端的稳定性&#xff0c;经受了充分的实战检验。 go-zero 包含极简的 API 定义和生成工具 goctl&#xff0c;可以根据定义的 api 文件一键生成…...

基于秃鹰优化的BP神经网络(分类应用) - 附代码

基于秃鹰优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码 文章目录 基于秃鹰优化的BP神经网络&#xff08;分类应用&#xff09; - 附代码1.鸢尾花iris数据介绍2.数据集整理3.秃鹰优化BP神经网络3.1 BP神经网络参数设置3.2 秃鹰算法应用 4.测试结果&#xff1a;5.M…...

python如何将word的doc另存为docx

将 DOCX 文件另存为 DOCX 格式&#xff08;Python 实现&#xff09; 在 Python 中&#xff0c;你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是&#xff0c;.doc 是旧的 Word 格式&#xff0c;而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...

leetcodeSQL解题:3564. 季节性销售分析

leetcodeSQL解题&#xff1a;3564. 季节性销售分析 题目&#xff1a; 表&#xff1a;sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统&#xff0c;支持多语言对话&#xff08;如 中文&#xff0c;英文&#xff0c;日语&#xff09;&#xff0c;语音情感&#xff08;如 开心&#xff0c;悲伤&#xff09;&#x…...

让AI看见世界:MCP协议与服务器的工作原理

让AI看见世界&#xff1a;MCP协议与服务器的工作原理 MCP&#xff08;Model Context Protocol&#xff09;是一种创新的通信协议&#xff0c;旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天&#xff0c;MCP正成为连接AI与现实世界的重要桥梁。…...

SpringTask-03.入门案例

一.入门案例 启动类&#xff1a; package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...

Spring AI与Spring Modulith核心技术解析

Spring AI核心架构解析 Spring AI&#xff08;https://spring.io/projects/spring-ai&#xff09;作为Spring生态中的AI集成框架&#xff0c;其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似&#xff0c;但特别为多语…...

使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台

🎯 使用 Streamlit 构建支持主流大模型与 Ollama 的轻量级统一平台 📌 项目背景 随着大语言模型(LLM)的广泛应用,开发者常面临多个挑战: 各大模型(OpenAI、Claude、Gemini、Ollama)接口风格不统一;缺乏一个统一平台进行模型调用与测试;本地模型 Ollama 的集成与前…...

初探Service服务发现机制

1.Service简介 Service是将运行在一组Pod上的应用程序发布为网络服务的抽象方法。 主要功能&#xff1a;服务发现和负载均衡。 Service类型的包括ClusterIP类型、NodePort类型、LoadBalancer类型、ExternalName类型 2.Endpoints简介 Endpoints是一种Kubernetes资源&#xf…...

《Docker》架构

文章目录 架构模式单机架构应用数据分离架构应用服务器集群架构读写分离/主从分离架构冷热分离架构垂直分库架构微服务架构容器编排架构什么是容器&#xff0c;docker&#xff0c;镜像&#xff0c;k8s 架构模式 单机架构 单机架构其实就是应用服务器和单机服务器都部署在同一…...

第八部分:阶段项目 6:构建 React 前端应用

现在&#xff0c;是时候将你学到的 React 基础知识付诸实践&#xff0c;构建一个简单的前端应用来模拟与后端 API 的交互了。在这个阶段&#xff0c;你可以先使用模拟数据&#xff0c;或者如果你的后端 API&#xff08;阶段项目 5&#xff09;已经搭建好&#xff0c;可以直接连…...