2023_Spark_实验十四:SparkSQL入门操作
1、将emp.csv、dept.csv文件上传到分布式环境,再用
hdfs dfs -put dept.csv /input/
hdfs dfs -put emp.csv /input/
将本地文件put到hdfs文件系统的input目录下
2、或者调用本地文件也可以。区别:sc.textFile("file:///D:\\temp\\emp.csv")
import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types._import spark.implicits._case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)val lines =sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))val allEmpDF = allEmp.toDFallEmpDF.show

-
StructType 是个case class,一般用于构建schema.
-
因为是case class,所以使用的时候可以不用new关键字
构造函数
-
可以传入Seq,List,Array,都是可以的~
-
还可以用无参的构造器,因为它有一个无参的构造器.
例子
private val schema: StructType = StructType(List(StructField("name", DataTypes.StringType),StructField("age", DataTypes.IntegerType)))
也可以是
private val schema: StructType = StructType(Array(StructField("name", DataTypes.StringType),StructField("age", DataTypes.IntegerType)))
-
还可以调用无参构造器,这么写
private val schema = (new StructType).add(StructField("name", DataTypes.StringType)).add(StructField("age", DataTypes.IntegerType))
-
这个无参的构造器,调用了一个有参构造器.this里面是个方法,这个方法的返回值是Array类型,实际上就是无参构造器调用了主构造器
def this() = this(Array.empty[StructField])case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {}
import org.apache.spark.sql.types._val myschema =StructType(List(StructField("empno",DataTypes.IntegerType),StructField("ename",DataTypes.StringType),StructField("job",DataTypes.StringType),StructField("mgr",DataTypes.StringType),StructField("hiredate",DataTypes.StringType),StructField("sal",DataTypes.IntegerType),StructField("comm",DataTypes.StringType),StructField("deptno",DataTypes.IntegerType)))val empcsvRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))import org.apache.spark.sql.Rowval rowRDD=empcsvRDD.map(line => Row (line(0).toInt,line(1),line(2),line(3),line(4),line(5).toInt,line(6),line(7).toInt))val df = spark.createDataFrame(rowRDD,myschema)

将people.json文件上传到分布式环境
hdfs dfs -put people.json /inputhdfs dfs -put emp.json /input
//读json文件
val df = spark.read.json("hdfs://Master:9000/input/emp.json")df.show

df.select ("ename").show

df.select($"ename").show

df.select($"ename",$"sal",$"sal"+100).show

df.filter($"sal">2000).show

df.groupBy($"deptno").count.show

df.createOrReplaceTempView("emp")
spark.sql("select * from emp").show

spark.sql("select * from emp where deptno=10").show

spark.sql("select deptno,sum(sal) from emp group by deptno").show

//1 创建一个普通的 view 和一个全局的 viewdf.createOrReplaceTempView("emp1")df.createGlobalTempView("emp2")//2 在当前会话中执行查询,均可查询出结果spark.sql("select * from emp1").showspark.sql("select * from global_temp.emp2").show//3 开启一个新的会话,执行同样的查询spark.newSession.sql("select * from emp1").show //运行出错spark.newSession.sql("select * from global_temp.emp2").show

//7、创建 Datasets//创建 DataSet,方式一:使用序列//1、定义 case classcase class MyData(a:Int,b:String)//2、生成序列,并创建 DataSetval ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS//3、查看结果ds.showds.collect

//创建 DataSet,方式二:使用 JSON 数据//1、定义 case classcase class Person(name: String, gender: String)//2、通过 JSON 数据生成 DataFrameval df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""":: Nil))//3、将 DataFrame 转成 DataSetdf.as[Person].showdf.as[Person].collect

//创建 DataSet,方式三:使用 HDFS 数据val linesDS = spark.read.text("hdfs://Master:9000/input/word.txt").as[String]val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)words.showwords.collect

val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).countresult.showresult.orderBy($"value").show

1、将emp.json文件上传到分布式环境,再用
hdfs dfs -put emp.json /input/
将本地文件put到hdfs文件系统的input目录下
//8、Datasets 的操作案例//1.使用 emp.json 生成 DataFrameval empDF = spark.read.json("hdfs://Master:9000/input/emp.json")//查询工资大于 3000 的员工empDF.where($"sal" >= 3000).show//创建 case classcase classEmp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)//生成 DataSets,并查询数据val empDS = empDF.as[Emp]//查询工资大于 3000 的员工empDS.filter(_.sal > 3000).show//查看 10 号部门的员工empDS.filter(_.deptno == 10).show//多表查询//1、创建部门表val deptRDD=sc.textFile("hdfs://Master:9000/input/dept.csv").map(_.split(","))case class Dept(deptno:Int,dname:String,loc:String)val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS//2、创建员工表case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)val empRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))val empDS = empRDD.map(x =>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS//3、执行多表查询:等值链接val result = deptDS.join(empDS,"deptno")//另一种写法:注意有三个等号val result = deptDS.joinWith(empDS,deptDS("deptno")===empDS("deptno"))//查看执行计划:result.explain
相关文章:
2023_Spark_实验十四:SparkSQL入门操作
1、将emp.csv、dept.csv文件上传到分布式环境,再用 hdfs dfs -put dept.csv /input/ hdfs dfs -put emp.csv /input/ 将本地文件put到hdfs文件系统的input目录下 2、或者调用本地文件也可以。区别:sc.textFile("file:///D:\\temp\\emp.csv&qu…...
如何将几个模型合并成一个
1、什么时候需要合并模型? 组装和装配:当你需要将多个零件或组件组装成一个整体时,可以合并它们成为一个模型。例如,在制造业中,当需要设计和展示一个完整的机械装置或产品时,可以将各个零部件合并成一个模…...
异常气体识别与飘移
Olfactory Target/Background Odor Detection via Self-expression Model 解决非目标气体检测 摘要:提出了SeELM模型(自表达ELM模型) 分为两步:1.对获得的数据集进行建模,计算出自我表达系数矩阵,2.对于异…...
分类预测 | Matlab实现WOA-BiLSTM鲸鱼算法优化双向长短期记忆神经网络的数据多输入分类预测
分类预测 | Matlab实现WOA-BiLSTM鲸鱼算法优化双向长短期记忆神经网络的数据多输入分类预测 目录 分类预测 | Matlab实现WOA-BiLSTM鲸鱼算法优化双向长短期记忆神经网络的数据多输入分类预测分类效果基本描述程序设计参考资料 分类效果 基本描述 1.Matlab实现WOA-BiLSTM鲸鱼算法…...
35 机器学习(三):混淆矩阵|朴素贝叶斯|决策树|随机森林
文章目录 分类模型的评估混淆矩阵精确率和召回率 接口介绍其他的补充 朴素贝叶斯基础原理介绍拉普拉斯平滑下面给出应用的例子朴素贝叶斯的思辨 决策树基础使用基本原理信息熵信息增益信息增益率Gini指数 剪枝api介绍 随机森林------集成学习初识基本使用api介绍 分类模型的评估…...
ImportError: urllib3 v2.0 only supports OpenSSL 1.1.1+
该错误提示表示您的 OpenSSL 版本过低,无法兼容 urllib3 v2.0。 解决此问题的方法是升级您的 OpenSSL 版本至 1.1.1 或以上。具体操作如下: 方法一: 检查您的 OpenSSL 版本,使用以下命令: openssl version 如果您的…...
webrtc gcc算法(1)
老的webrtc gcc算法,大概流程: 这两个拥塞控制算法分别是在发送端和接收端实现的, 接收端的拥塞控制算法所计算出的估计带宽, 会通过RTCP的remb反馈到发送端, 发送端综合两个控制算法的结果得到一个最终的发送码率,并以…...
2022年亚太杯APMCM数学建模大赛C题全球变暖与否全过程文档及程序
2022年亚太杯APMCM数学建模大赛 C题 全球变暖与否 原题再现: 加拿大的49.6C创造了地球北纬50以上地区的气温新纪录,一周内数百人死于高温;美国加利福尼亚州死亡谷是54.4C,这是有史以来地球上记录的最高温度;科威特53…...
苹果开发者 Xcode发布TestFlight全流程
打包前注意事项 使用Xcode导出安装包之前,必须先确认账户的所有合约是否全部同意,如果有不同意的,在出包的时候会弹出报错 这是什么意思 这意味着您有一些需要在应用商店连接上验证的协议(protocol)/契约(Contract)。解决方案 连接到应用商店…...
Spring Security—Servlet 应用架构
目录 一、Filter(过滤器)回顾 二、DelegatingFilterProxy 三、FilterChainProxy 四、SecurityFilterChain 五、Security Filter 六、打印出 Security Filter 七、添加自定义 Filter 到 Filter Chain 八、处理 Security 异常 九、保存认证之间的…...
排序优化:如何实现一个通用的、高性能的排序函数?
文章来源于极客时间前google工程师−王争专栏。 几乎所有的编程语言都会提供排序函数,比如java中的Collections.sort()。在平时的开发中,我们都是直接使用,这些排序函数是如何实现的?底层都利用了哪种排序算法呢? 问题…...
车载开发学习——CAN总线
CAN总线又称为汽车总线,全程为“控制器局域网(Controller Area Network)”,即区域网络控制器,它将区域内的单一控制单元以某种形式连接在一起,形成一个系统。在这个系统内,大家以一种大家都认可…...
2023年知名国产数据库厂家汇总
随着信创国产化的崛起,大家纷纷在寻找可替代的国产数据库厂家。这里小编就给大家汇总了一些国内知名数据库厂家,仅供参考哦! 2023年知名国产数据库厂家汇总 1、人大金仓 2、瀚高 3、高斯 4、阿里云 5、华为云 6、浪潮 7、达梦 8、南大…...
【ARM Coresight SoC-400/SoC-600 专栏导读】
文章目录 1. ARM Coresight SoC-400/SoC-600 专栏导读目录1.1 Coresight 专题1.1.1 Performance Profiling1.1.2 ARM Coresight DS-5 系列 1. ARM Coresight SoC-400/SoC-600 专栏导读目录 本专栏全面介绍 ARM Coresight 系统 及SoC-400, SoC-600 中的各个组件。 1.1 Coresigh…...
在Go中创建自定义错误
引言 Go提供了两种在标准库中创建错误的方法,[errors.New和fmt.Errorf],当与用户交流更复杂的错误信息时,或在调试时与未来的自己交流时,有时这两种机制不足以充分捕获和报告所发生的情况。为了传达更复杂的错误信息并实现更多的…...
Vue.js2+Cesium1.103.0 十三、通过经纬度查询 GeoServer 发布的 wms 服务下的 feature 对象的相关信息
Vue.js2Cesium1.103.0 十三、通过经纬度查询 GeoServer 发布的 wms 服务下的 feature 对象的相关信息 Demo <template><divid"cesium-container"style"width: 100%; height: 100%;"><div style"position: absolute;z-index: 999;bott…...
使用STM32怎么喂狗 (IWDG)
STM32F1 的独立看门狗(以下简称 IWDG)。 STM32F1内部自带了两个看门狗,一个是独立看门狗 IWDG,另一个是窗口看门狗 WWDG, 本章只介绍独立看门狗 IWDG,窗口看门狗 WWDG 会在后面章节介绍。 本章要实现的功能…...
GEE:计算和打印GEE程序的执行时间
作者:CSDN @ _养乐多_ 本文记录了计算和打印程序的执行时间的Google Earth Engine (GEE)代码,并举例说明。 大家在执行GEE代码的时候,有时候为了对比两个不同的脚本,不知道代码执行花费了多少时间。本文记录了打印代码执行时间的函数,并举了一个应用案例说明。可以知道…...
GDPU 数据结构 天码行空5
一、实验目的 1.掌握队列的顺序存储结构 2.掌握队列先进先出运算原则在解决实际问题中的应用 二、实验内容 仿照教材顺序循环队列的例子,设计一个只使用队头指针和计数器的顺序循环队列抽象数据类型。其中操作包括:初始化、入队…...
SQLAlchemy学习-12.查询之 order_by 按desc 降序排序
前言 sqlalchemy的query默认是按id升序进行排序的,当我们需要按某个字段降序排序,就需要用到 order_by。 order_by 排序 默认情况下 sqlalchemy 的 query 默认是按 id 升序进行排序的 res session.query(Project).all() print(res) # [<Project…...
线性结构之链表[基于郝斌课程]
每个结点只有一个前续结点每个结点只有一个后续结点首结点没有前续结点尾结点没有后续结点专业术语:首结点:第一个有效结点,存放第一个有效数据尾结点:最后一个有效结点,存放最后一个有效数据头结点:在首结…...
构建企业级AI智能体:LangGraph多智能体框架实战指南
构建企业级AI智能体:LangGraph多智能体框架实战指南 【免费下载链接】langgraph Build resilient language agents as graphs. 项目地址: https://gitcode.com/GitHub_Trending/la/langgraph 在当今AI应用开发中,开发者面临着一个核心挑战&#x…...
零基础新手指南:借助快马AI无需代码构建你的第一篇论文官网
作为一个完全没有编程基础的研究生,我曾经为了搭建个人论文展示网站头疼不已。直到发现了InsCode(快马)平台,整个过程变得异常简单。下面分享我的完整实践过程,希望能帮助到同样需要展示学术成果的朋友们。 明确网站需求结构 在开始前&#x…...
新手福音:用快马生成你的第一个c盘自动清理python脚本
今天想和大家分享一个特别实用的Python小工具——C盘自动清理脚本。作为一个刚接触编程的新手,我发现清理C盘空间是个常见需求,但手动操作既麻烦又容易误删重要文件。于是我用InsCode(快马)平台生成了一个简单实用的脚本,整个过程特别适合编程…...
指挥OpenClaw抓取数据折腾了一夜,我终于想到了邪修玩法
这段时间玩小龙虾玩得真上头,突然想起之前一直想要统计公众号的数据。 这工作交给小龙虾妥妥能胜任啊!但是吧……实际上执行出来的结果却不是这样的。 因为小白本地使用的是OpenClawAtomgit的方案,Atomgit主打一个不费一分钱,免…...
4款降AI率工具实测横评:最便宜和最贵的效果差多少?
花了几百块,测了一圈,现在把结果告诉你。 降AI率工具、降AI工具保姆级测评2026、降AI这个需求,不同工具之间差距其实挺明显的,不是"随便用一个都一样"。 我的结论:嘎嘎降AI(www.aigcleaner.com…...
[AI/应用/MCP] MCP Server/Tool 开发指南
1. 智能软件工程的范式转移:从库集成到原生框架演进 在生成式人工智能(Generative AI)从单纯的文本生成向具备自主规划与执行能力的“代理化(Agentic)”系统跨越的过程中,.NET 生态系统正在经历一场自该平台…...
Kettle数据迁移实战:从CSV到MySQL的高效导入指南
1. 为什么选择Kettle进行CSV到MySQL的数据迁移 第一次接触数据迁移任务时,我试过用Python脚本逐行读取CSV写入MySQL,结果导入10万条数据花了近20分钟。后来发现Kettle这个神器,同样的数据量只需要2分钟就能搞定,效率提升简直惊人。…...
Doris集群部署避坑指南:3FE+3BE配置全流程(含Java环境配置与常见问题解决)
Doris集群部署实战:3FE3BE高可用架构搭建与深度调优 在企业级数据分析场景中,Doris凭借其出色的实时分析性能和高并发处理能力,已成为众多企业的首选OLAP引擎。本文将基于3FE(Frontend)3BE(Backend…...
LangChain框架使用说明
LangChain框架的安装与环境配置 LangChain的安装可通过Python包管理器快速完成。核心库包括langchain、langchain-community和langchain-core,建议使用以下命令进行完整安装: pip install langchain langchain-community langchain-core openai环境配…...
