当前位置: 首页 > news >正文

深度了解flink(七) JobManager(1) 组件启动流程分析

前言

JobManager是Flink的核心进程,主要负责Flink集群的启动和初始化,包含多个重要的组件(JboMaster,Dispatcher,WebEndpoint等),本篇文章会基于源码分析JobManagr的启动流程,对其各个组件进行介绍,希望对JobManager有一个更全面的了解。

集群启动模式

ClusterEntryPoint是Flink集群的入口点的基类,该类是抽象类,类继承关系UML图如下

通过上图可知道,Flink有3种集群模式

Flink Session集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneSessionClusterEntrypoint Standalone session模式下集群的入口类
  • KubernetesSessionClusterEntrypoint K8s session模式下集群的入口类
  • YarnSessionClusterEntrypoint Yarn session模式下集群的入口类

集群生命周期

在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。

资源隔离:

Flink作业共享集群的ResourceManager和Dispacher等组件,TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。

适用场景:

因为组件共享,session集群资源使用率高,集群预先存在,不需要额外申请资源,适合一些比较小的,不是长期运行的作业,例如SQL预览,交互式查询,实时任务测试环境等

Flink Per Job集群

只要Yarn提供了继承的子类:YarnJobClusterEntrypoint

集群生命周期

在 Flink Job 集群中,可用的集群管理器(例如 YARN)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。一旦作业完成,Flink Job 集群将被拆除。

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

实时由于Per Job模式下用户应用程序的main方法在客户端执行生成JobGraph,任务量大情况下存在性能瓶颈,目前已被标记为废弃状态。

Flink Application集群

根据不同的资源管理器,有3个不同的子类:

  • StandaloneApplicationClusterEntryPoint Standalone Application模式下集群的入口类
  • KubernetesApplicationClusterEntrypoint K8s Application模式下集群的入口类
  • YarnApplicationClusterEntryPoint Yarn Application模式下集群的入口类

集群生命周期

Flink Application 集群是专用的 Flink 集群,仅从 Flink 应用程序执行作业,并且main方法在集群上而不是客户端上运行。应用程序逻辑和依赖打包成一个可执行的作业 JAR 中,并且集群入口(ApplicationClusterEntryPoint)负责调用main方法来提取 JobGraph

资源隔离:

每一个提交的Flink应用程序单独创建一套完整集群环境,该Job独享使用的计算资源和组件服务。

使用场景:

Application模式资源隔离性好,Per Job模式的替换方案,适合长期运行、具有高稳定性的大型作业

JobManager启动流程

JobManger启动流程在不同模式下基本相同,Standalone模式可以在本地运行(可以参考),方便Debug,因为使用Standalone模式的入口类StandaloneSessionClusterEntrypoint进行启动流程的分析。

main方法入口

public static void main(String[] args) {// 打印系统相关信息EnvironmentInformation.logEnvironmentInfo(LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);//信号注册器,注册系统级别的信号,接收到系统级别终止信号优雅的关闭SignalHandler.register(LOG);//注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭JvmShutdownSafeguard.installAsShutdownHook(LOG);// 解析命令行参数,获取配置信final EntrypointClusterConfiguration entrypointClusterConfiguration =ClusterEntrypointUtils.parseParametersOrExit(args,new EntrypointClusterConfigurationParserFactory(),StandaloneSessionClusterEntrypoint.class);//加载config.yaml,构建Configuration对象Configuration configuration = loadConfiguration(entrypointClusterConfiguration);StandaloneSessionClusterEntrypoint entrypoint =new StandaloneSessionClusterEntrypoint(configuration);ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

主要步骤

  1. 打印系统信息。
  2. 注册信号处理器,注册系统级别的信号,确保优雅关闭。
  3. 注册一个安全的钩子,这样jvm停止之前会睡眠5s去释放资源,5s之后强制关闭。
  4. 解析命令行参数,加载配置文件。
  5. 初始化 StandaloneSessionClusterEntrypoint
  6. 调用 ClusterEntrypoint#runClusterEntrypoint 方法启动集群。

ClusterEntrypoint#runClusterEntrypoint

public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {final String clusterEntrypointName = clusterEntrypoint.getClass().getSimpleName();
try {//clusterEntrypoint.startCluster();
} catch (ClusterEntrypointException e) {LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName),e);System.exit(STARTUP_FAILURE_RETURN_CODE);
}//无关代码 无需关注
}

核心步骤

  • 调用 clusterEntrypoint.startCluster() 启动集群。

ClusterEntrypoint#startCluster

public void startCluster() throws ClusterEntrypointException {//无关代码 无需关注try {FlinkSecurityManager.setFromConfiguration(configuration);//插件管理类,用来加载插件。插件加载两种方式。//1).通过如下参数配置FLINK_PLUGINS_DIR。//2).将插件jar包放入到plugins下PluginManager pluginManager =PluginUtils.createPluginManagerFromRootFolder(configuration);//初始化文件系统的配置configureFileSystems(configuration, pluginManager);//初始化安全上下文环境 默认HadoopSecurityContext,Hadoop安全上下文,//使用先前初始化的UGI(UserGroupInformation)和适当的安全凭据。比如Kerberos。//总结:初始化安全环境,创建安全环境的时候会做一系列的检查。SecurityContext securityContext = installSecurityContext(configuration);ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);//安全的情况下调用runCluster开始初始化组件securityContext.runSecured((Callable<Void>)() -> {runCluster(configuration, pluginManager);return null;});} catch (Throwable t) {//异常处理代码 无需关注}}

startCluster方法主要做了一些环境和配置初始化的工作

主要步骤

  1. 初始化插件管理器,用来加载插件。
  2. 初始化文件系统设置 例如 hdfs、本地file。此时只是初始化的配置。
  3. 初始化安全环境。
  4. 安全环境下调用 runCluster 方法。

ClusterEntrypoint#runCluster

private void  runCluster(Configuration configuration, PluginManager pluginManager)throws Exception {synchronized (lock) {//初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等initializeServices(configuration, pluginManager);// write host information into configurationconfiguration.set(JobManagerOptions.ADDRESS, commonRpcService.getAddress());configuration.set(JobManagerOptions.PORT, commonRpcService.getPort());//创建Dispatcher和ResourceManger组件的工厂类final DispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactory =createDispatcherResourceManagerComponentFactory(configuration);//创建Dispatcher和ResourceManger组件clusterComponent =dispatcherResourceManagerComponentFactory.create(configuration,resourceId.unwrap(),ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,delegationTokenManager,metricRegistry,executionGraphInfoStore,new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),failureEnrichers,this);//组件停止运行后的异步方法clusterComponent.getShutDownFuture().whenComplete(//代码省略)}}

主要步骤

1.初始化集群所需要的服务:例如通信服务,监控服务,高可用服务等

2.创建Dispatcher和ResourceManger组件的工厂类

3.创建Dispatcher和ResourceManger组件

4.定义组件停止运行后的异步方法

总结

本篇文章分享了Flink任务的集群模式,通过源码的方式分析了JobManger的启动流程,后续会对JobManger相关的服务和组件进行更详细的分析。

相关文章:

深度了解flink(七) JobManager(1) 组件启动流程分析

前言 JobManager是Flink的核心进程&#xff0c;主要负责Flink集群的启动和初始化&#xff0c;包含多个重要的组件(JboMaster&#xff0c;Dispatcher&#xff0c;WebEndpoint等)&#xff0c;本篇文章会基于源码分析JobManagr的启动流程&#xff0c;对其各个组件进行介绍&#x…...

PostgreSQL 约束

PostgreSQL 约束 介绍 PostgreSQL 是一种功能强大的开源对象关系数据库系统&#xff0c;它提供了多种约束来确保数据的完整性和一致性。约束是数据库规则&#xff0c;用于限制表中数据的类型和操作。在 PostgreSQL 中&#xff0c;约束可以分为几种类型&#xff0c;包括主键约…...

【Redis】

1、Redis 概述 远程字典服务器&#xff08;Remote Dictionary Server&#xff0c;Redis)&#xff1a;一个开源的、高性能的、轻量级、使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库&#xff0c;通过提供多种键值数据类型来试音不同场景下的缓…...

大厂面试真题-MVCC有哪些不好

MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并发控制&#xff09;虽然具有提高数据库并发性能、避免脏读等优势&#xff0c;但也存在一些缺点。以下是对MVCC缺点的详细归纳&#xff1a; 一、存储开销增加 MVCC需要为每个数据行存储多个版本&#x…...

一篇教你多排轮播效果

多排轮播 提示&#xff1a;demo案例 效果看看把 这些都是可以单独左右滑动的 文章目录 多排轮播前言一、上才艺总结 前言 今天想着想着 看着别人这样 哎还挺好看&#xff0c;就自己弄了 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、上才艺 &…...

安全警告您正在访问危险网站怎么关闭

在上网时&#xff0c;很多人可能遇到过“安全警告&#xff1a;您正在访问危险网站”的提示。这类警告通常由浏览器或安全软件自动弹出&#xff0c;旨在保护用户免受钓鱼网站、恶意软件等潜在安全威胁的侵害。这篇文章将带您了解这种安全警告的来源、关闭提示的步骤以及应采取的…...

群控系统服务端开发模式-应用开发-业务架构逻辑开发第一轮测试

整个系统的第一个层次已经开发完毕&#xff0c;已经有简单的中控&#xff0c;登录、退出、延迟登录时长、黑名单、数据层封装、验证层封装、RSA加解密、Redis等功能&#xff0c;还缺获取个人、角色按钮权限、角色菜单权限功能。角色按钮权限以及角色菜单权限等明后天开发&#…...

git 怎么保留某个文件夹忽略其下面的所有文件?

在 Git 中&#xff0c;如果你想要保留某个文件夹&#xff08;比如 folder/&#xff09;但忽略其下面的所有文件&#xff0c;可以使用 .gitignore 文件来实现。需要注意的是&#xff0c;Git 不会自动创建空目录。因此&#xff0c;为了让 Git 记录这个空目录&#xff0c;你需要在…...

Linux Shell 实现一键部署mariadb11.6

mariadb MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可 MariaDB的目的是完全兼容MySQL,包括API和命令行,使之能轻松成为MySQL的代替品。在存储引擎方面,使用XtraDB来代替MySQL的InnoDB。 MariaDB由MySQL的创始人Michael Widenius主导开发…...

Servlet 3.0 注解开发

文章目录 Servlet3.0注解开发修改idea创建注解的servlet模板内容讲解 关于servlet3.0注解开发的疑问_配置路径省略了属性urlPatterns内容讲解内容小结 Servlet3.0注解开发 【1】问题 说明&#xff1a;之前我们都是使用web.xml进行servlet映射路径的配置。这样配置的弊端&…...

rom定制系列------红米note8_miui14安卓13定制修改固件 带面具root权限 刷写以及界面预览

&#x1f49d;&#x1f49d;&#x1f49d;红米note8机型代码&#xff1a;ginkgo。高通芯片。此固件官方最终版为稳定版12.5.5安卓11的版本。目前很多工作室需要高安卓版本的固件来适应他们的软件。并且需要root权限。根据客户要求。修改固件为完全root。并且修改为可批量刷写的…...

Kaspa钱包ts代码封装

文章目录 1. 配置wasm2. 钱包地址创建3. KAS转账&余额查询4. KRC-20 处理5. 使用demo 1. 配置wasm 下载wasm地址&#xff1a;https://kaspa.aspectron.org/nightly/downloads/ 在项目根目录下添加wasm目录&#xff0c; 将下载的wasm文件中web目录下kaspa和kaspa-dev文件家…...

MySQL 数据库中 MyISAM 和 InnoDB 的区别:深入解析

MySQL 是目前最流行的开源数据库管理系统之一&#xff0c;支持多种存储引擎&#xff0c;其中最常用的就是 MyISAM 和 InnoDB。这两种存储引擎各有其特点&#xff0c;适用于不同的使用场景。理解它们之间的区别有助于数据库开发者和管理者根据应用需求选择合适的存储引擎。本文将…...

python中怎样实现闭包?

在Python中&#xff0c;闭包是指一个函数可以访问其自身范围之外的变量&#xff0c;即可以访问其外部函数作用域中的变量。要实现一个闭包&#xff0c;可以按照以下步骤进行&#xff1a; 内部函数引用外部函数的变量&#xff1a;在外部函数中定义一个内部函数&#xff0c;并在…...

论文阅读:MultiUI 利用网页UI进行丰富文本的视觉理解

《HARNESSING WEBPAGE UIS FOR TEXT-RICH VISUAL UNDERSTANDING》 利用网页UI进行丰富文本的视觉理解 总结 grounding和QA部分的数据集占比较大、同时消融实验显示其作用相对较大&#xff0c;并且grounding部分作用和效果呈现scaling正相关提供了很多web数据处理成多模态训练…...

【云原生】云原生后端详解:架构与实践

目录 引言一、云原生后端的核心概念1.1 微服务架构1.2 容器化1.3 可编排性1.4 弹性和可伸缩性 二、云原生后端的架构示意图三、云原生后端的最佳实践3.1 使用服务网格3.2 监控与日志管理3.3 CI/CD 流水线3.4 安全性 总结参考资料 引言 随着云计算的迅猛发展&#xff0c;云原生…...

MySQL覆盖索引

覆盖索引&#xff08;Covering Index&#xff09;是数据库优化中的一种重要技术 覆盖索引是指一个查询语句在执行时&#xff0c;所需的数据可以完全通过索引来获取&#xff0c;而无需访问实际的数据行。也就是说&#xff0c;查询语句所需的列都包含在了创建的索引中&#xff0c…...

「C/C++」C/C++ 之 循环结构详解

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「C/C」C/C程序设计&#x1f4da;全部专栏「VS」Visual Studio「C/C」C/C程序设计「UG/NX」BlockUI集合「Win」Windows程序设计「DSA」数据结构与算法「UG/NX」NX二次开发「QT」QT5程序设计「File」数据文件格式「PK」Parasoli…...

json-server的基本使用

一、json-server工具的使用 可以快速的搭建符合RESTful API服务。返回符合RESTful规范的数据&#xff1b; 1、全局引入json-server包 npm install -g json-server2、创建json格式的db.json文件 {"jsonData": [{"name": "小明"}] }3、在json文…...

华为配置BFD状态与接口状态联动实验

组网图形 图1 配置BFD状态与接口状态联动组网图 BFD简介配置注意事项组网需求配置思路操作步骤配置文件 BFD简介 为了减小设备故障对业务的影响&#xff0c;提高网络的可靠性&#xff0c;网络设备需要能够尽快检测到与相邻设备间的通信故障&#xff0c;以便及时采取措施&…...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

深度学习习题2

1.如果增加神经网络的宽度&#xff0c;精确度会增加到一个特定阈值后&#xff0c;便开始降低。造成这一现象的可能原因是什么&#xff1f; A、即使增加卷积核的数量&#xff0c;只有少部分的核会被用作预测 B、当卷积核数量增加时&#xff0c;神经网络的预测能力会降低 C、当卷…...

面向无人机海岸带生态系统监测的语义分割基准数据集

描述&#xff1a;海岸带生态系统的监测是维护生态平衡和可持续发展的重要任务。语义分割技术在遥感影像中的应用为海岸带生态系统的精准监测提供了有效手段。然而&#xff0c;目前该领域仍面临一个挑战&#xff0c;即缺乏公开的专门面向海岸带生态系统的语义分割基准数据集。受…...

Java求职者面试指南:Spring、Spring Boot、Spring MVC与MyBatis技术解析

Java求职者面试指南&#xff1a;Spring、Spring Boot、Spring MVC与MyBatis技术解析 一、第一轮基础概念问题 1. Spring框架的核心容器是什么&#xff1f;它的作用是什么&#xff1f; Spring框架的核心容器是IoC&#xff08;控制反转&#xff09;容器。它的主要作用是管理对…...

sshd代码修改banner

sshd服务连接之后会收到字符串&#xff1a; SSH-2.0-OpenSSH_9.5 容易被hacker识别此服务为sshd服务。 是否可以通过修改此banner达到让人无法识别此服务的目的呢&#xff1f; 不能。因为这是写的SSH的协议中的。 也就是协议规定了banner必须这么写。 SSH- 开头&#xff0c…...

LangChain 中的文档加载器(Loader)与文本切分器(Splitter)详解《二》

&#x1f9e0; LangChain 中 TextSplitter 的使用详解&#xff1a;从基础到进阶&#xff08;附代码&#xff09; 一、前言 在处理大规模文本数据时&#xff0c;特别是在构建知识库或进行大模型训练与推理时&#xff0c;文本切分&#xff08;Text Splitting&#xff09; 是一个…...

倒装芯片凸点成型工艺

UBM&#xff08;Under Bump Metallization&#xff09;与Bump&#xff08;焊球&#xff09;形成工艺流程。我们可以将整张流程图分为三大阶段来理解&#xff1a; &#x1f527; 一、UBM&#xff08;Under Bump Metallization&#xff09;工艺流程&#xff08;黄色区域&#xff…...

HTTPS证书一年多少钱?

HTTPS证书作为保障网站数据传输安全的重要工具&#xff0c;成为众多网站运营者的必备选择。然而&#xff0c;面对市场上种类繁多的HTTPS证书&#xff0c;其一年费用究竟是多少&#xff0c;又受哪些因素影响呢&#xff1f; 首先&#xff0c;HTTPS证书通常在PinTrust这样的专业平…...

41道Django高频题整理(附答案背诵版)

解释一下 Django 和 Tornado 的关系&#xff1f; Django和Tornado都是Python的web框架&#xff0c;但它们的设计哲学和应用场景有所不同。 Django是一个高级的Python Web框架&#xff0c;鼓励快速开发和干净、实用的设计。它遵循MVC设计&#xff0c;并强调代码复用。Django有…...