Spark SQL 中DataFrame DSL的使用
在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。
文章链接:

一、单词统计案例引入
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心数据类型不太一样** 1、读取数据构建一个DataFrame,相当于一张表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定读取数据的格式.schema("line STRING") //指定列的名和列的类型,多个列之间使用,分割.option("sep", "\n") //指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt") // 指定要读取数据的位置,可以使用相对路径/*** DSL: 类SQL语法 api 介于代码和纯sql之间的一种api** spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数* 如果想要在DSL语法中使用这些函数,需要导入隐式转换**///导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._// linesDF.select(explode(split($"line","\\|")) as "word")
// .groupBy($"word")
// .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存数据*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}
注意:show()可以指定两个参数,第一个参数为展现的条数,不指定默认展示前20条数据,第二个参数默认为false,代表的是如果数据过长展示就会不完全,可以指定为true,使得数据展示完整,比如 : show(200,truncate = false)
二、数据源获取
查看官方文档:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多种数据源的获取。

1、csv-->json
val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多种类型数据源读取演示").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数import org.apache.spark.sql.functions._//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段import sparkSession.implicits._/*** 读csv格式的文件-->写到json格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")
2、json-->parquet
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入spark sql中所有的隐式转换函数//导入sparkSession下的所有隐式转换函数,后面可以直接使用$函数引用字段/*** 读取json数据格式,因为json数据有键值对,会自动的将健作为列名,值作为列值,不需要手动的设置表结构*///1500100967,能映秋,21,女,文科五班//方式1:// val studentsJsonDF: DataFrame = sparkSession.read// .format("json")// .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:实际上也是调用方式1,只是更简洁了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")
3、parquet-->csv
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分区数为1,默认分区数是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._/*** parquet:压缩的比例由信息熵决定,通俗的说就是数据的重复程度决定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")
4、数据库
下面我们以mysql为例:
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("连接数据库").config("spark.sql.shuffle.partitions",1) //默认分区的数量是200个.getOrCreate()/*** 读取数据库中的数据,mysql* 如果链接失败,可以将参数补全:jdbc:mysql://192.168.19.100:3306?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false*/val jdDF: DataFrame = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://192.168.19.100:3306?useSSL=false").option("dbtable", "bigdata29.emp").option("user", "root").option("password", "123456").load()jdDF.show(10,truncate = false)
三、DataFrame DSL API的使用
1、select
import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函数演示").getOrCreate()//导入Spark sql中所有的sql隐式转换函数import org.apache.spark.sql.functions._//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函数*///方式1:只能查询原有字段,不能对字段做出处理,比如加减、起别名之类studentsDF.select("id", "name", "age")//方式2:弥补了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隐式转换函数中的$将字段变为一个对象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用对象对字段进行处理
// stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show() //不可使用未变为对象的字段stuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age") // +是函数,可以等价于该语句//3.2可以在select中使用sql函数studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}
2、where
/*** where函数:过滤数据*///方式1:直接将sql中的where语句以字符串形式传参studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列对象形式过滤/*** 注意在此种方式下:等于和不等于符号与我们平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()
3、groupBy和agg
/*** groupby:分组函数 agg:聚合函数* 注意:* 1、groupby与agg函数通常都是一起使用* 2、分组聚合之后的结果DataFrame中只会包含分组字段与聚合字段* 3、分组聚合之后select中无法出现不是分组的字段*///需求:根据班级分组,求每个班级的人数和平均年龄studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()
4、join
/*** 5、join:表关联*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//关联场景1:所关联的字段名字一样studentsDF.join(subjectDF1,"id")//关联场景2:所关联的字段名字不一样studentsDF.join(subjectDF2,$"id"===$"sid","inner")
// studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面两种关联场景默认inner连接方式(内连接),可以指定参数选择连接方式,比如左连接、右连接、全连接之类* * @param joinType Type of join to perform. Default `inner`. Must be one of:* * `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* * `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/
5、开窗
/*** 开窗函数* 1、ROW_NUMBER():为分区中的每一行分配一个唯一的序号。序号是根据ORDER BY子句定义的顺序分配的* 2、RANK()和DENSE_RANK():为分区中的每一行分配一个排名。RANK()在遇到相同值时会产生间隙,而DENSE_RANK()则不会。**///需求:统计每个班级总分前三的学生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函数,会新增一列,但是要预先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()
注意:
DSL API 不直接对应 SQL 的关键字执行顺序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照构建逻辑查询的方式来组织代码,使其与 SQL 查询的逻辑结构相似。
在构建 Spark DataFrame 转换和操作时,常用流程介绍:
- 选择数据源:使用
spark.read或从其他 DataFrame 派生。 - 转换:使用各种转换函数(如
select、filter、map、flatMap、join等)来修改 DataFrame。 - 聚合:使用
groupBy和聚合函数(如sum、avg、count等)对数据进行分组和汇总。 - 排序:使用
orderBy或sort对数据进行排序。 - 输出:使用
show、collect、write等函数将结果输出到控制台、收集到驱动程序或写入外部存储。
四、RDD与DataFrame的转换
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd与Df之间的转换").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因为在Rdd中不会存储文件的结构(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}
相关文章:
Spark SQL 中DataFrame DSL的使用
在上一篇文章中已经大致说明了DataFrame APi,下面我们具体介绍DataFrame DSL的使用。DataFrame DSL是一种命令式编写Spark SQL的方式,使用的是一种类sql的风格语法。 文章链接: 一、单词统计案例引入 import org.apache.spark.sql.{DataFrame, SaveMod…...
qt 布局学习笔记
目录 qt下载地址: widget 宽高 管理信息列表源码 c版: pro文件: qt 设置水平布局,里面有两个按钮,每个按钮就变的很宽,怎么设置按钮的精确位置 设置固定大小: 使用弹性空间(…...
设计模式复习
一、模式所采用的关系(e.g.继承…) UML图例 二、各模式的特点、优缺点 1.创建型(5种创建型口诀: 抽象工厂 按照 工厂方法,建造 单例 原型) 将对象的使用和创建分离,使用对象时无需知道对象的创建细节&a…...
前后端开发入门全攻略:零基础学起
新书上架~👇全国包邮奥~ python实用小工具开发教程http://pythontoolsteach.com/3 欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一、前后端开发概览 二、后端开发基础:Flask框架入门 代码案例:Hel…...
Android Studio无法改变Button背景颜色解决办法
大家好,我是咕噜铁蛋!今天我来和大家探讨一个在Android开发中常见但可能让初学者感到困惑的问题——如何在Android Studio中改变Button的背景颜色。这个问题看似简单,但实际操作中可能会遇到一些意想不到的挑战。接下来,我将从多个…...
元宇宙三维互动展厅让体验者进入一个充满奇幻与创意的数字世界
元宇宙数字产品展厅搭建编辑器凭借强大的三维可视化互动功能,为用户带来前所未有的沉浸式数字展览体验。 在元宇宙数字产品展厅搭建编辑器中,用户可以轻松打造逼真的三维展览环境,通过VR虚拟现实技术,仿佛置身于一个充满奇幻与创意…...
java高级——Collection集合之List探索(包含ArrayList、LinkedList、Vector底层实现及区别,非常详细哦)
java高级——Collection集合之List探索 前情提要文章介绍提前了解的知识点1. 数组2. 单向链表3. 双向链表4. 为什么单向链表使用的较多5. 线程安全和线程不安全的概念 ArrayList介绍1. 继承结构解析1.1 三个标志性接口1.2 AbstractList和AbstractCollection 2. ArrayList底层代…...
JAVA-->方法的使用详解
JAVA–>方法的使用详解 1.方法的概念及使用 1.1 什么是方法 : 方法就是一个代码片段. 类似于 C 语言中的 “函数”。 1.2 方法定义 / 方法定义 修饰符 返回值类型 方法名称([参数类型 形参 ...]){方法体代码;[return 返回值]; }判断是否为闰年 public class Method{ //…...
基于 vLLM 搭建 DeepSeek-V2 Chat 服务
直奔主题。 安装vLLM 官方实现的代码还没有 merge 到 vLLM 主分支,所以直接 git clone DeepSeek 的分支。 git clone https://github.com/zwd003/vllm.git cd vllm pip install -e .源码安装大概耗时 10 分钟。 OpenAI 接口规范启动 官方 Github 放的是单条推理…...
Kafka 安装教程和基本操作
一、简介 Kafka 是最初由 Linkedin 公司开发,是一个分布式、分区的、多副本的、多订阅者,基于 zookeeper 协调的分布式日志系统(也可以当做 MQ 系统),常见可以用于 web/nginx 日志、访问日志,消息服务等等…...
Java 五种内部类演示及底层原理详解
内部类 什么是内部类 在A类的内部定义B类,B类就被称为内部类 发动机类单独存在没有意义 发动机为独立个体 可以在外部其他类里创建内部类的对象去调用方法 类的五大成员 属性 方法 构造方法 代码块 内部类 内部类的访问特点 内部类可以直接访问外部类的成员&a…...
【UnityShader入门精要学习笔记】第十五章 使用噪声
本系列为作者学习UnityShader入门精要而作的笔记,内容将包括: 书本中句子照抄 个人批注项目源码一堆新手会犯的错误潜在的太监断更,有始无终 我的GitHub仓库 总之适用于同样开始学习Shader的同学们进行有取舍的参考。 文章目录 使用噪声上…...
C++ ─── string的完整模拟实现
本博客实现了string的常见接口实现 下面是用到的一些函数,供大家回顾复习 string.h #define _CRT_SECURE_NO_WARNINGS 1 #pragma once #include<iostream> #include<assert.h> using namespace std;namespace bit {class string{public:typedef char*…...
安卓中的图片压缩
安卓中如何进行图片压缩? 在安卓中进行图片压缩通常有以下几种方法: 质量压缩: 通过降低图片的质量来减小文件大小。这可以通过Bitmap的compress()方法实现,其中可以设置压缩质量(0-100)。 ByteArrayOutputStream baos…...
centOS7.9 DNS配置
1.DNS规划 dns.sohu.com192.168.110.111Awww.sohucom192.168.110.112Aoa.sohu.com 192.168.110.113A 2.安装 bind yum install -y bind bind-utils 3. 编辑主配置文件 vim /etc/named.conflisten- on port 53 { any; }; allow- query { any; }; 4.配置区域文件 …...
设计模式20——职责链模式
写文章的初心主要是用来帮助自己快速的回忆这个模式该怎么用,主要是下面的UML图可以起到大作用,在你学习过一遍以后可能会遗忘,忘记了不要紧,只要看一眼UML图就能想起来了。同时也请大家多多指教。 职责链模式(Chain …...
android13 差分包制作命令
./out/host/linux-x86/bin/ota_from_target_files -v -iCode/SourceCode/android13/ntls/userdebug/hpg2_24-target_files-38.zip --block -p ./out/host/linux-x86 Code/SourceCode/android13/ntls/userdebug/hpg2_24-target_files-39.zip update_ud.zip 脚本命令行参数 命令…...
Flink-cdc更好的流式数据集成工具
What’s Flink-cdc? Flink CDC 是基于Apache Flink的一种数据变更捕获技术,用于从数据源(如数据库)中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作,将这些变更事件转化为流式数据,并能够…...
C++|设计模式(三)|抽象工厂模式
抽象工厂模式仍然属于创建型模式,我们在【简单工厂和工厂方法模式】这篇文章中,描述了简单工厂和工厂方法模式,并在文末,简单介绍了工厂方法模式的局限性。 本文将通过汽车工厂的例子继续来阐述使用抽象工厂模式相比较于工厂方法…...
AVB协议分析(一) FQTSS协议介绍
FQTSS协议介绍 一、AVB整体架构二、概述三、协议作用及作用对象四、协议的实现五、参考文献: 一、AVB整体架构 可见FQTSS位于MAC层的上面,代码看不懂,咱们就从最底层开始,逐层分析协议,逐个击破,慢就是快。…...
KubeSphere 容器平台高可用:环境搭建与可视化操作指南
Linux_k8s篇 欢迎来到Linux的世界,看笔记好好学多敲多打,每个人都是大神! 题目:KubeSphere 容器平台高可用:环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
微信小程序之bind和catch
这两个呢,都是绑定事件用的,具体使用有些小区别。 官方文档: 事件冒泡处理不同 bind:绑定的事件会向上冒泡,即触发当前组件的事件后,还会继续触发父组件的相同事件。例如,有一个子视图绑定了b…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
聊一聊接口测试的意义有哪些?
目录 一、隔离性 & 早期测试 二、保障系统集成质量 三、验证业务逻辑的核心层 四、提升测试效率与覆盖度 五、系统稳定性的守护者 六、驱动团队协作与契约管理 七、性能与扩展性的前置评估 八、持续交付的核心支撑 接口测试的意义可以从四个维度展开,首…...
GitFlow 工作模式(详解)
今天再学项目的过程中遇到使用gitflow模式管理代码,因此进行学习并且发布关于gitflow的一些思考 Git与GitFlow模式 我们在写代码的时候通常会进行网上保存,无论是github还是gittee,都是一种基于git去保存代码的形式,这样保存代码…...
GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
省略号和可变参数模板
本文主要介绍如何展开可变参数的参数包 1.C语言的va_list展开可变参数 #include <iostream> #include <cstdarg>void printNumbers(int count, ...) {// 声明va_list类型的变量va_list args;// 使用va_start将可变参数写入变量argsva_start(args, count);for (in…...
