Spark【Spark SQL(三)DataSet】
DataSet
DataFrame 的出现,让 Spark 可以更好地处理结构化数据的计算,但存在一个问题:编译时的类型安全问题,为了解决它,Spark 引入了 DataSet API(DataFrame API 的扩展)。DataSet 是分布式的数据集合,它提供了强类型支持,也就是给 RDD 的每行数据都添加了类型约束。
在 Spark 2.0 中,DataFrame 和 DataSet 被合并为 DataSet 。DataSet包含 DataFrame 的功能,DataFrame 表示为 DataSet[Row] ,即DataSet 的子集。
三种 API 的选择
RDD 是DataFrame 和 DataSet 的底层,如果需要更多的控制功能(比如精确控制Spark 怎么执行一条查询),尽量使用 RDD。
如果希望在编译时获得更高的类型安全,建议使用 DataSet。
如果想统一简化 Spark 的API ,则使用 DataFrame 和 DataSet。
基于 DataFrame API 和 DataSet API 开发的程序会被自动优化,开发人员不需要操作底层的RDD API 来手动优化,大大提高了开发效率。但是RDD API 对于非结构化数据的处理有独特的优势(比如文本数据流),而且更方便我们做底层的操作。
DataSet 的创建
1、使用createDataset()方法创建
def main(args: Array[String]): Unit = {//local代表本地单线程模式 local[*]代表本地多线程模式val spark = SparkSession.builder().appName("create dataset").master("local[*]").getOrCreate()//一定要导入它 不然无法创建DataSet对象import spark.implicits._val ds1 = spark.createDataset(1 to 5)ds1.show()val ds2 = spark.createDataset(spark.sparkContext.textFile("data/sql/people.txt"))ds2.show()spark.stop()}
运行结果:
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
+-----++--------+
| value|
+--------+
| Tom, 21|
|Mike, 25|
|Andy, 18|
+--------+
2、通过 toDS 方法生成
import org.apache.spark.sql.{Dataset, SparkSession}object DataSetCreate {//case类一定要写到main方法之外case class Person(name:String,age:Int)def main(args: Array[String]): Unit = {//local代表本地单线程模式 local[*]代表本地多线程模式val spark = SparkSession.builder().appName("create dataset").master("local[*]").getOrCreate()//一定要导入 SparkSession对象下的implicitsimport spark.implicits._val data = List(Person("Tom",21),Person("Andy",22))val ds: Dataset[Person] = data.toDS()ds.show()spark.stop()}
}
运行结果:
+----+---+
|name|age|
+----+---+
| Tom| 21|
|Andy| 22|
+----+---+
3、通过DataFrame 转换生成
object DataSetCreate{case class Person(name:String,age:Long,sex:String)
def main(args: Array[String]): Unit = {//local代表本地单线程模式 local[*]代表本地多线程模式val spark = SparkSession.builder().appName("create dataset").master("local[*]").getOrCreate()import spark.implicits._val df = spark.read.json("data/sql/people.json")val ds = df.as[Person]ds.show()spark.stop()}
}
RDD、DataFrame 和 DataSet 之间的相互转换
RDD <=> DataFrame
- RDD 转 DataFrame ,也就是上一篇博客中介绍的两种方法:
- 能创建case类,就直接映射出一个RDD[Person],然后调用toDF方法利用反射机制推断RDD模式。
- 无法创建case类,就使用编程方式定义RDD模式,使用 createDataFrame(rowRDD,schema) 指定rowRDD:RDD[Row]和schema:StructType。
- DataFrame 转 RDD,直接使用 rdd() 方法。
package com.study.spark.core.sqlimport org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}object TransForm {case class Person(name:String,age:Int) //txt文件age字段可以用Int,但json文件尽量用Longdef main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("transform").master("local[*]").getOrCreate()import spark.implicits._//1.RDD和DataFrame之间互相转换//1.1 创建RDD对象val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt").map(_.split(",")).map(attr => Person(attr(0), attr(1).trim.toInt))rdd.foreach(println)/*Person(Andy,18)Person(Tom,21)Person(Mike,25)*///1.2 RDD转DataFrameval df = rdd.toDF()df.show()/*+----+---+|name|age|+----+---+| Tom| 21||Mike| 25||Andy| 18|+----+---+*///1.3 DataFrame转RDDval res: RDD[Row] = df.rdd/*[Andy,18][Tom,21][Mike,25]*/res.foreach(println)spark.stop()}
}
可以看到,RDD[Person]转为DataFrame后,再从DataFrame转回RDD就变成了RDD[Row] 类型了。
RDD <=> DataSet
RDD 和 DataSet 之间的转换比较简单:
- RDD 转 DataSet 直接使用case 类(比如Person),然后映射出 RDD[Person] ,直接调用 toDS方法。
- DataSet 转 RDD 直接调用 rdd方法即可。
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}object TransForm {case class Person(name:String,age:Int) //txt文件age字段可以用Int,但json文件尽量用Longdef main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("transform").master("local[*]").getOrCreate()import spark.implicits._//1.RDD和DataSet之间互相转换//1.1 创建RDD对象val rdd: RDD[Person] = spark.sparkContext.textFile("data/sql/people.txt").map(_.split(",")).map(attr => Person(attr(0), attr(1).trim.toInt))rdd.foreach(println)/*Person(Andy,18)Person(Tom,21)Person(Mike,25)*///1.2 RDD转DataSetval ds = rdd.toDS()ds.show()/*+----+---+|name|age|+----+---+| Tom| 21||Mike| 25||Andy| 18|+----+---+*///1.3 DataFrame转RDDval res: RDD[Person] = ds.rddres.foreach(println)/*Person(Andy,18)Person(Tom,21)Person(Mike,25)*/spark.stop()}
}
可以看到,相比RDD和DataFrame互相转换,RDD和DataSet转换的过程中,不会有数据类型的变化,而DataFrame转RDD的过程就会把我们定义的case类转为Row对象。
DataFrame <=> DataSet
- DataFrame 转 DataSet 先使用case类,然后直接使用 as[case 类] 方法。
- DataSet 转 DataFrame 直接使用 toDF 方法。
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object TransForm {case class Person(name:String,age:Long,sex:String) //txt文件age字段可以用Int,但json文件尽量用Longdef main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("transform").master("local[*]").getOrCreate()import spark.implicits._//1.DataFrame和DataSet之间互相转换//1.1 创建DataFrame对象val df = spark.read.json("data/sql/people.json")df.show()/*+---+----------+---+|age| name|sex|+---+----------+---+| 30| Michael| 男|| 19| Andy| 女|| 19| Justin| 男|| 20|Bernadette| 女|| 23| Gretchen| 女|| 27| David| 男|| 33| Joseph| 女|| 27| Trish| 女|| 33| Alex| 女|| 25| Ben| 男|+---+----------+---+*///1.2 DataFrame转DataSetval ds = df.as[Person]ds.show()/*+---+----------+---+|age| name|sex|+---+----------+---+| 30| Michael| 男|| 19| Andy| 女|| 19| Justin| 男|| 20|Bernadette| 女|| 23| Gretchen| 女|| 27| David| 男|| 33| Joseph| 女|| 27| Trish| 女|| 33| Alex| 女|| 25| Ben| 男|+---+----------+---+*///1.3 DataSet转DataFrameval res: DataFrame = ds.toDF()res.show()/*+---+----------+---+|age| name|sex|+---+----------+---+| 30| Michael| 男|| 19| Andy| 女|| 19| Justin| 男|| 20|Bernadette| 女|| 23| Gretchen| 女|| 27| David| 男|| 33| Joseph| 女|| 27| Trish| 女|| 33| Alex| 女|| 25| Ben| 男|+---+----------+---+*/spark.stop()}
}
DataSet 实现 WordCount
def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("create dataset").master("local[*]").getOrCreate()import spark.implicits._val res: Dataset[(String, Long)] = spark.read.text("data/word.txt").as[String].flatMap(_.split(" ")).groupByKey(_.toLowerCase).count()res.show()spark.stop()}
运行结果:
| key|count(1)|
+------+--------+
| fast| 1|
| is| 3|
| spark| 2|
|better| 1|
| good| 1|
|hadoop| 1|
+------+--------+
总结
剩下来就是不断练习各种DataFrame和DataSet的操作、熟悉各种转换和行动操作。
相关文章:
Spark【Spark SQL(三)DataSet】
DataSet DataFrame 的出现,让 Spark 可以更好地处理结构化数据的计算,但存在一个问题:编译时的类型安全问题,为了解决它,Spark 引入了 DataSet API(DataFrame API 的扩展)。DataSet 是分布式的数…...
制作立体图像实用软件:3DMasterKit 10.7 Crack
3DMasterKit 软件专为创建具有逼真 3D 和运动效果的光栅图片而设计:翻转、动画、变形和缩放。 打印机、广告工作室、摄影工作室和摄影师将发现 3DMasterKit 是一种有用且经济高效的解决方案,可将其业务扩展到新的维度,提高生成的 3D 图像和光…...
高校 Web 站点网络安全面临的主要的威胁
校园网 Web 站点的主要安全威胁来源于计算机病毒、内部用户恶意攻击和 破坏、内部用户非恶意的错误操作和网络黑客入侵等。 2.1 计算机病毒 计算机病毒是指编制者在计算机程序中插入的破坏计算机功能或者数据, 影响计算机使用并且能够自我复制的一组计算机指令或…...
vue前端解决跨域
1,首先 axios请求,看后端接口路径,http://122.226.146.110:25002/api/xx/ResxxList,所以baseURL地址改成 ‘/api’ let setAxios originAxios.create({baseURL: /api, //这里要改掉timeout: 20000 // request timeout}); export default s…...
【Cicadaplayer】解码线程及队列实现
4.4分支https://github.com/alibaba/CicadaPlayer/blob/release/0.4.4/framework/codec/ActiveDecoder.h对外:送入多个包,获取一个帧 int send_packet(std::unique_ptr<IAFPacket> &packet, uint64_t timeOut) override;int getFrame(std::u...
把文件上传到Gitee的详细步骤
目录 第一步:创建一个空仓库 第二步:找到你想上传的文件所在的地址,打开命令窗口,git init 第三步:git add 想上传的文件 ,git commit -m "给这次提交取个名字" 第四步:和咱们在第…...
基于keras中Lenet对于mnist的处理
文章目录 MNIST导入必要的包加载数据可视化数据集查看数据集的分布开始训练画出loss图画出accuracy图 使用数据外的图来测试图片可视化转化灰度图的可视化可视化卷积层的特征图第一层卷积 conv1 和 pool1第二层卷积 conv2 和 pool2 MNIST MNIST(Modified National …...
Python爬虫 教程:IP池的使用
前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! python更多源码/资料/解答/教程等 点击此处跳转文末名片免费获取 一、简介 爬虫中为什么需要使用代理 一些网站会有相应的反爬虫措施,例如很多网站会检测某一段时间某个IP的访问次数,如果访问频率…...
Ansible之playbook剧本
一、playbook概述1.1 playbook 介绍1.2 playbook 组成部分 二、playbook 示例2.1 playbook 启动及检测2.2 实例一2.3 vars 定义、引用变量2.4 指定远程主机sudo切换用户2.5 when条件判断2.6 迭代2.7 Templates 模块1.先准备一个以 .j2 为后缀的 template 模板文件,设…...
unique_ptr的大小探讨
unique_ptr大小和删除器有很大关系,具体区别看如下代码的分析。不要让unique_ptr占用的空间太大,否则不会达到裸指针同样的效果。 #include <iostream> #include <memory> using namespace std;class Widget {int m_x;int m_y;int m_z;publ…...
人工智能TensorFlow PyTorch物体分类和目标检测合集【持续更新】
1. 基于TensorFlow2.3.0的花卉识别 基于TensorFlow2.3.0的花卉识别Android APP设计_基于安卓的花卉识别_lilihewo的博客-CSDN博客 2. 基于TensorFlow2.3.0的垃圾分类 基于TensorFlow2.3.0的垃圾分类Android APP设计_def model_load(img_shape(224, 224, 3)_lilihewo的博客-CS…...
ElementPlus·面包屑导航实现
面包屑导航 使用vue3中的UI框架elementPlus的 <el-breadcrumb> 实现面包屑导航 <template><!-- 面包屑 --><div class"bread-container" ><el-breadcrumb separator">"><el-breadcrumb-item :to"{ path:/ }&quo…...
【项目管理】PM vs PMO 18点区别
导读:项目经理跟PMO主要有哪些区别?首先从定义上了解,然后根据其他维度进行对比分析,基本可以了解这二者的区别,文中罗列18点区别供各位参考。 目录 1、定义 1.1 PMO 1.2 PM 2、两者区别 2.1 ROI 2.2 项目成功率…...
13 Python使用Json
概述 在上一节,我们介绍了如何在Python中使用xml,包括:SAX、DOM、ElementTree等内容。在这一节,我们将介绍如何在Python中使用Json。Json的英文全称为JavaScript Object Notation,中文为JavaScript对象表示法ÿ…...
PDFBOX和ASPOSE.PDF
一、aspose.pdf 文档 https://docs.aspose.com/pdf/java/ 1、按段落分段 /*** docx文本按段分段*/ public static void main(String[] args) {int i 1;try {// 打开文件流FileInputStream file new FileInputStream("I:\\范文.docx");// 创建 Word 文档对象XWPFDo…...
第51节:cesium 范围查询(含源码+视频)
结果示例: 完整源码: <template><div class="viewer"><el-button-group class="top_item"><el-button type=...
YOLOv5改进算法之添加CA注意力机制模块
目录 1.CA注意力机制 2.YOLOv5添加注意力机制 送书活动 1.CA注意力机制 CA(Coordinate Attention)注意力机制是一种用于加强深度学习模型对输入数据的空间结构理解的注意力机制。CA 注意力机制的核心思想是引入坐标信息,以便模型可以更好地…...
Jmeter系列-阶梯加压线程组Stepping Thread Group详解(6)
前言 tepping Thread Group是第一个自定义线程组但,随着版本的迭代,已经有更好的线程组代替Stepping Thread Group了【Concurrency Thread Group】,所以说Stepping Thread Group已经是过去式了,但还是介绍一下 Stepping Thread …...
图像的几何变换(缩放、平移、旋转)
图像的几何变换 学习目标 掌握图像的缩放、平移、旋转等了解数字图像的仿射变换和透射变换 1 图像的缩放 缩放是对图像的大小进行调整,即 使图像放大或缩小 cv2.resize(src,dsize,fx0,fy0,interpolationcv2.INTER_LINEAR) 参数: src :输入图像dsize…...
计算机网络第四章——网络层(上)
提示:朝碧海而暮苍梧,睹青天而攀白日 文章目录 网络层是路由器的最高层次,通过网络层就可以将各个设备连接到一起,从而实现这两个主机的数据通信和资源共享,之前学的数据链路层和物理层也是将两端连接起来,但是却没有网…...
eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
Android Wi-Fi 连接失败日志分析
1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分: 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析: CTR…...
抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...
Rust 异步编程
Rust 异步编程 引言 Rust 是一种系统编程语言,以其高性能、安全性以及零成本抽象而著称。在多核处理器成为主流的今天,异步编程成为了一种提高应用性能、优化资源利用的有效手段。本文将深入探讨 Rust 异步编程的核心概念、常用库以及最佳实践。 异步编程基础 什么是异步…...
laravel8+vue3.0+element-plus搭建方法
创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
《C++ 模板》
目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板,就像一个模具,里面可以将不同类型的材料做成一个形状,其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式:templa…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
视觉slam十四讲实践部分记录——ch2、ch3
ch2 一、使用g++编译.cpp为可执行文件并运行(P30) g++ helloSLAM.cpp ./a.out运行 二、使用cmake编译 mkdir build cd build cmake .. makeCMakeCache.txt 文件仍然指向旧的目录。这表明在源代码目录中可能还存在旧的 CMakeCache.txt 文件,或者在构建过程中仍然引用了旧的路…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...
