Flink任务提交流程和运行模式
任务提交流程
Flink 的提交流程随着部署模式、资源管理平台的不同,会有不同的变化。这里做进一步的抽象,形成一个大概高视角的任务执行流程图,如下:

Flink按照集群和资源管理的划分运行模式有:Standalone、Flink On Yarn、K8S等。
Standalone
Standalone为独立模式,独立运行,不依赖外部资源调度管理框架。如果资源不足或出现故障,没有自动扩展和重分配的机制,需要手动处理。一般适合开发测试和作业较少的场景。支持的部署模式有:会话部署模式、应用部署模式。不支持PerJob(单作业)部署模式。
会话模式
首先启动集群,然后Web访问JobManager的8081端口提交任务或命令提交,提交任务如下:
cd ${FLINK_HOME}
bin/start-cluster.sh # 启动集群,根据配置文件TM的slot划分成静态的
bin/flink run -m master:8081 -c pers.xxm.flink.MyFlink /tmp/mytask.jar
bin/flink cancel <app_id> # id可通过flink list或UI查看
bin/stop-cluster.sh # 停止集群
再次提交第二个Job时,JobManager和TaskManager还是原来的进程,在JobManager内部会重新启动JobMaster线程,类似Spark的Driver。新的任务继续占用TaskManager的插槽,如果插槽不足任务提交失败。
应用模式
该模式不用启动集群。提交任务如下:
cd ${FLINK_HOME}
mv /tmp/mytask.jar lib/ # 将jar包放入lib目录
bin/standalone-job.sh start --job-classname pers.xxm.flink.MyFlink # JobManager机器执行
bin/taskmanager.sh start # 在所有需要跑TaskManager的机器执行
bin/taskmanager.sh stop # 停止集群,同时作业停止,集群销毁
bin/standalone-job.sh stop # 停止集群
该模式在8081端口也可以看到集群和任务运行。此时如果通过UI取消作业运行集群也会销毁。
Flink On Yarn
Flink集群安装在Hadoop集群上或者下载Flink依赖的Jar包,建议安装在Hadoop集群上,参照环境配置如下:
# 配置环境变量,所有机器
vim /etc/profile.d/my_env.sh # 内容如下4行
HADOOP_HOME=/usr/local/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile.d/my_env.sh
# 启动YARN集群,master运行,当前节点启动RM,在slaves配置的节点启动NM
start-yarn.sh
start-dfs.sh # 如果需要也可启动HDFS
YARN模式根据并行度除以每个TM插槽数向上取整动态申请TM,每个TM的slot参考Flink配置文件。该运行模式下支持的部署包括会话、单作业、应用模式。
会话模式
首先需要申请YARN会话,Yarn Session,然后启动Flink集群。启动会话应用如下:
cd $FLINK_HOME
# 执行后在YARN的8088端口UI查看生成了一个应用
bin/yarn-session.sh -nm my_app
此时在flink-conf.yaml的配置被覆盖即无效。启动后YARN会自动分配JM的机器和端口,在终端日志中会打印JM Web Interface地址,也可通过YARN的界面找到Tracking UI进入JM,这时使用YARN代理的方式进入。
未提交作业时,TM个数为0,因为Flink会根据运行在JM的作业所需slot动态分配TM。可提交多个作业。Job取消或结束后,占用的slot和tm会显示为可用,但过一会会被回收,显示总数和可用都为0。也可通过命令行提交任务到YARN会话,如下:
# 提交时查找/tmp/.yarn-properties-<username>文件,根据该文件对应YARN提交任务
bin/flink run -c pers.xxm.flink.MyFlink /tmp/mytask.jar
# 关闭YARN会话集群,该命令在启动上述会话时已经打印在标准输出中
echo "stop" | bin/yarn-session.sh -id <app_id>
单作业模式
每个作业占用一个YARN应用,即YARN集群,提交方式如下:
# 加上参数-d会推出占用模式在后台运行,CTRL+C退出不会影响集群的运行
bin/flink run -t yarn-per-job -c pers.xxm.flink.MyFlink /tmp/mytask.jar
bin/flink list -t yarn-per-job -Dyarn.application.id=<appid> # 查看集群中作业ID
bin/flink cancel -t yarn-per-job -Dyarn.application.id=<appid> <job_id> # 关闭上面的作业ID
在Flink的UI界面CANCEL任务后,YARN的应用状态变为FINISHED,这种关闭和上面命令行是一样的。
应用模式
Flink-1.11之后加入应用模式,和上个YARN单作业提交区别是此时提交的客户端不做代码解析等操作,这也是推荐的模式。提交方式如下:
bin/flink run-application -t yarn-application -c pers.xxm.flink.MyFlink /tmp/mytask.jar
bin/flink list -t yarn-application -Dyarn.application.id=<appid> # 查看集群中作业ID
bin/flink cancel -t yarn-application -Dyarn.application.id=<appid> <job_id> # 关闭作业ID
YARN模式优化
YARN的工作原理是每次执行任务时,都需要将Flink和用户的Jar包上传到HDFS上,所以在YARN的单作业和应用部署模式下,可以将依赖JAR包先上传到HDFS,然后指定路径,此时每次提交任务不会再次上传jar包到HDFS。
hadoop fs -mkdir /yarn/flink/dist # 创建目录
hadoop fs -mkdir /yarn/flink/jobs # 创建目录
hadoop fs -put lib/ /yarn/flink/dist # 将lib目录上传到dist目录下
hadoop fs -put plugins/ /yarn/flink/dist
hadoop fs -put /tmp/mytask.jar /yarn/flink/jobs # 自己的任务jar包放到jobs目录
# 此时以应用部署模式为例提交任务如下
bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://master:8020/yarn/flink/dist" -c pers.xxm.flink.MyFlink hdfs://master:8020/yarn/flink/jobs/mytask.jar
相关文章:
Flink任务提交流程和运行模式
任务提交流程 Flink 的提交流程随着部署模式、资源管理平台的不同,会有不同的变化。这里做进一步的抽象,形成一个大概高视角的任务执行流程图,如下: Flink按照集群和资源管理的划分运行模式有:Standalone、Flink On…...
【机器学习】 Sigmoid函数:机器学习中的关键激活函数
🌈个人主页: 鑫宝Code 🔥热门专栏: 闲话杂谈| 炫酷HTML | JavaScript基础 💫个人格言: "如无必要,勿增实体" 文章目录 Sigmoid函数:机器学习中的关键激活函数1. 引言2. Sigmoid函数定义3.…...
Vue+Element Plus后台管理主界面搭建实现
续接Django REST Framework,使用Vite构建Vue3的前端项目 1. 后台管理系统主界面框架搭建 后台系统主界面搭建 新建后台管理文件目录 完成后台整体布局 // 1.主界面 index.vue<script setup lang"ts"></script><template><el-…...
JAVA—异常
认识异常,学会从报错信息中发现问题,解决问题。并学会构建自定义异常,提醒编程时注意 目录 1.认识异常 2.自定义异常 1.自定义运行时异常 2.自定义编译时异常 3.异常的处理 1.认识异常 异常就是代表程序出现的问题,用来查询B…...
常见八股面试题:Dubbo 和 Spring Cloud Gateway 有什么区别?
大家好,我是鸭鸭! 此答案节选自鸭鸭最近弄的面试刷题神器面试鸭,更多大厂常问面试题,可以点击进行阅读哈! 目前这个面试刷题神器刚出,有网页和小程序双端可以阅读! 回归面试题! …...
k8s分布式存储-ceph
文章目录 Cephdeploy-ceph部署1.系统环境初始化1.1 修改主机名,DNS解析1.2 时间同步1.3 配置apt基础源与ceph源1.4关闭selinux与防火墙1.5 **创建** ceph **集群部署用户** cephadmin1.6分发密钥 2. ceph部署2.1 **安装** ceph 部署工具2.2 **初始化** mon **节点**…...
Redis cluster集群部署
redis搭建集群模式、Cluster模式(6节点,3主3从集群模式,添加删除节点)_redis cluster节点带数据增减-CSDN博客...
Java泛型的理解
前言 泛型是Java中一个比较重要的特性,是于JDK5引入新特性,其主要目的是为了提供编译时的类型安全检测机制和简化代码。本文主要探讨一下泛型的使用。 假如说没有泛型 假如说没有泛型,可以举一个例子: ArrayList list new Ar…...
Linux 照片图像编辑器
前言 照片图像编辑器是一种软件程序,它允许用户对数字照片或图像进行各种编辑和修改。以下是一些常见的功能及其解释: 裁剪与旋转 : 裁剪:移除图像的某些部分,以改善构图或符合特定尺寸要求。旋转:改变图像的方向,可以校正歪斜的照片或者为了艺术效果而旋转。调整亮度…...
【51单片机仿真】基于51单片机设计的智能六位密码锁(匿*输入/密码修改/警示/保存/恢复/初始密码)源码仿真设计文档演示视频——文末资料下载
基于51单片机设计的智能六位密码锁 演示视频 基于51单片机设计的智能六位密码锁 功能简介 - 能够从键盘中输入密码,并相应地在显示器上显示"*" - 能够判断密码是否正确,正确则开锁,错误则输出相应信息 - 能够实现密码的修改 -…...
【Vue3】组件通信之mitt
【Vue3】组件通信之mitt 背景简介开发环境开发步骤及源码总结 背景 随着年龄的增长,很多曾经烂熟于心的技术原理已被岁月摩擦得愈发模糊起来,技术出身的人总是很难放下一些执念,遂将这些知识整理成文,以纪念曾经努力学习奋斗的日…...
状态压缩动态规划——状压dp
状压dp:意思是将状态进行压缩,从而更容易地写出状态转移方程 通常做法:将每个状态(一个集合)用二进制表示,每个位的1就代表着这个编号的元素存在,0就代表着这个编号的元素不存在,如…...
【算法】最短路径算法思路小结
一、基础:二叉树的遍历->图的遍历 提到搜索算法,就不得不说两个最基础的思想: BFS(Breadth First Search)广度优先搜索 DFS(Depth First Search)深度优先搜索 刚开始是在二叉树遍历中接触这…...
zabbix7.0TLS-05-快速入门-触发器
文章目录 1 概述2 查看主机的触发器3 添加触发器3.1 触发器配置项介绍3.2 扩展文档3.2.1 关于配置项中每个键值返回值的说明3.2.2 触发器函数文档 4 验证触发器5 问题5.1 查了问题总列表5.2 查看问题详情5.3 更新处理问题5.4 查看已经处理的问题 6 问题恢复 1 概述 监控项用于…...
vue关于双向数据绑定的骚操作
组件传值大家都知道 直接上代码 computed: {optionModel: {get() {return this.selectedWidget.options;},set(newValue) {this.selectedWidget.options newValue;}}} 我们将optionModel传递给子组件 子组件可以直接修改props 来实现双向数据绑定 但是正常来时我们是不能修…...
基于Jeecgboot3.6.3的vue3版本的流程中仿钉钉流程的鼠标拖动功能支持
因为这个项目license问题无法开源,更多技术支持与服务请加入我的知识星球。 1、因为原先仿钉钉流程里不能进行鼠标拖动来查看流程,所以根据作者提供的信息进行修改,在hooks下增加下面文件useDraggableScroll.ts import { ref, onMounted, on…...
Docker Compse单机编排
一.Docker Compse 介绍 Docker Compose 是一个用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,你可以使用 YAML 文件来配置应用程序的服务、网络和卷,然后使用单个命令创建和启动所有服务。这使得在开发、测试和部署过程中管理多容器应用程…...
“AI+Security”系列第2期(一):对抗!大模型自身安全的攻防博弈
近日,由安全极客、Wisemodel 社区和 InForSec 网络安全研究国际学术论坛联合主办的“AISecurity”系列第 2 期——对抗!大模型自身安全的攻防博弈线上活动如期举行。本次活动邀请了君同未来创始人兼 CEO 韩蒙、前阿里云高级安全专家郑瀚、ChaMd5 AI 组负…...
Python Static Typing: 提升代码可靠性与可读性的使用技巧
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storm…...
Datawhale多模态赛事(1)
赛事说明:https://tianchi.aliyun.com/competition/entrance/532251/introduction?spma2c22.12281925.0.0.2f307137p8qZmp 学习平台:https://linklearner.com/home 第一天 1.报名赛道学习赛事 https://tianchi.aliyun.com/competition/entrance/53225…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
4. TypeScript 类型推断与类型组合
一、类型推断 (一) 什么是类型推断 TypeScript 的类型推断会根据变量、函数返回值、对象和数组的赋值和使用方式,自动确定它们的类型。 这一特性减少了显式类型注解的需要,在保持类型安全的同时简化了代码。通过分析上下文和初始值,TypeSc…...
作为测试我们应该关注redis哪些方面
1、功能测试 数据结构操作:验证字符串、列表、哈希、集合和有序的基本操作是否正确 持久化:测试aof和aof持久化机制,确保数据在开启后正确恢复。 事务:检查事务的原子性和回滚机制。 发布订阅:确保消息正确传递。 2、性…...
ThreadLocal 源码
ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物,因为每个访问一个线程局部变量的线程(通过其 get 或 set 方法)都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段,这些类希望将…...
内窥镜检查中基于提示的息肉分割|文献速递-深度学习医疗AI最新文献
Title 题目 Prompt-based polyp segmentation during endoscopy 内窥镜检查中基于提示的息肉分割 01 文献速递介绍 以下是对这段英文内容的中文翻译: ### 胃肠道癌症的发病率呈上升趋势,且有年轻化倾向(Bray等人,2018&#x…...
k8s从入门到放弃之Pod的容器探针检测
k8s从入门到放弃之Pod的容器探针检测 在Kubernetes(简称K8s)中,容器探测是指kubelet对容器执行定期诊断的过程,以确保容器中的应用程序处于预期的状态。这些探测是保障应用健康和高可用性的重要机制。Kubernetes提供了两种种类型…...
统计学(第8版)——统计抽样学习笔记(考试用)
一、统计抽样的核心内容与问题 研究内容 从总体中科学抽取样本的方法利用样本数据推断总体特征(均值、比率、总量)控制抽样误差与非抽样误差 解决的核心问题 在成本约束下,用少量样本准确推断总体特征量化估计结果的可靠性(置…...
java+webstock
maven依赖 <dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.3.5</version></dependency><dependency><groupId>org.apache.tomcat.websocket</groupId&…...
