如何用代码提交spark任务并且获取任务权柄
在国内说所有可能有些绝对,因为确实有少数大厂技术底蕴确实没的说能做出自己的东西,但其他的至少95%数据中台平台研发方案,都是集群中有一个持久化的程序,来接收任务信息,并向集群提交任务同时获取任务的权柄,把任务的appid和日志通过套接字的方式向外提供。
对于spark任务来说无非就是两种形式,要不传过来的是个jar包,要不就是一个sql语句,其他的就是一些任务参数,整体上就和正常的web项目开发没太大差别,不同的就是服务端是以哪种方式处理任务的提交的,给大家分享我经历过的项目中用过的三种处理方式,当然这不是全部,业内确实有真东西,不过人家不开源罢了。
第一种:spark官方提供的SparkLauncherAPI
这种方式,你可以在b站上常见,但是它使用起来限制特别大,我也只用过一次,而且还是在尝试阶段就被放弃了,感觉就和一个半成品一样,最让人难受的一点是它监听任务的最终状态是4个独立的不可变枚举值,而监听程序会终止在第一个触发到的不可变枚举值,就是说如果任务先进入了完成状态,但它的最终状态是失败,那权柄只能生效到完成阶段,后面就监听不到了,观察过源码到时找到了底层更新状态的依据,但是属于受保护包下,不能被public直接调用,还有很多其他的坑,所以说像个半成品
使用它,首先导入SparkLauncherAPI的依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-launcher_2.11</artifactId> <!-- 这里要替换为你的 scala 同版本 ,spark-launcher版本不同,支持的scala也不同具体去maven官方仓库中看--><version>2.1.1</version> <!--SparkLauncherAPI要和你用的 Spark 同版本 -->
</dependency>
随后它的提交任务代码如下
package com.wy;import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;import java.io.*;
import java.util.concurrent.CountDownLatch;public class Main {public static void main(String[] args) throws IOException, InterruptedException {// 使用SparkLauncher提交任务,将任务需要的环境进行封装//这里只是核心使用需要的设置,sparkLauncher还支持其他的方法设置其他的内容需要的自己看SparkLauncher sparkLauncher = new SparkLauncher().setSparkHome("D:\\mydevtool\\spark-2.1.1-hadoop2.7")//这个程序最终运行的服务器需要同时存在一个spark的install路径.setAppResource("D:\\test\\myscala-1.0.jar") //你的任务jar包.setMainClass("test.Test1")//任务主类.setMaster("local")//master 或者 yarn.setAppName("代码提交")//任务名称.addAppArgs("D:\\test\\123.txt")//任务的主类入参,这里是个可变参.setVerbose(false);/*这里注释是写一个伪代码,意在你可以处理出任务运行配置,比如内存资源等,传递给sparkLauncherfor (Map.Entry<String, String> conf : otherConfigParams.entrySet()) {sparkLauncher.setConf(conf.getKey(), conf.getValue());}*//*这里注释是写一个伪代码,意在你可以处理出任务主类入参后传递给sparkLauncherif (mainParams.length != 0) {launcher.addAppArgs(mainParams);}*///同步时,必须使用CountDownLatch 不然监听程序是异步的,拿不到任务权柄CountDownLatch countDownLatch = new CountDownLatch(1);// 启动应用并获取任务权柄,并传入一个监听类,监听任务不同状态时的事件SparkAppHandle sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() {//任务运行状态改变的时候触发的操作@Overridepublic void stateChanged(SparkAppHandle sparkAppHandle) {//状态发生变更时,此时将任务id拿出来if ( sparkAppHandle.getAppId() != null ){System.out.println("任务状态:"+sparkAppHandle.getState().toString());System.out.println("任务ID:"+sparkAppHandle.getAppId());}//诸如此类,你可以按照你的需求定义任务不同状态下要干的事if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.RUNNING)==0){System.out.println("任务开始运行");}//诸如此类,你可以按照你的需求定义任务不同状态下要干的事if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.FINISHED)==0){System.out.println("任务正常完成");countDownLatch.countDown();}if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.FAILED)==0){System.out.println("任务发生错误");countDownLatch.countDown();}if (sparkAppHandle.getState().compareTo(SparkAppHandle.State.KILLED)==0){System.out.println("任务被终止");countDownLatch.countDown();}}//任务的上下文发生变动时的事件,一般不用@Overridepublic void infoChanged(SparkAppHandle sparkAppHandle) {}});//这里的日志的代码,但是输出不能放在这里,按照整体来讲应该要有一个阻塞方法任务开始运行获取流,它实际使用起来发现只有放在监听里面才能正常获取日志,但是开头也说了,监听的生命周期有问题,这也是最终放弃使用的原因之一,你要是只想体验一下,把这部分代码放在监听中在开始run的状态下开始输出就行BufferedReader reader = null;try{String line;reader = new BufferedReader(new InputStreamReader(sparkLauncher.launch().getInputStream(),"UTF-8"));while ((line = reader.readLine()) != null) {System.out.println("日志流在输出:"+line);}} catch (UnsupportedEncodingException e) {throw new RuntimeException(e);} catch (IOException e) {throw new RuntimeException(e);} finally {}countDownLatch.await();}
}
第二种:Runtime直接启动脚本
这种方式是最简单的,也是大部分中小项目用的方式,和第一种方式一样的是程序运行在集群中,将submit的日志直返回,使用方通过判断的方式处理出任务的appid和url
public static void main(String[] args) {String command = "spark-submit --master yarn .......";Process p = null;String line = null;try {p = Runtime.getRuntime().exec(command);BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));while ((line = br.readLine()) != null){//这里将日志输出出去}br.close();} catch (IOException e) {e.printStackTrace();}}
第三种:kyuubi
在开源spark之上,业内有一个叫kyuubi的二次开发引擎,专门做sql查询的,它对sql开发做了很多优化,比如文件聚合、最终一次数据整理等,此外提供了专门的接口来查询任务的信息,一般不投入大成本又能达到效果的就用kyuubi了
相关文章:
如何用代码提交spark任务并且获取任务权柄
在国内说所有可能有些绝对,因为确实有少数大厂技术底蕴确实没的说能做出自己的东西,但其他的至少95%数据中台平台研发方案,都是集群中有一个持久化的程序,来接收任务信息,并向集群提交任务同时获取任务的权柄ÿ…...
关于Mac中的shell
1 MacOS中的shell 介绍: 在 macOS 系统中,Shell 是命令行与系统交互的工具,用于执行命令、运行脚本和管理系统。macOS 提供了多种 Shell,主要包括 bash 和 zsh。在 macOS Catalina(10.15)之前,…...
【npm依赖包介绍】借助rimraf依赖包,在用npm run build构建项目时,清空dist目录,避免新旧混合
文章目录 背景如何使用附上rimraf的介绍和说明主要作用使用场景安装使用示例异步删除同步删除 参考资料 背景 在npm run build时,一般都会清空项目中已有的dist目录再构建,避免新旧混合。 如何使用 可以简单使用rimraf这个npm依赖包。 目前rimraf的最…...
爬虫学习记录
1.概念 通过编写程序,模拟浏览器上网,然后让其去互联网上抓取数据的过程 通用爬虫:抓取的是一整张页面数据聚焦爬虫:抓取的是页面中的特定局部内容增量式爬虫:监测网站中数据更新的情况,只会抓取网站中最新更新出来的数据 robots.txt协议: 君子协议,网站后面添加robotx.txt…...
Java Spring Boot实现基于URL + IP访问频率限制
点击下载《Java Spring Boot实现基于URL IP访问频率限制(源代码)》 1. 引言 在现代 Web 应用中,接口被恶意刷新或暴力请求是一种常见的攻击手段。为了保护系统资源,防止服务器过载或服务不可用,需要对接口的访问频率进行限制。本文将介绍如…...
C4D2025 win版本安装完无法打开,提示请将你的maxon App更新至最新版本,如何解决
最近安装C4D2025 win版本时,明明按步骤安装完成,结果打开提示提示请将你的maxon App更新至最新版本?遇到这种情况该如何解决呢。 一开始我的思路以为是旧版本没有删除干净,所以将电脑里有关maxon的软件插件都卸载了,重…...
微信小程序实现登录注册
文章目录 1. 官方文档教程2. 注册实现3. 登录实现4. 关于作者其它项目视频教程介绍 1. 官方文档教程 https://developers.weixin.qq.com/miniprogram/dev/framework/路由跳转的几种方式: https://developers.weixin.qq.com/miniprogram/dev/api/route/wx.switchTab…...
SpringBoot环境和Maven配置
SpringBoot环境和Maven配置 1. 环境准备2. Maven2.1 什么是Maven2.2 为什么要学 Maven2.3 创建一个 Maven项目2.4 Maven核心功能2.4.1 项目构建2.4.2 依赖管理2.4.3 Maven Help插件 2.5 Maven 仓库2.5.1本地仓库2.5.2 中央仓库2.5.3 私有服务器, 也称为私服 2.6 Maven设置国内源…...
大语言模型训练所需的最低显存,联邦大语言模型训练的传输优化技术
联邦大语言模型训练的传输优化技术 目录 联邦大语言模型训练的传输优化技术大语言模型训练所需的最低显存大语言模型训练所需的最低显存 基于模型微调、压缩和分布式并行处理的方法,介绍了相关开源模型及技术应用 核心创新点 多维度优化策略:综合运用基于模型微调、模型压缩和…...
1.07 标准IO
1.思维导图 2.先编写以下结构体 struct Student { char name[20]; double math; double chinese; double english; double physical; double chemical; double…...
恒压恒流原边反馈控制芯片 CRE6289F
CRE6289F 系列产品是一款内置高压 MOS 功率开关管的高性能多模式原边控制的开关电源芯片。较少的外围元器件、较低的系统成本设计出高性能的交直流转换开关电源。CRE6289F 系列产品提供了极为全面和性能优异的智能化保护功能,包括逐周期过流保护、软启动、芯片过温保…...
Java中线程中断的几种方式,你了解吗?
Java中线程,可以使用 interrupt() 方法来实现线程的中断,那么,线程中中断的方式有几种呢?接下来,我们将介绍3种不同的线程中断方式,跟随我们的脚步,一起去看看! 目录 第一招…...
Tesseract5.4.0自定义LSTM训练
准备jTessBoxEditor,然后配置环境变量。 1、将图片转换成tif格式的,这里需要用画图工具另存为; 2、生成box文件 执行命令: tesseract agv.normal.exp1.tif agv.normal.exp1 -l eng --psm 6 batch.nochop makebox 关于box文件…...
centOS7
特殊权限 set_uid 赋予所有者身份 chmod us 文件 set_gid 赋予所有组身份 chmod gs 文件/目录 sticky_bit 防火墙 firewall-cmd 开启端口 firewall-cmd --zonepublic --add-port8080/tcp --permanent 重启防火墙 systemctl restart firewalld 查看开启的所有端口 fi…...
HTML5 弹跳动画(Bounce Animation)详解
HTML5 弹跳动画(Bounce Animation)详解 弹跳动画是一种动态效果,使元素在出现或消失时看起来像是在跳动。这种效果可以通过 CSS 动画或 JavaScript 来实现,增强用户体验。 1. 使用 CSS 实现弹跳动画 可以使用 CSS 的 keyframes…...
4.1.3 串
文章目录 串的基本概念串的基本操作串的存储结构 串的基本概念 串,仅由字符构成的有限序列。 串长:串中的字符个数。空串:长度为0的串。空格串:一个或多个空格构成的串。子串:串中任意长度连续字符构成的序列。含有字…...
国产编辑器EverEdit - 两种删除空白行的方法
1 使用技巧:删除空白行 1.1 应用场景 用户在编辑文档时,可能会遇到很多空白行需要删除的情况,比如从网页上拷贝文字,可能就会存在大量的空白行要删除。 1.2 使用方法 1.2.1 方法1: 使用编辑主菜单 选择主菜单编辑 …...
1月7日星期二今日早报简报微语报早读
1月7日星期二,农历腊月初八,早报#微语早读。 1、公安部:已为一线民警配备执法记录仪130万余部,规范现场执法; 2、浙江提出2035年全省域基本实现共同富裕; 3、“汕头牛肉丸”有新标准!1月6日起…...
随机置矩阵列为0[矩阵乘法pytorch版]
文章目录 1. 举例:2. python 代码 1. 举例: A [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 ] , r a n d [ 0 , 5 , 2 ] → A [ 0 1 0 3 4 0 6 7 0 9…...
C# 中mysql数据库,已经在原有数据库升级数据库脚本,去管理可以一次,和多次执行的,nuget包
在C#中,如果你需要管理数据库升级脚本,并且希望这些脚本能够支持一次执行和多次执行(即幂等性),你可以使用一些现成的NuGet包来简化这个过程。以下是一些常用的NuGet包: 1. DbUp 描述: DbUp 是一个轻量级…...
conda相比python好处
Conda 作为 Python 的环境和包管理工具,相比原生 Python 生态(如 pip 虚拟环境)有许多独特优势,尤其在多项目管理、依赖处理和跨平台兼容性等方面表现更优。以下是 Conda 的核心好处: 一、一站式环境管理:…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
SpringBoot+uniapp 的 Champion 俱乐部微信小程序设计与实现,论文初版实现
摘要 本论文旨在设计并实现基于 SpringBoot 和 uniapp 的 Champion 俱乐部微信小程序,以满足俱乐部线上活动推广、会员管理、社交互动等需求。通过 SpringBoot 搭建后端服务,提供稳定高效的数据处理与业务逻辑支持;利用 uniapp 实现跨平台前…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...
Vue3中的computer和watch
computed的写法 在页面中 <div>{{ calcNumber }}</div>script中 写法1 常用 import { computed, ref } from vue; let price ref(100);const priceAdd () > { //函数方法 price 1price.value ; }//计算属性 let calcNumber computed(() > {return ${p…...
Vue3 PC端 UI组件库我更推荐Naive UI
一、Vue3生态现状与UI库选择的重要性 随着Vue3的稳定发布和Composition API的广泛采用,前端开发者面临着UI组件库的重新选择。一个好的UI库不仅能提升开发效率,还能确保项目的长期可维护性。本文将对比三大主流Vue3 UI库(Naive UI、Element …...
