当前位置: 首页 > news >正文

Spark开发

第一步:创建RDD

Spark提供三种创建RDD方式:** 集合、本地文件、HDFS文件**

  1. 使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造一些测试数据,来测试后面的spark应用程序的流程。
  2. 使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件
  3. 使用HDFS文件创建RDD,是最常用的生产环境的处理方式,主要可以针对HDFS上存储的数据,进行离线批处理操作。
使用集合创建RDD

如果要通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了

	object CreateRddByArrayscala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("CreateRddByArrayscala").setMaster("local")val sc = new SparkContext(conf)//创建集合 driver中执行val arr = Array(1,2,3,4,5)//基于集合创建RDDval rdd =sc.parallelize(arr)//对集合数据求和val sum =rdd.reduce(_ + _)//这行代码再driver中执行println(sum)

** 注意**
val arr = Array(1,2,3,4,5)还有println(sum)代码是在driver进程中执行的,这些代码不会并行执行parallelize还有reduce之类的操作是在worker节点中执行的

使用本地文件和HDFS文件创建RDD

通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容。textFile()方法支持针对目录、压缩文件以及通配符创建RDD

/*** 通过文件创建RDD*/
object CreateRddByFilescala {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("CreateRddByArrayscala").setMaster("local")val sc = new SparkContext(conf)var path = "D:\\hello.txt"//path = hdfs://bigdata01:9000/test/hello.txtvar rdd =sc.textFile(path,minPartitions = 2)//获取每一行数据的长度,计算文件内数据的总长度val length = rdd.map(_.length).reduce(_+_)println(length);sc.stop() }
}

** Spark中对RDD的操作**
Spark对RDD的操作可以整体分为两类:Transformation和Action

Transformation可以翻译为转换,表示是针对RDD中数据的转换操作,主要会针对已有的RDD创建一个新的RDD:常见的有map、flatMap、filter等等.
Action可以翻译为执行,表示是触发任务执行的操作,主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序.
不管是Transformation里面的操作还是Action里面的操作,我们一般会把它们称之为算子
其中Transformation算子有一个特性:** lazy **
lazy特性在这里指的是,如果一个spark任务中只定义了transformation算子,那么即使你执行这个任务,任务中的算子也不会执行.
只有当transformation之后,接着执行了一个action操作,那么所有的transformation才会执行。
Spark通过lazy这种特性,来进行底层的spark任务执行的优化,避免产生过多中间结果。
Action的特性:执行Action操作才会触发一个Spark 任务的运行,从而触发这个Action之前所有的Transformation的执行

算子 介绍
map       将RDD中的每个元素进行处理,一进一出
filter    对RDD中每个元素进行判断,返回true则保留
flatMap   与map类似,但是每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组,每个key对应一个Iterable<value>
reduceByKey 对每个相同key对应的value进行reduce操作
sortByKey  对每个相同key对应的value进行排序操作(全局排序)
join     对两个包含<key,value>对的RDD进行join操作
distinct 对RDD中的元素进行全局去重

Transformation操作开发实战

  1. map:对集合中每个元素乘以2
  2. filter:过滤出集合中的偶数
  3. flatMap:将行拆分为单词
  4. groupByKey:对每个大区的主播进行分组
  5. reduceByKey:统计每个大区的主播数量
  6. sortByKey:对主播的音浪收入排序
  7. join:打印每个主播的大区信息和音浪收入
  8. distinct:统计当天开播的大区信息

scala代码如下:

object TransformationOpScala {def main(args: Array[String]): Unit = {val sc=  getSparkContextgroupByKeyOp(sc)}//flatMap:将行拆分为单词def flatMapOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(" good good study","day day up"))dataRdd.flatMap(_.split(" ")).foreach(println(_))}//groupbyKey 对每个大区主播进行分组def groupByKeyOp(sc: SparkContext): Unit = {val dataRdd =sc.parallelize(Array((150001,"us"),(1500002,"CN"),(150003,"CN"),(1500004,"IN")))//需要使用map对tuple中的数据位置进行互换,因为需要把大区作为key进行分组操作dataRdd.map(tup=>(tup._2,tup._1)).groupByKey().foreach(tup=>{//获取大区val area=tup._1println(area+":")//获取同一个大区对应的所有用户idval it = tup._2for(uid <- it){println(uid+" ")}println()})}//filter:过滤出集合中的偶数def filterOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))dataRdd.filter(_ %2 ==0).foreach(println(_))}
//map:对集合中每个元素乘以2def mapOp(sc: SparkContext): Unit = {val dataRdd =  sc.parallelize(Array(1,2,3,4,5))dataRdd.map(_ * 2).foreach(println(_))}private def getSparkContext = {val conf = new SparkConf()conf.setAppName("CreateRddByArrayscala").setMaster("local")new SparkContext(conf)}
}

常用Action介绍

算子 介绍
reduce   将RDD中的所有元素进行聚合操作
collect  将RDD中所有元素获取到本地客户端(Driver)
count    获取RDD中元素总数
take(n)  获取RDD中前n个元素
saveAsTextFile 将RDD中元素保存到文件中,对每个元素调用toString
countByKey     对每个key对应的值进行count计数
foreach        遍历RDD中的每个元素

scala代码:

object ActionOpScala {def main(args: Array[String]): Unit = {val sc =getSparkContext//reduce聚合计算//reduceOp(sc)//collect:获取元素集合//colletOp(sc)// count:获取元素总数//countOp(sc)//saveAsTextFile:保存文件//saveAsTextFileOp(sc)//countByKey:统计相同的key出现多少次//countByKeyOp(sc)//foreach:迭代遍历元素foreachOp(sc)sc.stop()}//foreach:迭代遍历元素def foreachOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))dataRdd.foreach(println(_))}//countByKey:统计相同的key出现多少次def countByKeyOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(("A",1001),("B",1002),("A",1003),("C",1004)))val res = dataRdd.countByKey()for((k,v) <- res){println(k+","+v)}}//saveAsTextFile:保存文件def saveAsTextFileOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))dataRdd.saveAsTextFile("hdfs://bigdata01:9000/out001")}
// count:获取元素总数def countOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))val res = dataRdd.count()println(res)}//collect:获取元素集合def colletOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))//collect 返回的是一个Array数组val res = dataRdd.collect()for(item <- res){println(item)}}
//reduce聚合计算def reduceOp(sc: SparkContext): Unit = {val dataRdd = sc.parallelize(Array(1,2,3,4,5))val num = dataRdd.reduce(_ + _)println(num)}private def getSparkContext = {val conf = new SparkConf()conf.setAppName("CreateRddByArrayscala").setMaster("local")new SparkContext(conf)}
}

相关文章:

Spark开发

第一步&#xff1a;创建RDD Spark提供三种创建RDD方式&#xff1a;** 集合、本地文件、HDFS文件** 使用程序中的集合创建RDD&#xff0c;主要用于进行测试&#xff0c;可以在实际部署到集群运行之前&#xff0c;自己使用集合构造一些测试数据&#xff0c;来测试后面的spark应…...

Tornado异步框架

简介&#xff1a; tornado是Python的web框架。tornado和主流的web服务器框架有明显的区别&#xff1a;它是非阻塞式服务器&#xff0c;而且速度非常快&#xff0c;得力于其非阻塞的方式和epoll的运用tornado可以每秒处理数以千计的连接&#xff08;号称&#xff09; 基本配置 …...

openpnp - error - 吸嘴没下降到板子上, 就将元件松开

文章目录openpnp - error - 吸嘴没下降到板子上, 就将元件松开概述笔记ENDopenpnp - error - 吸嘴没下降到板子上, 就将元件松开 概述 以前用过国内一家openpnp厂家出的设备, 他们家的openpnp是自己改过的. 贴片流程已经走过一遍. 这次还是按照以前记录的笔记, 按照国内那家的…...

【Java】yyyy-MM-dd HH:mm:ss 时间格式 时间戳 全面解读超详细

时间格式 时间格式(协议)描述gg时期或纪元。y不包含纪元的年份。不具有前导零。yy不包含纪元的年份。具有前导零。yyyy包含纪元的四位数的年份。M月份数字。一位数的月份没有前导零。MM月份数字。一位数的月份有一个前导零。MMM月份的缩写名称&#xff0c;在AbbreviatedMonthN…...

快鲸SCRM发布口腔企业私域运营解决方案

口腔企业普遍面临着以下几方面运营痛点问题 1、获客成本居高不下&#xff0c;恶性竞争严重 2、管理系统落后&#xff0c;人员流失严重 3、客户顾虑多、决策时间长 4、老客户易流失&#xff0c;粘性差 以上这些痛点&#xff0c;不得不倒逼口腔企业向精细化运营客户迈进。 …...

Verilog实现组合逻辑电路

在verilog 中可以实现的数字电路主要分为两类----组合逻辑电路和时序逻辑电路。组合逻辑电路比较简单&#xff0c;仅由基本逻辑门组成---如与门、或门和非门等。当电路的输入发生变化时&#xff0c;输出几乎&#xff08;信号在电路中传递时会有一小段延迟&#xff09;立即就发生…...

2023前端菜鸟笔试血泪史html5-one--找到工作前都更新

1.说说对html语义化的理解 什么的HTML语义化&#xff0c;顾名思义&#xff0c;HTML语义化就是可以不通过了解HTML的内容&#xff0c;就可以知道这个部分所代表的的意义。 HTML语义化的意义&#xff1a;在使用HTML标签构建页面时&#xff0c;避免大篇幅的使用无语义的标签。 …...

蓝牙调试工具集合汇总

BLE 该部分主要分享一下常用的蓝牙调试工具&#xff0c;方便后续蓝牙抓包及分析。 目录 1 hciconfig 2 hcitool 3 hcidump 4 hciattach 5 btmon 6 bluetoothd 7 bluetoothctl 1 hciconfig 工具介绍&#xff1a;hciconfig&#xff0c;HCI 设备配置工具 命令格式&…...

Java 获取文件后缀名【一文总结所有方法】

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...

UML常见图的总结

一、概述 UML&#xff1a;Unified Modeling Language&#xff0c;统一建模语言&#xff0c;支持从需求分析开始的软件开发的全过程。是一个支持模型化和软件系统开发的图形化语言、为软件开发的所有阶段提供模型化和可视化支持&#xff0c;包括由需求分析到规格&#xff0c;到…...

WebRTC系列-工具系列之音频相关工具

文章目录 1. audio_util数据格式转换类2. WavFile文件读写类2.1 读取wav文件2.2 写入wav文件这篇文章主要介绍WebRTC中一些音频工具这些,大部分都在 common_audio目录下,这个文件夹下提供音频的大量算法,包括sinc重采样算法,音频数据格式的转换:例如 float转int16_t格式等…...

7 线性回归及Python实现

1 统计指标 随机变量XXX的理论平均值称为期望: μE(X)\mu E(X)μE(X)但现实中通常不知道μ\muμ, 因此使用已知样本来获取均值 X‾1n∑i1nXi.\overline{X} \frac{1}{n} \sum_{i 1}^n X_i. Xn1​i1∑n​Xi​.方差variance定义为&#xff1a; σ2E(∣X−μ∣2).\sigma^2 E(|…...

适合小团队协作、任务管理、计划和进度跟踪的项目任务管理工具有哪些?

适合小团队协作、任务管理、计划和进度跟踪的项目任务管理工具有哪些? 大家可以参考这个模板&#xff1a;http://s.fanruan.com/irhj8管理项目归根结底在管理人、物&#xff0c;扩展来说便是&#xff1a; 人&#xff1a;员工能力、组织机制&#xff1b; 物&#xff1a;项目内…...

从100%进口到自主可控,从600块降到10块,中科院攻克重要芯片

前言 2月28日&#xff0c;“20多位中科院专家把芯片价格打到10块”冲上微博热搜&#xff0c;据河南省官媒大象新闻报道&#xff0c;热搜中提到的中科院专家所在企业为全球最大的PLC分路器芯片制造商仕佳光子&#xff0c;坐落于河南鹤壁。 为实现芯片技术自主可控自立自强&#…...

关于git的一些基本点总结

1.什么是git? git是一个常用的分布式版本管理工具。 2.git 的常用命令: clone&#xff08;克隆&#xff09;: 从远程仓库中克隆代码到本地仓库 checkout &#xff08;检出&#xff09;:从本地仓库中检出一个仓库分支然后进行修订 add&#xff08;添加&#xff09;: 在提交前…...

PyTorch保姆级安装教程

1 安装CUDA1.1 查找Nvidia适用的CUDA版本桌面右键&#xff0c;【打开 NVIDIA控制面板】查看【系统信息】查看NVIDIA的支持的CUDA的版本&#xff0c;下图可知支持的版本是 10.11.2 下载CUDACUDA下载官方网址https://developer.nvidia.com/cuda-toolkit-archive找到适合的版本下载…...

MySQL 上亿大表如何优化?

背景XX 实例&#xff08;一主一从&#xff09;xxx 告警中每天凌晨在报 SLA 报警&#xff0c;该报警的意思是存在一定的主从延迟。&#xff08;若在此时发生主从切换&#xff0c;需要长时间才可以完成切换&#xff0c;要追延迟来保证主从数据的一致性&#xff09;XX 实例的慢查询…...

Git(狂神课堂笔记)

1.首先去git官网下载我们对应的版本Git - Downloading Package (git-scm.com) 2.安装后我们会发现git文件夹里有三个应用程序&#xff1a; Git Bash&#xff1a;Unix与Linux风格的命令行&#xff0c;使用最多&#xff0c;推荐最多 Git CMD&#xff1a;Windows风格的命令行 G…...

「2」指针进阶,最详细指针和数组难题解题思路

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练 &#x1f525;座右铭&#xff1a;“不要等到什么都没有了&#xff0c;才下定决心去做” &#x1f680;&#x1f680;&#x1f680;大家觉不错…...

云服务器是做什么的?云服务器典型的应用场景介绍

云服务器可能是很多企业以及个人上云用户的必选产品了&#xff0c;但是对于初学者或者非专业的用户来说云服务器还是比较陌生的&#xff0c;它到底是干什么的&#xff0c;如此生活中哪些地方可以接触到&#xff0c;这篇文章将详细的介绍云服务器使用的应用场景以及相关的操作 本…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

【2025年】解决Burpsuite抓不到https包的问题

环境&#xff1a;windows11 burpsuite:2025.5 在抓取https网站时&#xff0c;burpsuite抓取不到https数据包&#xff0c;只显示&#xff1a; 解决该问题只需如下三个步骤&#xff1a; 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)

骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术&#xff0c;它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton)&#xff1a;由层级结构的骨头组成&#xff0c;类似于人体骨骼蒙皮 (Mesh Skinning)&#xff1a;将模型网格顶点绑定到骨骼上&#xff0c;使骨骼移动…...

CSS设置元素的宽度根据其内容自动调整

width: fit-content 是 CSS 中的一个属性值&#xff0c;用于设置元素的宽度根据其内容自动调整&#xff0c;确保宽度刚好容纳内容而不会超出。 效果对比 默认情况&#xff08;width: auto&#xff09;&#xff1a; 块级元素&#xff08;如 <div>&#xff09;会占满父容器…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时&#xff0c;性能会显著下降。以下是优化思路和简易实现方法&#xff1a; 一、核心优化思路 减少 JOIN 数量 数据冗余&#xff1a;添加必要的冗余字段&#xff08;如订单表直接存储用户名&#xff09;合并表&#xff1a;将频繁关联的小表合并成…...

android RelativeLayout布局

<?xml version"1.0" encoding"utf-8"?> <RelativeLayout xmlns:android"http://schemas.android.com/apk/res/android"android:layout_width"match_parent"android:layout_height"match_parent"android:gravity&…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...

Linux部署私有文件管理系统MinIO

最近需要用到一个文件管理服务&#xff0c;但是又不想花钱&#xff0c;所以就想着自己搭建一个&#xff0c;刚好我们用的一个开源框架已经集成了MinIO&#xff0c;所以就选了这个 我这边对文件服务性能要求不是太高&#xff0c;单机版就可以 安装非常简单&#xff0c;几个命令就…...

恶补电源:1.电桥

一、元器件的选择 搜索并选择电桥&#xff0c;再multisim中选择FWB&#xff0c;就有各种型号的电桥: 电桥是用来干嘛的呢&#xff1f; 它是一个由四个二极管搭成的“桥梁”形状的电路&#xff0c;用来把交流电&#xff08;AC&#xff09;变成直流电&#xff08;DC&#xff09;。…...

GraphRAG优化新思路-开源的ROGRAG框架

目前的如微软开源的GraphRAG的工作流程都较为复杂&#xff0c;难以孤立地评估各个组件的贡献&#xff0c;传统的检索方法在处理复杂推理任务时可能不够有效&#xff0c;特别是在需要理解实体间关系或多跳知识的情况下。先说结论&#xff0c;看完后感觉这个框架性能上不会比Grap…...