SparkSQL-初识
一、概览
Spark SQL and DataFrames - Spark 3.5.2 Documentation
我们先看下官网的描述:
SparkSQL是用于结构化数据处理的Spark模块,与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部,Spark SQL使用这些额外信息来执行额外的优化。
Spark SQL的一个用途是执行SQL查询。Spark SQL还可以用于从现有的Hive库表中读取数据。在另一种编程语言中运行SQL时,结果将作为Dataset/DataFrame返回。也可以使用命令行或通过JDBC/ODBC与执行SQL。
二、什么是Dataset
Dataset是Spark 1.6中添加的一个新接口,是数据的分布式集合。它兼容RDD和SparkSQL的优点:
1、RDD:强类型、使用强大lambda函数的能力,可以使用map()、flatMap()、filter()等转换算子
2、Spark SQL:优化执行引擎,可以使用select()、where()、groupBy()等DSL语法
Dataset是惰性的,只有在调用action算子时才会触发计算。在内部,Dataset表示一个逻辑计划,描述了生成数据所需的计算。当调用一个操作时,Spark的查询优化器会优化逻辑计划,并生成一个物理计划,以并行和分布式的方式高效执行。
从源码中我们可以看到,需要给定一个特定于域的类型“T”映射到Spark的内部类型系统。
class Dataset[T] private[sql](@DeveloperApi @Unstable @transient val queryExecution: QueryExecution,@DeveloperApi @Unstable @transient val encoder: Encoder[T])extends Serializable {//...........}
如果时基本类型可以通过导入spark.implicits来支持,如果是对象类型,需要自己定义,比如
case class Person(name: String, age: Long)
encoder会用于告诉Spark在运行时生成代码,将“Person”对象序列化为二进制结构。这种二进制结构通常具有更低的内存占用,并且针对数据处理的效率进行了优化(例如,以列格式)。
三、什么是DataFrame
通常调用spark.sql("select * from xxxxx") 或者 spark.read.json("xxx/xxx.json")时会返回DataFrame
下面我们看下DataFrame的类型是什么
package org.apache.spark
package object sql {type DataFrame = Dataset[Row]}
从源码中我们可以看到 DataFrame只是Dataset[Row]的一个类型别名
DataFrame是一个组织成命名列的数据集。它在概念上相当于关系数据库中的表或R/Python中的DataFrame,但底层有更丰富的优化。DataFrames可以从各种来源构建,例如:结构化数据文件、Hive中的表、外部数据库或现有的RDD。DataFrame API在Scala、Java、Python和R中可用。
四、SparkSession
它是使用Dataset和DataFrame API对Spark编程的入口点
创建一个新SparkSession
SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
我们来看下它的部分源码:
class SparkSession private(@transient val sparkContext: SparkContext,@transient private val existingSharedState: Option[SharedState],@transient private val parentSessionState: Option[SessionState],@transient private[sql] val extensions: SparkSessionExtensions,@transient private[sql] val initialSessionOptions: Map[String, String])extends Serializable with Closeable with Logging { self =>//此会话的封装版本为[[SQLContext]]形式,以实现向后兼容性val sqlContext: SQLContext = new SQLContext(this)//向 QueryExecutionListener 注册来监听查询度量def listenerManager: ExecutionListenerManager = sessionState.listenerManager//用于注册用户定义函数(UDF)//以下示例将Scala闭包注册为UDF://sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)def udf: UDFRegistration = sessionState.udfRegistrationdef streams: StreamingQueryManager = sessionState.streamingQueryManager//以各种方式创建DataFramedef createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = withActive {//........}def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = withActive {//........}def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = withActive {//........}def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = withActive {//........}def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = withActive {//........}//以各种方式创建Datasetdef createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {//........}//用户可以通过该界面创建、删除、更改或查询底层数据库、表、函数等@transient lazy val catalog: Catalog = new CatalogImpl(self)def table(tableName: String): DataFrame = {read.table(tableName)}//使用Spark执行SQL查询,将结果作为“DataFrame”返回。这个API急切地运行DDL/DML命令,但不用于SELECT查询。def sql(sqlText: String): DataFrame = withActive {//........}//在外部执行引擎而不是Spark中执行任意字符串命令。//当用户想在Spark外执行某些命令时,这可能很有用。//例如,为JDBC执行自定义DDL/DML命令,为ElasticSearch创建索引,为Solr创建内核等等。//调用此方法后,命令将被热切执行,返回的DataFrame将包含命令的输出(如果有的话)。def executeCommand(runner: String, command: String, options: Map[String, String]): DataFrame = {//........}//返回一个DataFrameReader,可用于将非流数据作为“DataFrame”读取。//示例:// sparkSession.read.parquet("/path/to/file.parquet")// sparkSession.read.schema(schema).json("/path/to/file.json")def read: DataFrameReader = new DataFrameReader(self)//禁用样式检查器,以便“隐含”对象可以以小写i开头//(特定于Scala)Scala中提供的隐式方法,用于将常见的Scala对象转换为`DataFrame`。object implicits extends SQLImplicits with Serializable {protected override def _sqlContext: SQLContext = SparkSession.this.sqlContext}}object SparkSession extends Logging {class Builder extends Logging {//为应用程序设置一个名称,该名称将显示在Spark web UI中。//如果没有设置应用程序名称,将使用随机生成的名称。def appName(name: String): Builder = config("spark.app.name", name)//设置配置选项。使用此方法设置的选项会自动传播到“SparkConf”和SparkSession自己的配置中。def config(key: String, value: String): Builder = synchronized {options += key -> valuethis}def config(conf: SparkConf): Builder = synchronized {conf.getAll.foreach { case (k, v) => options += k -> v }this}//设置要连接的Spark主URL,//例如“local”在本地运行,“local[4]”在4核本地运行,//或“spark://master:7077“在Spark独立集群上运行。def master(master: String): Builder = config("spark.master", master)//启用Hive支持,包括连接到持久Hive元存储、支持Hive服务器和Hive用户定义函数。def enableHiveSupport(): Builder = synchronized {if (hiveClassesArePresent) {config(CATALOG_IMPLEMENTATION.key, "hive")} else {抛异常 : 无法使用Hive支持实例化SparkSession,因为找不到Hive相关类}}//将扩展注入[[SparkSession]]。这允许用户添加分析器规则、优化器规则、//计划策略或自定义解析器。def withExtensions(f: SparkSessionExtensions => Unit): Builder = synchronized {f(extensions)this}//获取一个现有的 SparkSession,如果没有现有的,则创建一个新的。def getOrCreate(): SparkSession = synchronized {//......省略.......}}//创建一个SparkSession.Builder来构造一个SparkSessiondef builder(): Builder = new Builder}
五、使用示例
我们以Spark源码中的examples为例来看下SparkSQL是如何使用的
examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
1、创建DataFrame
val df = spark.read.json("examples/src/main/resources/people.json")//将DataFrame的内容显示到stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+//使用$符号需要此导入
import spark.implicits._
//以树格式打印schema
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
//只选择name列进行打印
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+// 选择所有的列,但是对age列分别进行加1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+// 选择年龄大于21的人
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+// 按年龄统计人数
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+//将DataFrame注册为SQL临时视图
df.createOrReplaceTempView("people")val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
2、创建Dataset
import spark.implicits._
// 创建编码器
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+// 大多数常见类型的编码器是通过导入spark.implicits自动提供的_
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)// 通过提供类,DataFrames可以转换为Dataset。映射将按名称完成
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
3、用户自定义函数
// 定义并注册零参数非确定性UDF
// 默认情况下,UDF是确定性的,即对相同的输入产生相同的结果。
val random = udf(() => Math.random())
spark.udf.register("random", random.asNondeterministic())
spark.sql("SELECT random()").show()
// +-------+
// |UDF() |
// +-------+
// |xxxxxxx|
// +-------+// 定义并注册一个单参数UDF
val plusOne = udf((x: Int) => x + 1)
spark.udf.register("plusOne", plusOne)
spark.sql("SELECT plusOne(5)").show()
// +------+
// |UDF(5)|
// +------+
// | 6|
// +------+// 定义一个双参数UDF,并在一个步骤中向Spark注册
spark.udf.register("strLenScala", (_: String).length + (_: Int))
spark.sql("SELECT strLenScala('test', 1)").show()
// +--------------------+
// |strLenScala(test, 1)|
// +--------------------+
// | 5|
// +--------------------+//WHERE子句中的UDF
spark.udf.register("oneArgFilter", (n: Int) => { n > 5 })
spark.range(1, 10).createOrReplaceTempView("test")
spark.sql("SELECT * FROM test WHERE oneArgFilter(id)").show()
// +---+
// | id|
// +---+
// | 6|
// | 7|
// | 8|
// | 9|
// +---+
4、基于hive使用
使用Hive时,必须使用Hive支持实例化“SparkSession”,即enableHiveSupport()。包括连接到持久Hive元存储、支持Hive服务器和Hive用户定义函数。没有现有Hive部署的用户仍然可以启用Hive支持。当未由hive-site.xml配置时,上下文会自动在当前目录中创建“metastore_db”,并创建一个由“spark.sql.house.dir”配置的目录,该目录默认为启动spark应用程序的当前目录中的“spark warehouse”目录。
import spark.implicits._
import spark.sqlsql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// 普通查询
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...// 聚合查询
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
相关文章:

SparkSQL-初识
一、概览 Spark SQL and DataFrames - Spark 3.5.2 Documentation 我们先看下官网的描述: SparkSQL是用于结构化数据处理的Spark模块,与基本的Spark RDD API不同。Spark SQL提供的接口为Spark提供了更多关于正在执行的数据和计算结构的信息。在内部&a…...

Go语言的垃圾回收(GC)机制的迭代和优化历史
Go语言的垃圾回收(GC)机制自Go语言发布以来经历了多次重要的迭代和优化,以提高性能和减少程序运行时的停顿时间。 以下是一些关键的版本和相应的GC优化: Go版本GC耗时情况主要改进点Go 1.0-1.4可能达到几百毫秒至秒级使用简单的标…...

thinkphp8 从入门到放弃(后面会完善用到哪里写到哪)
thinkphp8 从入门到放弃 引言 thinkphp* 大道至简一、 thinkphp8 安装安装Composerthinkphp 安装命令(tp-项目名称)多应用安装(一个项目不会只有一个应用)安装完文件目录如下本地部署配置伪静态好了项目可以run 二、架构服务(Service…...

对于电商跨境电商独立站中源代码建站和SaaS建站的区别
电商跨境电商独立站的搭建有多种方式,作为电商企业,搭建完全自主控制的电商独立站,对于电商企业的发展和运营有着至关重要的作用。下面推荐一个使用多年的跨境电商独立站系统源码,做简要介绍,据说前段时间火爆的Pandab…...

使用vite+react+ts+Ant Design开发后台管理项目(二)
前言 本文将引导开发者从零基础开始,运用vite、react、react-router、react-redux、Ant Design、less、tailwindcss、axios等前沿技术栈,构建一个高效、响应式的后台管理系统。通过详细的步骤和实践指导,文章旨在为开发者揭示如何利用这些技术…...

C++之 string(中)
C之 string string类对象的容量操作 resize 将有效字符的个数该成n个,多出的空间用字符c填充 虽然在string里用的不多,但是在vector里面常见 这里有三种情况: 1)resize小于当前的size 2)resize大于当前的size,小于capacity …...

双向链表的基本结构及功能实现
1.基本结构: 双向链表是一种链表数据结构,它由一系列节点组成,每个节点包含三个部分: (1).数据域:存储节点的数据 (2).前驱指针:指向前一个节点 (3).后驱指针:指向下一个节点 2.基本特性: 双向链接: 与单向链表…...

stm32定时触发软件中断
这里使用定时器作为延时,单位为秒,使用exti的软件触发方式,配置见代码,在main里进行触发软件中断 代码 #include "stm32f10x.h" #include "stm32f10x_gpio.h" #include "misc.h" #include "…...

blender设置背景图怎么添加?blender云渲染选择
Blender是一款功能强大的3D建模软件,它以流畅的操作体验和直观的用户界面而闻名。使用Blender,你可以轻松地为你的3D模型添加背景图片。 以下是具体的操作步骤: 1、启动Blender:首先,打开Blender软件。访问添加菜单&a…...

MMD模型及动作一键完美导入UE5-Blender方案(三)
1、下载并安装blender_mmd_tools插件 1、下载并安装Blender,Blender,下载Blender3.6,下载太新的版本可能会跟blender_mmd_tools不匹配 2、github下载blender_mmd_tools:https://github.com/UuuNyaa/blender_mmd_tools/ 3、Edit->Preference->Add ons->Install F…...

网络安全自学入门:(超详细)从入门到精通学习路线规划,学完即可就业
很多人上来就说想学习黑客,但是连方向都没搞清楚就开始学习,最终也只是会无疾而终!黑客是一个大的概念,里面包含了许多方向,不同的方向需要学习的内容也不一样。 算上从学校开始学习,已经在网安这条路上走…...

如何在O2OA中使用ElementUI组件进行审批流程工作表单设计
本文主要介绍如何在O2OA中进行审批流程表单或者工作流表单设计,O2OA主要采用拖拽可视化开发的方式完成流程表单的设计和配置,不需要过多的代码编写,业务人员可以直接进行修改操作。 在流程表单设计界面,可以在左边的工具栏找到Ele…...

三、LLM应用开发准备工作
LLM应用开发准备工作 开发基础开发工具大模型kxswkey的配置与使用工具推荐结语 开发基础 最好具备一定的Python开发基础,不需要特别深 如果不具备,可以先学习一下基础知识(概念),比如Python环境管理、包管理与使用、基…...

机器学习-可解释性机器学习:随机森林与fastshap的可视化模型解析
可解释性机器学习是指使机器学习模型的决策过程透明化,帮助用户理解模型如何得出特定结果。随机森林和 FastSHAP 是常用的工具,以下是对它们的简要解析和可视化方法。 随机森林 1. 概述 随机森林是一种集成学习方法,通过构建多个决策树并结…...

使用Assimp加载glb/gltf文件,然后使用Qt3D来渲染
文章目录 1.代码2.说明2.1.调用2.2.关于贴图 1.代码 ModelLoader.h #ifndef MODELLOADER_H #define MODELLOADER_H#include <QObject> #include <Qt3DRender> #include <QVector3D> #include <QGeometry>#include <assimp/Importer.hpp> #incl…...

vue实现左侧数据拖拽到右侧区域,且左侧数据保留且左侧数据不能互相拖拽改变顺序
一、案例效果 二、案例代码 封装左侧抽屉 DrawerSearch.vue<template><div><mtd-form :model="formDrawerSearch" ref="formCustom" inline><mtd-form-item><mtd-inputtype="text"v-model="formDrawerSearch.ho…...

人工智能与机器学习原理精解【21】
文章目录 SVM求两线段上距离最近的两个点问题描述:距离函数:解法:具体步骤:特别注意:示例代码 SVM思想的介入1. **SVM 的基本思想**超平面: 2. **分类间隔(Margin)**1. **分类间隔的…...

【MySQL 01】数据库基础
目录 1.数据库是什么 2.基本操作 数据库服务器连接操作 数据库和数据库表的创建 服务器,数据库,表关系 数据逻辑存储 3.MySQL架构 4.SQL分类 5.存储引擎 1.数据库是什么 mysql&&mysqld: mysql:这通常指的是 MySQL …...

C语言字符学习中级使用库解决问题
学习C语言中的字符处理,对于初学者来说,理解字符的基本概念以及如何进行操作是非常重要的。字符处理是指对单个字符或一组字符(字符串)的操作。为了更好地理解,下面从基础开始介绍,并结合一些常用的函数和示…...

网络管理:网络故障排查指南
在现代IT环境中,网络故障是不可避免的。快速、有效地排查和解决网络故障是确保业务连续性和用户满意度的关键。本文将详细介绍网络故障排查的基本方法和步骤,确保内容通俗易懂,并配以代码示例和必要的图片说明。 一、网络故障排查的基本步骤 确认故障现象 确认用户报告的故…...

Springboot常见问题(bean找不到)
如图错误显示userMapper bean没有找到。 解决方案: mapper包位置有问题:因为SpringBoot默认的包扫描机制会扫描启动类所在的包同级文件和子包下的文件。注解问题: 比如没有加mapper注解 然而无论是UserMapper所在的包位置还是Mapper注解都是…...

架构设计笔记-5-软件工程基础知识
知识要点 按软件过程活动,将软件工具分为软件开发工具、软件维护工具、软件管理和软件支持工具。 软件开发工具:需求分析工具、设计工具、编码与排错工具。 软件维护工具:版本控制工具、文档分析工具、开发信息库工具、逆向工程工具、再工…...

Solidity——抽象合约和接口详解
🚀本系列文章为个人学习笔记,目的是巩固知识并记录我的学习过程及理解。文笔和排版可能拙劣,望见谅。 Solidity中的抽象合约和接口详解 目录 什么是抽象合约?抽象合约的语法接口(Interface)的定义接口的语…...

Fyne ( go跨平台GUI )中文文档-入门(一)
本文档注意参考官网(developer.fyne.io/) 编写, 只保留基本用法go代码展示为Go 1.16 及更高版本, ide为goland2021.2 这是一个系列文章: Fyne ( go跨平台GUI )中文文档-入门(一)-CSDN博客 Fyne ( go跨平台GUI )中文文档-Fyne总览(二)-CSDN博客 Fyne ( go跨平台GUI )…...

Google 扩展 Chrome 安全和隐私功能
过去一周,谷歌一直在推出新特性和功能,旨在让用户在 Chrome 上的桌面体验更加安全,最新的举措是扩展在多个设备上保存密钥的功能。 到目前为止,Chrome 网络用户只能将密钥保存到 Android 上的 Google 密码管理器,然后…...

css 缩放会变动的需要使用转换
position: fixed;top: 170px;left: 50%;transform: translate(-50%, -50%);...

(17)数据库neo4j数据备份
图数据库备份 假设图数据库安装位置:/root/shuzihua/neo4j-community-3.5.8 1.数据导出 进入/root/shuzihua/neo4j-community-3.5.8/bin目录;执行 neo4j stop 停止服务;/root/shuzihua/neo4j-community-3.5.8/data/databases/graph.db&#…...

从零开始学习Python
目录 从零开始学习Python 引言 环境搭建 安装Python解释器 选择IDE 基础语法 注释 变量和数据类型 变量命名规则 数据类型 运算符 算术运算符 比较运算符 逻辑运算符 输入和输出 控制流 条件语句 循环语句 for循环 while循环 循环控制语句 函数和模块 定…...

前端框架的对比和选择
在当今的前端开发领域,有多种流行的前端框架可供选择,如 Vue、React 和 Angular。以下是这些框架的对比以及 Vue 的优势: 一、React 特点: 声明式编程:使用 JSX 语法,使得组件的结构和行为更加清晰。虚拟…...

《机器学习》周志华-CH7(贝叶斯分类)
7.1贝叶斯决策论 对分类任务而言,在所有相关概率已知的理想情形下,贝叶斯决策论考虑如何基于这些概率核误判损失来选择最优的类别标记。 R ( x i ∣ x ) ∑ j 1 N λ i j P ( c j ∣ x ) \begin{equation} R(x_{i}|x)\sum_{j1}^{N}\lambda_{ij}P(c_{j}…...