Spark SQL 中DataFrame DSL的使用
在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。
文章链接:

一、单词统计案例引入
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心数据类型不太一样** 1、读取数据构建一个DataFrame,相当于一张表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定读取数据的格式.schema("line STRING") //指定列的名和列的类型,多个列之间使用,分割.option("sep", "\n") //指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt") // 指定要读取数据的位置,可以使用相对路径/*** DSL: 类SQL语法 api 介于代码和纯sql之间的一种api** spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数* 如果想要在DSL语法中使用这些函数,需要导入隐式转换**///导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._// linesDF.select(explode(split($"line","\\|")) as "word")
// .groupBy($"word")
// .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存数据*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}
注意:show()可以指定两个参数,第一个参数为展现的条数,不指定默认展示前20条数据,第二个参数默认为false,代表的是如果数据过长展示就会不完全,可以指定为true,使得数据展示完整,比如 : show(200,truncate = false)
二、数据源获取
查看官方文档:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多种数据源的获取。

1、csv-->json
val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多种类型数据源读取演示").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数import org.apache.spark.sql.functions._//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段import sparkSession.implicits._/*** 读csv格式的文件-->写到json格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")
2、json-->parquet
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段/*** 读取json数据格式,因为json数据有键值对,会自动的将健作为列名,值作为列值,不需要手动的设置表结构*///1500100967,能映秋,21,女,文科五班//方式1:// val studentsJsonDF: DataFrame = sparkSession.read// .format("json")// .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:实际上也是调用方式1,只是更简洁了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")
3、parquet-->csv
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._/*** parquet:压缩的比例由信息熵决定,通俗的说就是数据的重复程度决定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")
4、数据库
下面我们以mysql为例:
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("连接数据库").config("spark.sql.shuffle.partitions",1) //默认分区的数量是200个.getOrCreate()/*** 读取数据库中的数据,mysql* 如果链接失败,可以将参数补全:jdbc:mysql://192.168.19.100:3306?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false*/val jdDF: DataFrame = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://192.168.19.100:3306?useSSL=false").option("dbtable", "bigdata29.emp").option("user", "root").option("password", "123456").load()jdDF.show(10,truncate = false)
三、DataFrame DSL API的使用
1、select
import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函数演示").getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函数*///方式1:只能查询原有字段,不能对字段做出处理,比如加减、起别名之类studentsDF.select("id", "name", "age")//方式2:弥补了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隐式转换函数中的$将字段变为一个对象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用对象对字段进行处理
// stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show() //不可使用未变为对象的字段stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age") // +是函数,可以等价于该语句//3.2可以在select中使用sql函数studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}
2、where
/*** where函数:过滤数据*///方式1:直接将sql中的where语句以字符串形式传参studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列对象形式过滤/*** 注意在此种方式下:等于和不等于符号与我们平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()
3、groupBy和agg
/*** groupby:分组函数 agg:聚合函数* 注意:* 1、groupby与agg函数通常都是一起使用* 2、分组聚合之后的结果DataFrame中只会包含分组字段与聚合字段* 3、分组聚合之后select中无法出现不是分组的字段*///需求:根据班级分组,求每个班级的人数和平均年龄studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()
4、join
/*** 5、join:表关联*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//关联场景1:所关联的字段名字一样studentsDF.join(subjectDF1,"id")//关联场景2:所关联的字段名字不一样studentsDF.join(subjectDF2,$"id"===$"sid","inner")
// studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面两种关联场景默认inner连接方式(内连接),可以指定参数选择连接方式,比如左连接、右连接、全连接之类* * @param joinType Type of join to perform. Default `inner`. Must be one of:* * `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* * `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/
5、开窗
/*** 开窗函数* 1、ROW_NUMBER():为分区中的每一行分配一个唯一的序号。序号是根据ORDER BY子句定义的顺序分配的* 2、RANK()和DENSE_RANK():为分区中的每一行分配一个排名。RANK()在遇到相同值时会产生间隙,而DENSE_RANK()则不会。**///需求:统计每个班级总分前三的学生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函数,会新增一列,但是要预先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()
注意:
DSL API 不直接对应 SQL 的关键字执行顺序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照构建逻辑查询的方式来组织代码,使其与 SQL 查询的逻辑结构相似。
在构建 Spark DataFrame 转换和操作时,常用流程介绍:
- 选择数据源:使用
spark.read或从其他 DataFrame 派生。 - 转换:使用各种转换函数(如
select、filter、map、flatMap、join等)来修改 DataFrame。 - 聚合:使用
groupBy和聚合函数(如sum、avg、count等)对数据进行分组和汇总。 - 排序:使用
orderBy或sort对数据进行排序。 - 输出:使用
show、collect、write等函数将结果输出到控制台、收集到驱动程序或写入外部存储。
四、RDD与DataFrame的转换
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd与Df之间的转换").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因为在Rdd中不会存储文件的结构(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}
相关文章:
Spark SQL 中DataFrame DSL的使用
在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。 文章链接: 一、单词统计案例引入 import org.apache.spark.sql.{DataFrame, SaveMod…...
qt 布局学习笔记
目录 qt下载地址: widget 宽高 管理信息列表源码 c版: pro文件: qt 设置水平布局,里面有两个按钮,每个按钮就变的很宽,怎么设置按钮的精确位置 设置固定大小: 使用弹性空间(…...
设计模式复习
一、模式所采用的关系(e.g.继承…) UML图例 二、各模式的特点、优缺点 1.创建型(5种创建型口诀: 抽象工厂 按照 工厂方法,建造 单例 原型) 将对象的使用和创建分离,使用对象时无需知道对象的创建细节&a…...
前后端开发入门全攻略:零基础学起
新书上架~👇全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一、前后端开发概览 二、后端开发基础:Flask框架入门 代码案例:Hel…...
Android Studio无法改变Button背景颜色解决办法
大家好,我是咕噜铁蛋!今天我来和大家探讨一个在Android开发中常见但可能让初学者感到困惑的问题——如何在Android Studio中改变Button的背景颜色。这个问题看似简单,但实际操作中可能会遇到一些意想不到的挑战。接下来,我将从多个…...
元宇宙三维互动展厅让体验者进入一个充满奇幻与创意的数字世界
元宇宙数字产品展厅搭建编辑器凭借强大的三维可视化互动功能,为用户带来前所未有的沉浸式数字展览体验。 在元宇宙数字产品展厅搭建编辑器中,用户可以轻松打造逼真的三维展览环境,通过VR虚拟现实技术,仿佛置身于一个充满奇幻与创意…...
java高级——Collection集合之List探索(包含ArrayList、LinkedList、Vector底层实现及区别,非常详细哦)
java高级——Collection集合之List探索 前情提要文章介绍提前了解的知识点1. 数组2. 单向链表3. 双向链表4. 为什么单向链表使用的较多5. 线程安全和线程不安全的概念 ArrayList介绍1. 继承结构解析1.1 三个标志性接口1.2 AbstractList和AbstractCollection 2. ArrayList底层代…...
JAVA-->方法的使用详解
JAVA–>方法的使用详解 1.方法的概念及使用 1.1 什么是方法 : 方法就是一个代码片段. 类似于 C 语言中的 “函数”。 1.2 方法定义 / 方法定义 修饰符 返回值类型 方法名称([参数类型 形参 ...]){方法体代码;[return 返回值]; }判断是否为闰年 public class Method{ //…...
基于 vLLM 搭建 DeepSeek-V2 Chat 服务
直奔主题。 安装vLLM 官方实现的代码还没有 merge 到 vLLM 主分支,所以直接 git clone DeepSeek 的分支。 git clone https://github.com/zwd003/vllm.git cd vllm pip install -e .源码安装大概耗时 10 分钟。 OpenAI 接口规范启动 官方 Github 放的是单条推理…...
Kafka 安装教程和基本操作
一、简介 Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等…...
Java 五种内部类演示及底层原理详解
内部类 什么是内部类 在A类的内部定义B类,B类就被称为内部类 发动机类单独存在没有意义 发动机为独立个体 可以在外部其他类里创建内部类的对象去调用方法 类的五大成员 属性 方法 构造方法 代码块 内部类 内部类的访问特点 内部类可以直接访问外部类的成员&a…...
【UnityShader入门精要学习笔记】第十五章 使用噪声
本系列为作者学习UnityShader入门精要而作的笔记,内容将包括: 书本中句子照抄 个人批注项目源码一堆新手会犯的错误潜在的太监断更,有始无终 我的GitHub仓库 总之适用于同样开始学习Shader的同学们进行有取舍的参考。 文章目录 使用噪声上…...
C++ ─── string的完整模拟实现
本博客实现了string的常见接口实现 下面是用到的一些函数,供大家回顾复习 string.h #define _CRT_SECURE_NO_WARNINGS 1 #pragma once #include<iostream> #include<assert.h> using namespace std;namespace bit {class string{public:typedef char*…...
安卓中的图片压缩
安卓中如何进行图片压缩? 在安卓中进行图片压缩通常有以下几种方法: 质量压缩: 通过降低图片的质量来减小文件大小。这可以通过Bitmap的compress()方法实现,其中可以设置压缩质量(0-100)。 ByteArrayOutputStream baos…...
centOS7.9 DNS配置
1.DNS规划 dns.sohu.com192.168.110.111Awww.sohucom192.168.110.112Aoa.sohu.com 192.168.110.113A 2.安装 bind yum install -y bind bind-utils 3. 编辑主配置文件 vim /etc/named.conflisten- on port 53 { any; }; allow- query { any; }; 4.配置区域文件 …...
设计模式20——职责链模式
写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用,主要是下面的UML图可以起到大作用,在你学习过一遍以后可能会遗忘,忘记了不要紧,只要看一眼UML图就能想起来了。同时也请大家多多指教。 职责链模式(Chain …...
android13 差分包制作命令
./out/host/linux-x86/bin/ota_from_target_files -v -iCode/SourceCode/android13/ntls/userdebug/hpg2_24-target_files-38.zip --block -p ./out/host/linux-x86 Code/SourceCode/android13/ntls/userdebug/hpg2_24-target_files-39.zip update_ud.zip 脚本命令行参数 命令…...
Flink-cdc更好的流式数据集成工具
What’s Flink-cdc? Flink CDC 是基于Apache Flink的一种数据变更捕获技术,用于从数据源(如数据库)中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作,将这些变更事件转化为流式数据,并能够…...
C++|设计模式(三)|抽象工厂模式
抽象工厂模式仍然属于创建型模式,我们在【简单工厂和工厂方法模式】这篇文章中,描述了简单工厂和工厂方法模式,并在文末,简单介绍了工厂方法模式的局限性。 本文将通过汽车工厂的例子继续来阐述使用抽象工厂模式相比较于工厂方法…...
AVB协议分析(一) FQTSS协议介绍
FQTSS协议介绍 一、AVB整体架构二、概述三、协议作用及作用对象四、协议的实现五、参考文献: 一、AVB整体架构 可见FQTSS位于MAC层的上面,代码看不懂,咱们就从最底层开始,逐层分析协议,逐个击破,慢就是快。…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
后进先出(LIFO)详解
LIFO 是 Last In, First Out 的缩写,中文译为后进先出。这是一种数据结构的工作原则,类似于一摞盘子或一叠书本: 最后放进去的元素最先出来 -想象往筒状容器里放盘子: (1)你放进的最后一个盘子(…...
STM32+rt-thread判断是否联网
一、根据NETDEV_FLAG_INTERNET_UP位判断 static bool is_conncected(void) {struct netdev *dev RT_NULL;dev netdev_get_first_by_flags(NETDEV_FLAG_INTERNET_UP);if (dev RT_NULL){printf("wait netdev internet up...");return false;}else{printf("loc…...
【单片机期末】单片机系统设计
主要内容:系统状态机,系统时基,系统需求分析,系统构建,系统状态流图 一、题目要求 二、绘制系统状态流图 题目:根据上述描述绘制系统状态流图,注明状态转移条件及方向。 三、利用定时器产生时…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
拉力测试cuda pytorch 把 4070显卡拉满
import torch import timedef stress_test_gpu(matrix_size16384, duration300):"""对GPU进行压力测试,通过持续的矩阵乘法来最大化GPU利用率参数:matrix_size: 矩阵维度大小,增大可提高计算复杂度duration: 测试持续时间(秒&…...
LLMs 系列实操科普(1)
写在前面: 本期内容我们继续 Andrej Karpathy 的《How I use LLMs》讲座内容,原视频时长 ~130 分钟,以实操演示主流的一些 LLMs 的使用,由于涉及到实操,实际上并不适合以文字整理,但还是决定尽量整理一份笔…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
【SpringBoot自动化部署】
SpringBoot自动化部署方法 使用Jenkins进行持续集成与部署 Jenkins是最常用的自动化部署工具之一,能够实现代码拉取、构建、测试和部署的全流程自动化。 配置Jenkins任务时,需要添加Git仓库地址和凭证,设置构建触发器(如GitHub…...
Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
