当前位置: 首页 > news >正文

Spark:DataFrame介绍及使用

1. DataFrame详解

DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。

  • Row 类型 表示一行数据
    • DataFrame就算是多行构成
# 导入行类Row
from pyspark.sql import Row# 创建行数据
r1 = Row(1, '张三', 20)# 行数取取值 按照下标取值
data = r1[0]
print(data)
data1 = r1[1]
print(data1)# 指定字段创建行数据
r2 = Row(id=2, name='李四', age=22)
# 按照字段取值
data3 = r2['id']
print(data3)
data4 = r2['name']
print(data4)
  • schema表信息
    • 定义DataFrame中的表的字段名和字段类型。
# 导入数据类型
from pyspark.sql.types import *# 定义schema信息
# 使用StructType类进行定义
# add()方法是指定字段信息
# 第一参数,字段名
# 第二个参数,字段信息
# 第三个参数是否允许为空值  默认是True,允许为空
schema_type = StructType().\add('id',IntegerType()).\add('name',StringType()).\add('age',IntegerType(),False)

2. DataFrame创建

创建datafram数据需要使用一个sparksession的类创建,SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext。

2.1 基本创建

#DataFrame 的基本创建
#Row就是行数据定义的类
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *#行数据创建
r1 = Row(1,"刘向阳",23,'男')
print(r1)#行数据下标取值
print(r1[0])
print(r1[1])#创建行数据时可以指定字段名
r2 = Row(id=2,name='李四',age=20,gender='女')
print(r2)
#使用字段名取值
print(r2['name'])# 定义元数据
schema = (StructType().add('id', IntegerType()).add('username', StringType()).add('age', IntegerType()).add('gender', StringType()))
print(schema)# 将元数据和行数据放在一起合成DataFrame
ss = SparkSession.builder.getOrCreate()# 调用创建df的方法
df = ss.createDataFrame([r1,r2],schema=schema)# 查看df中数据
df.show()#查看元数据信息
df.printSchema()

运行结果:
在这里插入图片描述

2.2 RDD和DF之间的转化

  • rdd的二维数据转化为DataFrame
    • rdd.toDF()
      在这里插入图片描述
# rdd 和 dataframe的转化
from pyspark.sql import SparkSession#创建SparkSession对象
ss = SparkSession.builder.getOrCreate()#基于ss对象获取sparkContext
sc = ss.sparkContext#创建rdd , 要使用二维列表指定每行数据
rdd = sc.parallelize([[1,'张三',20,'男'],[2,'李四',20,'男']])#将rdd转为df
df = rdd.toDF(schema='id int,name string,age int,gender string')#df数据查看
df.show()
df.printSchema()#df可以转rdd
res = df.rdd.collect()
print(res)rdd2 = df.rdd.map(lambda x:x['name'])res2 = rdd2.collect()
print(res2)

运行结果:
在这里插入图片描述

2.3 pandas和spark之间转化

  • spark的df转为pandas的df
    • toPandas
#pandas 和 spark的dataframe转化
from pyspark.sql import SparkSession
import pandas as pdss = SparkSession.builder.getOrCreate()#创建pandas的df
df_pd = pd.DataFrame({'id':[1,2,3,4],'name':['张三','李四','王五','赵六'],'age':[1,2,3,4],'gender':['男','女','女','女']}
)
#查看数据
print(df_pd)#取值
name = df_pd['name'][0]
print(name)
# 将pandas中的df转为spark的df
df_spark = ss.createDataFrame(df_pd)#查看
df_spark.show()#取值
row = df_spark.limit(1).first()
print(row['name'])#将spark的df重新转为pandas的df
df_pandas = df_spark.toPandas()
print(df_pandas)

运行结果:
在这里插入图片描述

2.4 读取文件数据转为df

通过read方法读取数据转为df

  • ss.read
#读取文件转为df
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取不同文件数据转为df
# txt文件
df = ss.read.text('hdfs://node1:8020/data/students.txt')
df.show()# json 文件
df_json = ss.read.json('hdfs://node1:8020/data/baike_qa_valid.json')
df_json.show()#orc文件
df_orc = ss.read.orc('hdfs://node1:8020/data/users.orc')
df_orc.show()#去取csv文件
#header或csv文件中的第一行作为表头字段数据
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv')
df_csv.show()

3. DataFrame基本使用

3.1 SQL语句

使用sparksession提供的sql方法,编写sql语句执行

#使用sql操作dataframe结构化数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',')#使用sql操作df数据
#将df指定一个临时表名
df_csv.createTempView('stu')#编写sql字符串语句,支持hivesql语法
sql_str ="""
select * from stu 
"""#执行sql语句,执行结果返回一个新的df
df_res = ss.sql(sql_str)
df_csv.show()
df_res.show()

3.2 DSL方法

DSL方法是df提供的数据操作函数
使用方式:

  • df.方法()
  • 可以进行链式调用
  • df.方法().方法().方法()
  • 方法执行后返回一个新的df保存计算结果
  • new_df = df.方法()

spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据。
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法执行完成后会得到一个处理后的新的df

#使用DSL方法操作dataframe
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1/data/students.csv', header=True,sep=',')#使用DSL方法对df数据进行操作
df2 = df_csv.select('id','name')#查看结果
df2.show()#第二种指定字段的方式
df3 = df_csv.select(df_csv.age,df_csv.gender)#给字段起别名
df4 = df_csv.select(df_csv.age.alias('new_age'),df_csv.gender)
df4.show()#修改字段类型
df_csv.printSchema()
df5 = df_csv.select(df_csv.age.cast('int'),df_csv.gender)
df5.printSchema()#where 的数据过滤
age = 20
df6 = df_csv.where(f'age > {age}')
df6.show()#过滤年龄大于20并且性别为女性的学生信息
df7 = df_csv.where(f'age > 20 and gender = "女" ')
df7.show()#使用第二种字段判断方式
df8 = df_csv.where(df_csv.age == age)
df8.show()#分组聚合计算
df9 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age')
df9.show()#分组后过滤where 聚合计算时只能一次计算一个聚合数据
df10 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age').where('sum(age) > 80')
df10.show()#排序
df11 = df_csv.orderBy('age')  #默认排序
df11.show()df12 = df_csv.orderBy('age',ascending=False)  #降序
df12.show()#分页
df13 = df_csv.limit(5)
df13.show()#转为rdd
res = df_csv.rdd.collect()[5:10]
print(res)
df_new = ss.createDataFrame(res)
df_new.show()

相关文章:

Spark:DataFrame介绍及使用

1. DataFrame详解 DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。 Row 类型 表示一行数据 …...

Linux系统:本机(物理主机)访问不了虚拟机中的apache服务问题的解决方案

学习目标: 提示:本文主要讲述-本机(物理主机)访问不了虚拟机中的apache服务情况下的解决方案 Linux系统:Ubuntu 23.04; 文中提到的“本机”:代表,宿主机,物理主机; 首先&#xff0c…...

望繁信科技成功签约国显科技 流程挖掘助力制造业智造未来

近日,上海望繁信科技有限公司(简称“望繁信科技”)成功与深圳市国显科技有限公司(简称“国显科技”)达成合作。国显科技作为全球领先的TFT-LCD液晶显示及Mini/Micro LED显示产品供应商,致力于为笔记本、手机…...

枚举在Java体系中的作用

1. 枚举 枚举是在JDK1.5以后引入的。主要用途是:将一组常量组织起来,在这之前表示一组常量通常使用定义常量的方式: //用public static final修饰常量 public static final int RED 1; public static final int GREEN 2; public static f…...

『气泡水』Web官网 案例赏析

前言 Schweppes,作为一家享誉全球的气泡水品牌,致力于与年轻消费者建立更紧密的联系,并提升品牌影响力。为此,其打造了一个充满创意和高度互动性的官网,利用前端技术和动画效果,将产品特性与用户浏览体验完…...

【前端】制作一个简单的网页(2)

单标签组成的元素 这类标签不需要内容产生效果&#xff0c;通常表示对网页的某种行为&#xff0c;它们不用标记任何内容&#xff0c;开始即是结束。 比如&#xff0c;<hr>标签的作用是在网页中添加一条分割线&#xff0c;它仅包含开始标签&#xff0c;是一个单标签元素。…...

OpenAI Canvas:提升编程与写作效率的全新工作界面

随着人工智能技术的飞速发展&#xff0c;大语言模型&#xff08;LLM&#xff09;不仅限于生成文本&#xff0c;还能逐步扩展至编程、设计等任务的支持。近期&#xff0c;OpenAI 推出了一个名为 Canvas 的全新功能&#xff0c;专门用于协助用户进行编程和写作。这一功能与 Claud…...

将SpringBoot的Maven项目打成jar包和war包

先需要明确的是&#xff0c;该项目打包的形态是可执行的jar包&#xff0c;还是在tomcat下运行的war包。 springboot自带的maven打包 1.创建一个springboot web项目 1.api控制层HelloWorld.java RestController RequestMapping("/hello") public class HelloWorld …...

【Iceberg分析】Spark与Iceberg集成之常用存储过程

文章目录 Spark与Iceberg集成之常用存储过程调用语法调用样例表快照管理快照回滚根据snapshotid进行回滚根据timestamp进行回滚 设置表当前生效的快照 表元数据管理设置快照过期时间清除孤岛文件重写数据文件运用参数示例optionsGeneral OptionsOptions for sort strategyOptio…...

[旧日谈]关于Qt的刷新事件频率,以及我们在Qt的框架上做实时的绘制操作时我们该关心什么。

[旧日谈]关于Qt的刷新事件频率&#xff0c;以及我们在Qt的框架上做实时的绘制操作时我们该关心什么。 最近在开发的时候&#xff0c;发现一个依赖事件来刷新渲染的控件会导致程序很容易异常和崩溃。 当程序在运行的时候&#xff0c;其实软件本身的负载并不高&#xff0c;所以…...

云上考场小程序+ssm论文源码调试讲解

2 关键技术简介 2.1 微信小程序 微信小程序&#xff0c;简称小程序&#xff0c;英文名Mini Program&#xff0c;是一种全新的连接用户与服务的方式&#xff0c;可以快速访问、快速传播&#xff0c;并具有良好的使用体验。 小程序的主要开发语言是JavaScript&#xff0c;它与…...

城域网——IP城域网、城域以太网、光城域网

一、城域网 1、城域网&#xff08;Metropolitan Area Network&#xff0c;MAN&#xff09;&#xff1a;一个城市范围内所建立的计算机通信网。 2、分布式队列双总线&#xff08;Distributed Queue Dual Bus&#xff0c;DQDB&#xff09;&#xff1a;即IEEE802.6&#xff0c;由…...

华为Eth-trunk链路聚合加入到E-trunk实现跨设备的链路聚合

一、适用场景&#xff08;注&#xff1a;e-trunk与eth-trunk是2个不同的概念&#xff09; 1、企业中有重要的server服务器业务不能中断的情况下&#xff0c;可将上行链路中的汇聚交换机&#xff0c;通过eth-trunk链路聚合技术&#xff0c;实现链路故障后&#xff0c;仍有可用的…...

【网络安全】JSONP劫持原理及攻击实战

未经许可,不得转载。 文章目录 JSONP简介JSONP工作原理JSONP劫持Callback可定义问题JSONP简介 JSONP(JavaScript Object Notation Padding)是一种用于绕过浏览器同源策略限制的技术,使得网页可以从不同域名的服务器请求数据。由于浏览器的同源策略限制,网页通常只能向与其…...

VR全景摄影的拍摄和编辑软件推荐

随着虚拟现实技术的不断进步&#xff0c;VR全景摄影逐渐成为商业、娱乐和教育等多个领域中的重要工具。通过专业的设备与软件&#xff0c;摄影师能够创作出沉浸式的360度全景作品&#xff0c;为观众提供身临其境的视觉体验。在这篇文章中&#xff0c;我们将介绍VR全景摄影的相关…...

linux:使用sar诊断问题

使用sar诊断问题 1. CPU 使用情况2. 内存与交换3. 磁盘 I/O 活动4. 网络 I/O 活动5. 进程与上下文切换6. 系统调用与文件活动7. 电源管理8. 延迟分析9. 系统全局统计10. 查看历史记录11. 特定时间段12. 自动定时采样其他参数&#xff1a;使用实例&#xff1a; sar&#xff08;S…...

CUDA编程技巧(不断搜集更新)

1 使用位运算替换部分乘法或除法 位移操作主要适用于无符号整数&#xff0c;对于带符号数的位移&#xff0c;特别是负数&#xff0c;可能会导致问题&#xff0c;如果你需要对负数执行除法或者乘法&#xff0c;最好谨慎使用位移运算。 1.1 替换除法 当需要将一个数除以 2、4、…...

云计算(第二阶段):mysql后的shell

第一章&#xff1a;变量 前言 什么是shell Shell 是一种提供用户与操作系统内核交互的工具&#xff0c;它接受用户输入的命令&#xff0c;解释后交给操作系统去执行。它不仅可以作为命令解释器&#xff0c;还可以通过脚本完成一系列自动化任务。 shell的特点 跨平台&#xff1a…...

Debian12离线部署Mysql全网最详细教程

一、下载安装所需要的库 1、所需要的库 # 所需要的库有 libc6_2.36-9deb12u8_amd64.deb libgcc-s1_12.2.0-14_amd64.deb libstdc6_12.2.0-14_amd64.deb gcc-12-base_12.2.0-14_amd64.deb psmisc_23.6-1_amd64.deb libnuma1_2.0.18-1_amd64.deb libmecab2_0.996-14b14_amd64.d…...

文本生成视频技术:艺术与科学的交汇点

在人工智能技术的飞速发展下&#xff0c;文本生成视频&#xff08;Text-to-Video&#xff09;技术已经成为现实。这项技术能够根据文本描述生成相应的视频内容&#xff0c;极大地拓展了内容创作的边界。本文将从三个主要方面对文本生成视频技术进行深入探讨&#xff1a;技术能达…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:

一、属性动画概述NETX 作用&#xff1a;实现组件通用属性的渐变过渡效果&#xff0c;提升用户体验。支持属性&#xff1a;width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项&#xff1a; 布局类属性&#xff08;如宽高&#xff09;变化时&#…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

Python爬虫实战:研究feedparser库相关技术

1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

微信小程序云开发平台MySQL的连接方式

注&#xff1a;微信小程序云开发平台指的是腾讯云开发 先给结论&#xff1a;微信小程序云开发平台的MySQL&#xff0c;无法通过获取数据库连接信息的方式进行连接&#xff0c;连接只能通过云开发的SDK连接&#xff0c;具体要参考官方文档&#xff1a; 为什么&#xff1f; 因为…...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

Proxmox Mail Gateway安装指南:从零开始配置高效邮件过滤系统

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storms…...