Spark SQL 中DataFrame DSL的使用
在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。
文章链接:

一、单词统计案例引入
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心数据类型不太一样** 1、读取数据构建一个DataFrame,相当于一张表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定读取数据的格式.schema("line STRING") //指定列的名和列的类型,多个列之间使用,分割.option("sep", "\n") //指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt") // 指定要读取数据的位置,可以使用相对路径/*** DSL: 类SQL语法 api 介于代码和纯sql之间的一种api** spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数* 如果想要在DSL语法中使用这些函数,需要导入隐式转换**///导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._// linesDF.select(explode(split($"line","\\|")) as "word")
// .groupBy($"word")
// .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存数据*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}
注意:show()可以指定两个参数,第一个参数为展现的条数,不指定默认展示前20条数据,第二个参数默认为false,代表的是如果数据过长展示就会不完全,可以指定为true,使得数据展示完整,比如 : show(200,truncate = false)
二、数据源获取
查看官方文档:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多种数据源的获取。

1、csv-->json
val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多种类型数据源读取演示").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数import org.apache.spark.sql.functions._//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段import sparkSession.implicits._/*** 读csv格式的文件-->写到json格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")
2、json-->parquet
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段/*** 读取json数据格式,因为json数据有键值对,会自动的将健作为列名,值作为列值,不需要手动的设置表结构*///1500100967,能映秋,21,女,文科五班//方式1:// val studentsJsonDF: DataFrame = sparkSession.read// .format("json")// .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:实际上也是调用方式1,只是更简洁了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")
3、parquet-->csv
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._/*** parquet:压缩的比例由信息熵决定,通俗的说就是数据的重复程度决定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")
4、数据库
下面我们以mysql为例:
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("连接数据库").config("spark.sql.shuffle.partitions",1) //默认分区的数量是200个.getOrCreate()/*** 读取数据库中的数据,mysql* 如果链接失败,可以将参数补全:jdbc:mysql://192.168.19.100:3306?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false*/val jdDF: DataFrame = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://192.168.19.100:3306?useSSL=false").option("dbtable", "bigdata29.emp").option("user", "root").option("password", "123456").load()jdDF.show(10,truncate = false)
三、DataFrame DSL API的使用
1、select
import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函数演示").getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函数*///方式1:只能查询原有字段,不能对字段做出处理,比如加减、起别名之类studentsDF.select("id", "name", "age")//方式2:弥补了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隐式转换函数中的$将字段变为一个对象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用对象对字段进行处理
// stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show() //不可使用未变为对象的字段stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age") // +是函数,可以等价于该语句//3.2可以在select中使用sql函数studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}
2、where
/*** where函数:过滤数据*///方式1:直接将sql中的where语句以字符串形式传参studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列对象形式过滤/*** 注意在此种方式下:等于和不等于符号与我们平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()
3、groupBy和agg
/*** groupby:分组函数 agg:聚合函数* 注意:* 1、groupby与agg函数通常都是一起使用* 2、分组聚合之后的结果DataFrame中只会包含分组字段与聚合字段* 3、分组聚合之后select中无法出现不是分组的字段*///需求:根据班级分组,求每个班级的人数和平均年龄studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()
4、join
/*** 5、join:表关联*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//关联场景1:所关联的字段名字一样studentsDF.join(subjectDF1,"id")//关联场景2:所关联的字段名字不一样studentsDF.join(subjectDF2,$"id"===$"sid","inner")
// studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面两种关联场景默认inner连接方式(内连接),可以指定参数选择连接方式,比如左连接、右连接、全连接之类* * @param joinType Type of join to perform. Default `inner`. Must be one of:* * `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* * `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/
5、开窗
/*** 开窗函数* 1、ROW_NUMBER():为分区中的每一行分配一个唯一的序号。序号是根据ORDER BY子句定义的顺序分配的* 2、RANK()和DENSE_RANK():为分区中的每一行分配一个排名。RANK()在遇到相同值时会产生间隙,而DENSE_RANK()则不会。**///需求:统计每个班级总分前三的学生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函数,会新增一列,但是要预先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()
注意:
DSL API 不直接对应 SQL 的关键字执行顺序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照构建逻辑查询的方式来组织代码,使其与 SQL 查询的逻辑结构相似。
在构建 Spark DataFrame 转换和操作时,常用流程介绍:
- 选择数据源:使用
spark.read或从其他 DataFrame 派生。 - 转换:使用各种转换函数(如
select、filter、map、flatMap、join等)来修改 DataFrame。 - 聚合:使用
groupBy和聚合函数(如sum、avg、count等)对数据进行分组和汇总。 - 排序:使用
orderBy或sort对数据进行排序。 - 输出:使用
show、collect、write等函数将结果输出到控制台、收集到驱动程序或写入外部存储。
四、RDD与DataFrame的转换
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd与Df之间的转换").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因为在Rdd中不会存储文件的结构(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}
相关文章:
Spark SQL 中DataFrame DSL的使用
在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。 文章链接: 一、单词统计案例引入 import org.apache.spark.sql.{DataFrame, SaveMod…...
qt 布局学习笔记
目录 qt下载地址: widget 宽高 管理信息列表源码 c版: pro文件: qt 设置水平布局,里面有两个按钮,每个按钮就变的很宽,怎么设置按钮的精确位置 设置固定大小: 使用弹性空间(…...
设计模式复习
一、模式所采用的关系(e.g.继承…) UML图例 二、各模式的特点、优缺点 1.创建型(5种创建型口诀: 抽象工厂 按照 工厂方法,建造 单例 原型) 将对象的使用和创建分离,使用对象时无需知道对象的创建细节&a…...
前后端开发入门全攻略:零基础学起
新书上架~👇全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一、前后端开发概览 二、后端开发基础:Flask框架入门 代码案例:Hel…...
Android Studio无法改变Button背景颜色解决办法
大家好,我是咕噜铁蛋!今天我来和大家探讨一个在Android开发中常见但可能让初学者感到困惑的问题——如何在Android Studio中改变Button的背景颜色。这个问题看似简单,但实际操作中可能会遇到一些意想不到的挑战。接下来,我将从多个…...
元宇宙三维互动展厅让体验者进入一个充满奇幻与创意的数字世界
元宇宙数字产品展厅搭建编辑器凭借强大的三维可视化互动功能,为用户带来前所未有的沉浸式数字展览体验。 在元宇宙数字产品展厅搭建编辑器中,用户可以轻松打造逼真的三维展览环境,通过VR虚拟现实技术,仿佛置身于一个充满奇幻与创意…...
java高级——Collection集合之List探索(包含ArrayList、LinkedList、Vector底层实现及区别,非常详细哦)
java高级——Collection集合之List探索 前情提要文章介绍提前了解的知识点1. 数组2. 单向链表3. 双向链表4. 为什么单向链表使用的较多5. 线程安全和线程不安全的概念 ArrayList介绍1. 继承结构解析1.1 三个标志性接口1.2 AbstractList和AbstractCollection 2. ArrayList底层代…...
JAVA-->方法的使用详解
JAVA–>方法的使用详解 1.方法的概念及使用 1.1 什么是方法 : 方法就是一个代码片段. 类似于 C 语言中的 “函数”。 1.2 方法定义 / 方法定义 修饰符 返回值类型 方法名称([参数类型 形参 ...]){方法体代码;[return 返回值]; }判断是否为闰年 public class Method{ //…...
基于 vLLM 搭建 DeepSeek-V2 Chat 服务
直奔主题。 安装vLLM 官方实现的代码还没有 merge 到 vLLM 主分支,所以直接 git clone DeepSeek 的分支。 git clone https://github.com/zwd003/vllm.git cd vllm pip install -e .源码安装大概耗时 10 分钟。 OpenAI 接口规范启动 官方 Github 放的是单条推理…...
Kafka 安装教程和基本操作
一、简介 Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等…...
Java 五种内部类演示及底层原理详解
内部类 什么是内部类 在A类的内部定义B类,B类就被称为内部类 发动机类单独存在没有意义 发动机为独立个体 可以在外部其他类里创建内部类的对象去调用方法 类的五大成员 属性 方法 构造方法 代码块 内部类 内部类的访问特点 内部类可以直接访问外部类的成员&a…...
【UnityShader入门精要学习笔记】第十五章 使用噪声
本系列为作者学习UnityShader入门精要而作的笔记,内容将包括: 书本中句子照抄 个人批注项目源码一堆新手会犯的错误潜在的太监断更,有始无终 我的GitHub仓库 总之适用于同样开始学习Shader的同学们进行有取舍的参考。 文章目录 使用噪声上…...
C++ ─── string的完整模拟实现
本博客实现了string的常见接口实现 下面是用到的一些函数,供大家回顾复习 string.h #define _CRT_SECURE_NO_WARNINGS 1 #pragma once #include<iostream> #include<assert.h> using namespace std;namespace bit {class string{public:typedef char*…...
安卓中的图片压缩
安卓中如何进行图片压缩? 在安卓中进行图片压缩通常有以下几种方法: 质量压缩: 通过降低图片的质量来减小文件大小。这可以通过Bitmap的compress()方法实现,其中可以设置压缩质量(0-100)。 ByteArrayOutputStream baos…...
centOS7.9 DNS配置
1.DNS规划 dns.sohu.com192.168.110.111Awww.sohucom192.168.110.112Aoa.sohu.com 192.168.110.113A 2.安装 bind yum install -y bind bind-utils 3. 编辑主配置文件 vim /etc/named.conflisten- on port 53 { any; }; allow- query { any; }; 4.配置区域文件 …...
设计模式20——职责链模式
写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用,主要是下面的UML图可以起到大作用,在你学习过一遍以后可能会遗忘,忘记了不要紧,只要看一眼UML图就能想起来了。同时也请大家多多指教。 职责链模式(Chain …...
android13 差分包制作命令
./out/host/linux-x86/bin/ota_from_target_files -v -iCode/SourceCode/android13/ntls/userdebug/hpg2_24-target_files-38.zip --block -p ./out/host/linux-x86 Code/SourceCode/android13/ntls/userdebug/hpg2_24-target_files-39.zip update_ud.zip 脚本命令行参数 命令…...
Flink-cdc更好的流式数据集成工具
What’s Flink-cdc? Flink CDC 是基于Apache Flink的一种数据变更捕获技术,用于从数据源(如数据库)中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作,将这些变更事件转化为流式数据,并能够…...
C++|设计模式(三)|抽象工厂模式
抽象工厂模式仍然属于创建型模式,我们在【简单工厂和工厂方法模式】这篇文章中,描述了简单工厂和工厂方法模式,并在文末,简单介绍了工厂方法模式的局限性。 本文将通过汽车工厂的例子继续来阐述使用抽象工厂模式相比较于工厂方法…...
AVB协议分析(一) FQTSS协议介绍
FQTSS协议介绍 一、AVB整体架构二、概述三、协议作用及作用对象四、协议的实现五、参考文献: 一、AVB整体架构 可见FQTSS位于MAC层的上面,代码看不懂,咱们就从最底层开始,逐层分析协议,逐个击破,慢就是快。…...
VB.net复制Ntag213卡写入UID
本示例使用的发卡器:https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
模型参数、模型存储精度、参数与显存
模型参数量衡量单位 M:百万(Million) B:十亿(Billion) 1 B 1000 M 1B 1000M 1B1000M 参数存储精度 模型参数是固定的,但是一个参数所表示多少字节不一定,需要看这个参数以什么…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面
代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口(适配服务端返回 Token) export const login async (code, avatar) > {const res await http…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
网络编程(UDP编程)
思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...
解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...
