当前位置: 首页 > news >正文

源码解析Flink源节点数据读取是如何与checkpoint串行执行

文章目录

        • 源码解析Flink源节点数据读取是如何与checkpoint串行执行
          • Checkpoint阶段
            • StreamTask类变量actionExecutor的实现和初始化
            • 小结
          • 数据读取阶段
            • 小结
          • 总结

源码解析Flink源节点数据读取是如何与checkpoint串行执行

Flink版本:1.13.6

前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。

本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,最后得出结论:源节点Checkpoint时和源节点读取数据时,都需要抢SourceStreamTask类中lock变量的锁,最终实现串行执行checkpoint与写数据

Checkpoint阶段

Checkpoint在StreamTask的performCheckpoint方法中执行,该方法调用过程如下

// 在StreamTask类中 执行checkpoint操作
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetricsBuilder checkpointMetrics )throws Exception {if (isRunning) {//使用actionExecutor 同步触发checkpointactionExecutor.runThrowing(() -> {....//经过一系列检查subtaskCheckpointCoordinator.checkpointState(checkpointMetaData,checkpointOptions,checkpointMetrics,operatorChain,this::isRunning);});return true;} else {....}}

从上述代码可以看出,Checkpoint执行是由actionExecutor执行器执行

StreamTask类变量actionExecutor的实现和初始化

StreamTask类变量actionExecution的实现

通过代码注释可以知道该执行器的实现是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;从SynchronizedStreamTaskActionExecutor源代码可知,该执行器每次执行都需要获得mutex对象锁

  /*** All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another* thread) must be executed through this executor to ensure that we don't have concurrent method* calls that void consistent checkpoints.** <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.*/
private final StreamTaskActionExecutor actionExecutor;class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {private final Object mutex;public SynchronizedStreamTaskActionExecutor(Object mutex) {this.mutex = mutex;}@Overridepublic void run(RunnableWithException runnable) throws Exception {synchronized (mutex) {runnable.run();}}
}

StreamTask变量actionExecution初始化

actionExecutor变量在StreamTask中定义,在构造方法中初始化;该构造方法由SourceStreamTask调用,并传入SynchronizedStreamTaskActionExecutor对象,代码如下所示

//   SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {//调用的StreamTask构造函数,传入SynchronizedStreamTaskActionExecutor对象super(env,null,FatalExitExceptionHandler.INSTANCE,//初始化actionExecutorStreamTaskActionExecutor.synchronizedExecutor(lock));//将lock对象赋值给类变量lockthis.lock = Preconditions.checkNotNull(lock);this.sourceThread = new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}//  StreamTask的方法
protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,//初始化actionExecutorStreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));
}protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox mailbox)throws Exception {super(environment);this.configuration = new StreamConfig(getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);//初始化actionExecutorthis.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);.......}
小结

actionExecutor执行器每次执行都需要获得mutex对象,mutex对象就是SourceStreamTask类中的lock对象;即算子每次执行Checkpoint时都需要获得SourceStreamTask类中lock对象锁才能进行

数据读取阶段

在执行Checkpoint时控制读取源端,则控制点必定是在调用SourceContext的collect方法时

@Override
public void run(SourceContext<String> ctx) throws Exception {int i = 0;while (true) {//在这个方法里处理ctx.collect(String.valueOf(i));}
}

点击collection查看实现,选择NonTimestampContext查看代码,collect()实现如下

@Override
public void collect(T element) {synchronized (lock) {output.collect(reuse.replace(element));}
}

所以这里控制数据读取发送是通过lock来控制,lock是如何初始化的?

通过NonTimestampContext构造方法可以定位到StreamSourceContexts->getSourceContext方法;

public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic,ProcessingTimeService processingTimeService,Object checkpointLock,StreamStatusMaintainer streamStatusMaintainer,Output<StreamRecord<OUT>> output,long watermarkInterval,long idleTimeout) {final SourceFunction.SourceContext<OUT> ctx;switch (timeCharacteristic) {....case ProcessingTime://初始化NonTimestampContextctx = new NonTimestampContext<>(checkpointLock, output);break;default:throw new IllegalArgumentException(String.valueOf(timeCharacteristic));}return ctx;
}

向上追踪,在StreamSource类中调用getSourceContext:

public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector,final OperatorChain<?, ?> operatorChain)throws Exception {....this.ctx =StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);....}
// 再向上最终run方法的调用点->是由内部方法run调用
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final OperatorChain<?, ?> operatorChain)throws Exception {run(lockingObject, streamStatusMaintainer, output, operatorChain);
}//再向上最终run方法的调用点->SourceStreamTask 调用run 然后再代用mainOpterator run方法
@Override
public void run() {try {// 使用的是类变量lockmainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);if (!wasStoppedExternally && !isCanceled()) {synchronized (lock) {operatorChain.setIgnoreEndOfInput(false);}}completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}
}
小结

所以在源端写数据时,必须获得SourceStreamTask中的类变量lock的锁才能进行写数据;类变量lock刚好和执行器时同一个对象

总结

flink的source算子在Checkpoint时,是通过锁对象SourceStreamTask.lock,来控制源端数据产生和Checkpoint的有序进行

相关文章:

源码解析Flink源节点数据读取是如何与checkpoint串行执行

文章目录 源码解析Flink源节点数据读取是如何与checkpoint串行执行Checkpoint阶段StreamTask类变量actionExecutor的实现和初始化小结 数据读取阶段小结 总结 源码解析Flink源节点数据读取是如何与checkpoint串行执行 Flink版本&#xff1a;1.13.6 前置知识&#xff1a;源节点…...

进阶:Docker容器管理工具——Docker-Compose使用

文章目录 前言Compose大杀器编排服务 1、docker-compose安装curl方式安装增加可执行权限查看版本 2、Docker-compose.yaml命令3、 docker-compose实战4、Docker网络路由docker的跨主机网络路由**问题由来**:方案两台机分别配置路由表ip_forward配置 总结 前言 容器的管理工具&…...

策略模式(Strategy)

策略模式是一种行为设计模式&#xff0c;就是定义一系列算法&#xff0c;然后将每一个算法封装起来&#xff0c;并使它们可相互替换。本模式通过定义一组可相互替换的算法&#xff0c;实现将算法独立于使用它的用户而变化。 Strategy is a behavioral design pattern that def…...

webpack基础知识十:与webpack类似的工具还有哪些?区别?

一、模块化工具 模块化是一种处理复杂系统分解为更好的可管理模块的方式 可以用来分割&#xff0c;组织和打包应用。每个模块完成一个特定的子功能&#xff0c;所有的模块按某种方法组装起来&#xff0c;成为一个整体(bundle) 在前端领域中&#xff0c;并非只有webpack这一款…...

分享kubernetes部署:基于Ansible自动安装kubernetes

基于Ansible自动安装kubernetes 环境准备 我们以如下机器环境为例&#xff1a; 开放端口&#xff1a; 控制平面节点 工作节点 请按如上中规定的开放端口&#xff0c;或关闭防火墙&#xff1a; systemctlstopfirewalld&&\ systemctldisablefirewalld 安装常用工具 sudo…...

【Kubernetes部署篇】基于Ubuntu20.04操作系统搭建K8S1.23版本集群

文章目录 一、集群架构规划信息二、系统初始化准备(所有节点同步操作)三、安装kubeadm(所有节点同步操作)四、初始化K8S集群(master节点操作)五、添加Node节点到K8S集群中六、安装Calico网络插件七、测试CoreDNS可用性 一、集群架构规划信息 pod网段&#xff1a;10.244.0.0/16…...

c++--二叉树应用

1.根据二叉树创建字符串 力扣 给你二叉树的根节点 root &#xff0c;请你采用前序遍历的方式&#xff0c;将二叉树转化为一个由括号和整数组成的字符串&#xff0c;返回构造出的字符串。 空节点使用一对空括号对 "()" 表示&#xff0c;转化后需要省略所有不影响字符…...

以太网DHCP协议(十)

目录 一、工作原理 二、DHCP报文 2.1 DHCP报文类型 2.2 DHCP报文格式 当网络内部的主机设备数量过多是&#xff0c;IP地址的手动设置是一件非常繁琐的事情。为了实现自动设置IP地址、统一管理IP地址分配&#xff0c;TCPIP协议栈中引入了DHCP协议。 一、工作原理 使用DHCP之…...

企业服务器器中了360后缀勒索病毒怎么解决,勒索病毒解密数据恢复

随着网络威胁的增加&#xff0c;企业服务器成为黑客攻击的目标之一。近期&#xff0c;上海某知名律师事务所的数据库遭到了360后缀的勒索病毒攻击&#xff0c;导致企业服务器内的数据库被360后缀勒索病毒加密。许多重要的数据被锁定无法正常读取&#xff0c;严重影响了企业的正…...

详解Kafka分区机制原理|Kafka 系列 二

Kafka 系列第二篇&#xff0c;详解分区机制原理。为了不错过更新&#xff0c;请大家将本号“设为星标”。 点击上方“后端开发技术”&#xff0c;选择“设为星标” &#xff0c;优质资源及时送达 上一篇文章介绍了 Kafka 的基本概念和术语&#xff0c;里面有个概念是 分区(Part…...

CSS学习记录(基础笔记)

CSS简介: CSS 指的是层叠样式表* (Cascading Style Sheets)&#xff0c;主要用于设置HTML页面的文字内容&#xff08;字体、大小、对齐方式&#xff09;&#xff0c;图片的外形&#xff08;边框&#xff09; CSS 描述了如何在屏幕、纸张或其他媒体上显示 HTML 元素 CSS 节省…...

Chatgpt AI newbing作画,文字生成图 BingImageCreator 二次开发,对接wxbot

开源项目 https://github.com/acheong08/BingImageCreator 获取cookie信息 cookieStore.get("_U").then(result > console.log(result.value)) pip3 install --upgrade BingImageCreator import os import BingImageCreatoros.environ["http_proxy"]…...

PPT忘记密码如何解除?

PPT文件所带有的两种加密方式&#xff0c;打开密码以及修改权限&#xff0c;两种密码在打开文件的时候都会有相应的提示&#xff0c;但不同的是两种加密忘记密码之后是不同的。 如果忘记了打开密码&#xff0c;我们就没办法打开PPT文件了&#xff1b;如果是忘记了修改密码&…...

绘制曲线python

文章目录 import matplotlib.pyplot as plt# 提供的数据 x= [1,1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9,2,2.1,2.2,2.3,2.4,2.5,2.6,2.7,2.8,2.9,3,3.1,3.2,3.3,3.4,3.5,3.6,3.7,3.8,3.9,4,4.1,4.2,4.3,4.4,4.5,4.6,4.7,4.8,4.9,5,5.1,5.2,5.3,5.4,5.5,5.6,5.7,5.8,5.9,6,6.1,6.2…...

CentOs 8 常见问题处理

CentOs 8 常见问题处理 vmware虚拟机新增网卡操作 vmware虚拟机新增网卡操作 [rootcentos ~]# ip add 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0…...

OpenAI将GPT-4设置为ChatGPT Plus付费用户的默认模型

OpenAI最近为ChatGPT引入了一系列新功能&#xff0c;这些更新旨在增强用户体验&#xff0c;提供更多指导和更多的功能。其中最显著的功能之一是将GPT-4设置为ChatGPT Plus付费用户的默认模型&#xff0c;这意味着付费订阅用户无需手动切换到其他公开可用的语言模型&#xff0c;…...

textarea 标签如何创建多行文本输入框?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ textarea 的写法⭐ 代码含义⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣、…...

(15)Qt绘图(two)

目录 坐标变换 平移坐标轴 缩放坐标轴 旋转坐标轴 定时器加坐标轴旋转实现动画旋转 transform旋转&#xff08;可设置旋转轴&#xff09; 绕X轴旋转 绕Y轴旋转 绕Z轴旋转 错切 Y轴错切 X轴错切 画家的保存与坐标复原 基本图形绘制 绘制点 绘制线 绘制矩形 普…...

用队列实现栈——数据结构与算法

&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️ &#x1f4a5;个人主页&#xff1a;&#x1f525;&#x1f525;&#x1f525;大魔王&#x1f525;&#x1f525;&#x1f525; &#x1f4a5;代码仓库&#xff1a;&#x1f525;&#x1f525;魔…...

Python“牵手”1688商品详情页数据采集方法,1688API接口申请指南

1688详情接口 API 是开放平台提供的一种 API 接口&#xff0c;它可以帮助开发者获取商品的详细信息&#xff0c;包括商品的标题、描述、图片等信息。在电商平台的开发中&#xff0c;详情接口API是非常常用的 API&#xff0c;因此本文将详细介绍详情接口 API 的使用。 一、1688…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

浅谈不同二分算法的查找情况

二分算法原理比较简单&#xff0c;但是实际的算法模板却有很多&#xff0c;这一切都源于二分查找问题中的复杂情况和二分算法的边界处理&#xff0c;以下是博主对一些二分算法查找的情况分析。 需要说明的是&#xff0c;以下二分算法都是基于有序序列为升序有序的情况&#xf…...

【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论

路径问题的革命性重构&#xff1a;基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中&#xff08;图1&#xff09;&#xff1a; mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...

解读《网络安全法》最新修订,把握网络安全新趋势

《网络安全法》自2017年施行以来&#xff0c;在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂&#xff0c;网络攻击、数据泄露等事件频发&#xff0c;现行法律已难以完全适应新的风险挑战。 2025年3月28日&#xff0c;国家网信办会同相关部门起草了《网络安全…...

Golang——9、反射和文件操作

反射和文件操作 1、反射1.1、reflect.TypeOf()获取任意值的类型对象1.2、reflect.ValueOf()1.3、结构体反射 2、文件操作2.1、os.Open()打开文件2.2、方式一&#xff1a;使用Read()读取文件2.3、方式二&#xff1a;bufio读取文件2.4、方式三&#xff1a;os.ReadFile读取2.5、写…...

android RelativeLayout布局

<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...

【前端异常】JavaScript错误处理:分析 Uncaught (in promise) error

在前端开发中&#xff0c;JavaScript 异常是不可避免的。随着现代前端应用越来越多地使用异步操作&#xff08;如 Promise、async/await 等&#xff09;&#xff0c;开发者常常会遇到 Uncaught (in promise) error 错误。这个错误是由于未正确处理 Promise 的拒绝&#xff08;r…...

0x-3-Oracle 23 ai-sqlcl 25.1 集成安装-配置和优化

是不是受够了安装了oracle database之后sqlplus的简陋&#xff0c;无法删除无法上下翻页的苦恼。 可以安装readline和rlwrap插件的话&#xff0c;配置.bahs_profile后也能解决上下翻页这些&#xff0c;但是很多生产环境无法安装rpm包。 oracle提供了sqlcl免费许可&#xff0c…...