当前位置: 首页 > news >正文

Spark SQL 学习总结

文章目录

  • (一)Spark SQL
  • (二)SParkSession
  • (三)DataFrame常见算子操作
  • (四)DataFrame的sql操作
  • (五)RDD转换为DataFrame
    • (1)反射方式
    • (2)编程方式
  • (六)load和save操作
    • (1)load操作
    • (2)SaveMode
  • (七)内置函数

(一)Spark SQL

Spark SQL和我们之前讲Hive的时候说的hive on spark是不一样的。
hive on spark是表示把底层的mapreduce引擎替换为spark引擎。
而Spark SQL是Spark自己实现的一套SQL处理引擎。
Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。
DataFrame=RDD+Schema 。
它其实和关系型数据库中的表非常类似,RDD可以认为是表中的数据,Schema是表结构信息。
DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD
Spark1.3出现的 DataFrame ,Spark1.6出现了 DataSet ,在Spark2.0中两者统一,DataFrame等于DataSet[Row]

(二)SParkSession

要使用Spark SQL,首先需要创建一个SpakSession对象
SparkSession中包含了SparkContext和SqlContext
所以说想通过SparkSession来操作RDD的话需要先通过它来获取SparkContext
这个SqlContext是使用sparkSQL操作hive的时候会用到的。

使用案例
添加依赖:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.3</version>
</dependency>
object SqlDemoScala {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local")val sparkSession=SparkSession.builder().appName("sqlDemoScala").config(conf).getOrCreate()//读取json文件,获取Dataframeval stuDf = sparkSession.read.json("D:\\student.json")//查看Dataframe中的数据stuDf.show()sparkSession.stop()}}

结果:

+---+------+------+
|age|  name|   sex|
+---+------+------+
| 19|  jack|  male|
| 18|   tom|  male|
| 27|jessic|female|
| 18|  hehe|female|
| 15|  haha|  male|
+---+------+------+

(三)DataFrame常见算子操作

  • printSchema()
  • show()
  • select()
  • filter()、where()
  • groupBy()
  • count()

使用案例

object DataFrameOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("DataFrameOpScala")
.config(conf)
.getOrCreate()val stuDf = sparkSession.read.json("D:\\student.json")
//打印schema信息
stuDf.printSchema()
//默认显示所有数据,可以通过参数控制显示多少条
stuDf.show(2)
//查询数据中的指定字段信息
stuDf.select("name","age").show()
//在使用select的时候可以对数据做一些操作,需要添加隐式转换函数,否则语法报错
import sparkSession.implicits._
stuDf.select($"name",$"age" + 1).show()
//对数据进行过滤,需要添加隐式转换函数,否则语法报错
stuDf.filter($"age">18).show()
//where底层调用的就是filter
stuDf.where($"age">18).show()
//对数据进行分组求和
stuDf.groupBy("age").count().show()
sparkSession.stop()
}
}

(四)DataFrame的sql操作

想要实现直接支持sql语句查询DataFrame中的数据
需要两步操作

  1. 先将DataFrame注册为一个临时表
  2. 使用sparkSession中的sql函数执行sql语句

案例实现:

object DataFrameSqlScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("DataFrameSqlScala")
.config(conf)
.getOrCreate()
val stuDf = sparkSession.read.json("D:\\student.json")
//将DataFrame注册为一个临时表
stuDf.createOrReplaceTempView("student")
//使用sql查询临时表中的数据
sparkSession.sql("select age,count(*) as num from student group by age")
.show()
sparkSession.stop()
}
}

(五)RDD转换为DataFrame

为什么要将RDD转换为DataFrame?
在实际工作中我们可能会先把hdfs上的一些日志数据加载进来,然后进行一些处理,最终变成结构化的数据,希望对这些数据做一些统计分析,当然了我们可以使用spark中提供transformation算子来实现,只不过会有一些麻烦,毕竟是需要写代码的,如果能够使用sql实现,其实是更加方便的。所以可以针对我们前面创建的RDD,将它转换为DataFrame,这样就可以使用dataFrame中的一些算子或者直接写sql来操作数据了。
Spark SQL支持这两种方式将RDD转换为DataFrame

  1. 反射方式
  2. 编程方式

(1)反射方式

这种方式是使用反射来推断RDD中的元数据。
基于反射的方式,代码比较简洁,也就是说当你在写代码的时候,已经知道了RDD中的元数据,这样的话使用反射这种方式是一种非常不错的选择。

Scala具有隐式转换的特性,所以spark sql的scala接口是支持自动将包含了case class的RDD转换为DataFrame的
下面来举一个例子:

object RddToDataFrameByReflectScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("RddToDataFrameByReflectScala")
.config(conf)
.getOrCreate()
//获取SparkContext
val sc = sparkSession.sparkContext
val dataRDD = sc.parallelize(Array(("jack",18),("tom",20),("jessic",30)))
//基于反射直接将包含Student对象的dataRDD转换为DataFrame
//需要导入隐式转换
import sparkSession.implicits._
val stuDf = dataRDD.map(tup=>Student(tup._1,tup._2)).toDF()
//下面就可以通过DataFrame的方式操作dataRDD中的数据了
stuDf.createOrReplaceTempView("student")
//执行sql查询
val resDf = sparkSession.sql("select name,age from student where age > 18
//将DataFrame转化为RDD
val resRDD = resDf.rdd
//从row中取数据,封装成student,打印到控制台
resRDD.map(row=>Student(row(0).toString,row(1).toString.toInt))
.collect()
.foreach(println(_))
//使用row的getAs()方法,获取指定列名的值
resRDD.map(row=>Student(row.getAs[String]("name"),row.getAs[Int]("age")))
.collect()
.foreach(println(_))
sparkSession.stop()
}
}
//定义一个Student
case class Student(name: String,age: Int)

(2)编程方式

接下来是编程的方式
这种方式是通过编程接口来创建DataFrame,你可以在程序运行时动态构建一份元数据,就是Schema,然后将其应用到已经存在的RDD上。这种方式的代码比较冗长,但是如果在编写程序时,还不知道RDD的元数据,只有在程序运行时,才能动态得知其元数据,那么只能通过这种动态构建元数据的方式。
也就是说当case calss中的字段无法预先定义的时候,就只能用编程方式动态指定元数据了

案例:

object RddToDataFrameByProgramScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("RddToDataFrameByProgramScala")
.config(conf)
.getOrCreate()
//获取SparkContext
val sc = sparkSession.sparkContext
val dataRDD = sc.parallelize(Array(("jack",18),("tom",20),("jessic",30)))
//组装rowRDD
val rowRDD = dataRDD.map(tup=>Row(tup._1,tup._2))
//指定元数据信息【这个元数据信息就可以动态从外部获取了,比较灵活】
val schema = StructType(Array(
StructField("name",StringType,true),
StructField("age",IntegerType,true)
))
//组装DataFrame
val stuDf = sparkSession.createDataFrame(rowRDD,schema)
//下面就可以通过DataFrame的方式操作dataRDD中的数据了
stuDf.createOrReplaceTempView("student")
//执行sql查询
val resDf = sparkSession.sql("select name,age from student where age > 18
//将DataFrame转化为RDD
val resRDD = resDf.rdd
resRDD.map(row=>(row(0).toString,row(1).toString.toInt))
.collect()
.foreach(println(_))
sparkSession.stop()
}
}

(六)load和save操作

对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。
load操作主要用于加载数据,创建出DataFrame;
save操作,主要用于将DataFrame中的数据保存到文件中。

(1)load操作

我们前面操作json格式的数据的时候好像没有使用load方法,而是直接使用的json方法,这是什么特殊用法吗?
查看json方法的源码会发现,它底层调用的是format和load方法
注意:如果看不到源码,需要点击idea右上角的download source提示信息下载依赖的源码。

此时如果不指定format,则默认读取的数据源格式是parquet,也可以手动指定数据源格式。Spark SQL
内置了一些常见的数据源类型,比如json, parquet, jdbc, orc, csv, text
通过这个功能,就可以在不同类型的数据源之间进行转换了。

案例:

object LoadAndSaveOpScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")
//创建SparkSession对象,里面包含SparkContext和SqlContext
val sparkSession = SparkSession.builder()
.appName("LoadAndSaveOpScala")
.config(conf)
.getOrCreate()
//读取数据
val stuDf = sparkSession.read
.format("json")
.load("D:\\student.json")
//保存数据
stuDf.select("name","age")
.write
.format("csv")
.save("hdfs://bigdata01:9000/out-save001")
sparkSession.stop()
}
}

执行代码,查看结果,csv文件是使用逗号分隔的:

jack,19
tom,18
jessic,27
hehe,18
haha,15

(2)SaveMode

Spark SQL对于save操作,提供了不同的save mode。
主要用来处理,当目标位置已经有数据时应该如何处理。save操作不会执行锁操作,并且也不是原子的,因此是有一定风险出现脏数据的。

SaveMode 解释
SaveMode.ErrorIfExists (默认) 如果目标位置已经存在数据,那么抛出一个异常
SaveMode.Append 如果目标位置已经存在数据,那么将数据追加进去
SaveMode.Overwrite 如果目标位置已经存在数据,那么就将已经存在的数据删
SaveMode.Ignore 如果目标位置已经存在数据,那么就忽略,不做任何操作

在LoadAndSaveOpScala中增加SaveMode的设置,重新执行,验证结果将SaveMode设置为Append,如果目标已存在,则追加

(七)内置函数

Spark提供了很多内置函数

种类 函数
聚合函数 avg, count, countDistinct, first, last, max, mean, min, sum,
集合函数 array_contains, explode, size
日期/时间函数 datediff, date_add, date_sub, add_months, last_day, next_day,
数学函数 abs, ceil, floor, round
混合函数 if, isnull, md5, not, rand, when
字符串函数 concat, get_json_object, length, reverse, split, upper
窗口函数 denseRank, rank, rowNumber

其实这里面的函数和hive中的函数是类似的
注意:SparkSQL中的SQL函数文档不全,其实在使用这些函数的时候,大家完全可以去查看hive中sql的文档,使用的时候都是一样的。

相关文章:

Spark SQL 学习总结

文章目录&#xff08;一&#xff09;Spark SQL&#xff08;二&#xff09;SParkSession&#xff08;三&#xff09;DataFrame常见算子操作&#xff08;四&#xff09;DataFrame的sql操作&#xff08;五&#xff09;RDD转换为DataFrame&#xff08;1&#xff09;反射方式&#x…...

深度学习 - 37.TF x Keras Deep Cross Network DCN 实现

目录 一.引言 二.模型简介 1.Embedding and stacking layer 2.Cross Network 2.1 模型架构分析 2.2 计算逻辑...

Ubuntu中使用Synaptic进行包管理

Synaptic概况 Synaptic 是一个轻量级的 apt 软件包管理器系统的 GUI 前端&#xff0c;所有你可以在终端中使用 apt-get 命令来做的事&#xff0c;都可以通过 Synaptic 来实现。优势 图形化安装界面&#xff0c;同时可以安装配置相关依赖&#xff0c;避免由于依赖问题导致的各类…...

python之selenium库安装及用法(定位法、获取文本、文本框输入、鼠标点击、滑动滚动条)

一、selenium库安装 pip install selenium二、浏览器驱动安装 谷歌浏览器驱动下载地址&#xff1a;https://chromedriver.storage.googleapis.com/index.html 根据你电脑的谷歌浏览器版本&#xff0c;下载相应的就行。我下载的是110.0.5481.XX中的chromedriver_win32.zip 下载…...

FPGA纯verilog实现图像视频旋转 串口指令控制旋转角度 提供工程源码和技术支持

目录1、前言2、理论基础3、设计思路和框架图像输入和采集图像旋转处理图像缓存图像输出4、vivado工程详解5、上板调试验证6、福利&#xff1a;工程代码的获取1、前言 图像旋转是一种常用的图像处理技术&#xff0c;其基本原理就是指图像以某一点为中心旋转一定的角度&#xff…...

EventGraph:Event Extraction as Semantic Graph Parsing 论文解读

EventGraph: Event Extraction as Semantic Graph Parsing 论文&#xff1a;2022.case-1.2.pdf (aclanthology.org) 代码&#xff1a;huiling-y/EventGraph (github.com) 期刊/会议&#xff1a;CASE 2022 摘要 事件抽取涉及到事件触发词和相应事件论元的检测和抽取。现有系…...

【蓝桥杯集训·每日一题】AcWing 3696. 构造有向无环图

文章目录一、题目1、原题链接2、题目描述二、解题报告1、思路分析2、时间复杂度3、代码详解三、知识风暴拓扑排序一、题目 1、原题链接 3696. 构造有向无环图 2、题目描述 给定一个由 n 个点和 m 条边构成的图。 不保证给定的图是连通的。 图中的一部分边的方向已经确定&#…...

国内vs国外:外贸建站该如何选择?

外贸建站找国内还是国外&#xff1f; 答案是&#xff1a;国内。 随着互联网的发展&#xff0c;越来越多的企业开始意识到在网络上进行商业活动的重要性。 其中&#xff0c;建立一个专业的外贸网站是企业在国际市场上拓展业务的关键。 然而&#xff0c;对于选择国内还是国外…...

HLS协议有哪些特别优势

阿酷TONY / 2023-3-3 / 长沙 可以实现码率的动态自适应&#xff0c;清晰度动态成为可能&#xff1b;HLS是基于HTTP 协议的&#xff0c;更易于做各平台的适配与兼容&#xff1b;多终端跨平台的支持性&#xff1a; PC端, Android端, IOS 平台&#xff0c;微信之类的都支持&am…...

JavaScript里的回调函数属于闭包吗?

回调函数本身不一定属于闭包&#xff0c;但是在某些情况下&#xff0c;它们可能会涉及闭包。 回调函数通常是指在异步操作完成时执行的函数。它们在 JavaScript 中被广泛使用&#xff0c;例如在处理 AJAX 请求、定时器、事件处理程序等方面。 在使用回调函数时&#xff0c;如…...

编程基本概念

程序的构成 python程序由模块组成&#xff0c;一个模块对应一个python源文件&#xff0c;后缀为.py模块由语句组成&#xff0c;运行python程序时&#xff0c;按照模块中的语句顺序依次执行语句是python程序的构造单元&#xff0c;用于创建对象&#xff0c;变量赋值&#xff0c…...

Azure OpenAI 官方指南02|ChatGPT 的架构设计与应用实例

ChatGPT 作为即将在微软全球 Azure 公有云平台正式发布的服务&#xff0c;已经迅速成为了众多用户关心的服务之一。而由 OpenAI 发布的 ChatGPT 产品&#xff0c;仅仅上线两个月&#xff0c;就成为互联网历史上最快突破一亿月活的应用。本期从技术角度深度解析 ChatGPT 的架构设…...

RK3568核心板以太网大数据测试报告-万象奥科

1. 测试对象HD-RK3568-IOT 底板基于HD-RK3568-CORE工业级核心板设计&#xff08;双网口、双CAN、5路串口&#xff09;&#xff0c;接口丰富&#xff0c;适用于工业现场应用需求&#xff0c;亦方便用户评估核心板及CPU的性能。适用于工业自动化控制、人机界面、中小型医疗分析器…...

来 CSDN 三年,我写了一本Python书

大家好&#xff0c;我是朱小五。转眼间已经来 CSDN 3年了&#xff0c;其中给大家一共分享了252篇Python文章。 但这三年&#xff0c;最大的收获还是写了一本Python书&#xff01; 在这个自动化时代&#xff0c;我们有很多重复无聊的工作要做。想想这些你不再需要一次又一次地做…...

TIA博途中通过SCL语言实现快速排序的具体方法示例

TIA博途中通过SCL语言实现快速排序的具体方法示例 首先,了解一下快速排序的原理: 排序算法的思想非常简单,在待排序的数列中,我们首先要找一个数字作为基准数。为了方便,我们一般选择第 1 个数字作为基准数(其实选择第几个并没有关系)。接下来我们需要将这个待排序的数列…...

第 46 届世界技能大赛浙江省选拔赛“网络安全“项目B模块任务书

第46届世界技能大赛浙江省选拔赛"网络安全"项目B模块&#xff08;网络安全事件响应、数字取证调查&#xff09;第46届世界技能大赛浙江省选拔赛"网络安全"项目B模块2.1 第一部分 事件响应2.2 第二部分 数字取证调查2.3 第三部分 应用程序安全第46届世界技能…...

【C】字符串操作函数

初始化字符串 #include <string.h>void *memset(void *s, int c, size_t n); 返回值&#xff1a;s指向哪&#xff0c;返回的指针就指向哪memset函数把s所指的内存地址开始的n个字节都填充为c的值。通常c的值为0&#xff0c;把一块内存区清零。例如定义char buf[10];&…...

【python】 pytest自动化测试框架--selenium,requests,appium自动化工具

一、pytest简介 pytest是python的一种单元测试框架&#xff0c;与python自带的unittest测试框架类似&#xff0c;但是比unittest框架使用起来更简洁&#xff0c;效率更高 二、 pytest 单元测试框架 1、pytest 特点 pytest是python当中的一个单元框架&#xff0c;比unittest更灵…...

Spring boot 实战指南(三):配置事务,整合Elasticsearch、swagger、redis、rabbitMQ

文章目录一、配置事务依赖使用注解二、Elasticsearch创建项目配置maven完善依赖es连接配置实体映射repositoryservicecontroller三、swagger依赖启动类路径匹配配置配置类controller注解四、redis&#xff08;代码实现&#xff09;依赖yml配置配置类封装redisTamplate五、rabbi…...

九、Bean的循环依赖问题

1 什么是Bean的循环依赖 A对象中有B属性。B对象中有A属性。这就是循环依赖。我依赖你&#xff0c;你也依赖我。 比如&#xff1a;丈夫类Husband&#xff0c;妻子类Wife。Husband中有Wife的引用。Wife中有Husband的引用。 2 singleton下的set注入产生的循环依赖 丈夫类 pac…...

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

HTML 语义化

目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案&#xff1a; 语义化标签&#xff1a; <header>&#xff1a;页头<nav>&#xff1a;导航<main>&#xff1a;主要内容<article>&#x…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;支持 Python 3.6。它基于标准 Python 类型提示&#xff0c;易于学习且功能强大。以下是一个完整的 FastAPI 入门教程&#xff0c;涵盖从环境搭建到创建并运行一个简单的…...

HTML 列表、表格、表单

1 列表标签 作用&#xff1a;布局内容排列整齐的区域 列表分类&#xff1a;无序列表、有序列表、定义列表。 例如&#xff1a; 1.1 无序列表 标签&#xff1a;ul 嵌套 li&#xff0c;ul是无序列表&#xff0c;li是列表条目。 注意事项&#xff1a; ul 标签里面只能包裹 li…...

家政维修平台实战20:权限设计

目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系&#xff0c;主要是分成几个表&#xff0c;用户表我们是记录用户的基础信息&#xff0c;包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题&#xff0c;不同的角色&#xf…...

反射获取方法和属性

Java反射获取方法 在Java中&#xff0c;反射&#xff08;Reflection&#xff09;是一种强大的机制&#xff0c;允许程序在运行时访问和操作类的内部属性和方法。通过反射&#xff0c;可以动态地创建对象、调用方法、改变属性值&#xff0c;这在很多Java框架中如Spring和Hiberna…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术&#xff1a;基于互相关的相干体技术&#xff08;Correlation&#xff09;第二代相干体技术&#xff1a;基于相似的相干体技术&#xff08;Semblance&#xff09;基于多道相似的相干体…...