当前位置: 首页 > news >正文

Spark---资源、任务调度

一、Spark资源调度源码

1、Spark资源调度源码过程

Spark资源调度源码是在Driver启动之后注册Application完成后开始的。Spark资源调度主要就是Spark集群如何给当前提交的Spark application在Worker资源节点上划分资源。Spark资源调度源码在Master.scala类中的schedule()中进行的。

2、Spark资源调度源码结论

  1. Executor在集群中分散启动,有利于task计算的数据本地化。
  2. 默认情况下(提交任务的时候没有设置--executor-cores选项),每一个Worker为当前的Application启动一个Executor,这个Executor会使用这个Worker的所有的cores和1G内存。
  3. 如果想在Worker上启动多个Executor,提交Application的时候要加--executor-cores这个选项。
  4. 默认情况下没有设置--total-executor-cores,一个Application会使用Spark集群中所有的cores。
  5. 启动Executor不仅和core有关还和内存有关。

3、资源调度源码结论验证

使用Spark-submit提交任务演示。也可以使用spark-shell来验证。

1、默认情况每个worker为当前的Application启动一个Executor,这个Executor使用集群中所有的cores和1G内存。

./spark-submit 
--master spark://node1:7077--class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

2、在workr上启动多个Executor,设置--executor-cores参数指定每个executor使用的core数量。

./spark-submit--master  spark://node1:7077--executor-cores 1 --class org.apache.spark.examples.SparkPi 
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

3、内存不足的情况下启动core的情况。Spark启动是不仅看core配置参数,也要看配置的core的内存是否够用。

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 3g 
--class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

--total-executor-cores集群中共使用多少cores

注意:一个进程不能让集群多个节点共同启动。

./spark-submit 
--master  spark://node1:7077 
--executor-cores 1  
--executor-memory 2g 
--total-executor-cores 3
--class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 
10000

二、Spark任务调度源码

Spark任务调度源码是从Spark Application的一个Action算子开始的。action算子开始执行,会调用RDD的一系列触发job的逻辑。其中也有stage的划分过程:

三、Spark二次排序和分组取topN

1、二次排序

大数据中很多排序场景是需要先根据一列进行排序,如果当前列数据相同,再对其他某列进行排序的场景,这就是二次排序场景。例如:要找出网站活跃的前10名用户,活跃用户的评测标准就是用户在当前季度中登录网站的天数最多,如果某些用户在当前季度登录网站的天数相同,那么再比较这些用户的当前登录网站的时长进行排序,找出活跃用户。这就是一个典型的二次排序场景。

解决二次排序问题可以采用封装对象的方式,对象中实现对应的比较方法。

1.SparkConf sparkConf = new SparkConf()
2..setMaster("local")
3..setAppName("SecondarySortTest");
4.final JavaSparkContext sc = new JavaSparkContext(sparkConf);
5.
6.JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");
7.
8.JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
9.
10.  /**
11.   * 
12.  */
13.  private static final long serialVersionUID = 1L;
14.
15.  @Override
16.  public Tuple2<SecondSortKey, String> call(String line) throws Exception {
17.    String[] splited = line.split(" ");
18.    int first = Integer.valueOf(splited[0]);
19.    int second = Integer.valueOf(splited[1]);
20.    SecondSortKey secondSortKey = new SecondSortKey(first,second);
21.    return new Tuple2<SecondSortKey, String>(secondSortKey,line);
22.  }
23.});
24.
25.pairSecondRDD.sortByKey(false).foreach(new 
26.VoidFunction<Tuple2<SecondSortKey,String>>() {
27.
28.  /**
29.   * 
30.   */
31.  private static final long serialVersionUID = 1L;
32.
33.    @Override
34.    public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
35.      System.out.println(tuple._2);
36.  }
37.});
38.
39.
40.
41.public class SecondSortKey implements Serializable,Comparable<SecondSortKey>{
42.  /**
43.   * 
44.   */
45.  private static final long serialVersionUID = 1L;
46.  private int first;
47.  private int second;
48.  public int getFirst() {
49.    return first;
50.  }
51.  public void setFirst(int first) {
52.    this.first = first;
53.  }
54.  public int getSecond() {
55.    return second;
56.  }
57.  public void setSecond(int second) {
58.    this.second = second;
59.  }
60.  public SecondSortKey(int first, int second) {
61.    super();
62.    this.first = first;
63.    this.second = second;
64.  }
65.  @Override
66.  public int compareTo(SecondSortKey o1) {
67.    if(getFirst() - o1.getFirst() ==0 ){
68.      return getSecond() - o1.getSecond();
69.    }else{
70.      return getFirst() - o1.getFirst();
71.    }
72.  }
73.}

2、分组取topN

大数据中按照某个Key进行分组,找出每个组内数据的topN时,这种情况就是分组取topN问题。

解决分组取TopN问题有两种方式,第一种就是直接分组,对分组内的数据进行排序处理。第二种方式就是直接使用定长数组的方式解决分组取topN问题。

1.SparkConf conf = new SparkConf()
2..setMaster("local")
3..setAppName("TopOps");
4.JavaSparkContext sc = new JavaSparkContext(conf);
5.JavaRDD<String> linesRDD = sc.textFile("scores.txt");
6.
7.JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
8.
9.  /**
10.   * 
11.  */
12.  private static final long serialVersionUID = 1L;
13.
14.  @Override
15.  public Tuple2<String, Integer> call(String str) throws Exception {
16.    String[] splited = str.split("\t");
17.    String clazzName = splited[0];
18.    Integer score = Integer.valueOf(splited[1]);
19.    return new Tuple2<String, Integer> (clazzName,score);
20.  }
21.});
22.
23.pairRDD.groupByKey().foreach(new 
24.VoidFunction<Tuple2<String,Iterable<Integer>>>() {
25.
26.  /**
27.   * 
28.   */
29.  private static final long serialVersionUID = 1L;
30.
31.  @Override
32.  public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
33.    String clazzName = tuple._1;
34.    Iterator<Integer> iterator = tuple._2.iterator();
35.
36.    Integer[] top3 = new Integer[3];
37.
38.    while (iterator.hasNext()) {
39.      Integer score = iterator.next();
40.
41.      for (int i = 0; i < top3.length; i++) {
42.        if(top3[i] == null){
43.          top3[i] = score;
44.          break;
45.        }else if(score > top3[i]){
46.        for (int j = 2; j > i; j--) {
47.          top3[j] = top3[j-1];
48.        }
49.        top3[i] = score;
50.        break;
51.      }
52.    }
53.  }
54.  System.out.println("class Name:"+clazzName);
55.  for(Integer sscore : top3){
56.    System.out.println(sscore);
57.  }
58.}
59.});

相关文章:

Spark---资源、任务调度

一、Spark资源调度源码 1、Spark资源调度源码过程 Spark资源调度源码是在Driver启动之后注册Application完成后开始的。Spark资源调度主要就是Spark集群如何给当前提交的Spark application在Worker资源节点上划分资源。Spark资源调度源码在Master.scala类中的schedule()中进行…...

单片机开发常见问题集合

文章目录 发送串口数据偶尔丢失字节 发送串口数据偶尔丢失字节 场景&#xff1a; 在STM32单片机中进行串口数据发送&#xff0c;在Linux/Windows上进行串口数据接收&#xff0c;会偶发出现接收到的数据有某些字节丢失。 分析&#xff1a; 在STM32中可以使用printf用于发送串口…...

Matlab 点云曲率计算(之二)

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 之前已经讨论过许多关于计算曲率的问题,这里使用一个通过拟合三次曲面方程的方式来计算曲率,计算过程如下图所示: 二、实现代码 %********...

C++11的原子变量

C11提供了一个原子类型std::atomic<T>&#xff0c;可以使用任意类型作为模板参数&#xff0c;C11内置了整型的原子变量&#xff0c;可以更方便的使用原子变量&#xff0c;使用原子变量就不需要使用互斥量来保护该变量了&#xff0c;用起来更简洁。例如&#xff0c;要做一…...

北京交通大学 计算机网络体系与协议(研) 考试试卷

计算机网络体系与协议2023年期末考试 时长&#xff1a;120分钟 学院&#xff1a; 学号&#xff1a; 姓名&#xff1a; 一、简答题&#xff08;每题5分&#xff09; 1.简述公开密钥密码体制的工作原理…...

python之pyqt专栏7-信号与槽3

在上一篇文章中python之pyqt专栏6-信号与槽2-CSDN博客中&#xff0c;我们可以了解到对象可以使用内置信号&#xff0c;这些信号来自于类定义或者继承过来的。我们可以对这些信号可以通过connect连接槽函数。 需求 现在有一个需求&#xff0c;有两个UI界面“untitled.ui”和“u…...

高噪点灰度图目标粗定位CoraseLocation

高噪点的灰度图目标粗定位 /* ** name: CoraseLocation ** brief: 粗定位 ** param:[in] srcGray 灰度图&#xff08;&#xff09; ** param:[in] box 目标尺寸&#xff08;像素&#xff09; ** param:[ou] roi 目标定位结果 ** return: true成功&#xff0c;false…...

Android:Google三方库之Firebase集成详细步骤(二)

Analytics分析 1、将 Firebase 添加到您的 Android 项目&#xff08;如果尚未添加&#xff09;&#xff0c;并确保在 Firebase 项目中启用了 Google Analytics&#xff08;分析&#xff09;&#xff1a; 如果您要创建新的 Firebase 项目&#xff0c;请在项目创建过程中启用 G…...

java使用freemarker模板生成html,再生成pdf

1.freemarker模板生成html 添加Maven依赖 在pom.xml文件中添加以下依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId> </dependency>创建Freemarker…...

图解系列--Web服务器,Http首部

1.用单台虚拟主机实现多个域名 HTTP/1.1 规范允许一台 HTTP 服务器搭建多个 Web 站点。。比如&#xff0c;提供 Web 托管服务&#xff08;Web Hosting Service&#xff09;的供应商&#xff0c;可以用一台服务器为多位客户服务&#xff0c;也可以以每位客户持有的域名运行各自不…...

直线(蓝桥杯)

直线 题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 在平面直角坐标系中&#xff0c;两点可以确定一条直线。如果有多点在一条直线上&#xff0c; 那么这些点中任意两点确定的直线是同一条。 给定平面上 2 3 个…...

Android:从源码看FragmentManager如何工作

一个Activity中&#xff0c;在某一个容器中&#xff0c;更换不同的Fragment&#xff0c;从而显示不同的界面&#xff0c;这个场景相信大家已经非常熟悉了&#xff0c;也知道Activity是通过FragmentManager来管理嵌入的Fragments的&#xff0c;所以今天就来看看FragmentManager是…...

LabVIEW通过编程将图形类控件的X轴显示为时间戳

LabVIEW通过编程将图形类控件的X轴显示为时间戳 每个版本的LabVIEW中都有属性节点&#xff0c;可以以编程方式调整X轴和Y轴格式。对于不同版本的LabVIEW&#xff0c;这些属性节点无法在同一个位置找到。请参阅以下部分&#xff0c;了解特定版本LabVIEW的相关属性节点的位置。 …...

Spring Boot进行单元测试,一个思路解决重启低效难题!

所谓单元测试就是对功能最小粒度的测试&#xff0c;落实到JAVA中就是对单个方法的测试。 junit可以完成单个方法的测试&#xff0c;但是对于Spring体系下的web应用的单元测试是无能为力的。因为spring体系下的web应用都采用了MVC三层架构&#xff0c;依托于IOC&#xff0c;层级…...

c/c++ header_only 头文件实现的关键点

header_only 头文件实现的关键点 ------------------------------------------------------------------------- author: hjjdebug date: 2023年 11月 28日 星期二 16:58:38 CST descriptor: header_only 头文件实现的关键点1. 对外声明的函数必需加上inline, 消除连接的歧义…...

Linux(CentOS7.5):通过docker安装redis

一、准备配置文件 在宿主机&#xff0c;准备映射配置文件的目录下&#xff0c;运行如下&#xff1a; wget http://download.redis.io/redis-stable/redis.conf二、安装 docker run \ --restartalways \ --log-opt max-size100m \ --log-opt max-file2 \ -p 6380:6379 \ -v /opt…...

唯创知音WT588F02B-8S语音芯片:灵活更换语音内容,降低开发成本与备货压力

在电子产品的开发阶段&#xff0c;语音芯片的选择与使用对于产品的功能、成本和上市时间都有着重要影响。唯创知音的WT588F02B-8S语音芯片以其独特的优势&#xff0c;成为工程师们的理想选择&#xff0c;尤其在样品阶段&#xff0c;它为工程师提供了自行更换语音内容的便利&…...

git的创建以及使用

1、上传本地仓库 首先确定项目根目录中没有.git文件&#xff0c;有的话就删了&#xff0c;没有就下一步。在终端中输入git init命令。注意必须是根目录&#xff01; 将代码存到暂存区 将代码保存到本地仓库 2、创建git仓库 仓库名称和路径&#xff08;name&#xff09;随便写…...

面试笔记--Linux常用命令

文件和目录操作&#xff1a; ls: 列出目录内容 例子&#xff1a;ls -l - 列出详细信息&#xff0c;包括权限、所有者等 cd: 切换目录 例子&#xff1a;cd Documents - 进入 “Documents” 目录 pwd: 显示当前工作目录 例子&#xff1a;pwd - 显示当前工作目录的路径 cp: 复制文…...

【小黑嵌入式系统第十课】μC/OS-III概况——实时操作系统的特点、基本概念(内核任务中断)、与硬件的关系实现

文章目录 一、为什么要学习μC/OS-III二、嵌入式操作系统的发展历史三、实时操作系统的特点四、基本概念1. 前后台系统2. 操作系统3. 实时操作系统&#xff08;RTOS&#xff09;4. 内核5. 任务6. 任务优先级7. 任务切换8. 调度9. 非抢占式&#xff08;合作式&#xff09;内核10…...

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…...

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用&#xff1a;实现组件通用属性的渐变过渡效果&#xff0c;提升用户体验。支持属性&#xff1a;width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项&#xff1a; 布局类属性&#xff08;如宽高&#xff09;变化时&#…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

【位运算】消失的两个数字(hard)

消失的两个数字&#xff08;hard&#xff09; 题⽬描述&#xff1a;解法&#xff08;位运算&#xff09;&#xff1a;Java 算法代码&#xff1a;更简便代码 题⽬链接&#xff1a;⾯试题 17.19. 消失的两个数字 题⽬描述&#xff1a; 给定⼀个数组&#xff0c;包含从 1 到 N 所有…...

《通信之道——从微积分到 5G》读书总结

第1章 绪 论 1.1 这是一本什么样的书 通信技术&#xff0c;说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号&#xff08;调制&#xff09; 把信息从信号中抽取出来&am…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)

一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解&#xff0c;适合用作学习或写简历项目背景说明。 &#x1f9e0; 一、概念简介&#xff1a;Solidity 合约开发 Solidity 是一种专门为 以太坊&#xff08;Ethereum&#xff09;平台编写智能合约的高级编…...

DeepSeek 技术赋能无人农场协同作业:用 AI 重构农田管理 “神经网”

目录 一、引言二、DeepSeek 技术大揭秘2.1 核心架构解析2.2 关键技术剖析 三、智能农业无人农场协同作业现状3.1 发展现状概述3.2 协同作业模式介绍 四、DeepSeek 的 “农场奇妙游”4.1 数据处理与分析4.2 作物生长监测与预测4.3 病虫害防治4.4 农机协同作业调度 五、实际案例大…...