Flink源码之StreamTask启动流程
每个ExecutionVertex分配Slot后,JobMaster就会向Slot所在的TaskExecutor提交RPC请求执行Task,接口为TaskExecutorGateway::submitTask
CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, @RpcTimeout Time timeout);
TaskDeploymentDescriptor 中包含当前Task的执行逻辑、Job信息、输入输出信息

submitTask 方法核心就是构造org.apache.flink.runtime.taskmanager.Task实例,该实例继承自Runnable接口,有个Thread成员变量,构造完成后就启动线程执行Task逻辑。
TaskExecutor::submitTask
Task.startTaskThread
Task.run
Task.doRun
Task::setupPartitionsAndGates //初始化Task的输入输出
RuntimeEnvironment::new //封装task执行上下文信息
Task::loadAndInstantiateInvokable //TaskInvokables实例化
StreamTask::newStreamTask::createRecordWriterDelegate //创建Writer,为每个StreamEdge创建一个WriterStreamTask::createStateBackend //创建StateBackend,一个task一个StateBackend实例StreamTask::createCheckpointStorageSubtaskCheckpointCoordinatorImpl::new
Task::restoreAndInvoke
TaskInvokable::restore
TaskInvokable::invoke //处理输入元素
TaskInvokable::cleanUp
Task的Invokable Class是在StreamGraph中添加Operator形成StreamNode时确定的,对不同的算子有不同的InvokableClass:
- SourceStreamTask.class //LegacySource算子
- SourceOperatorStreamTask //Source算子
- OneInputStreamTask.class //输入是一个算子
- TwoInputStreamTask:class //输入是两个算子
- MultipleInputStreamTask.class //输入有多个算子
以上这些类都继承自org.apache.flink.streaming.runtime.tasks.StreamTask

在调用TaskInvokable::restore时会执行:
StreamTask::restore
StreamTask::restoreInternal //创建OperatorChain
RegularOperatorChain::new
OperatorChain::new
OperatorChain::createOutputCollector
OperatorChain::createOperatorChain
OperatorChain::createOperator
StreamOperatorFactoryUtil.createOperator //创建Operator,在每个算子的StreamConfig中定义了每个Operator具体类型,比如StreamMap, StreamFlatMap
SimpleOperatorFactory::createStreamOperator //创建StreamOperator包装了用户函数,, StreamOperator包装了代码中用户函数,会调用用户函数中的open/close等生命周期函数AbstractUdfStreamOperator::setupAbstractStreamOperator::setup //设置用用自定义函数中的RuntimeContext成员变量StreamingRuntimeContext::new //StreamTask::init //子类做初始化,创建InputGate、StreamTaskInput、DataOutput及InputProcessor
StreamTask::restoreGatesStreamTask::createStreamTaskStateInitializerStreamTaskStateInitializerImpl::new //OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法AbstractStreamOperator::initializeState StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackendStreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理StreamOperatorStateHandler::initializeOperatorStateStateInitializationContextImpl::newAbstractUdfStreamOperator::initializeState//调用用户定义函数中的initializeState方法,可获取Operator StateStreamingFunctionUtils::restoreFunctionStateStreamingRuntimeContext::setKeyedStateStoreStreamOperator::open //调用getRuntimeContext().getState可获取keySate
StreamTask::invoke
StreamTask::runMailboxLoop
MailboxProcessor::runMailboxLoop
StreamTask::processInput
整个过程在StreamTask.java的注释中有说明:
* -- invoke()* |* +----> Create basic utils (config, etc) and load the chain of operators* +----> operators.setup()* +----> task specific init()* +----> initialize-operator-states()* +----> open-operators()* +----> run()* +----> finish-operators()* +----> close-operators()* +----> common cleanup* +----> task specific cleanup()
- 首先创建OperatorChain,依次创建出每个StreamOperator
- 调用Operator的setup方法,初始化StreamingRuntimeContext
- 调用子类init方法初始化
- 调用initializeState初始化每个算子的状态,此时会为每个StreamOperator创建keyedStatedBackend和operatorStateBackend,然后会调用用户定义函数中的initializeState方法,用于创建Operator State
- 调用算子的open方法,便于用户在自定义函数open中进行初始化,比如初始化keyState
- 调用processInput处理流中数据
SourceStreamTask重载了StreamTask::processInput,该方法中直接起一个线程调用SourceFunction::run方法。
OneInputStreamTask则不同,它重载了StreamTask的init方法,在init方法中创建了StreamOneInputProcessor
OneInputStreamTask::init
OneInputStreamTask::createCheckpointedInputGate
OneInputStreamTask::createDataOutput //创建StreamTaskNetworkOutput
OneInputStreamTask::createTaskInput //创建StreamTaskNetworkInput
StreamOneInputProcessor::new
在StreamTask::processInput则是调用InputProcessor::processInput不断读取数据进行处理
StreamOneInputProcessor::processInput
StreamTaskNetworkInput::emitNext(StreamTaskNetworkOutput)
AbstractStreamTaskNetworkInput::emitNext //循环不断从buffer中读取StreamElement
处理
AbstractStreamTaskNetworkInput::processElementStreamTaskNetworkOutput::emitRecord //调用operator的setKeyContextElement和processElementOneInputStreamOperator::setKeyContextElementAbstractStreamOperator::setKeyContextElement1AbstractStreamOperator::setCurrentKey //StreamOperatorStateHandler::setCurrentKey //设置状态当前keyInput::processElement //调用StreamOperator的processElement方法
以上Task从提交到起线程执行起来的整个过程,在初始化过程中为每个StreamOperator进行状态后端的初始化相当重要,后续处理流的过程中会使用这些状态后端存储管理状态。
相关文章:
Flink源码之StreamTask启动流程
每个ExecutionVertex分配Slot后,JobMaster就会向Slot所在的TaskExecutor提交RPC请求执行Task,接口为TaskExecutorGateway::submitTask CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, RpcTi…...
【BASH】回顾与知识点梳理(三十)
【BASH】回顾与知识点梳理 三十 三十. 进程的观察30.1 ps :将某个时间点的进程运作情况撷取下来仅观察自己的 bash 相关进程: ps -l观察系统所有进程: ps aux 30.2 top:动态观察进程的变化30.3 pstree 该系列目录 --> 【BASH】…...
亿赛通电子文档安全管理系统任意文件上传漏洞复现
0x01 产品简介 亿赛通电子文档安全管理系统(简称:CDG)是一款电子文档安全加密软件,该系统利用驱动层透明加密技术,通过对电子文档的加密保护,防止内部员工泄密和外部人员非法窃取企业核心重要数据资产&…...
java:数据库连接池
概念 举个例子来说吧,假设我们开了一家餐馆,客人来了,我们就请一个服务员,使用完后再把他开除了,下个客人再来了,我们再请一个,使用完再开除。 这是不是我们现在使用 JDBC 连接数据库的场景&a…...
可视化绘图技巧100篇基础篇(三)-条形图(一)
目录 前言 适用场景 图例 条形图分类 多系列条形图 单系列条形图...
如何使用Redis实现附近商家查询
导读 在日常生活中,我们经常能看见查询附近商家的功能。 常见的场景有,比如你在点外卖的时候,就可能需要按照距离查询附近几百米或者几公里的商家。 本文将介绍如何使用Redis实现按照距离查询附近商户的功能,并以SpringBoot项目…...
于vue3+vite+element pro + pnpm开源项目
河码桌面是一个基于vue3viteelement pro pnpm 创建的monorepo项目,项目采用的是类操作系统的web界面,操作起来简单又方便,符合用户习惯,又没有操作系统的复杂! 有两个两个分支,一个是web版本,…...
18-组件化开发 根组件
组件化开发 & 根组件: 1. 组件化:一个页面可以拆分成一个个组件,每个组件有着自己独立的结构、样式、行为. 好处:便于维护,利于复用->提升开发效率 组件分类: 普通组件 , 根组件 2. 根组件:整个应用最上层的组件,包裹所有普通小组件…...
springboot集成ES
1.引入pom依赖2.application 配置3.JavaBean配置以及ES相关注解 3.1 Student实体类3.2 Teacher实体类3.3 Headmaster 实体类4. 启动类配置5.elasticsearchRestTemplate 新增 5.1 createIndex && putMapping 创建索引及映射 5.1.1 Controller层5.1.2 service层5.1.3 ser…...
Maven 生成编译时间和版本Java类
本文使用Maven插件来自动生成一个 Version.java 类,可以在Java代码中使用里面对应的常量,获取当前版本号和构建时间。 Maven编译后自动生成的 Version.java 文件内容如下所示: package com.shanhy.demo;public final class Version {public…...
关于uniapp微信小程序scroll-view组件使用show-scrollbar隐藏不了滚动条
这里关于使用 scroll-view组件 时候有滚动条 想要隐藏滚动条但是使用show-scrollbar没有效果 这时候又使用类名隐藏滚动条 使用id隐藏滚动条都不行 解决方法:在使用 scroll-view组件 的页面或者app 页面加上以下代码就可以了 ::-webkit-scrollbar {displa…...
CSS:filter滤镜 详解(用法 + 代码 + 例子 + 效果)
文章目录 filter 滤镜blur() 模糊度例子 渐变光晕 brightness() 元素亮度contrast() 对比度grayscale() 元素灰度hue-rorate() 色相opacity() 透明度invert() 反转颜色saturate() 饱和度 backdrop-filter 蒙版,滤镜例子 卷轴展开 filter 滤镜 动图为效果添加前后对…...
【Unity每日一记】Physics.Raycast 相关_Unity中的“X光射线”
👨💻个人主页:元宇宙-秩沅 👨💻 hallo 欢迎 点赞👍 收藏⭐ 留言📝 加关注✅! 👨💻 本文由 秩沅 原创 👨💻 收录于专栏:uni…...
软件报错msvcr90.dll丢失的解决方法,亲测可以修复
我曾经遇到过一个令人头疼的问题:msvcr90.dll丢失。这个问题导致了我的程序无法正常运行,让我感到非常苦恼。然而,在经过一番努力后,我终于成功地修复了这个问题,这让我感到非常欣慰和满足。 msvcr90.dll丢失的原因可能…...
第一百一十八回 如何获取蓝牙连接状态
文章目录 知识回顾实现方法示例代码我们在上一章回中介绍了如何连接蓝牙设备相关的内容,本章回中将介绍如何获取蓝牙连接状态.闲话休提,让我们一起Talk Flutter吧。 知识回顾 我们在上一章回中介绍如何连接蓝牙设备,但是如何知道蓝牙设备是否连接成功呢?这就需要我们去获取…...
C++11并发与多线程笔记(12) windows临界区、其他各种mutex互斥量
C11并发与多线程笔记(12) windows临界区、其他各种mutex互斥量 1、windows临界区2、自动析构技术3、递归独占互斥量 std::recursive_mutex4、带超时的互斥量 std::timed_mutex 和 std::recursive_timed_mutex4.1 std::timed_mutex:是带超时的…...
[MAUI]在.NET MAUI中实现可拖拽排序列表
文章目录 创建可拖放控件创建绑定服务类拖拽(Drag)拖拽悬停,经过(DragOver)释放(Drop) 创建页面元素最终效果项目地址 .NET MAUI 中提供了拖放(drag-drop)手势识别器,允许用户通过拖…...
Linux 内核内存管理 pfn_to_online_page宏
文章目录 一、Memory Hotplug1.1 简介1.2 热插拔事件通知机制 二、pfn_to_online_page2.1 pfn_to_online_page2.2 pfn_to_section_nr2.3 online_section_nr 参考资料 一、Memory Hotplug 1.1 简介 Linux 内存热插拔(Memory Hotplug)是指在运行时动态增…...
『C语言初阶』第八章 -结构体
前言 今天小羊又来给铁汁们分享关于C语言的结构体,在C语言中,结构体类型属于一种构造类型(其他的构造类型还有:数组类型,联合类型),今天我们主要简单了解一下结构体。 一、结构体是什么&#x…...
chatgpt汇编,两个关于chatgpt的问题
1.用C写一个高级的可自主学习的AI语言模型 编写一个高级的可自主学习的AI语言模型是一个复杂且庞大的工程,需要深入的专业知识和大量的时间精力。下面是一个简化的示例,演示如何使用C实现一个基本的可自主学习的AI语言模型。 首先,你需要定…...
SpringBoot-17-MyBatis动态SQL标签之常用标签
文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...
零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?
一、核心优势:专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发,是一款收费低廉但功能全面的Windows NAS工具,主打“无学习成本部署” 。与其他NAS软件相比,其优势在于: 无需硬件改造:将任意W…...
51c自动驾驶~合集58
我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留,CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制(CCA-Attention),…...
抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...
python如何将word的doc另存为docx
将 DOCX 文件另存为 DOCX 格式(Python 实现) 在 Python 中,你可以使用 python-docx 库来操作 Word 文档。不过需要注意的是,.doc 是旧的 Word 格式,而 .docx 是新的基于 XML 的格式。python-docx 只能处理 .docx 格式…...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...
Python Ovito统计金刚石结构数量
大家好,我是小马老师。 本文介绍python ovito方法统计金刚石结构的方法。 Ovito Identify diamond structure命令可以识别和统计金刚石结构,但是无法直接输出结构的变化情况。 本文使用python调用ovito包的方法,可以持续统计各步的金刚石结构,具体代码如下: from ovito…...
搭建DNS域名解析服务器(正向解析资源文件)
正向解析资源文件 1)准备工作 服务端及客户端都关闭安全软件 [rootlocalhost ~]# systemctl stop firewalld [rootlocalhost ~]# setenforce 0 2)服务端安装软件:bind 1.配置yum源 [rootlocalhost ~]# cat /etc/yum.repos.d/base.repo [Base…...
实战三:开发网页端界面完成黑白视频转为彩色视频
一、需求描述 设计一个简单的视频上色应用,用户可以通过网页界面上传黑白视频,系统会自动将其转换为彩色视频。整个过程对用户来说非常简单直观,不需要了解技术细节。 效果图 二、实现思路 总体思路: 用户通过Gradio界面上…...
C++_哈希表
本篇文章是对C学习的哈希表部分的学习分享 相信一定会对你有所帮助~ 那咱们废话不多说,直接开始吧! 一、基础概念 1. 哈希核心思想: 哈希函数的作用:通过此函数建立一个Key与存储位置之间的映射关系。理想目标:实现…...
