源码解析Flink源节点数据读取是如何与checkpoint串行执行
文章目录
- 源码解析Flink源节点数据读取是如何与checkpoint串行执行
- Checkpoint阶段
- StreamTask类变量actionExecutor的实现和初始化
- 小结
- 数据读取阶段
- 小结
- 总结
源码解析Flink源节点数据读取是如何与checkpoint串行执行
Flink版本:1.13.6
前置知识:源节点的Checkpoint是由Checkpointcoordinate触发,具体是通过RPC调用TaskManager中对应的Task的StreamTask类的performChecpoint方法执行Checkpoint。
本文思路:本文先分析checkpoint阶段,然后再分析数据读取阶段,最后得出结论:源节点Checkpoint时和源节点读取数据时,都需要抢SourceStreamTask类中lock变量的锁,最终实现串行执行checkpoint与写数据
Checkpoint阶段
Checkpoint在StreamTask的performCheckpoint方法中执行,该方法调用过程如下
// 在StreamTask类中 执行checkpoint操作
private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetricsBuilder checkpointMetrics )throws Exception {if (isRunning) {//使用actionExecutor 同步触发checkpointactionExecutor.runThrowing(() -> {....//经过一系列检查subtaskCheckpointCoordinator.checkpointState(checkpointMetaData,checkpointOptions,checkpointMetrics,operatorChain,this::isRunning);});return true;} else {....}}
从上述代码可以看出,Checkpoint执行是由actionExecutor执行器执行
StreamTask类变量actionExecutor的实现和初始化
StreamTask类变量actionExecution的实现
通过代码注释可以知道该执行器的实现是StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor;从SynchronizedStreamTaskActionExecutor源代码可知,该执行器每次执行都需要获得mutex对象锁
/*** All actions outside of the task {@link #mailboxProcessor mailbox} (i.e. performed by another* thread) must be executed through this executor to ensure that we don't have concurrent method* calls that void consistent checkpoints.** <p>CheckpointLock is superseded by {@link MailboxExecutor}, with {@link* StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor* SynchronizedStreamTaskActionExecutor} to provide lock to {@link SourceStreamTask}.*/
private final StreamTaskActionExecutor actionExecutor;class SynchronizedStreamTaskActionExecutor implements StreamTaskActionExecutor {private final Object mutex;public SynchronizedStreamTaskActionExecutor(Object mutex) {this.mutex = mutex;}@Overridepublic void run(RunnableWithException runnable) throws Exception {synchronized (mutex) {runnable.run();}}
}
StreamTask变量actionExecution初始化
actionExecutor变量在StreamTask中定义,在构造方法中初始化;该构造方法由SourceStreamTask调用,并传入SynchronizedStreamTaskActionExecutor对象,代码如下所示
// SourceStreamTask的方法
private SourceStreamTask(Environment env, Object lock) throws Exception {//调用的StreamTask构造函数,传入SynchronizedStreamTaskActionExecutor对象super(env,null,FatalExitExceptionHandler.INSTANCE,//初始化actionExecutorStreamTaskActionExecutor.synchronizedExecutor(lock));//将lock对象赋值给类变量lockthis.lock = Preconditions.checkNotNull(lock);this.sourceThread = new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);
}// StreamTask的方法
protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,//初始化actionExecutorStreamTaskActionExecutor actionExecutor)throws Exception {this(environment,timerService,uncaughtExceptionHandler,actionExecutor,new TaskMailboxImpl(Thread.currentThread()));
}protected StreamTask(Environment environment,@Nullable TimerService timerService,Thread.UncaughtExceptionHandler uncaughtExceptionHandler,StreamTaskActionExecutor actionExecutor,TaskMailbox mailbox)throws Exception {super(environment);this.configuration = new StreamConfig(getTaskConfiguration());this.recordWriter = createRecordWriterDelegate(configuration, environment);//初始化actionExecutorthis.actionExecutor = Preconditions.checkNotNull(actionExecutor);this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);.......}
小结
actionExecutor执行器每次执行都需要获得mutex对象,mutex对象就是SourceStreamTask类中的lock对象;即算子每次执行Checkpoint时都需要获得SourceStreamTask类中lock对象锁才能进行
数据读取阶段
在执行Checkpoint时控制读取源端,则控制点必定是在调用SourceContext的collect方法时
@Override
public void run(SourceContext<String> ctx) throws Exception {int i = 0;while (true) {//在这个方法里处理ctx.collect(String.valueOf(i));}
}
点击collection查看实现,选择NonTimestampContext查看代码,collect()实现如下
@Override
public void collect(T element) {synchronized (lock) {output.collect(reuse.replace(element));}
}
所以这里控制数据读取发送是通过lock来控制,lock是如何初始化的?
通过NonTimestampContext构造方法可以定位到StreamSourceContexts->getSourceContext方法;
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(TimeCharacteristic timeCharacteristic,ProcessingTimeService processingTimeService,Object checkpointLock,StreamStatusMaintainer streamStatusMaintainer,Output<StreamRecord<OUT>> output,long watermarkInterval,long idleTimeout) {final SourceFunction.SourceContext<OUT> ctx;switch (timeCharacteristic) {....case ProcessingTime://初始化NonTimestampContextctx = new NonTimestampContext<>(checkpointLock, output);break;default:throw new IllegalArgumentException(String.valueOf(timeCharacteristic));}return ctx;
}
向上追踪,在StreamSource类中调用getSourceContext:
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final Output<StreamRecord<OUT>> collector,final OperatorChain<?, ?> operatorChain)throws Exception {....this.ctx =StreamSourceContexts.getSourceContext(timeCharacteristic,getProcessingTimeService(),lockingObject,streamStatusMaintainer,collector,watermarkInterval,-1);....}
// 再向上最终run方法的调用点->是由内部方法run调用
public void run(final Object lockingObject,final StreamStatusMaintainer streamStatusMaintainer,final OperatorChain<?, ?> operatorChain)throws Exception {run(lockingObject, streamStatusMaintainer, output, operatorChain);
}//再向上最终run方法的调用点->SourceStreamTask 调用run 然后再代用mainOpterator run方法
@Override
public void run() {try {// 使用的是类变量lockmainOperator.run(lock, getStreamStatusMaintainer(), operatorChain);if (!wasStoppedExternally && !isCanceled()) {synchronized (lock) {operatorChain.setIgnoreEndOfInput(false);}}completionFuture.complete(null);} catch (Throwable t) {// Note, t can be also an InterruptedExceptioncompletionFuture.completeExceptionally(t);}
}
小结
所以在源端写数据时,必须获得SourceStreamTask中的类变量lock的锁才能进行写数据;类变量lock刚好和执行器时同一个对象
总结
flink的source算子在Checkpoint时,是通过锁对象SourceStreamTask.lock,来控制源端数据产生和Checkpoint的有序进行
相关文章:
源码解析Flink源节点数据读取是如何与checkpoint串行执行
文章目录 源码解析Flink源节点数据读取是如何与checkpoint串行执行Checkpoint阶段StreamTask类变量actionExecutor的实现和初始化小结 数据读取阶段小结 总结 源码解析Flink源节点数据读取是如何与checkpoint串行执行 Flink版本:1.13.6 前置知识:源节点…...
进阶:Docker容器管理工具——Docker-Compose使用
文章目录 前言Compose大杀器编排服务 1、docker-compose安装curl方式安装增加可执行权限查看版本 2、Docker-compose.yaml命令3、 docker-compose实战4、Docker网络路由docker的跨主机网络路由**问题由来**:方案两台机分别配置路由表ip_forward配置 总结 前言 容器的管理工具&…...
策略模式(Strategy)
策略模式是一种行为设计模式,就是定义一系列算法,然后将每一个算法封装起来,并使它们可相互替换。本模式通过定义一组可相互替换的算法,实现将算法独立于使用它的用户而变化。 Strategy is a behavioral design pattern that def…...
webpack基础知识十:与webpack类似的工具还有哪些?区别?
一、模块化工具 模块化是一种处理复杂系统分解为更好的可管理模块的方式 可以用来分割,组织和打包应用。每个模块完成一个特定的子功能,所有的模块按某种方法组装起来,成为一个整体(bundle) 在前端领域中,并非只有webpack这一款…...
分享kubernetes部署:基于Ansible自动安装kubernetes
基于Ansible自动安装kubernetes 环境准备 我们以如下机器环境为例: 开放端口: 控制平面节点 工作节点 请按如上中规定的开放端口,或关闭防火墙: systemctlstopfirewalld&&\ systemctldisablefirewalld 安装常用工具 sudo…...
【Kubernetes部署篇】基于Ubuntu20.04操作系统搭建K8S1.23版本集群
文章目录 一、集群架构规划信息二、系统初始化准备(所有节点同步操作)三、安装kubeadm(所有节点同步操作)四、初始化K8S集群(master节点操作)五、添加Node节点到K8S集群中六、安装Calico网络插件七、测试CoreDNS可用性 一、集群架构规划信息 pod网段:10.244.0.0/16…...
c++--二叉树应用
1.根据二叉树创建字符串 力扣 给你二叉树的根节点 root ,请你采用前序遍历的方式,将二叉树转化为一个由括号和整数组成的字符串,返回构造出的字符串。 空节点使用一对空括号对 "()" 表示,转化后需要省略所有不影响字符…...
以太网DHCP协议(十)
目录 一、工作原理 二、DHCP报文 2.1 DHCP报文类型 2.2 DHCP报文格式 当网络内部的主机设备数量过多是,IP地址的手动设置是一件非常繁琐的事情。为了实现自动设置IP地址、统一管理IP地址分配,TCPIP协议栈中引入了DHCP协议。 一、工作原理 使用DHCP之…...
企业服务器器中了360后缀勒索病毒怎么解决,勒索病毒解密数据恢复
随着网络威胁的增加,企业服务器成为黑客攻击的目标之一。近期,上海某知名律师事务所的数据库遭到了360后缀的勒索病毒攻击,导致企业服务器内的数据库被360后缀勒索病毒加密。许多重要的数据被锁定无法正常读取,严重影响了企业的正…...
详解Kafka分区机制原理|Kafka 系列 二
Kafka 系列第二篇,详解分区机制原理。为了不错过更新,请大家将本号“设为星标”。 点击上方“后端开发技术”,选择“设为星标” ,优质资源及时送达 上一篇文章介绍了 Kafka 的基本概念和术语,里面有个概念是 分区(Part…...
CSS学习记录(基础笔记)
CSS简介: CSS 指的是层叠样式表* (Cascading Style Sheets),主要用于设置HTML页面的文字内容(字体、大小、对齐方式),图片的外形(边框) CSS 描述了如何在屏幕、纸张或其他媒体上显示 HTML 元素 CSS 节省…...
Chatgpt AI newbing作画,文字生成图 BingImageCreator 二次开发,对接wxbot
开源项目 https://github.com/acheong08/BingImageCreator 获取cookie信息 cookieStore.get("_U").then(result > console.log(result.value)) pip3 install --upgrade BingImageCreator import os import BingImageCreatoros.environ["http_proxy"]…...
PPT忘记密码如何解除?
PPT文件所带有的两种加密方式,打开密码以及修改权限,两种密码在打开文件的时候都会有相应的提示,但不同的是两种加密忘记密码之后是不同的。 如果忘记了打开密码,我们就没办法打开PPT文件了;如果是忘记了修改密码&…...
绘制曲线python
文章目录 import matplotlib.pyplot as plt# 提供的数据 x= [1,1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9,2,2.1,2.2,2.3,2.4,2.5,2.6,2.7,2.8,2.9,3,3.1,3.2,3.3,3.4,3.5,3.6,3.7,3.8,3.9,4,4.1,4.2,4.3,4.4,4.5,4.6,4.7,4.8,4.9,5,5.1,5.2,5.3,5.4,5.5,5.6,5.7,5.8,5.9,6,6.1,6.2…...
CentOs 8 常见问题处理
CentOs 8 常见问题处理 vmware虚拟机新增网卡操作 vmware虚拟机新增网卡操作 [rootcentos ~]# ip add 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00inet 127.0…...
OpenAI将GPT-4设置为ChatGPT Plus付费用户的默认模型
OpenAI最近为ChatGPT引入了一系列新功能,这些更新旨在增强用户体验,提供更多指导和更多的功能。其中最显著的功能之一是将GPT-4设置为ChatGPT Plus付费用户的默认模型,这意味着付费订阅用户无需手动切换到其他公开可用的语言模型,…...
textarea 标签如何创建多行文本输入框?
聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ textarea 的写法⭐ 代码含义⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个专栏是为那些对Web开发感兴趣、…...
(15)Qt绘图(two)
目录 坐标变换 平移坐标轴 缩放坐标轴 旋转坐标轴 定时器加坐标轴旋转实现动画旋转 transform旋转(可设置旋转轴) 绕X轴旋转 绕Y轴旋转 绕Z轴旋转 错切 Y轴错切 X轴错切 画家的保存与坐标复原 基本图形绘制 绘制点 绘制线 绘制矩形 普…...
用队列实现栈——数据结构与算法
😶🌫️Take your time ! 😶🌫️ 💥个人主页:🔥🔥🔥大魔王🔥🔥🔥 💥代码仓库:🔥🔥魔…...
Python“牵手”1688商品详情页数据采集方法,1688API接口申请指南
1688详情接口 API 是开放平台提供的一种 API 接口,它可以帮助开发者获取商品的详细信息,包括商品的标题、描述、图片等信息。在电商平台的开发中,详情接口API是非常常用的 API,因此本文将详细介绍详情接口 API 的使用。 一、1688…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)
题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
【人工智能】神经网络的优化器optimizer(二):Adagrad自适应学习率优化器
一.自适应梯度算法Adagrad概述 Adagrad(Adaptive Gradient Algorithm)是一种自适应学习率的优化算法,由Duchi等人在2011年提出。其核心思想是针对不同参数自动调整学习率,适合处理稀疏数据和不同参数梯度差异较大的场景。Adagrad通…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配
AI3D视觉的工业赋能者 迁移科技成立于2017年,作为行业领先的3D工业相机及视觉系统供应商,累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成,通过稳定、易用、高回报的AI3D视觉系统,为汽车、新能源、金属制造等行…...
微信小程序云开发平台MySQL的连接方式
注:微信小程序云开发平台指的是腾讯云开发 先给结论:微信小程序云开发平台的MySQL,无法通过获取数据库连接信息的方式进行连接,连接只能通过云开发的SDK连接,具体要参考官方文档: 为什么? 因为…...
