当前位置: 首页 > news >正文

Flink StreamTask启动和执行源码分析

文章目录

  • 前言
  • StreamTask 部署启动
  • Task 线程启动
  • StreamTask 初始化
  • StreamTask 执行


前言

Flink的StreamTask的启动和执行是一个复杂的过程,涉及多个关键步骤。以下是StreamTask启动和执行的主要流程:

  1. 初始化:StreamTask的初始化阶段涉及多个任务,包括Operator的配置、task特定的初始化以及初始化算子的State等。在这个阶段,Flink将业务处理函数抽象为operator,并通过operatorChain将业务代码串起来执行,以完成业务逻辑的处理。同时,还会调用具体task的init方法进行初始化。
  2. 读取数据和事件:StreamTask通过mailboxProcessor读取数据和事件。
  3. 运行业务逻辑:在StreamTask的beforeInvoke方法中,主要调用生成operatorChain并执行相关的业务逻辑。这些业务逻辑可能包括Source算子和map算子等,它们将被Chain在一起并在一个线程内同步执行。
  4. 资源清理:在执行完业务逻辑后,StreamTask会进行关闭和资源清理的操作,这部分在afterInvoke阶段完成。

值得注意的是,从资源角度来看,每个TaskManager内部有多个slot,每个slot内部运行着一个subtask,即每个slot内部运行着一个StreamTask。这意味着StreamTask是由TaskManager(TM)部署并执行的本地处理单元。

总的来说,Flink的StreamTask启动和执行是一个由多个阶段和组件协同工作的过程,涉及数据的读取、业务逻辑的执行以及资源的清理等多个方面。这些步骤确保了StreamTask能够高效、准确地处理数据流,并满足实时计算和分析的需求。


StreamTask 部署启动

当 TaskExecutor 接收提交 Task 执行的请求,则调用:

TaskExecutor.submitTask(TaskDeploymentDescriptor tdd, 
JobMasterId jobMasterId,Time timeout){// 构造 Task 对象Task task = new Task(jobInformation, taskInformation, ExecutionAttemptId,AllocationId, SubtaskIndex, ....);// 启动 Task 的执行task.startTaskThread();
}

Task对象的构造方法

public Task(.....){
// 封装一个 Task信息对象 TaskInfo,(TaskInfo, JobInfo,JobMasterInfo)
this.taskInfo = new TaskInfo(....);
// 各种成员变量赋值
......
// 一个Task的执行有输入也有输出: 关于输出的抽象: ResultPartition 和
ResultSubPartitionPipelinedSubpartition// 初始化 ResultPartition 和 ResultSubPartition
final ResultPartitionWriter[] resultPartitionWriters =
shuffleEnvironment.createResultPartitionWriters(....);
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator.decorate(....);
// 一个Task的执行有输入也有输出: 关于输入的抽象: InputGate 和 InputChannel(从上有
一个Task节点拉取数据)
// InputChannel 可能有两种实现: Local Remote
// 初始化 InputGate 和 InputChannel
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates(.....);
// 初始化一个用来执行 Task 的线程,目标对象,就是 Task 自己
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
}

Task 线程启动

Task 的启动,是通过启动 Task 对象的内部 executingThread 来执行 Task 的,具体逻辑在 run 方法中:


private void doRun() {
// 1、先更改 Task 的状态: CREATED ==> DEPLOYING
transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING);
// 2、准备 ExecutionConfig
final ExecutionConfig executionConfig =
serializedExecutionConfig.deserializeValue(userCodeClassLoader);
// 3、初始化输入和输出组件, 拉起 ResultPartition 和 InputGate
setupPartitionsAndGates(consumableNotifyingPartitionWriters,
inputGates);
// 4、注册 输出
for(ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
taskEventDispatcher.registerPartition(partitionWriter.getPartitionId());
} /
/ 5、初始 环境对象 RuntimeEnvironment, 包装在 Task 执行过程中需要的各种组件
Environment env = new RuntimeEnvironment(jobId, vertexId, executionId,
....);
// 6、初始化 调用对象
// 两种最常见的类型: SourceStreamTask、OneInputStreamTask、
TwoInputStreamTask
// 父类: StreamTask
// 通过反射实例化 StreamTask 实例(可能的两种情况: SourceStreamTask,
OneInputStreamTask)
AbstractInvokable invokable =
loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
// 7、先更改 Task 的状态: DEPLOYING ==> RUNNING
transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
// 8、真正把 Task 启动起来了
invokable.invoke();
// 9、StreamTask 需要正常结束,处理 buffer 中的数据
for(ResultPartitionWriter partitionWriter :
consumableNotifyingPartitionWriters) {
if(partitionWriter != null) {
partitionWriter.finish();
}
} /
/ 10、先更改 Task 的状态: RUNNING ==> FINISHED
transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED);

StreamTask 初始化

StreamTask 初始化指的就是 SourceStreamTask 和 OneInputStreamTask 的实例对象的构建!Task 这个类,只是一个笼统意义上的 Task,就是一个通用 Task 的抽象,不管是批处理的,还是流式处理的,不管是 源Task, 还是逻辑处理 Task, 都被抽象成 Task 来进行调度执行!

private SourceStreamTask(Environment env, Object lock) throws Exception {super(env,null,FatalExitExceptionHandler.INSTANCE,StreamTaskActionExecutor.synchronizedExecutor(lock));this.lock = Preconditions.checkNotNull(lock);this.sourceThread = new LegacySourceFunctionThread();getEnvironment().getMetricGroup().getIOMetricGroup().setEnableBusyTime(false);}@Overrideprotected void init() {// we check if the source is actually inducing the checkpoints, rather// than the triggerSourceFunction<?> source = mainOperator.getUserFunction();if (source instanceof ExternallyInducedSource) {externallyInducedCheckpoints = true;ExternallyInducedSource.CheckpointTrigger triggerHook =new ExternallyInducedSource.CheckpointTrigger() {@Overridepublic void triggerCheckpoint(long checkpointId) throws FlinkException {// TODO - we need to see how to derive those. We should probably not// encode this in the// TODO -   source's trigger message, but do a handshake in this task// between the trigger// TODO -   message from the master, and the source's trigger// notificationfinal CheckpointOptions checkpointOptions =CheckpointOptions.forConfig(CheckpointType.CHECKPOINT,CheckpointStorageLocationReference.getDefault(),configuration.isExactlyOnceCheckpointMode(),configuration.isUnalignedCheckpointsEnabled(),configuration.getAlignedCheckpointTimeout().toMillis());final long timestamp = System.currentTimeMillis();final CheckpointMetaData checkpointMetaData =new CheckpointMetaData(checkpointId, timestamp, timestamp);try {SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions).get();} catch (RuntimeException e) {throw e;} catch (Exception e) {throw new FlinkException(e.getMessage(), e);}}};((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);}getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME,this::getAsyncCheckpointStartDelayNanos);recordWriter.setMaxOverdraftBuffersPerGate(0);}

StreamTask 执行

核心步骤如下:

public final void invoke() throws Exception {
// Task 正式工作之前
beforeInvoke();
// Task 开始工作: 针对数据执行正儿八经的逻辑处理
runMailboxLoop();
// Task 要结束
afterInvoke();
// Task 最后执行清理
cleanUpInvoke();
}

总结一下要点:

  • 在 beforeInvoke() 中,主要是初始化 OperatorChain,然后调用 init() 执行初始化,然后恢复状态,更改 Task 自己的状态为 isRunning = true
  • 在 runMailboxLoop() 中,主要是不停的处理 mail,这里是 FLink-1.10 的一项改进,使用了mailbox 模型来处理任务
  • 在 afterInvoke() 中,主要是完成 Task 要结束之前需要完成的一些细节,比如,把 Buffer 中还没flush 的数据 flush 出来
  • 在 cleanUpInvoke() 中,主要做一些资源的释放,执行各种关闭动作:set false,interrupt,
    shutdown,close,cleanup,dispose 等

相关文章:

Flink StreamTask启动和执行源码分析

文章目录 前言StreamTask 部署启动Task 线程启动StreamTask 初始化StreamTask 执行 前言 Flink的StreamTask的启动和执行是一个复杂的过程&#xff0c;涉及多个关键步骤。以下是StreamTask启动和执行的主要流程&#xff1a; 初始化&#xff1a;StreamTask的初始化阶段涉及多个…...

【MySQL 系列】MySQL 语句篇_DCL 语句

DCL&#xff08; Data Control Language&#xff0c;数据控制语言&#xff09;用于对数据访问权限进行控制&#xff0c;定义数据库、表、字段、用户的访问权限和安全级别。主要关键字包括 GRANT、 REVOKE 等。 文章目录 1、MySQL 中的 DCL 语句1.1、数据控制语言--DCL1.2、MySQ…...

什么是序列化?为什么需要序列化?

1、典型回答 序列化(Serialization)序列化是将对象转换为可存储或传输的形式的过程(例如: 将对象转换为字节流) 反序列化(Deserialization) 是将序列化后的数据(例如: 二进制文件)转换回原始对象的过程。通过反序列化&#xff0c;可以从存储介质 (如磁盘、数据库) 或通过网络…...

Linux本地搭建FastDFS系统

文章目录 前言1. 本地搭建FastDFS文件系统1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址5.…...

docker和docker-compose安装

一、docker安装 1、移除旧版本 依次执行如下命令移除旧版本docker&#xff0c;如未安装过无需执行 yum -y remove docker docker-client docker-client-latest docker-common docker-latest docker-latest-logrotate docker-logrotate docker-selinux docker-engine-selinux…...

深入理解Spring的ApplicationContext:案例详解与应用

深入理解Spring的ApplicationContext&#xff1a;案例详解与应用 在Spring框架的丰富生态中&#xff0c;ApplicationContext扮演着至关重要的角色。作为BeanFactory的扩展&#xff0c;ApplicationContext不仅继承了其所有功能&#xff0c;还引入了更多高级特性&#xff0c;使得…...

6.Java并发编程—深入剖析Java Executors:探索创建线程的5种神奇方式

Executors快速创建线程池的方法 Java通过Executors 工厂提供了5种创建线程池的方法&#xff0c;具体方法如下 方法名描述newSingleThreadExecutor()创建一个单线程的线程池&#xff0c;该线程池中只有一个工作线程。所有任务按照提交的顺序依次执行&#xff0c;保证任务的顺序性…...

英语阅读挑战

英语阅读真是令人头痛的东西。可怜的子航想利用寒假时间突破英语难题。当他拿到一篇英语阅读时&#xff0c;他很好奇作者最喜欢用那些字母。 输入 一句30词以内的英语句子 输出 统计每个字母出现的次数 样例输入 复制 However,the British dont have a history of exporting th…...

备战蓝桥之思维

平台重叠真的坑 给你一句样例&#xff0c;如果你觉得自己的代码没问题那就试试吧 2 1 1 3 1 0 4 正确答案 0 0 0 0 P1105 平台 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) import java.awt.Checkbox; import java.awt.PageAttributes.OriginType; import java.io.B…...

09 string的实现

注意 实现仿cplus官网的的string类&#xff0c;对部分主要功能实现 实现 头文件 #pragma once #include <iostream> #include <assert.h> #include <string>namespace mystring {class string{friend std::ostream& operator<<(std::ostream&a…...

Git 进行版本控制时,配置 user.name 和 user.email

在使用 Git 进行版本控制时&#xff0c;配置 user.name 和 user.email 是一个非常重要的初始步骤&#xff0c;但不是绝对必须的。这两个配置项定义了当你进行提交&#xff08;commit&#xff09;时用于标识提交者的信息。 为什么建议配置 user.name 和 user.email 标识提交者…...

传统开发读写优化与HBase

目录: 一、传统开发数据读写性能优化 1. Mysql 分表、主从复制与读写分离 2. Redis(缓存型数据库)主从复制与读写分离 二、HBase 一、传统开发数据读写性能优化 1、Mysql 分表、主从复制与读写分离 mysql分库分表方案 一种分表方案&#xff1a;设置表A 表B 表A 自增列从1开始…...

【OpenGL实现 03】纹理贴图原理和实现

目录 一、说明二、纹理贴图原理2.1 纹理融合原理2.2 UV坐标原理 三、生成纹理对象3.1 需要在VAO上绑定纹理坐标3.2 纹理传递3.3 纹理buffer生成 四、代码实现&#xff1a;五、着色器4.1 片段4.2 顶点 五、后记 一、说明 本篇叙述在画出图元的时候&#xff0c;如何贴图纹理图片…...

FDU 2021 | 二叉树关键节点的个数

文章目录 1. 题目描述2. 我的尝试 1. 题目描述 给定一颗二叉树&#xff0c;树的每个节点的值为一个正整数。如果从根节点到节点 N 的路径上不存在比节点 N 的值大的节点&#xff0c;那么节点 N 被认为是树上的关键节点。求树上所有的关键节点的个数。请写出程序&#xff0c;并…...

精读《React Conf 2019 - Day2》

1 引言 这是继 精读《React Conf 2019 - Day1》 之后的第二篇&#xff0c;补充了 React Conf 2019 第二天的内容。 2 概述 & 精读 第二天的内容更为精彩&#xff0c;笔者会重点介绍比较干货的部分。 Fast refresh Fast refresh 是更好的 react-hot-loader 替代方案&am…...

向ChatGPT高效提问模板

PS: ChatGPT无限次数&#xff0c;无需魔法&#xff0c;登录即可使用,网页打开下面 tj4.mnsfdx.net [点击跳转链接](http://tj4.mnsfdx.net/) 我想请你XXXX&#xff0c;请问我应该如何向你提问才能得到最满意的答案&#xff0c;请提供全面、详细的建议&#xff0c;针对每一个建…...

android metaRTC编译

参考文章&#xff1a; metaRTC3.0稳定版本编译指南_metartc 编译-CSDN博客 源码下载&#xff1a; Releases metartc/metaRTC GitHub 版本v6.0-b4即可...

HDFS面试重点

文章目录 1. HDFS的架构2. HDFS的读写流程3.HDFS中&#xff0c;文件为什么以block块的方式存储&#xff1f; 1. HDFS的架构 HDFS的架构可以分为以下几个主要组件&#xff1a; NameNode&#xff08;名称节点&#xff09;&#xff1a; NameNode是HDFS的关键组件之一&#xff0c;…...

Java中的IO流是什么?

Java中的IO流&#xff08;Input/Output Stream&#xff09;是Java编程语言中用于处理输入和输出操作的一种重要机制。在Java中&#xff0c;IO流被用来读取和写入数据&#xff0c;这些数据可以来自各种来源&#xff0c;如文件、网络连接、内存缓冲区等。Java的IO流提供了丰富的类…...

Spring boot 集成netty实现websocket通信

一、netty介绍 Netty 是一个基于NIO的客户、服务器端的编程框架&#xff0c;使用Netty 可以确保你快速和简单的开发出一个网络应用&#xff0c;例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程&#xff0c;例如&#xff1a;基于TCP和U…...

STM32+rt-thread判断是否联网

一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...

Python爬虫实战:研究feedparser库相关技术

1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...

STM32F4基本定时器使用和原理详解

STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

Qwen3-Embedding-0.6B深度解析:多语言语义检索的轻量级利器

第一章 引言&#xff1a;语义表示的新时代挑战与Qwen3的破局之路 1.1 文本嵌入的核心价值与技术演进 在人工智能领域&#xff0c;文本嵌入技术如同连接自然语言与机器理解的“神经突触”——它将人类语言转化为计算机可计算的语义向量&#xff0c;支撑着搜索引擎、推荐系统、…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...

让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比

在机器学习的回归分析中&#xff0c;损失函数的选择对模型性能具有决定性影响。均方误差&#xff08;MSE&#xff09;作为经典的损失函数&#xff0c;在处理干净数据时表现优异&#xff0c;但在面对包含异常值的噪声数据时&#xff0c;其对大误差的二次惩罚机制往往导致模型参数…...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...

通过 Ansible 在 Windows 2022 上安装 IIS Web 服务器

拓扑结构 这是一个用于通过 Ansible 部署 IIS Web 服务器的实验室拓扑。 前提条件&#xff1a; 在被管理的节点上安装WinRm 准备一张自签名的证书 开放防火墙入站tcp 5985 5986端口 准备自签名证书 PS C:\Users\azureuser> $cert New-SelfSignedCertificate -DnsName &…...