SparkSQL-初识
一、概览
Spark SQL and DataFrames - Spark 3.5.2 Documentation
我们先看下官网的描述:
SparkSQL是用于结构化数据处理的Spark模块,与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部,Spark SQL使用这些额外信息来执行额外的优化。
Spark SQL的一个用途是执行SQL查询。Spark SQL还可以用于从现有的Hive库表中读取数据。在另一种编程语言中运行SQL时,结果将作为Dataset/DataFrame返回。也可以使用命令行或通过JDBC/ODBC与执行SQL。
二、什么是Dataset
Dataset是Spark 1.6中添加的一个新接口,是数据的分布式集合。它兼容RDD和SparkSQL的优点:
1、RDD:强类型、使用强大lambda函数的能力,可以使用map()、flatMap()、filter()等转换算子
2、Spark SQL:优化执行引擎,可以使用select()、where()、groupBy()等DSL语法
Dataset是惰性的,只有在调用action算子时才会触发计算。在内部,Dataset表示一个逻辑计划,描述了生成数据所需的计算。当调用一个操作时,Spark的查询优化器会优化逻辑计划,并生成一个物理计划,以并行和分布式的方式高效执行。
从源码中我们可以看到,需要给定一个特定于域的类型“T”映射到Spark的内部类型系统。
class Dataset[T] private[sql](@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,@DeveloperApi @Unstable @transient val encoder: Encoder[T])extends Serializable {//...........}
如果时基本类型可以通过导入spark.implicits来支持,如果是对象类型,需要自己定义,比如
case class Person(name: String, age: Long)
encoder会用于告诉Spark在运行时生成代码,将“Person”对象序列化为二进制结构。这种二进制结构通常具有更低的内存占用,并且针对数据处理的效率进行了优化(例如,以列格式)。
三、什么是DataFrame
通常调用spark.sql("select * from xxxxx") 或者 spark.read.json("xxx/xxx.json")时会返回DataFrame
下面我们看下DataFrame的类型是什么
package org.apache.spark
package object sql {type DataFrame = Dataset[Row]}
从源码中我们可以看到 DataFrame只是Dataset[Row]的一个类型别名
DataFrame是一个组织成命名列的数据集。它在概念上相当于关系数据库中的表或R/Python中的DataFrame,但底层有更丰富的优化。DataFrames可以从各种来源构建,例如:结构化数据文件、Hive中的表、外部数据库或现有的RDD。DataFrame API在Scala、Java、Python和R中可用。
四、SparkSession
它是使用Dataset和DataFrame API对Spark编程的入口点
创建一个新SparkSession
SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
我们来看下它的部分源码:
class SparkSession private(@transient val sparkContext: SparkContext,@transient private val existingSharedState: Option[SharedState],@transient private val parentSessionState: Option[SessionState],@transient private[sql] val extensions: SparkSessionExtensions,@transient private[sql] val initialSessionOptions: Map[String, String])extends Serializable with Closeable with Logging { self =>//此会话的封装版本为[[SQLContext]]形式,以实现向后兼容性val sqlContext: SQLContext = new SQLContext(this)//向 QueryExecutionListener 注册来监听查询度量def listenerManager: ExecutionListenerManager = sessionState.listenerManager//用于注册用户定义函数(UDF)//以下示例将Scala闭包注册为UDF://sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)def udf: UDFRegistration = sessionState.udfRegistrationdef streams: StreamingQueryManager = sessionState.streamingQueryManager//以各种方式创建DataFramedef createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = withActive {//........}def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = withActive {//........}def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = withActive {//........}def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = withActive {//........}def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = withActive {//........}//以各种方式创建Datasetdef createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {//........}//用户可以通过该界面创建、删除、更改或查询底层数据库、表、函数等@transient lazy val catalog: Catalog = new CatalogImpl(self)def table(tableName: String): DataFrame = {read.table(tableName)}//使用Spark执行SQL查询,将结果作为“DataFrame”返回。这个API急切地运行DDL/DML命令,但不用于SELECT查询。def sql(sqlText: String): DataFrame = withActive {//........}//在外部执行引擎而不是Spark中执行任意字符串命令。//当用户想在Spark外执行某些命令时,这可能很有用。//例如,为JDBC执行自定义DDL/DML命令,为ElasticSearch创建索引,为Solr创建内核等等。//调用此方法后,命令将被热切执行,返回的DataFrame将包含命令的输出(如果有的话)。def executeCommand(runner: String, command: String, options: Map[String, String]): DataFrame = {//........}//返回一个DataFrameReader,可用于将非流数据作为“DataFrame”读取。//示例:// sparkSession.read.parquet("/path/to/file.parquet")// sparkSession.read.schema(schema).json("/path/to/file.json")def read: DataFrameReader = new DataFrameReader(self)//禁用样式检查器,以便“隐含”对象可以以小写i开头//(特定于Scala)Scala中提供的隐式方法,用于将常见的Scala对象转换为`DataFrame`。object implicits extends SQLImplicits with Serializable {protected override def _sqlContext: SQLContext = SparkSession.this.sqlContext}}object SparkSession extends Logging {class Builder extends Logging {//为应用程序设置一个名称,该名称将显示在Spark web UI中。//如果没有设置应用程序名称,将使用随机生成的名称。def appName(name: String): Builder = config("spark.app.name", name)//设置配置选项。使用此方法设置的选项会自动传播到“SparkConf”和SparkSession自己的配置中。def config(key: String, value: String): Builder = synchronized {options += key -> valuethis}def config(conf: SparkConf): Builder = synchronized {conf.getAll.foreach { case (k, v) => options += k -> v }this}//设置要连接的Spark主URL,//例如“local”在本地运行,“local[4]”在4核本地运行,//或“spark://master:7077“在Spark独立集群上运行。def master(master: String): Builder = config("spark.master", master)//启用Hive支持,包括连接到持久Hive元存储、支持Hive服务器和Hive用户定义函数。def enableHiveSupport(): Builder = synchronized {if (hiveClassesArePresent) {config(CATALOG_IMPLEMENTATION.key, "hive")} else {抛异常 : 无法使用Hive支持实例化SparkSession,因为找不到Hive相关类}}//将扩展注入[[SparkSession]]。这允许用户添加分析器规则、优化器规则、//计划策略或自定义解析器。def withExtensions(f: SparkSessionExtensions => Unit): Builder = synchronized {f(extensions)this}//获取一个现有的 SparkSession,如果没有现有的,则创建一个新的。def getOrCreate(): SparkSession = synchronized {//......省略.......}}//创建一个SparkSession.Builder来构造一个SparkSessiondef builder(): Builder = new Builder}
五、使用示例
我们以Spark源码中的examples为例来看下SparkSQL是如何使用的

examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
1、创建DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")//将DataFrame的内容显示到stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+//使用$符号需要此导入
import spark.implicits._
//以树格式打印schema
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
//只选择name列进行打印
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+// 选择所有的列,但是对age列分别进行加1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+// 选择年龄大于21的人
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+// 按年龄统计人数
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+//将DataFrame注册为SQL临时视图
df.createOrReplaceTempView("people")val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
2、创建Dataset
import spark.implicits._
// 创建编码器
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+// 大多数常见类型的编码器是通过导入spark.implicits自动提供的_
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// 通过提供类,DataFrames可以转换为Dataset。映射将按名称完成
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
3、用户自定义函数
// 定义并注册零参数非确定性UDF
// 默认情况下,UDF是确定性的,即对相同的输入产生相同的结果。
val random = udf(() => Math.random())
spark.udf.register("random", random.asNondeterministic())
spark.sql("SELECT random()").show()
// +-------+
// |UDF() |
// +-------+
// |xxxxxxx|
// +-------+// 定义并注册一个单参数UDF
val plusOne = udf((x: Int) => x + 1)
spark.udf.register("plusOne", plusOne)
spark.sql("SELECT plusOne(5)").show()
// +------+
// |UDF(5)|
// +------+
// | 6|
// +------+// 定义一个双参数UDF,并在一个步骤中向Spark注册
spark.udf.register("strLenScala", (_: String).length + (_: Int))
spark.sql("SELECT strLenScala('test', 1)").show()
// +--------------------+
// |strLenScala(test, 1)|
// +--------------------+
// | 5|
// +--------------------+//WHERE子句中的UDF
spark.udf.register("oneArgFilter", (n: Int) => { n > 5 })
spark.range(1, 10).createOrReplaceTempView("test")
spark.sql("SELECT * FROM test WHERE oneArgFilter(id)").show()
// +---+
// | id|
// +---+
// | 6|
// | 7|
// | 8|
// | 9|
// +---+
4、基于hive使用
使用Hive时,必须使用Hive支持实例化“SparkSession”,即enableHiveSupport()。包括连接到持久Hive元存储、支持Hive服务器和Hive用户定义函数。没有现有Hive部署的用户仍然可以启用Hive支持。当未由hive-site.xml配置时,上下文会自动在当前目录中创建“metastore_db”,并创建一个由“spark.sql.house.dir”配置的目录,该目录默认为启动spark应用程序的当前目录中的“spark warehouse”目录。
import spark.implicits._
import spark.sqlsql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// 普通查询
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...// 聚合查询
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
相关文章:
SparkSQL-初识
一、概览 Spark SQL and DataFrames - Spark 3.5.2 Documentation 我们先看下官网的描述: SparkSQL是用于结构化数据处理的Spark模块,与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部&a…...
Go语言的垃圾回收(GC)机制的迭代和优化历史
Go语言的垃圾回收(GC)机制自Go语言发布以来经历了多次重要的迭代和优化,以提高性能和减少程序运行时的停顿时间。 以下是一些关键的版本和相应的GC优化: Go版本GC耗时情况主要改进点Go 1.0-1.4可能达到几百毫秒至秒级使用简单的标…...
thinkphp8 从入门到放弃(后面会完善用到哪里写到哪)
thinkphp8 从入门到放弃 引言 thinkphp* 大道至简一、 thinkphp8 安装安装Composerthinkphp 安装命令(tp-项目名称)多应用安装(一个项目不会只有一个应用)安装完文件目录如下本地部署配置伪静态好了项目可以run 二、架构服务(Service…...
对于电商跨境电商独立站中源代码建站和SaaS建站的区别
电商跨境电商独立站的搭建有多种方式,作为电商企业,搭建完全自主控制的电商独立站,对于电商企业的发展和运营有着至关重要的作用。下面推荐一个使用多年的跨境电商独立站系统源码,做简要介绍,据说前段时间火爆的Pandab…...
使用vite+react+ts+Ant Design开发后台管理项目(二)
前言 本文将引导开发者从零基础开始,运用vite、react、react-router、react-redux、Ant Design、less、tailwindcss、axios等前沿技术栈,构建一个高效、响应式的后台管理系统。通过详细的步骤和实践指导,文章旨在为开发者揭示如何利用这些技术…...
C++之 string(中)
C之 string string类对象的容量操作 resize 将有效字符的个数该成n个,多出的空间用字符c填充 虽然在string里用的不多,但是在vector里面常见 这里有三种情况: 1)resize小于当前的size 2)resize大于当前的size,小于capacity …...
双向链表的基本结构及功能实现
1.基本结构: 双向链表是一种链表数据结构,它由一系列节点组成,每个节点包含三个部分: (1).数据域:存储节点的数据 (2).前驱指针:指向前一个节点 (3).后驱指针:指向下一个节点 2.基本特性: 双向链接: 与单向链表…...
stm32定时触发软件中断
这里使用定时器作为延时,单位为秒,使用exti的软件触发方式,配置见代码,在main里进行触发软件中断 代码 #include "stm32f10x.h" #include "stm32f10x_gpio.h" #include "misc.h" #include "…...
blender设置背景图怎么添加?blender云渲染选择
Blender是一款功能强大的3D建模软件,它以流畅的操作体验和直观的用户界面而闻名。使用Blender,你可以轻松地为你的3D模型添加背景图片。 以下是具体的操作步骤: 1、启动Blender:首先,打开Blender软件。访问添加菜单&a…...
MMD模型及动作一键完美导入UE5-Blender方案(三)
1、下载并安装blender_mmd_tools插件 1、下载并安装Blender,Blender,下载Blender3.6,下载太新的版本可能会跟blender_mmd_tools不匹配 2、github下载blender_mmd_tools:https://github.com/UuuNyaa/blender_mmd_tools/ 3、Edit->Preference->Add ons->Install F…...
网络安全自学入门:(超详细)从入门到精通学习路线规划,学完即可就业
很多人上来就说想学习黑客,但是连方向都没搞清楚就开始学习,最终也只是会无疾而终!黑客是一个大的概念,里面包含了许多方向,不同的方向需要学习的内容也不一样。 算上从学校开始学习,已经在网安这条路上走…...
如何在O2OA中使用ElementUI组件进行审批流程工作表单设计
本文主要介绍如何在O2OA中进行审批流程表单或者工作流表单设计,O2OA主要采用拖拽可视化开发的方式完成流程表单的设计和配置,不需要过多的代码编写,业务人员可以直接进行修改操作。 在流程表单设计界面,可以在左边的工具栏找到Ele…...
三、LLM应用开发准备工作
LLM应用开发准备工作 开发基础开发工具大模型kxswkey的配置与使用工具推荐结语 开发基础 最好具备一定的Python开发基础,不需要特别深 如果不具备,可以先学习一下基础知识(概念),比如Python环境管理、包管理与使用、基…...
机器学习-可解释性机器学习:随机森林与fastshap的可视化模型解析
可解释性机器学习是指使机器学习模型的决策过程透明化,帮助用户理解模型如何得出特定结果。随机森林和 FastSHAP 是常用的工具,以下是对它们的简要解析和可视化方法。 随机森林 1. 概述 随机森林是一种集成学习方法,通过构建多个决策树并结…...
使用Assimp加载glb/gltf文件,然后使用Qt3D来渲染
文章目录 1.代码2.说明2.1.调用2.2.关于贴图 1.代码 ModelLoader.h #ifndef MODELLOADER_H #define MODELLOADER_H#include <QObject> #include <Qt3DRender> #include <QVector3D> #include <QGeometry>#include <assimp/Importer.hpp> #incl…...
vue实现左侧数据拖拽到右侧区域,且左侧数据保留且左侧数据不能互相拖拽改变顺序
一、案例效果 二、案例代码 封装左侧抽屉 DrawerSearch.vue<template><div><mtd-form :model="formDrawerSearch" ref="formCustom" inline><mtd-form-item><mtd-inputtype="text"v-model="formDrawerSearch.ho…...
人工智能与机器学习原理精解【21】
文章目录 SVM求两线段上距离最近的两个点问题描述:距离函数:解法:具体步骤:特别注意:示例代码 SVM思想的介入1. **SVM 的基本思想**超平面: 2. **分类间隔(Margin)**1. **分类间隔的…...
【MySQL 01】数据库基础
目录 1.数据库是什么 2.基本操作 数据库服务器连接操作 数据库和数据库表的创建 服务器,数据库,表关系 数据逻辑存储 3.MySQL架构 4.SQL分类 5.存储引擎 1.数据库是什么 mysql&&mysqld: mysql:这通常指的是 MySQL …...
C语言字符学习中级使用库解决问题
学习C语言中的字符处理,对于初学者来说,理解字符的基本概念以及如何进行操作是非常重要的。字符处理是指对单个字符或一组字符(字符串)的操作。为了更好地理解,下面从基础开始介绍,并结合一些常用的函数和示…...
网络管理:网络故障排查指南
在现代IT环境中,网络故障是不可避免的。快速、有效地排查和解决网络故障是确保业务连续性和用户满意度的关键。本文将详细介绍网络故障排查的基本方法和步骤,确保内容通俗易懂,并配以代码示例和必要的图片说明。 一、网络故障排查的基本步骤 确认故障现象 确认用户报告的故…...
盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来
一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...
C++八股 —— 单例模式
文章目录 1. 基本概念2. 设计要点3. 实现方式4. 详解懒汉模式 1. 基本概念 线程安全(Thread Safety) 线程安全是指在多线程环境下,某个函数、类或代码片段能够被多个线程同时调用时,仍能保证数据的一致性和逻辑的正确性…...
让回归模型不再被异常值“带跑偏“,MSE和Cauchy损失函数在噪声数据环境下的实战对比
在机器学习的回归分析中,损失函数的选择对模型性能具有决定性影响。均方误差(MSE)作为经典的损失函数,在处理干净数据时表现优异,但在面对包含异常值的噪声数据时,其对大误差的二次惩罚机制往往导致模型参数…...
七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
Ubuntu Cursor升级成v1.0
0. 当前版本低 使用当前 Cursor v0.50时 GitHub Copilot Chat 打不开,快捷键也不好用,当看到 Cursor 升级后,还是蛮高兴的 1. 下载 Cursor 下载地址:https://www.cursor.com/cn/downloads 点击下载 Linux (x64) ,…...
vue3 daterange正则踩坑
<el-form-item label"空置时间" prop"vacantTime"> <el-date-picker v-model"form.vacantTime" type"daterange" start-placeholder"开始日期" end-placeholder"结束日期" clearable :editable"fal…...
自然语言处理——文本分类
文本分类 传统机器学习方法文本表示向量空间模型 特征选择文档频率互信息信息增益(IG) 分类器设计贝叶斯理论:线性判别函数 文本分类性能评估P-R曲线ROC曲线 将文本文档或句子分类为预定义的类或类别, 有单标签多类别文本分类和多…...
