当前位置: 首页 > news >正文

使用java远程提交flink任务到yarn集群

使用java远程提交flink任务到yarn集群

背景

由于业务需要,使用命令行的方式提交flink任务比较麻烦,要么将后端任务部署到大数据集群,要么弄一个提交机,感觉都不是很离线。经过一些调研,发现可以实现远程的任务发布。接下来就记录一下实现过程。这里用flink on yarn 的Application模式实现

环境准备

  • 大数据集群,只要有hadoop就行
  • 后端服务器,linux mac都行,windows不行

正式开始

1. 上传flink jar包到hdfs

去flink官网下载你需要的版本,我这里用的是flink-1.18.1,把flink lib目录下的jar包传到hdfs中。

在这里插入图片描述
其中flink-yarn-1.18.1.jar需要大家自己去maven仓库下载。

2. 编写一段flink代码

随便写一段flink代码就行,我们目的是测试

package com.azt;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.concurrent.TimeUnit;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> ctx) throws Exception {String[] words = {"spark", "flink", "hadoop", "hdfs", "yarn"};Random random = new Random();while (true) {ctx.collect(words[random.nextInt(words.length)]);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {}});source.print();env.execute();}
}

3. 打包第二步的代码,上传到hdfs

在这里插入图片描述

4. 拷贝配置文件

  • 拷贝flink conf下的所有文件到java项目的resource中
  • 拷贝hadoop配置文件到到java项目的resource中

具体看截图
在这里插入图片描述

5. 编写java远程提交任务的程序

这一步有个注意的地方就是,如果你跟我一样是windows电脑,那么本地用idea提交会报错;如果你是mac或者linux,那么可以直接在idea中提交任务。

package com.test;import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;/*** @date :2021/5/12 7:16 下午*/
public class Main {public static void main(String[] args) throws Exception {///home/root/flink/lib/libSystem.setProperty("HADOOP_USER_NAME","root");
//        String configurationDirectory = "C:\\project\\test_flink_mode\\src\\main\\resources\\conf";String configurationDirectory = "/export/server/flink-1.18.1/conf";org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());String flinkLibs = "hdfs://node1.itcast.cn/flink/lib";String userJarPath = "hdfs://node1.itcast.cn/flink/user-lib/original.jar";String flinkDistJar = "hdfs://node1.itcast.cn/flink/lib/flink-yarn-1.18.1.jar";YarnClient yarnClient = YarnClient.createYarnClient();YarnConfiguration yarnConfiguration = new YarnConfiguration();yarnClient.init(yarnConfiguration);yarnClient.start();YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);//获取flink的配置Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);flinkConfiguration.set(PipelineOptions.JARS,Collections.singletonList(userJarPath));YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,configurationDirectory);Path remoteLib = new Path(flinkLibs);flinkConfiguration.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(remoteLib.toString()));flinkConfiguration.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);//设置为application模式flinkConfiguration.set(DeploymentOptions.TARGET,YarnDeploymentTarget.APPLICATION.getName());//yarn application nameflinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobname");//设置配置,可以设置很多flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);flinkConfiguration.setInteger("parallelism.default", 4);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();//		设置用户jar的参数和主类ApplicationConfiguration appConfig = new ApplicationConfiguration(args,"com.azt.WordCount");YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration,yarnConfiguration,yarnClient,clusterInformationRetriever,true);ClusterClientProvider<ApplicationId> clusterClientProvider = null;try {clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification,appConfig);} catch (ClusterDeploymentException e){e.printStackTrace();}ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();System.out.println(clusterClient.getWebInterfaceURL());ApplicationId applicationId = clusterClient.getClusterId();System.out.println(applicationId);Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();int counts = 30;while (jobStatusMessages.size() == 0 && counts > 0) {Thread.sleep(1000);counts--;jobStatusMessages = clusterClient.listJobs().get();if (jobStatusMessages.size() > 0) {break;}}if (jobStatusMessages.size() > 0) {List<String> jids = new ArrayList<>();for (JobStatusMessage jobStatusMessage : jobStatusMessages) {jids.add(jobStatusMessage.getJobId().toHexString());}System.out.println(String.join(",",jids));}}
}

由于我这里是windows电脑,所以我打包放到服务器上去运行
执行命令 :

java -cp test_flink_mode-1.0-SNAPSHOT.jar com.test.Main

不出以外的话,会打印如下日志

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
http://node2:33811
application_1715418089838_0017
6d4d6ed5277a62fc9a3a274c4f34a468

复制打印的url连接,就可以打开flink的webui了,在yarn的前端页面中也可以看到flink任务。

相关文章:

使用java远程提交flink任务到yarn集群

使用java远程提交flink任务到yarn集群 背景 由于业务需要&#xff0c;使用命令行的方式提交flink任务比较麻烦&#xff0c;要么将后端任务部署到大数据集群&#xff0c;要么弄一个提交机&#xff0c;感觉都不是很离线。经过一些调研&#xff0c;发现可以实现远程的任务发布。…...

麻了!新增4.1分,CCF-C类,2区毕业神刊,被标记On Hold!

本周投稿推荐 SSCI • 2区社科类&#xff0c;3.0-4.0&#xff08;社科均可&#xff09; EI • 计算机工程类&#xff08;接收广&#xff0c;录用极快&#xff09; SCI&EI • 4区生物医学类&#xff0c;1.5-2.0&#xff08;录用率99%&#xff09; • 1区工程类&#…...

tomcat 的启动流程

tomcat 的启动流程 中 使用的Lifecycle 生命流程 。在这里还使用了设计模式中的模板模式&#xff08;LifecycleBase 是一个模板类&#xff09; init&#xff08;&#xff09;方法 start() 方法 container 的处理...

YOLOv9全网最新改进系列::YOLOv9完美融合双卷积核(DualConv)来构建轻量级深度神经网络,目标检测模型有效涨点神器!!!

YOLOv9全网最新改进系列&#xff1a;&#xff1a;YOLOv9完美融合双卷积核&#xff08;DualConv&#xff09;来构建轻量级深度神经网络,目标检测模型有效涨点神器&#xff01;&#xff01;&#xff01; YOLOv9原文链接戳这里&#xff0c;原文全文翻译请关注B站Ai学术叫叫首er …...

PCIE协议-2-事务层规范-MEM/IO/CFG request rules

2.2.7 内存、I/O和配置请求规则 以下规则适用于所有内存、I/O和配置请求。每种类型的请求还有特定的额外规则。 所有内存、I/O和配置请求除了常见的头标字段外&#xff0c;还包括以下字段&#xff1a;requester ID[15:0]和Tag[9:0]&#xff0c;形成事务ID。Last DW BE[3:0] a…...

jmeter分布式集群压测

目的&#xff1a;通过多台机器同时运行 性能压测 脚本&#xff0c;模拟更好的并发压力 简单点&#xff1a;就是一个人&#xff08;控制机controler/调度机 master&#xff09;做一个项目的时候&#xff0c;压力有点大&#xff0c;会导致结果不理想&#xff0c;这时候找几个人&a…...

美国加州正测试ChatGPT等生成式AI,在4大部门应用

5月11日&#xff0c;美联社消息&#xff0c;美国加州政府正在测试ChatGPT等生成式AI&#xff0c;应用在税收和收费管理部、交通部、公共卫生部以及卫生与公众服务部4大部门。 测试时间6个月&#xff0c;为其提供技术支持的一共有5家公司&#xff0c;分别是OpenAI、Anthropic、…...

【Kali Linux工具篇】wpscan的基本介绍与使用

介绍 WPScan是Kali Linux默认自带的一款漏洞扫描工具&#xff0c;它采用Ruby编写&#xff0c;能够扫描WordPress网站中的多种安全漏洞&#xff0c;其中包括主题漏洞、插件漏洞和WordPress本身的漏洞。最新版本WPScan的数据库中包含超过18000种插件漏洞和2600种主题漏洞&#x…...

C#算法之计数排序

算法释义&#xff1a;计数排序是一种非基于比较的排序算法&#xff0c;它不依赖于比较操作来确定元素的顺序&#xff0c;而是通过键值索引直接确定元素的输出位置。计数排序适用于一定范围内的整数排序。为什么说是一定范围之内呢&#xff1f;原因如下&#xff1a;计数排序的复…...

EasyExcel简单使用

EasyExcel简单使用 ​ 之前一直用的Apache POI来做数据的导入导出&#xff0c;但听说阿里的EasyExcel也拥有POI的功能的同时&#xff0c;在处理大数据量的导入导出的时候性能上比POI更好&#xff0c;所以就来尝试使用一下 导入Maven依赖&#xff1a; <dependency><…...

Notes客户端中的漫游功能

大家好&#xff0c;才是真的好。 故事&#xff0c;首先是从一个小图标开始的&#xff0c;很多人问我Domino公共通讯录中&#xff0c;个人文档前面有一个绿色小球图标&#xff0c;这是什么意思&#xff1f; 我的答案&#xff1a;这是Notes客户端中的漫游功能。 说到漫游&…...

为什么要内存对齐?

首先&#xff0c;我们介绍一下结构体内存对齐的规则&#xff1a; 1.第一个成员在与结构体偏移量为0的地址处。 2.其他成员变量要对其到某个数字&#xff08;对齐数&#xff09;的整数倍的地址处。 注&#xff1a;对齐数编译器默认的一个对齐数与该成员大小的较小值&#xff…...

23、Flink 的 Savepoints 详解

Savepoints 1.什么是 Savepoints Savepoint 是依据 Flink checkpointing 机制所创建的流作业执行状态的镜像&#xff0c;可以使用 Savepoint 进行 Flink 作业的停止、重启或更新。 Savepoint 由两部分组成&#xff1a;稳定存储&#xff08;例如 HDFS&#xff0c;S3&#xff…...

【Unity】Unity项目转抖音小游戏(二)云数据库和云函数

业务需求&#xff0c;开始接触一下抖音小游戏相关的内容&#xff0c;开发过程中记录一下流程。 抖音云官方文档&#xff1a;https://developer.open-douyin.com/docs/resource/zh-CN/developer/tools/cloud/develop-guide/cloud-function-debug 1.开通抖音云环境 抖音云地址&a…...

SpringBoot集成jasypt对yml文件指定参数加密并自定义@bean隐藏密钥

1、查看SpringBoot和jasypt对应版本。 Jasypt 1.9.x 通常与 Spring Boot 1.5.x 相对应。 Jasypt 2.1.x 通常与 Spring Boot 2.0.x 相对应。 Jasypt 3.x 通常与 Spring Boot 2.1.x相对应。 2、引入maven <dependency><groupId>com.github.ulisesbocchio</groupI…...

GDB的使用

即目标机直接使用GDB调试 源码安装&#xff1a; Index of /gnu/gdb 或者 wget https://ftp.gnu.org/gnu/gdb/gdb-8.3.1.tar.gz ./configure make main install 编译报错解决方法&#xff1a; 解决编译安装gdb-10.1 unistd.h:663:3: error: #error “Please include con…...

Linux处理用户输入

目录 一、传递参数 1.1 读取参数 1.2 读取脚本名 二、跟踪参数 三、移动参数 四、处理选项 4.1 查找选项 4.1.1 处理简单选项 4.1.2 分离参数和选项 4.1.3 处理含值的选项 五、选项标准化 5.1 使用 getopt 命令 5.1.1 命令格式 5.1.2 在脚本中使用getopt 5.2 使用…...

【代码笔记】高并发场景下问题解决思路

高并发指的是在单位时间内&#xff0c;瞬时流量激增&#xff0c;系统需要同时处理大量并行的请求或操作。这种情况通常出现在面向大量用户或服务的分布式系统中&#xff0c;尤其是当用户请求高度集中时&#xff0c;比如促销活动、秒杀活动、注册抢课、热点事件、定时任务调度等…...

【Docker系列】Linux部署Docker Compose

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...

基于SSM的文化遗产的保护与旅游开发系统(有报告)。Javaee项目。ssm项目。

演示视频&#xff1a; 基于SSM的文化遗产的保护与旅游开发系统&#xff08;有报告&#xff09;。Javaee项目。ssm项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构&#xff0c;…...

idea大量爆红问题解决

问题描述 在学习和工作中&#xff0c;idea是程序员不可缺少的一个工具&#xff0c;但是突然在有些时候就会出现大量爆红的问题&#xff0c;发现无法跳转&#xff0c;无论是关机重启或者是替换root都无法解决 就是如上所展示的问题&#xff0c;但是程序依然可以启动。 问题解决…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻

在如今就业市场竞争日益激烈的背景下&#xff0c;越来越多的求职者将目光投向了日本及中日双语岗位。但是&#xff0c;一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧&#xff1f;面对生疏的日语交流环境&#xff0c;即便提前恶补了…...

Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)

概述 在 Swift 开发语言中&#xff0c;各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过&#xff0c;在涉及到多个子类派生于基类进行多态模拟的场景下&#xff0c;…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

淘宝扭蛋机小程序系统开发:打造互动性强的购物平台

淘宝扭蛋机小程序系统的开发&#xff0c;旨在打造一个互动性强的购物平台&#xff0c;让用户在购物的同时&#xff0c;能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机&#xff0c;实现旋转、抽拉等动作&#xff0c;增…...

十九、【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建

【用户管理与权限 - 篇一】后端基础:用户列表与角色模型的初步构建 前言准备工作第一部分:回顾 Django 内置的 `User` 模型第二部分:设计并创建 `Role` 和 `UserProfile` 模型第三部分:创建 Serializers第四部分:创建 ViewSets第五部分:注册 API 路由第六部分:后端初步测…...

Modbus RTU与Modbus TCP详解指南

目录 1. Modbus协议基础 1.1 什么是Modbus? 1.2 Modbus协议历史 1.3 Modbus协议族 1.4 Modbus通信模型 🎭 主从架构 🔄 请求响应模式 2. Modbus RTU详解 2.1 RTU是什么? 2.2 RTU物理层 🔌 连接方式 ⚡ 通信参数 2.3 RTU数据帧格式 📦 帧结构详解 🔍…...

Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践

前言&#xff1a;本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中&#xff0c;跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南&#xff0c;你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案&#xff0c;并结合内网…...

Linux基础开发工具——vim工具

文章目录 vim工具什么是vimvim的多模式和使用vim的基础模式vim的三种基础模式三种模式的初步了解 常用模式的详细讲解插入模式命令模式模式转化光标的移动文本的编辑 底行模式替换模式视图模式总结 使用vim的小技巧vim的配置(了解) vim工具 本文章仍然是继续讲解Linux系统下的…...

Canal环境搭建并实现和ES数据同步

作者&#xff1a;田超凡 日期&#xff1a;2025年6月7日 Canal安装&#xff0c;启动端口11111、8082&#xff1a; 安装canal-deployer服务端&#xff1a; https://github.com/alibaba/canal/releases/1.1.7/canal.deployer-1.1.7.tar.gz cd /opt/homebrew/etc mkdir canal…...