摸鱼大数据——Spark SQL——DataFrame详解一
1.DataFrame基本介绍
DataFrame表示的是一个二维的表。二维表,必然存在行、列等表结构描述信息表结构描述信息(元数据Schema): StructType对象字段: StructField对象,可以描述字段名称、字段数据类型、是否可以为空行: Row对象列: Column对象,包含字段名称和字段值在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息
如何构建表结构信息数据:
2.DataFrame的构建方式
方式1: 使用SparkSession的createDataFrame(data,schema)函数创建data参数1.基于List列表数据进行创建2.基于RDD弹性分布式数据集进行创建3.基于pandas的DataFrame数据进行创建schema参数1: 字符串格式一 :“字段名1 字段类型,字段名2 字段类型”格式二(推荐):“字段名1:字段类型,字段名2:字段类型”2: List格式: ["字段名1","字段名2"] 3: DataType(推荐,用的最多)格式一:schema=StructType().add('字段名1',字段类型).add('字段名2',字段类型)格式二:schema=StructType([StructField('字段名1',类型),StructField('字段名1',类型)])方式2: 使用DataFrame的toDF(colNames)函数创建DataFrame的toDF方法是一个在Apache Spark的DataFrame API中用来创建一个新的DataFrame的方法。这个方法可以将一个RDD转换为DataFrame,或者将一个已存在的DataFrame转换为另一个DataFrame。在Python中,你可以使用toDF方法来指定列的名字。如果你不指定列的名字,那么默认的列的名字会是_1, _2等等。 格式: rdd.toDF([列名])方式3: 使用SparkSession的read()函数创建在 Spark 中,SparkSession 的 read 是用于读取数据的入口点之一,它提供了各种方法来读取不同格式的数据并将其加载到 Spark 中进行处理。统一API格式: spark.read.format('text|csv|json|parquet|orc|...') : 读取外部文件的方式.option('k','v') : 选项 可以设置相关的参数 (可选).schema(StructType | String) : 设置表的结构信息.load('加载数据路径') : 读取外部文件的路径, 支持 HDFS 也支持本地简写API格式:注意: 以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写格式: spark.read.文件读取方式()注意: parquet:是Spark中常用的一种列式存储文件格式和Hive中的ORC差不多, 他俩都是列存储格式
2.1 createDataFrame()创建
场景:一般用在开发和测试中。因为只能处理少量的数据
2.1.1 基于列表
# 导包import osfrom pyspark.sql import SparkSession# 绑定指定的python解释器os.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.创建DF对象data = [(1, '张三', 18), (2, '李四', 28), (3, '王五', 38)]df1 = spark.createDataFrame(data,schema=['id','name','age'])# 展示数据df1.show()# 查看结构信息df1.printSchema()print('---------------------------------------------------------')df2 = spark.createDataFrame(data,schema='id int,name string,age int')# 展示数据df2.show()# 查看结构信息df2.printSchema()print('---------------------------------------------------------')df3 = spark.createDataFrame(data,schema='id:int,name:string,age:int')# 展示数据df3.show()# 查看结构信息df3.printSchema()# 3.关闭资源spark.stop()
2.1.2 基于RDD普通方式
场景:RDD可以存储任意结构的数据;而DataFrame只能处理二维表数据。在使用Spark处理数据的初期,可能输入进来的数据是半结构化或者是非结构化的数据,那么可以先通过RDD对数据进行ETL处理成结构化数据,再使用开发效率高的SparkSQL来对后续数据进行处理分析。
Schema选择StructType对象来定义DataFrame的“表结构”转换RDD
# 导包import osfrom pyspark.sql import SparkSession# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructFieldos.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()sc = spark.sparkContext# 2.读取生成rddtextRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')print(type(textRDD)) # <class 'pyspark.rdd.RDD'>etlRDD = textRDD.map(lambda line:line.split(',')).map(lambda l:(l[0],l[1]))# 3.定义schema结构信息schema1 = StructType().add('name',StringType(),True).add('age',StringType(),True)schema2 = StructType([StructField('name',StringType(),True),StructField('age',StringType(),True)])schema3 = ['name','age']schema4 = 'name string,age string'schema5 = 'name:string,age:string'# 4.创建DF对象dfpeople = spark.createDataFrame(etlRDD,schema5)# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView('peoples')r = spark.sql('select * from peoples')r.show()# 7.关闭资源sc.stop()spark.stop()
2.1.3 基于RDD反射方式
Schema使用反射方法来推断Schema模式Spark SQL 可以将 Row 对象的 RDD 转换为 DataFrame,从而推断数据类型。
# 导包import osfrom pyspark.sql import SparkSession# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()sc = spark.sparkContext# 2.读取生成rdd# 3.定义schema结构信息textRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')etlRDD_schema = textRDD.map(lambda line:line.split(',')).map(lambda l:Row(name=l[0],age=l[1]))# 4.创建DF对象dfpeople = spark.createDataFrame(etlRDD_schema)# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView('peoples')r = spark.sql('select * from peoples')r.show()# 7.关闭资源sc.stop()spark.stop()
2.2 toDF()创建
schema模式编码在字符串中,toDF参数用于指定列的名字。如果你不指定列的名字,那么默认的列的名字会是_1, _2等等。
# 导包import osfrom pyspark.sql import SparkSession# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()sc = spark.sparkContext# 2.读取生成rdd# 3.定义schema结构信息textRDD = sc.textFile('file:///export/data/spark_project/spark_sql/data/data1.txt')etlRDD = textRDD.map(lambda line:line.split(','))# 4.创建DF对象dfpeople = etlRDD.toDF(['name','age'])# 5.df展示结构信息dfpeople.show()dfpeople.printSchema()# 6.拓展: 创建临时视图,方便sql查询dfpeople.createTempView('peoples')r = spark.sql('select * from peoples')r.show()# 7.关闭资源sc.stop()spark.stop()
2.3 read读取外部文件
复杂API
统一API格式: spark.read.format('text|csv|json|parquet|orc|avro|jdbc|.....') # 读取外部文件的方式.option('k','v') # 选项 可以设置相关的参数 (可选).schema(StructType | String) # 设置表的结构信息.load('加载数据路径') # 读取外部文件的路径, 支持 HDFS 也支持本地
简写API
请注意: 以上所有的外部读取方式,都有简单的写法。spark内置了一些常用的读取方案的简写格式: spark.read.读取方式()例如: df = spark.read.csv(path='file:///export/data/_03_spark_sql/data/stu.txt',header=True,sep=' ',inferSchema=True,encoding='utf-8',)
2.3.1 Text方式读取
text方式读取文件:1- 不管文件中内容是什么样的,text会将所有内容全部放到一个列中处理2- 默认生成的列名叫value,数据类型string3- 只能够在schema中修改字段value的名称,其他任何内容不能修改
# 导包import osfrom pyspark.sql import SparkSession# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.读取数据# 注意: 读取text文件默认只有1列,且列名交value,可以通过schema修改df = spark.read\.format('text')\.schema('info string')\.load('file:///export/data/spark_project/spark_sql/data/data1.txt')# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView('peoples')r = spark.sql('select * from peoples')r.show()# 6.关闭资源spark.stop()
2.3.2 CSV方式读取
csv格式读取外部文件:1- 复杂API和简写API都必须掌握2- 相关参数作用说明:2.1- path:指定读取的文件路径。支持HDFS和本地文件路径2.2- schema:手动指定元数据信息2.3- sep:指定字段间的分隔符2.4- encoding:指定文件的编码方式2.5- header:指定文件中的第一行是否是字段名称2.6- inferSchema:根据数据内容自动推断数据类型。但是,推断结果可能不精确
# 导包import osfrom pyspark.sql import SparkSession# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.读取数据# 注意: csv文件可以识别多个列,可以使用schema指定列名,类型# 原始方式# df = spark.read\# .format('csv')\# .schema('name string,age int')\# .option('sep',',')\# .option('encoding','utf8')\# .option('header',False)\# .load('file:///export/data/spark_project/spark_sql/data/data1.txt')# 简化方式df = spark.read.csv(schema='name string,age int',sep=',',encoding='utf8',header=False,path='file:///export/data/spark_project/spark_sql/data/data1.txt')# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView('peoples')r = spark.sql('select * from peoples')r.show()# 7.关闭资源spark.stop()
2.3.3 JSON方式读取
json读取数据:1- 需要手动指定schema信息。如果手动指定的时候,字段名称与json中的key名称不一致,会解析不成功,以null值填充2- csv/json中schema的结构,如果是字符串类型,那么字段名称和字段数据类型间,只能以空格分隔
json的数据内容
{'id': 1,'name': '张三','age': 20}{'id': 2,'name': '李四','age': 23,'address': '北京'}{'id': 3,'name': '王五','age': 25}{'id': 4,'name': '赵六','age': 29}
代码实现
# 导包import osfrom pyspark.sql import SparkSession# 绑定指定的python解释器from pyspark.sql.types import StructType, StringType, StructField, Rowos.environ['SPARK_HOME'] = '/export/server/spark'os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# 创建main函数if __name__ == '__main__':# 1.创建SparkContext对象spark = SparkSession.builder.appName('pyspark_demo').master('local[*]').getOrCreate()# 2.读取数据# 注意: json的key和schema指定的字段名不一致,会用null补充,如果没有数据也是用null补充# 简化方式df = spark.read.json(schema='id int,name string,age int,address string',encoding='utf8',path='file:///export/data/spark_project/spark_sql/data/data2.txt')# 5.df展示结构信息df.show()df.printSchema()# 6.拓展: 创建临时视图,方便sql查询df.createTempView('peoples')r = spark.sql('select * from peoples')r.show()# 关闭资源spark.stop()
相关文章:

摸鱼大数据——Spark SQL——DataFrame详解一
1.DataFrame基本介绍 DataFrame表示的是一个二维的表。二维表,必然存在行、列等表结构描述信息表结构描述信息(元数据Schema): StructType对象字段: StructField对象,可以描述字段名称、字段数据类型、是否可以为空行: Row对象列: Column对象ÿ…...

【Java探索之旅】初识多态_概念_实现条件
文章目录 📑前言一、多态1.1 概念1.2 多态的实现条件 🌤️全篇总结 📑前言 多态作为面向对象编程中的重要概念,为我们提供了一种灵活而强大的编程方式。通过多态,同一种操作可以应用于不同的对象,并根据对象…...
[Day 26] 區塊鏈與人工智能的聯動應用:理論、技術與實踐
數據科學與AI的整合應用 數據科學(Data Science)和人工智能(AI)在現代技術世界中扮演著至關重要的角色。兩者的整合應用能夠為企業和研究人員提供強大的工具,以更好地理解、預測和解決各種複雜的問題。本文將深入探討…...

算法 —— 滑动窗口
目录 长度最小的子数组 无重复字符的最长子串 最大连续1的个数 将x减到0的最小操作数 找到字符串中所有字母异位词 最小覆盖子串 长度最小的子数组 sum比target小就进窗口,sum比target大就出窗口,由于数组是正数,所以相加会使sum变大&…...

【设计模式】工厂模式(定义 | 特点 | Demo入门讲解)
文章目录 定义简单工厂模式案例 | 代码Phone顶层接口设计Meizu品牌类Xiaomi品牌类PhoneFactory工厂类Customer 消费者类 工厂方法模式案例 | 代码PhoneFactory工厂类 Java高级特性---工厂模式与反射的高阶玩法方案:反射工厂模式 总结 其实工厂模式就是用一个代理类帮…...
Linux之计划和日志
计划任务 计划任务概念解析 在Linux操作系统中,除了用户即时执行的命令操作以外,还可以配置在指定的时间、指定的日期执行预先计划好的系统管理任务(如定期备份、定期采集监测数据)。通过安装at和crontabs这两个系统服务实现一次性、周期性计划任务的功能,并分别通过at、…...

C++ 多态篇
文章目录 1. 多态的概念和实现1.1 概念1.2 实现1.2.1 协变1.2.2 析构函数1.2.3 子类虚函数不加virtual 2. C11 final和override3.1 final3.2 override 3. 函数重载、重写与隐藏4. 多态的原理5. 抽象类6.单继承和多继承的虚表6.1 单继承6.2 多继承 7. 菱形继承的虚表(了解)7.1 菱…...

【LVGL-SquareLine Studio】
LVGL-SquareLine Studio ■ SquareLine Studio-官网下载地址■ SquareLine Studio-参考博客■ SquareLine Studio-安装■ SquareLine Studio-汉化■ SquareLine Studio-■ SquareLine Studio-■ SquareLine Studio-■ SquareLine Studio-■ SquareLine Studio- ■ SquareLine S…...
mysqli 与mysql 区别和联系, 举例说明
mysqli是一种PHP的扩展,用于与MySQL数据库进行交互。它提供了一套面向对象的接口,可以更方便地操作数据库。MySQL是一种关系型数据库管理系统,用于存储和管理数据。 区别: mysqli是MySQL的扩展,而不是单独的数据库管…...

【SpringCloud应用框架】Nacos安装和服务提供者注册
第二章 Spring Cloud Alibaba Nacos之Nacos安装和服务提供者注册 文章目录 Nacos介绍为何使用Nacos?一、Nacos下载和安装1. 下载2. 安装Linux/Unix/MacWindows 二、Nacos服务提供者注册1. Nacos代替Eureka2. Nacos服务注册中心3. 引入Nacos Discovery进行服务注册/发…...

英语学习交流小程序的设计
管理员账户功能包括:系统首页,个人中心,用户管理,每日打卡管理,备忘录管理,学习计划管理,学习资源管理,论坛交流 微信端账号功能包括:系统首页,学习资源&…...
实现Java多线程中的线程间通信
实现Java多线程中的线程间通信 大家好,我是微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 1. 线程间通信的基本概念 在线程编程中,线程间通信是指多个线程之间通过共享内存或消息传递的方式进行交…...

C++模板元编程(一)——可变参数模板
这个系列主要记录C模板元编程的常用语法 文章目录 引言语法应用函数模板可变参数的打印可变参数的最小/最大函数 类模板 参考文献 引言 在C11之前,函数模板和类模板只支持含有固定数量的模板参数。C11增强了模板功能,允许模板定义中包含任意个(包括0个)…...

kafka中
Kafka RocketMQ概述 RabbitMQ概述 ActiveMQ概述 ZeroMQ概述 MQ对比选型 适用场景-从公司基础建设力量角度出发 适用场景-从业务场景出发 Kafka配置介绍 运行Kafka 安装ELAK 配置EFAK EFAK界面 KAFKA常用术语 Kafka常用指令 Kafka中消息读取 单播消息 group.id 相同 多播消息 g…...
Android 获取当前电池状态
在 API 级别 23 上获取充电状态 要在 API 级别 23 上获取电池的当前状态,只需使用电池管理器系统服务: BatteryManager batteryManager (BatteryManager) getSystemService(BATTERY_SERVICE); boolean isCharging batteryManager.isCharging();使用 S…...

【JVM 的内存模型】
1. JVM内存模型 下图为JVM内存结构模型: 两种执行方式: 解释执行:JVM是由C语言编写的,其中有C解释器,负责先将Java语言解释翻译为C语言。缺点是经过一次JVM翻译,速度慢一点。JIT执行:JIT编译器…...

【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【17】认证服务01—短信/邮件/异常/MD5
持续学习&持续更新中… 守破离 【雷丰阳-谷粒商城 】【分布式高级篇-微服务架构篇】【17】认证服务01 环境搭建验证码倒计时短信服务邮件服务验证码短信形式:邮件形式: 异常机制MD5参考 环境搭建 C:\Windows\System32\drivers\etc\hosts 192.168.…...
geom buffer制作
1. auto buffer_geom line_string->buffer(15);//buffer //这个是x和y各扩大段15个单位 auto buffer_geom line_string->buffer(15);//buffer //这个是x和y各扩大段15米 获取buffer坐标 auto boundary buffer_geom->getBoundary(); auto boundary_coords boun…...

微软正在放弃React
最近,微软Edge团队撰写了一篇文章,介绍了微软团队如何努力提升Edge浏览器的性能。但在文中,微软对React提出了批评,并宣布他们将不再在Edge浏览器的开发中使用React。 我将详细解析他们的整篇文章内容,探讨这一决定对…...

U盘非安全退出后的格式化危机与高效恢复策略
在数字化时代,U盘作为数据存储与传输的重要工具,其数据安全备受关注。然而,一个常见的操作失误——U盘没有安全退出便直接拔出,随后再插入时却遭遇“需要格式化”的提示,这不仅让用户措手不及,更可能意味着…...
DockerHub与私有镜像仓库在容器化中的应用与管理
哈喽,大家好,我是左手python! Docker Hub的应用与管理 Docker Hub的基本概念与使用方法 Docker Hub是Docker官方提供的一个公共镜像仓库,用户可以在其中找到各种操作系统、软件和应用的镜像。开发者可以通过Docker Hub轻松获取所…...

阿里云ACP云计算备考笔记 (5)——弹性伸缩
目录 第一章 概述 第二章 弹性伸缩简介 1、弹性伸缩 2、垂直伸缩 3、优势 4、应用场景 ① 无规律的业务量波动 ② 有规律的业务量波动 ③ 无明显业务量波动 ④ 混合型业务 ⑤ 消息通知 ⑥ 生命周期挂钩 ⑦ 自定义方式 ⑧ 滚的升级 5、使用限制 第三章 主要定义 …...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

【大模型RAG】Docker 一键部署 Milvus 完整攻略
本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...

【Redis】笔记|第8节|大厂高并发缓存架构实战与优化
缓存架构 代码结构 代码详情 功能点: 多级缓存,先查本地缓存,再查Redis,最后才查数据库热点数据重建逻辑使用分布式锁,二次查询更新缓存采用读写锁提升性能采用Redis的发布订阅机制通知所有实例更新本地缓存适用读多…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...
Caliper 配置文件解析:fisco-bcos.json
config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...

Python 实现 Web 静态服务器(HTTP 协议)
目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1)下载安装包2)配置环境变量3)安装镜像4)node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1)使用 http-server2)详解 …...