源码解析Flink源节点数据读取是如何与checkpoint串行执行
文章目录
- 源码解析Flink源节点数据读取是如何与checkpoint串行执行
- Checkpoint阶段
- StreamTask类变量actionExecutor的实现和初始化
- 小结
- 数据读取阶段
- 小结
- 总结
源码解析Flink源节点数据读取是如何与checkpoint串行执行
Flink版本:1.13.6
前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。
本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,最后得出结论:源节点Checkpoint时和源节点读取数据时,都需要抢SourceStreamTask类中lock变量的锁,最终实现串行执行checkpoint与写数据
Checkpoint阶段
Checkpoint在StreamTask的performCheckpoint方法中执行,该方法调用过程如下
// 在StreamTask类中 执行checkpoint操作
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetricsBuilder checkpointMetrics )throws Exception {if (isRunning) {//使用actionExecutor 同步触发checkpointactionExecutor.runThrowing(() -> {....//经过一系列检查subtaskCheckpointCoordinator.checkpointState(checkpointMetaData,checkpointOptions,checkpointMetrics,operatorChain,this::isRunning);});return true;} else {....}}
从上述代码可以看出,Checkpoint执行是由actionExecutor执行器执行
StreamTask类变量actionExecutor的实现和初始化
StreamTask类变量actionExecution的实现
通过代码注释可以知道该执行器的实现是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;从SynchronizedStreamTaskActionExecutor源代码可知,该执行器每次执行都需要获得mutex对象锁
/*** All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another* thread) must be executed through this executor to ensure that we don't have concurrent method* calls that void consistent checkpoints.** <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.*/
private final StreamTaskActionExecutor actionExecutor;class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {private final Object mutex;public SynchronizedStreamTaskActionExecutor(Object mutex) {this.mutex = mutex;}@Overridepublic void run(RunnableWithException runnable) throws Exception {synchronized (mutex) {runnable.run();}}
}
StreamTask变量actionExecution初始化
actionExecutor变量在StreamTask中定义,在构造方法中初始化;该构造方法由SourceStreamTask调用,并传入SynchronizedStreamTaskActionExecutor对象,代码如下所示
// SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {//调用的StreamTask构造函数,传入SynchronizedStreamTaskActionExecutor对象super(env,null,FatalExitExceptionHandler.INSTANCE,//初始化actionExecutorStreamTaskActionExecutor.synchronizedExecutor(lock));//将lock对象赋值给类变量lockthis.lock = Preconditions.checkNotNull(lock);this.sourceThread = new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}// StreamTask的方法
protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,//初始化actionExecutorStreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));
}protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox mailbox)throws Exception {super(environment);this.configuration = new StreamConfig(getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);//初始化actionExecutorthis.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);.......}
小结
actionExecutor执行器每次执行都需要获得mutex对象,mutex对象就是SourceStreamTask类中的lock对象;即算子每次执行Checkpoint时都需要获得SourceStreamTask类中lock对象锁才能进行
数据读取阶段
在执行Checkpoint时控制读取源端,则控制点必定是在调用SourceContext的collect方法时
@Override
public void run(SourceContext<String> ctx) throws Exception {int i = 0;while (true) {//在这个方法里处理ctx.collect(String.valueOf(i));}
}
点击collection查看实现,选择NonTimestampContext查看代码,collect()实现如下
@Override
public void collect(T element) {synchronized (lock) {output.collect(reuse.replace(element));}
}
所以这里控制数据读取发送是通过lock来控制,lock是如何初始化的?
通过NonTimestampContext构造方法可以定位到StreamSourceContexts->getSourceContext方法;
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic,ProcessingTimeService processingTimeService,Object checkpointLock,StreamStatusMaintainer streamStatusMaintainer,Output<StreamRecord<OUT>> output,long watermarkInterval,long idleTimeout) {final SourceFunction.SourceContext<OUT> ctx;switch (timeCharacteristic) {....case ProcessingTime://初始化NonTimestampContextctx = new NonTimestampContext<>(checkpointLock, output);break;default:throw new IllegalArgumentException(String.valueOf(timeCharacteristic));}return ctx;
}
向上追踪,在StreamSource类中调用getSourceContext:
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector,final OperatorChain<?, ?> operatorChain)throws Exception {....this.ctx =StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);....}
// 再向上最终run方法的调用点->是由内部方法run调用
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final OperatorChain<?, ?> operatorChain)throws Exception {run(lockingObject, streamStatusMaintainer, output, operatorChain);
}//再向上最终run方法的调用点->SourceStreamTask 调用run 然后再代用mainOpterator run方法
@Override
public void run() {try {// 使用的是类变量lockmainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);if (!wasStoppedExternally && !isCanceled()) {synchronized (lock) {operatorChain.setIgnoreEndOfInput(false);}}completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}
}
小结
所以在源端写数据时,必须获得SourceStreamTask中的类变量lock的锁才能进行写数据;类变量lock刚好和执行器时同一个对象
总结
flink的source算子在Checkpoint时,是通过锁对象SourceStreamTask.lock,来控制源端数据产生和Checkpoint的有序进行
相关文章:
源码解析Flink源节点数据读取是如何与checkpoint串行执行
文章目录 源码解析Flink源节点数据读取是如何与checkpoint串行执行Checkpoint阶段StreamTask类变量actionExecutor的实现和初始化小结 数据读取阶段小结 总结 源码解析Flink源节点数据读取是如何与checkpoint串行执行 Flink版本:1.13.6 前置知识:源节点…...
进阶:Docker容器管理工具——Docker-Compose使用
文章目录 前言Compose大杀器编排服务 1、docker-compose安装curl方式安装增加可执行权限查看版本 2、Docker-compose.yaml命令3、 docker-compose实战4、Docker网络路由docker的跨主机网络路由**问题由来**:方案两台机分别配置路由表ip_forward配置 总结 前言 容器的管理工具&…...
策略模式(Strategy)
策略模式是一种行为设计模式,就是定义一系列算法,然后将每一个算法封装起来,并使它们可相互替换。本模式通过定义一组可相互替换的算法,实现将算法独立于使用它的用户而变化。 Strategy is a behavioral design pattern that def…...
webpack基础知识十:与webpack类似的工具还有哪些?区别?
一、模块化工具 模块化是一种处理复杂系统分解为更好的可管理模块的方式 可以用来分割,组织和打包应用。每个模块完成一个特定的子功能,所有的模块按某种方法组装起来,成为一个整体(bundle) 在前端领域中,并非只有webpack这一款…...
分享kubernetes部署:基于Ansible自动安装kubernetes
基于Ansible自动安装kubernetes 环境准备 我们以如下机器环境为例: 开放端口: 控制平面节点 工作节点 请按如上中规定的开放端口,或关闭防火墙: systemctlstopfirewalld&&\ systemctldisablefirewalld 安装常用工具 sudo…...
【Kubernetes部署篇】基于Ubuntu20.04操作系统搭建K8S1.23版本集群
文章目录 一、集群架构规划信息二、系统初始化准备(所有节点同步操作)三、安装kubeadm(所有节点同步操作)四、初始化K8S集群(master节点操作)五、添加Node节点到K8S集群中六、安装Calico网络插件七、测试CoreDNS可用性 一、集群架构规划信息 pod网段:10.244.0.0/16…...
c++--二叉树应用
1.根据二叉树创建字符串 力扣 给你二叉树的根节点 root ,请你采用前序遍历的方式,将二叉树转化为一个由括号和整数组成的字符串,返回构造出的字符串。 空节点使用一对空括号对 "()" 表示,转化后需要省略所有不影响字符…...
以太网DHCP协议(十)
目录 一、工作原理 二、DHCP报文 2.1 DHCP报文类型 2.2 DHCP报文格式 当网络内部的主机设备数量过多是,IP地址的手动设置是一件非常繁琐的事情。为了实现自动设置IP地址、统一管理IP地址分配,TCPIP协议栈中引入了DHCP协议。 一、工作原理 使用DHCP之…...
企业服务器器中了360后缀勒索病毒怎么解决,勒索病毒解密数据恢复
随着网络威胁的增加,企业服务器成为黑客攻击的目标之一。近期,上海某知名律师事务所的数据库遭到了360后缀的勒索病毒攻击,导致企业服务器内的数据库被360后缀勒索病毒加密。许多重要的数据被锁定无法正常读取,严重影响了企业的正…...
详解Kafka分区机制原理|Kafka 系列 二
Kafka 系列第二篇,详解分区机制原理。为了不错过更新,请大家将本号“设为星标”。 点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达 上一篇文章介绍了 Kafka 的基本概念和术语,里面有个概念是 分区(Part…...
CSS学习记录(基础笔记)
CSS简介: CSS 指的是层叠样式表* (Cascading Style Sheets),主要用于设置HTML页面的文字内容(字体、大小、对齐方式),图片的外形(边框) CSS 描述了如何在屏幕、纸张或其他媒体上显示 HTML 元素 CSS 节省…...
Chatgpt AI newbing作画,文字生成图 BingImageCreator 二次开发,对接wxbot
开源项目 https://github.com/acheong08/BingImageCreator 获取cookie信息 cookieStore.get("_U").then(result > console.log(result.value)) pip3 install --upgrade BingImageCreator import os import BingImageCreatoros.environ["http_proxy"]…...
PPT忘记密码如何解除?
PPT文件所带有的两种加密方式,打开密码以及修改权限,两种密码在打开文件的时候都会有相应的提示,但不同的是两种加密忘记密码之后是不同的。 如果忘记了打开密码,我们就没办法打开PPT文件了;如果是忘记了修改密码&…...
绘制曲线python
文章目录 import matplotlib.pyplot as plt# 提供的数据 x= [1,1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9,2,2.1,2.2,2.3,2.4,2.5,2.6,2.7,2.8,2.9,3,3.1,3.2,3.3,3.4,3.5,3.6,3.7,3.8,3.9,4,4.1,4.2,4.3,4.4,4.5,4.6,4.7,4.8,4.9,5,5.1,5.2,5.3,5.4,5.5,5.6,5.7,5.8,5.9,6,6.1,6.2…...
CentOs 8 常见问题处理
CentOs 8 常见问题处理 vmware虚拟机新增网卡操作 vmware虚拟机新增网卡操作 [rootcentos ~]# ip add 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0…...
OpenAI将GPT-4设置为ChatGPT Plus付费用户的默认模型
OpenAI最近为ChatGPT引入了一系列新功能,这些更新旨在增强用户体验,提供更多指导和更多的功能。其中最显著的功能之一是将GPT-4设置为ChatGPT Plus付费用户的默认模型,这意味着付费订阅用户无需手动切换到其他公开可用的语言模型,…...
textarea 标签如何创建多行文本输入框?
聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ textarea 的写法⭐ 代码含义⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个专栏是为那些对Web开发感兴趣、…...
(15)Qt绘图(two)
目录 坐标变换 平移坐标轴 缩放坐标轴 旋转坐标轴 定时器加坐标轴旋转实现动画旋转 transform旋转(可设置旋转轴) 绕X轴旋转 绕Y轴旋转 绕Z轴旋转 错切 Y轴错切 X轴错切 画家的保存与坐标复原 基本图形绘制 绘制点 绘制线 绘制矩形 普…...
用队列实现栈——数据结构与算法
😶🌫️Take your time ! 😶🌫️ 💥个人主页:🔥🔥🔥大魔王🔥🔥🔥 💥代码仓库:🔥🔥魔…...
Python“牵手”1688商品详情页数据采集方法,1688API接口申请指南
1688详情接口 API 是开放平台提供的一种 API 接口,它可以帮助开发者获取商品的详细信息,包括商品的标题、描述、图片等信息。在电商平台的开发中,详情接口API是非常常用的 API,因此本文将详细介绍详情接口 API 的使用。 一、1688…...
《Playwright:微软的自动化测试工具详解》
Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...
uniapp中使用aixos 报错
问题: 在uniapp中使用aixos,运行后报如下错误: AxiosError: There is no suitable adapter to dispatch the request since : - adapter xhr is not supported by the environment - adapter http is not available in the build 解决方案&…...
浅谈不同二分算法的查找情况
二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况…...
Device Mapper 机制
Device Mapper 机制详解 Device Mapper(简称 DM)是 Linux 内核中的一套通用块设备映射框架,为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程,并配以详细的…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...
基于Java+MySQL实现(GUI)客户管理系统
客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...
【C++特殊工具与技术】优化内存分配(一):C++中的内存分配
目录 一、C 内存的基本概念 1.1 内存的物理与逻辑结构 1.2 C 程序的内存区域划分 二、栈内存分配 2.1 栈内存的特点 2.2 栈内存分配示例 三、堆内存分配 3.1 new和delete操作符 4.2 内存泄漏与悬空指针问题 4.3 new和delete的重载 四、智能指针…...
Leetcode33( 搜索旋转排序数组)
题目表述 整数数组 nums 按升序排列,数组中的值 互不相同 。 在传递给函数之前,nums 在预先未知的某个下标 k(0 < k < nums.length)上进行了 旋转,使数组变为 [nums[k], nums[k1], …, nums[n-1], nums[0], nu…...
