当前位置: 首页 > news >正文

2023_Spark_实验十四:SparkSQL入门操作

1、将emp.csv、dept.csv文件上传到分布式环境,再用 

hdfs  dfs -put dept.csv /input/

hdfs  dfs -put emp.csv /input/

将本地文件put到hdfs文件系统的input目录下

2、或者调用本地文件也可以。区别:sc.textFile("file:///D:\\temp\\emp.csv")


import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types._import spark.implicits._case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)val lines =sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))val allEmpDF = allEmp.toDFallEmpDF.show

  • StructType 是个case class,一般用于构建schema.

  • 因为是case class,所以使用的时候可以不用new关键字

构造函数

  • 可以传入Seq,List,Array,都是可以的~

  • 还可以用无参的构造器,因为它有一个无参的构造器.

例子

private val schema: StructType = StructType(List(StructField("name", DataTypes.StringType),StructField("age", DataTypes.IntegerType)))

也可以是

private val schema: StructType = StructType(Array(StructField("name", DataTypes.StringType),StructField("age", DataTypes.IntegerType)))

  • 还可以调用无参构造器,这么写

private val schema = (new StructType).add(StructField("name", DataTypes.StringType)).add(StructField("age", DataTypes.IntegerType))

  • 这个无参的构造器,调用了一个有参构造器.this里面是个方法,这个方法的返回值是Array类型,实际上就是无参构造器调用了主构造器

def this() = this(Array.empty[StructField])case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {}

import org.apache.spark.sql.types._val myschema =StructType(List(StructField("empno",DataTypes.IntegerType),StructField("ename",DataTypes.StringType),StructField("job",DataTypes.StringType),StructField("mgr",DataTypes.StringType),StructField("hiredate",DataTypes.StringType),StructField("sal",DataTypes.IntegerType),StructField("comm",DataTypes.StringType),StructField("deptno",DataTypes.IntegerType)))val empcsvRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))import org.apache.spark.sql.Rowval rowRDD=empcsvRDD.map(line => Row (line(0).toInt,line(1),line(2),line(3),line(4),line(5).toInt,line(6),line(7).toInt))val df = spark.createDataFrame(rowRDD,myschema)

将people.json文件上传到分布式环境

hdfs  dfs -put people.json /inputhdfs  dfs -put emp.json /input

//读json文件

val df = spark.read.json("hdfs://Master:9000/input/emp.json")df.show

df.select ("ename").show

df.select($"ename").show

df.select($"ename",$"sal",$"sal"+100).show

df.filter($"sal">2000).show

df.groupBy($"deptno").count.show

df.createOrReplaceTempView("emp")

spark.sql("select * from emp").show

spark.sql("select * from emp where deptno=10").show

spark.sql("select deptno,sum(sal) from emp group by deptno").show


//1 创建一个普通的 view 和一个全局的 viewdf.createOrReplaceTempView("emp1")df.createGlobalTempView("emp2")//2 在当前会话中执行查询,均可查询出结果spark.sql("select * from emp1").showspark.sql("select * from global_temp.emp2").show//3 开启一个新的会话,执行同样的查询spark.newSession.sql("select * from emp1").show //运行出错spark.newSession.sql("select * from global_temp.emp2").show

//7、创建 Datasets//创建 DataSet,方式一:使用序列//1、定义 case classcase class MyData(a:Int,b:String)//2、生成序列,并创建 DataSetval ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS//3、查看结果ds.showds.collect


//创建 DataSet,方式二:使用 JSON 数据//1、定义 case classcase class Person(name: String, gender: String)//2、通过 JSON 数据生成 DataFrameval df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""":: Nil))//3、将 DataFrame 转成 DataSetdf.as[Person].showdf.as[Person].collect


//创建 DataSet,方式三:使用 HDFS 数据val linesDS = spark.read.text("hdfs://Master:9000/input/word.txt").as[String]val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)words.showwords.collect


val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).countresult.showresult.orderBy($"value").show

1、将emp.json文件上传到分布式环境,再用 

hdfs  dfs -put emp.json /input/

将本地文件put到hdfs文件系统的input目录下


//8、Datasets 的操作案例//1.使用 emp.json 生成 DataFrameval empDF = spark.read.json("hdfs://Master:9000/input/emp.json")//查询工资大于 3000 的员工empDF.where($"sal" >= 3000).show//创建 case classcase classEmp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)//生成 DataSets,并查询数据val empDS = empDF.as[Emp]//查询工资大于 3000 的员工empDS.filter(_.sal > 3000).show//查看 10 号部门的员工empDS.filter(_.deptno == 10).show//多表查询//1、创建部门表val deptRDD=sc.textFile("hdfs://Master:9000/input/dept.csv").map(_.split(","))case class Dept(deptno:Int,dname:String,loc:String)val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS//2、创建员工表case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)val empRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))val empDS = empRDD.map(x =>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS//3、执行多表查询:等值链接val result = deptDS.join(empDS,"deptno")//另一种写法:注意有三个等号val result = deptDS.joinWith(empDS,deptDS("deptno")===empDS("deptno"))//查看执行计划:result.explain

相关文章:

2023_Spark_实验十四:SparkSQL入门操作

1、将emp.csv、dept.csv文件上传到分布式环境,再用 hdfs dfs -put dept.csv /input/ hdfs dfs -put emp.csv /input/ 将本地文件put到hdfs文件系统的input目录下 2、或者调用本地文件也可以。区别:sc.textFile("file:///D:\\temp\\emp.csv&qu…...

如何将几个模型合并成一个

1、什么时候需要合并模型? 组装和装配:当你需要将多个零件或组件组装成一个整体时,可以合并它们成为一个模型。例如,在制造业中,当需要设计和展示一个完整的机械装置或产品时,可以将各个零部件合并成一个模…...

异常气体识别与飘移

Olfactory Target/Background Odor Detection via Self-expression Model 解决非目标气体检测 摘要:提出了SeELM模型(自表达ELM模型) 分为两步:1.对获得的数据集进行建模,计算出自我表达系数矩阵,2.对于异…...

分类预测 | Matlab实现WOA-BiLSTM鲸鱼算法优化双向长短期记忆神经网络的数据多输入分类预测

分类预测 | Matlab实现WOA-BiLSTM鲸鱼算法优化双向长短期记忆神经网络的数据多输入分类预测 目录 分类预测 | Matlab实现WOA-BiLSTM鲸鱼算法优化双向长短期记忆神经网络的数据多输入分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现WOA-BiLSTM鲸鱼算法…...

35 机器学习(三):混淆矩阵|朴素贝叶斯|决策树|随机森林

文章目录 分类模型的评估混淆矩阵精确率和召回率 接口介绍其他的补充 朴素贝叶斯基础原理介绍拉普拉斯平滑下面给出应用的例子朴素贝叶斯的思辨 决策树基础使用基本原理信息熵信息增益信息增益率Gini指数 剪枝api介绍 随机森林------集成学习初识基本使用api介绍 分类模型的评估…...

ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1+

该错误提示表示您的 OpenSSL 版本过低,无法兼容 urllib3 v2.0。 解决此问题的方法是升级您的 OpenSSL 版本至 1.1.1 或以上。具体操作如下: 方法一: 检查您的 OpenSSL 版本,使用以下命令: openssl version 如果您的…...

webrtc gcc算法(1)

老的webrtc gcc算法,大概流程: 这两个拥塞控制算法分别是在发送端和接收端实现的, 接收端的拥塞控制算法所计算出的估计带宽, 会通过RTCP的remb反馈到发送端, 发送端综合两个控制算法的结果得到一个最终的发送码率,并以…...

2022年亚太杯APMCM数学建模大赛C题全球变暖与否全过程文档及程序

2022年亚太杯APMCM数学建模大赛 C题 全球变暖与否 原题再现: 加拿大的49.6C创造了地球北纬50以上地区的气温新纪录,一周内数百人死于高温;美国加利福尼亚州死亡谷是54.4C,这是有史以来地球上记录的最高温度;科威特53…...

苹果开发者 Xcode发布TestFlight全流程

打包前注意事项 使用Xcode导出安装包之前,必须先确认账户的所有合约是否全部同意,如果有不同意的,在出包的时候会弹出报错 这是什么意思 这意味着您有一些需要在应用商店连接上验证的协议(protocol)/契约(Contract)。解决方案 连接到应用商店…...

Spring Security—Servlet 应用架构

目录 一、Filter(过滤器)回顾 二、DelegatingFilterProxy 三、FilterChainProxy 四、SecurityFilterChain 五、Security Filter 六、打印出 Security Filter 七、添加自定义 Filter 到 Filter Chain 八、处理 Security 异常 九、保存认证之间的…...

排序优化:如何实现一个通用的、高性能的排序函数?

文章来源于极客时间前google工程师−王争专栏。 几乎所有的编程语言都会提供排序函数,比如java中的Collections.sort()。在平时的开发中,我们都是直接使用,这些排序函数是如何实现的?底层都利用了哪种排序算法呢? 问题…...

车载开发学习——CAN总线

CAN总线又称为汽车总线,全程为“控制器局域网(Controller Area Network)”,即区域网络控制器,它将区域内的单一控制单元以某种形式连接在一起,形成一个系统。在这个系统内,大家以一种大家都认可…...

2023年知名国产数据库厂家汇总

随着信创国产化的崛起,大家纷纷在寻找可替代的国产数据库厂家。这里小编就给大家汇总了一些国内知名数据库厂家,仅供参考哦! 2023年知名国产数据库厂家汇总 1、人大金仓 2、瀚高 3、高斯 4、阿里云 5、华为云 6、浪潮 7、达梦 8、南大…...

【ARM Coresight SoC-400/SoC-600 专栏导读】

文章目录 1. ARM Coresight SoC-400/SoC-600 专栏导读目录1.1 Coresight 专题1.1.1 Performance Profiling1.1.2 ARM Coresight DS-5 系列 1. ARM Coresight SoC-400/SoC-600 专栏导读目录 本专栏全面介绍 ARM Coresight 系统 及SoC-400, SoC-600 中的各个组件。 1.1 Coresigh…...

在Go中创建自定义错误

引言 Go提供了两种在标准库中创建错误的方法,[errors.New和fmt.Errorf],当与用户交流更复杂的错误信息时,或在调试时与未来的自己交流时,有时这两种机制不足以充分捕获和报告所发生的情况。为了传达更复杂的错误信息并实现更多的…...

Vue.js2+Cesium1.103.0 十三、通过经纬度查询 GeoServer 发布的 wms 服务下的 feature 对象的相关信息

Vue.js2Cesium1.103.0 十三、通过经纬度查询 GeoServer 发布的 wms 服务下的 feature 对象的相关信息 Demo <template><divid"cesium-container"style"width: 100%; height: 100%;"><div style"position: absolute;z-index: 999;bott…...

使用STM32怎么喂狗 (IWDG)

STM32F1 的独立看门狗&#xff08;以下简称 IWDG&#xff09;。 STM32F1内部自带了两个看门狗&#xff0c;一个是独立看门狗 IWDG&#xff0c;另一个是窗口看门狗 WWDG&#xff0c; 本章只介绍独立看门狗 IWDG&#xff0c;窗口看门狗 WWDG 会在后面章节介绍。 本章要实现的功能…...

GEE:计算和打印GEE程序的执行时间

作者:CSDN @ _养乐多_ 本文记录了计算和打印程序的执行时间的Google Earth Engine (GEE)代码,并举例说明。 大家在执行GEE代码的时候,有时候为了对比两个不同的脚本,不知道代码执行花费了多少时间。本文记录了打印代码执行时间的函数,并举了一个应用案例说明。可以知道…...

GDPU 数据结构 天码行空5

一、实验目的 1&#xff0e;掌握队列的顺序存储结构 2&#xff0e;掌握队列先进先出运算原则在解决实际问题中的应用 二、实验内容 仿照教材顺序循环队列的例子&#xff0c;设计一个只使用队头指针和计数器的顺序循环队列抽象数据类型。其中操作包括&#xff1a;初始化、入队…...

SQLAlchemy学习-12.查询之 order_by 按desc 降序排序

前言 sqlalchemy的query默认是按id升序进行排序的&#xff0c;当我们需要按某个字段降序排序&#xff0c;就需要用到 order_by。 order_by 排序 默认情况下 sqlalchemy 的 query 默认是按 id 升序进行排序的 res session.query(Project).all() print(res) # [<Project…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频

使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!

一、引言 在数据驱动的背景下&#xff0c;知识图谱凭借其高效的信息组织能力&#xff0c;正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合&#xff0c;探讨知识图谱开发的实现细节&#xff0c;帮助读者掌握该技术栈在实际项目中的落地方法。 …...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

NFT模式:数字资产确权与链游经济系统构建

NFT模式&#xff1a;数字资产确权与链游经济系统构建 ——从技术架构到可持续生态的范式革命 一、确权技术革新&#xff1a;构建可信数字资产基石 1. 区块链底层架构的进化 跨链互操作协议&#xff1a;基于LayerZero协议实现以太坊、Solana等公链资产互通&#xff0c;通过零知…...

汇编常见指令

汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX&#xff08;不访问内存&#xff09;XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

什么是Ansible Jinja2

理解 Ansible Jinja2 模板 Ansible 是一款功能强大的开源自动化工具&#xff0c;可让您无缝地管理和配置系统。Ansible 的一大亮点是它使用 Jinja2 模板&#xff0c;允许您根据变量数据动态生成文件、配置设置和脚本。本文将向您介绍 Ansible 中的 Jinja2 模板&#xff0c;并通…...