当前位置: 首页 > news >正文

如何用代码提交spark任务并且获取任务权柄

在国内说所有可能有些绝对,因为确实有少数大厂技术底蕴确实没的说能做出自己的东西,但其他的至少95%数据中台平台研发方案,都是集群中有一个持久化的程序,来接收任务信息,并向集群提交任务同时获取任务的权柄,把任务的appid和日志通过套接字的方式向外提供。

对于spark任务来说无非就是两种形式,要不传过来的是个jar包,要不就是一个sql语句,其他的就是一些任务参数,整体上就和正常的web项目开发没太大差别,不同的就是服务端是以哪种方式处理任务的提交的,给大家分享我经历过的项目中用过的三种处理方式,当然这不是全部,业内确实有真东西,不过人家不开源罢了。

第一种:spark官方提供的SparkLauncherAPI

这种方式,你可以在b站上常见,但是它使用起来限制特别大,我也只用过一次,而且还是在尝试阶段就被放弃了,感觉就和一个半成品一样,最让人难受的一点是它监听任务的最终状态是4个独立的不可变枚举值,而监听程序会终止在第一个触发到的不可变枚举值,就是说如果任务先进入了完成状态,但它的最终状态是失败,那权柄只能生效到完成阶段,后面就监听不到了,观察过源码到时找到了底层更新状态的依据,但是属于受保护包下,不能被public直接调用,还有很多其他的坑,所以说像个半成品

使用它,首先导入SparkLauncherAPI的依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-launcher_2.11</artifactId> <!-- 这里要替换为你的 scala 同版本 ,spark-launcher版本不同,支持的scala也不同具体去maven官方仓库中看--><version>2.1.1</version> <!--SparkLauncherAPI要和你用的 Spark 同版本 -->
</dependency>

随后它的提交任务代码如下

package com.wy;import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;import java.io.*;
import java.util.concurrent.CountDownLatch;public class Main {public static void main(String[] args) throws IOException, InterruptedException {// 使用SparkLauncher提交任务,将任务需要的环境进行封装//这里只是核心使用需要的设置,sparkLauncher还支持其他的方法设置其他的内容需要的自己看SparkLauncher sparkLauncher = new SparkLauncher().setSparkHome("D:\\mydevtool\\spark-2.1.1-hadoop2.7")//这个程序最终运行的服务器需要同时存在一个spark的install路径.setAppResource("D:\\test\\myscala-1.0.jar") //你的任务jar包.setMainClass("test.Test1")//任务主类.setMaster("local")//master 或者 yarn.setAppName("代码提交")//任务名称.addAppArgs("D:\\test\\123.txt")//任务的主类入参,这里是个可变参.setVerbose(false);/*这里注释是写一个伪代码,意在你可以处理出任务运行配置,比如内存资源等,传递给sparkLauncherfor (Map.Entry<String, String> conf : otherConfigParams.entrySet()) {sparkLauncher.setConf(conf.getKey(), conf.getValue());}*//*这里注释是写一个伪代码,意在你可以处理出任务主类入参后传递给sparkLauncherif (mainParams.length != 0) {launcher.addAppArgs(mainParams);}*///同步时,必须使用CountDownLatch 不然监听程序是异步的,拿不到任务权柄CountDownLatch countDownLatch = new CountDownLatch(1);// 启动应用并获取任务权柄,并传入一个监听类,监听任务不同状态时的事件SparkAppHandle sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() {//任务运行状态改变的时候触发的操作@Overridepublic void stateChanged(SparkAppHandle sparkAppHandle) {//状态发生变更时,此时将任务id拿出来if ( sparkAppHandle.getAppId() != null ){System.out.println("任务状态:"+sparkAppHandle.getState().toString());System.out.println("任务ID:"+sparkAppHandle.getAppId());}//诸如此类,你可以按照你的需求定义任务不同状态下要干的事if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.RUNNING)==0){System.out.println("任务开始运行");}//诸如此类,你可以按照你的需求定义任务不同状态下要干的事if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.FINISHED)==0){System.out.println("任务正常完成");countDownLatch.countDown();}if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.FAILED)==0){System.out.println("任务发生错误");countDownLatch.countDown();}if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.KILLED)==0){System.out.println("任务被终止");countDownLatch.countDown();}}//任务的上下文发生变动时的事件,一般不用@Overridepublic void infoChanged(SparkAppHandle sparkAppHandle) {}});//这里的日志的代码,但是输出不能放在这里,按照整体来讲应该要有一个阻塞方法任务开始运行获取流,它实际使用起来发现只有放在监听里面才能正常获取日志,但是开头也说了,监听的生命周期有问题,这也是最终放弃使用的原因之一,你要是只想体验一下,把这部分代码放在监听中在开始run的状态下开始输出就行BufferedReader reader = null;try{String line;reader = new BufferedReader(new InputStreamReader(sparkLauncher.launch().getInputStream(),"UTF-8"));while ((line = reader.readLine()) != null) {System.out.println("日志流在输出:"+line);}} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);} catch (IOException e) {throw new RuntimeException(e);} finally {}countDownLatch.await();}
}

第二种:Runtime直接启动脚本

这种方式是最简单的,也是大部分中小项目用的方式,和第一种方式一样的是程序运行在集群中,将submit的日志直返回,使用方通过判断的方式处理出任务的appid和url

public static void main(String[] args) {String command = "spark-submit --master yarn .......";Process p = null;String line = null;try {p = Runtime.getRuntime().exec(command);BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));while ((line = br.readLine()) != null){//这里将日志输出出去}br.close();} catch (IOException e) {e.printStackTrace();}}

第三种:kyuubi

在开源spark之上,业内有一个叫kyuubi的二次开发引擎,专门做sql查询的,它对sql开发做了很多优化,比如文件聚合、最终一次数据整理等,此外提供了专门的接口来查询任务的信息,一般不投入大成本又能达到效果的就用kyuubi了

相关文章:

如何用代码提交spark任务并且获取任务权柄

在国内说所有可能有些绝对&#xff0c;因为确实有少数大厂技术底蕴确实没的说能做出自己的东西&#xff0c;但其他的至少95%数据中台平台研发方案&#xff0c;都是集群中有一个持久化的程序&#xff0c;来接收任务信息&#xff0c;并向集群提交任务同时获取任务的权柄&#xff…...

关于Mac中的shell

1 MacOS中的shell 介绍&#xff1a; 在 macOS 系统中&#xff0c;Shell 是命令行与系统交互的工具&#xff0c;用于执行命令、运行脚本和管理系统。macOS 提供了多种 Shell&#xff0c;主要包括 bash 和 zsh。在 macOS Catalina&#xff08;10.15&#xff09;之前&#xff0c…...

【npm依赖包介绍】借助rimraf依赖包,在用npm run build构建项目时,清空dist目录,避免新旧混合

文章目录 背景如何使用附上rimraf的介绍和说明主要作用使用场景安装使用示例异步删除同步删除 参考资料 背景 在npm run build时&#xff0c;一般都会清空项目中已有的dist目录再构建&#xff0c;避免新旧混合。 如何使用 可以简单使用rimraf这个npm依赖包。 目前rimraf的最…...

爬虫学习记录

1.概念 通过编写程序,模拟浏览器上网,然后让其去互联网上抓取数据的过程 通用爬虫:抓取的是一整张页面数据聚焦爬虫:抓取的是页面中的特定局部内容增量式爬虫:监测网站中数据更新的情况,只会抓取网站中最新更新出来的数据 robots.txt协议: 君子协议,网站后面添加robotx.txt…...

Java Spring Boot实现基于URL + IP访问频率限制

点击下载《Java Spring Boot实现基于URL IP访问频率限制(源代码)》 1. 引言 在现代 Web 应用中&#xff0c;接口被恶意刷新或暴力请求是一种常见的攻击手段。为了保护系统资源&#xff0c;防止服务器过载或服务不可用&#xff0c;需要对接口的访问频率进行限制。本文将介绍如…...

C4D2025 win版本安装完无法打开,提示请将你的maxon App更新至最新版本,如何解决

最近安装C4D2025 win版本时&#xff0c;明明按步骤安装完成&#xff0c;结果打开提示提示请将你的maxon App更新至最新版本&#xff1f;遇到这种情况该如何解决呢。 一开始我的思路以为是旧版本没有删除干净&#xff0c;所以将电脑里有关maxon的软件插件都卸载了&#xff0c;重…...

微信小程序实现登录注册

文章目录 1. 官方文档教程2. 注册实现3. 登录实现4. 关于作者其它项目视频教程介绍 1. 官方文档教程 https://developers.weixin.qq.com/miniprogram/dev/framework/路由跳转的几种方式&#xff1a; https://developers.weixin.qq.com/miniprogram/dev/api/route/wx.switchTab…...

SpringBoot环境和Maven配置

SpringBoot环境和Maven配置 1. 环境准备2. Maven2.1 什么是Maven2.2 为什么要学 Maven2.3 创建一个 Maven项目2.4 Maven核心功能2.4.1 项目构建2.4.2 依赖管理2.4.3 Maven Help插件 2.5 Maven 仓库2.5.1本地仓库2.5.2 中央仓库2.5.3 私有服务器, 也称为私服 2.6 Maven设置国内源…...

大语言模型训练所需的最低显存,联邦大语言模型训练的传输优化技术

联邦大语言模型训练的传输优化技术 目录 联邦大语言模型训练的传输优化技术大语言模型训练所需的最低显存大语言模型训练所需的最低显存 基于模型微调、压缩和分布式并行处理的方法,介绍了相关开源模型及技术应用 核心创新点 多维度优化策略:综合运用基于模型微调、模型压缩和…...

1.07 标准IO

1.思维导图 2.先编写以下结构体 struct Student { char name[20]&#xff1b; double math&#xff1b; double chinese&#xff1b; double english&#xff1b; double physical&#xff1b; double chemical&#xff1b; double…...

恒压恒流原边反馈控制芯片 CRE6289F

CRE6289F 系列产品是一款内置高压 MOS 功率开关管的高性能多模式原边控制的开关电源芯片。较少的外围元器件、较低的系统成本设计出高性能的交直流转换开关电源。CRE6289F 系列产品提供了极为全面和性能优异的智能化保护功能&#xff0c;包括逐周期过流保护、软启动、芯片过温保…...

Java中线程中断的几种方式,你了解吗?

Java中线程&#xff0c;可以使用 interrupt() 方法来实现线程的中断&#xff0c;那么&#xff0c;线程中中断的方式有几种呢&#xff1f;接下来&#xff0c;我们将介绍3种不同的线程中断方式&#xff0c;跟随我们的脚步&#xff0c;一起去看看&#xff01; 目录 第一招&#xf…...

Tesseract5.4.0自定义LSTM训练

准备jTessBoxEditor&#xff0c;然后配置环境变量。 1、将图片转换成tif格式的&#xff0c;这里需要用画图工具另存为&#xff1b; 2、生成box文件 执行命令&#xff1a; tesseract agv.normal.exp1.tif agv.normal.exp1 -l eng --psm 6 batch.nochop makebox 关于box文件…...

centOS7

特殊权限 set_uid 赋予所有者身份 chmod us 文件 set_gid 赋予所有组身份 chmod gs 文件/目录 sticky_bit 防火墙 firewall-cmd 开启端口 firewall-cmd --zonepublic --add-port8080/tcp --permanent 重启防火墙 systemctl restart firewalld 查看开启的所有端口 fi…...

HTML5 弹跳动画(Bounce Animation)详解

HTML5 弹跳动画&#xff08;Bounce Animation&#xff09;详解 弹跳动画是一种动态效果&#xff0c;使元素在出现或消失时看起来像是在跳动。这种效果可以通过 CSS 动画或 JavaScript 来实现&#xff0c;增强用户体验。 1. 使用 CSS 实现弹跳动画 可以使用 CSS 的 keyframes…...

4.1.3 串

文章目录 串的基本概念串的基本操作串的存储结构 串的基本概念 串&#xff0c;仅由字符构成的有限序列。 串长&#xff1a;串中的字符个数。空串&#xff1a;长度为0的串。空格串&#xff1a;一个或多个空格构成的串。子串&#xff1a;串中任意长度连续字符构成的序列。含有字…...

国产编辑器EverEdit - 两种删除空白行的方法

1 使用技巧&#xff1a;删除空白行 1.1 应用场景 用户在编辑文档时&#xff0c;可能会遇到很多空白行需要删除的情况&#xff0c;比如从网页上拷贝文字&#xff0c;可能就会存在大量的空白行要删除。 1.2 使用方法 1.2.1 方法1&#xff1a; 使用编辑主菜单 选择主菜单编辑 …...

1月7日星期二今日早报简报微语报早读

1月7日星期二&#xff0c;农历腊月初八&#xff0c;早报#微语早读。 1、公安部&#xff1a;已为一线民警配备执法记录仪130万余部&#xff0c;规范现场执法&#xff1b; 2、浙江提出2035年全省域基本实现共同富裕&#xff1b; 3、“汕头牛肉丸”有新标准&#xff01;1月6日起…...

随机置矩阵列为0[矩阵乘法pytorch版]

文章目录 1. 举例&#xff1a;2. python 代码 1. 举例&#xff1a; A [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 ] , r a n d [ 0 , 5 , 2 ] → A [ 0 1 0 3 4 0 6 7 0 9…...

C# 中mysql数据库,已经在原有数据库升级数据库脚本,去管理可以一次,和多次执行的,nuget包

在C#中&#xff0c;如果你需要管理数据库升级脚本&#xff0c;并且希望这些脚本能够支持一次执行和多次执行&#xff08;即幂等性&#xff09;&#xff0c;你可以使用一些现成的NuGet包来简化这个过程。以下是一些常用的NuGet包&#xff1a; 1. DbUp 描述: DbUp 是一个轻量级…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

在四层代理中还原真实客户端ngx_stream_realip_module

一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡&#xff08;如 HAProxy、AWS NLB、阿里 SLB&#xff09;发起上游连接时&#xff0c;将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后&#xff0c;ngx_stream_realip_module 从中提取原始信息…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

【论文阅读28】-CNN-BiLSTM-Attention-(2024)

本文把滑坡位移序列拆开、筛优质因子&#xff0c;再用 CNN-BiLSTM-Attention 来动态预测每个子序列&#xff0c;最后重构出总位移&#xff0c;预测效果超越传统模型。 文章目录 1 引言2 方法2.1 位移时间序列加性模型2.2 变分模态分解 (VMD) 具体步骤2.3.1 样本熵&#xff08;S…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信 BLE Mesh协议的拓扑结构 定向转发机制

目录 节点的功能承载层&#xff08;GATT/Adv&#xff09;局限性&#xff1a; 拓扑关系定向转发机制定向转发意义 CG 节点的功能 节点的功能由节点支持的特性和功能决定。所有节点都能够发送和接收网格消息。节点还可以选择支持一个或多个附加功能&#xff0c;如 Configuration …...

破解路内监管盲区:免布线低位视频桩重塑停车管理新标准

城市路内停车管理常因行道树遮挡、高位设备盲区等问题&#xff0c;导致车牌识别率低、逃费率高&#xff0c;传统模式在复杂路段束手无策。免布线低位视频桩凭借超低视角部署与智能算法&#xff0c;正成为破局关键。该设备安装于车位侧方0.5-0.7米高度&#xff0c;直接规避树枝遮…...

leetcode73-矩阵置零

leetcode 73 思路 记录 0 元素的位置&#xff1a;遍历整个矩阵&#xff0c;找出所有值为 0 的元素&#xff0c;并将它们的坐标记录在数组zeroPosition中置零操作&#xff1a;遍历记录的所有 0 元素位置&#xff0c;将每个位置对应的行和列的所有元素置为 0 具体步骤 初始化…...

Selenium 查找页面元素的方式

Selenium 查找页面元素的方式 Selenium 提供了多种方法来查找网页中的元素&#xff0c;以下是主要的定位方式&#xff1a; 基本定位方式 通过ID定位 driver.find_element(By.ID, "element_id")通过Name定位 driver.find_element(By.NAME, "element_name"…...

aurora与pcie的数据高速传输

设备&#xff1a;zynq7100&#xff1b; 开发环境&#xff1a;window&#xff1b; vivado版本&#xff1a;2021.1&#xff1b; 引言 之前在前面两章已经介绍了aurora读写DDR,xdma读写ddr实验。这次我们做一个大工程&#xff0c;pc通过pcie传输给fpga&#xff0c;fpga再通过aur…...