当前位置: 首页 > news >正文

Spark:DataFrame介绍及使用

1. DataFrame详解

DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。

  • Row 类型 表示一行数据
    • DataFrame就算是多行构成
# 导入行类Row
from pyspark.sql import Row# 创建行数据
r1 = Row(1, '张三', 20)# 行数取取值 按照下标取值
data = r1[0]
print(data)
data1 = r1[1]
print(data1)# 指定字段创建行数据
r2 = Row(id=2, name='李四', age=22)
# 按照字段取值
data3 = r2['id']
print(data3)
data4 = r2['name']
print(data4)
  • schema表信息
    • 定义DataFrame中的表的字段名和字段类型。
# 导入数据类型
from pyspark.sql.types import *# 定义schema信息
# 使用StructType类进行定义
# add()方法是指定字段信息
# 第一参数,字段名
# 第二个参数,字段信息
# 第三个参数是否允许为空值  默认是True,允许为空
schema_type = StructType().\add('id',IntegerType()).\add('name',StringType()).\add('age',IntegerType(),False)

2. DataFrame创建

创建datafram数据需要使用一个sparksession的类创建,SparkSession类是在SparkContext的基础上进行了封装,也就是SparkSession类中包含了SparkContext。

2.1 基本创建

#DataFrame 的基本创建
#Row就是行数据定义的类
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *#行数据创建
r1 = Row(1,"刘向阳",23,'男')
print(r1)#行数据下标取值
print(r1[0])
print(r1[1])#创建行数据时可以指定字段名
r2 = Row(id=2,name='李四',age=20,gender='女')
print(r2)
#使用字段名取值
print(r2['name'])# 定义元数据
schema = (StructType().add('id', IntegerType()).add('username', StringType()).add('age', IntegerType()).add('gender', StringType()))
print(schema)# 将元数据和行数据放在一起合成DataFrame
ss = SparkSession.builder.getOrCreate()# 调用创建df的方法
df = ss.createDataFrame([r1,r2],schema=schema)# 查看df中数据
df.show()#查看元数据信息
df.printSchema()

运行结果:
在这里插入图片描述

2.2 RDD和DF之间的转化

  • rdd的二维数据转化为DataFrame
    • rdd.toDF()
      在这里插入图片描述
# rdd 和 dataframe的转化
from pyspark.sql import SparkSession#创建SparkSession对象
ss = SparkSession.builder.getOrCreate()#基于ss对象获取sparkContext
sc = ss.sparkContext#创建rdd , 要使用二维列表指定每行数据
rdd = sc.parallelize([[1,'张三',20,'男'],[2,'李四',20,'男']])#将rdd转为df
df = rdd.toDF(schema='id int,name string,age int,gender string')#df数据查看
df.show()
df.printSchema()#df可以转rdd
res = df.rdd.collect()
print(res)rdd2 = df.rdd.map(lambda x:x['name'])res2 = rdd2.collect()
print(res2)

运行结果:
在这里插入图片描述

2.3 pandas和spark之间转化

  • spark的df转为pandas的df
    • toPandas
#pandas 和 spark的dataframe转化
from pyspark.sql import SparkSession
import pandas as pdss = SparkSession.builder.getOrCreate()#创建pandas的df
df_pd = pd.DataFrame({'id':[1,2,3,4],'name':['张三','李四','王五','赵六'],'age':[1,2,3,4],'gender':['男','女','女','女']}
)
#查看数据
print(df_pd)#取值
name = df_pd['name'][0]
print(name)
# 将pandas中的df转为spark的df
df_spark = ss.createDataFrame(df_pd)#查看
df_spark.show()#取值
row = df_spark.limit(1).first()
print(row['name'])#将spark的df重新转为pandas的df
df_pandas = df_spark.toPandas()
print(df_pandas)

运行结果:
在这里插入图片描述

2.4 读取文件数据转为df

通过read方法读取数据转为df

  • ss.read
#读取文件转为df
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取不同文件数据转为df
# txt文件
df = ss.read.text('hdfs://node1:8020/data/students.txt')
df.show()# json 文件
df_json = ss.read.json('hdfs://node1:8020/data/baike_qa_valid.json')
df_json.show()#orc文件
df_orc = ss.read.orc('hdfs://node1:8020/data/users.orc')
df_orc.show()#去取csv文件
#header或csv文件中的第一行作为表头字段数据
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv')
df_csv.show()

3. DataFrame基本使用

3.1 SQL语句

使用sparksession提供的sql方法,编写sql语句执行

#使用sql操作dataframe结构化数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',')#使用sql操作df数据
#将df指定一个临时表名
df_csv.createTempView('stu')#编写sql字符串语句,支持hivesql语法
sql_str ="""
select * from stu 
"""#执行sql语句,执行结果返回一个新的df
df_res = ss.sql(sql_str)
df_csv.show()
df_res.show()

3.2 DSL方法

DSL方法是df提供的数据操作函数
使用方式:

  • df.方法()
  • 可以进行链式调用
  • df.方法().方法().方法()
  • 方法执行后返回一个新的df保存计算结果
  • new_df = df.方法()

spark提供DSL方法和sql的关键词一样,使用方式和sql基本类似,在进行数据处理时,要按照sql的执行顺序去思考如何处理数据。
from join 知道数据在哪 df本身就是要处理的数据 df.join(df2) from 表
where 过滤需要处理的数据 df.join(df2).where()
group by 聚合 数据的计算 df.join(df2).where().groupby().sum()
having 计算后的数据进行过滤 df.join(df2).where().groupby().sum().where()
select 展示数据的字段 df.join(df2).where().groupby().sum().where().select()
order by 展示数据的排序 df.join(df2).where().groupby().sum().where().select().orderBy()
limit 展示数据的数量 df.join(df2).where().groupby().sum().where().select().orderBy().limit()
DSL方法执行完成后会得到一个处理后的新的df

#使用DSL方法操作dataframe
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df_csv = ss.read.csv('hdfs://node1/data/students.csv', header=True,sep=',')#使用DSL方法对df数据进行操作
df2 = df_csv.select('id','name')#查看结果
df2.show()#第二种指定字段的方式
df3 = df_csv.select(df_csv.age,df_csv.gender)#给字段起别名
df4 = df_csv.select(df_csv.age.alias('new_age'),df_csv.gender)
df4.show()#修改字段类型
df_csv.printSchema()
df5 = df_csv.select(df_csv.age.cast('int'),df_csv.gender)
df5.printSchema()#where 的数据过滤
age = 20
df6 = df_csv.where(f'age > {age}')
df6.show()#过滤年龄大于20并且性别为女性的学生信息
df7 = df_csv.where(f'age > 20 and gender = "女" ')
df7.show()#使用第二种字段判断方式
df8 = df_csv.where(df_csv.age == age)
df8.show()#分组聚合计算
df9 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age')
df9.show()#分组后过滤where 聚合计算时只能一次计算一个聚合数据
df10 = df_csv.select(df_csv.gender,df_csv.cls,df_csv.age.cast('int').alias('age')).groupby('gender','cls').sum('age').where('sum(age) > 80')
df10.show()#排序
df11 = df_csv.orderBy('age')  #默认排序
df11.show()df12 = df_csv.orderBy('age',ascending=False)  #降序
df12.show()#分页
df13 = df_csv.limit(5)
df13.show()#转为rdd
res = df_csv.rdd.collect()[5:10]
print(res)
df_new = ss.createDataFrame(res)
df_new.show()

相关文章:

Spark:DataFrame介绍及使用

1. DataFrame详解 DataFrame是基于RDD进行封装的结构化数据类型,增加了schema元数据,最终DataFrame类型在计算时,还是转为rdd计算。DataFrame的结构化数据有Row(行数据)和schema元数据构成。 Row 类型 表示一行数据 …...

Linux系统:本机(物理主机)访问不了虚拟机中的apache服务问题的解决方案

学习目标: 提示:本文主要讲述-本机(物理主机)访问不了虚拟机中的apache服务情况下的解决方案 Linux系统:Ubuntu 23.04; 文中提到的“本机”:代表,宿主机,物理主机; 首先&#xff0c…...

望繁信科技成功签约国显科技 流程挖掘助力制造业智造未来

近日,上海望繁信科技有限公司(简称“望繁信科技”)成功与深圳市国显科技有限公司(简称“国显科技”)达成合作。国显科技作为全球领先的TFT-LCD液晶显示及Mini/Micro LED显示产品供应商,致力于为笔记本、手机…...

枚举在Java体系中的作用

1. 枚举 枚举是在JDK1.5以后引入的。主要用途是:将一组常量组织起来,在这之前表示一组常量通常使用定义常量的方式: //用public static final修饰常量 public static final int RED 1; public static final int GREEN 2; public static f…...

『气泡水』Web官网 案例赏析

前言 Schweppes,作为一家享誉全球的气泡水品牌,致力于与年轻消费者建立更紧密的联系,并提升品牌影响力。为此,其打造了一个充满创意和高度互动性的官网,利用前端技术和动画效果,将产品特性与用户浏览体验完…...

【前端】制作一个简单的网页(2)

单标签组成的元素 这类标签不需要内容产生效果&#xff0c;通常表示对网页的某种行为&#xff0c;它们不用标记任何内容&#xff0c;开始即是结束。 比如&#xff0c;<hr>标签的作用是在网页中添加一条分割线&#xff0c;它仅包含开始标签&#xff0c;是一个单标签元素。…...

OpenAI Canvas:提升编程与写作效率的全新工作界面

随着人工智能技术的飞速发展&#xff0c;大语言模型&#xff08;LLM&#xff09;不仅限于生成文本&#xff0c;还能逐步扩展至编程、设计等任务的支持。近期&#xff0c;OpenAI 推出了一个名为 Canvas 的全新功能&#xff0c;专门用于协助用户进行编程和写作。这一功能与 Claud…...

将SpringBoot的Maven项目打成jar包和war包

先需要明确的是&#xff0c;该项目打包的形态是可执行的jar包&#xff0c;还是在tomcat下运行的war包。 springboot自带的maven打包 1.创建一个springboot web项目 1.api控制层HelloWorld.java RestController RequestMapping("/hello") public class HelloWorld …...

【Iceberg分析】Spark与Iceberg集成之常用存储过程

文章目录 Spark与Iceberg集成之常用存储过程调用语法调用样例表快照管理快照回滚根据snapshotid进行回滚根据timestamp进行回滚 设置表当前生效的快照 表元数据管理设置快照过期时间清除孤岛文件重写数据文件运用参数示例optionsGeneral OptionsOptions for sort strategyOptio…...

[旧日谈]关于Qt的刷新事件频率,以及我们在Qt的框架上做实时的绘制操作时我们该关心什么。

[旧日谈]关于Qt的刷新事件频率&#xff0c;以及我们在Qt的框架上做实时的绘制操作时我们该关心什么。 最近在开发的时候&#xff0c;发现一个依赖事件来刷新渲染的控件会导致程序很容易异常和崩溃。 当程序在运行的时候&#xff0c;其实软件本身的负载并不高&#xff0c;所以…...

云上考场小程序+ssm论文源码调试讲解

2 关键技术简介 2.1 微信小程序 微信小程序&#xff0c;简称小程序&#xff0c;英文名Mini Program&#xff0c;是一种全新的连接用户与服务的方式&#xff0c;可以快速访问、快速传播&#xff0c;并具有良好的使用体验。 小程序的主要开发语言是JavaScript&#xff0c;它与…...

城域网——IP城域网、城域以太网、光城域网

一、城域网 1、城域网&#xff08;Metropolitan Area Network&#xff0c;MAN&#xff09;&#xff1a;一个城市范围内所建立的计算机通信网。 2、分布式队列双总线&#xff08;Distributed Queue Dual Bus&#xff0c;DQDB&#xff09;&#xff1a;即IEEE802.6&#xff0c;由…...

华为Eth-trunk链路聚合加入到E-trunk实现跨设备的链路聚合

一、适用场景&#xff08;注&#xff1a;e-trunk与eth-trunk是2个不同的概念&#xff09; 1、企业中有重要的server服务器业务不能中断的情况下&#xff0c;可将上行链路中的汇聚交换机&#xff0c;通过eth-trunk链路聚合技术&#xff0c;实现链路故障后&#xff0c;仍有可用的…...

【网络安全】JSONP劫持原理及攻击实战

未经许可,不得转载。 文章目录 JSONP简介JSONP工作原理JSONP劫持Callback可定义问题JSONP简介 JSONP(JavaScript Object Notation Padding)是一种用于绕过浏览器同源策略限制的技术,使得网页可以从不同域名的服务器请求数据。由于浏览器的同源策略限制,网页通常只能向与其…...

VR全景摄影的拍摄和编辑软件推荐

随着虚拟现实技术的不断进步&#xff0c;VR全景摄影逐渐成为商业、娱乐和教育等多个领域中的重要工具。通过专业的设备与软件&#xff0c;摄影师能够创作出沉浸式的360度全景作品&#xff0c;为观众提供身临其境的视觉体验。在这篇文章中&#xff0c;我们将介绍VR全景摄影的相关…...

linux:使用sar诊断问题

使用sar诊断问题 1. CPU 使用情况2. 内存与交换3. 磁盘 I/O 活动4. 网络 I/O 活动5. 进程与上下文切换6. 系统调用与文件活动7. 电源管理8. 延迟分析9. 系统全局统计10. 查看历史记录11. 特定时间段12. 自动定时采样其他参数&#xff1a;使用实例&#xff1a; sar&#xff08;S…...

CUDA编程技巧(不断搜集更新)

1 使用位运算替换部分乘法或除法 位移操作主要适用于无符号整数&#xff0c;对于带符号数的位移&#xff0c;特别是负数&#xff0c;可能会导致问题&#xff0c;如果你需要对负数执行除法或者乘法&#xff0c;最好谨慎使用位移运算。 1.1 替换除法 当需要将一个数除以 2、4、…...

云计算(第二阶段):mysql后的shell

第一章&#xff1a;变量 前言 什么是shell Shell 是一种提供用户与操作系统内核交互的工具&#xff0c;它接受用户输入的命令&#xff0c;解释后交给操作系统去执行。它不仅可以作为命令解释器&#xff0c;还可以通过脚本完成一系列自动化任务。 shell的特点 跨平台&#xff1a…...

Debian12离线部署Mysql全网最详细教程

一、下载安装所需要的库 1、所需要的库 # 所需要的库有 libc6_2.36-9deb12u8_amd64.deb libgcc-s1_12.2.0-14_amd64.deb libstdc6_12.2.0-14_amd64.deb gcc-12-base_12.2.0-14_amd64.deb psmisc_23.6-1_amd64.deb libnuma1_2.0.18-1_amd64.deb libmecab2_0.996-14b14_amd64.d…...

文本生成视频技术:艺术与科学的交汇点

在人工智能技术的飞速发展下&#xff0c;文本生成视频&#xff08;Text-to-Video&#xff09;技术已经成为现实。这项技术能够根据文本描述生成相应的视频内容&#xff0c;极大地拓展了内容创作的边界。本文将从三个主要方面对文本生成视频技术进行深入探讨&#xff1a;技术能达…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

基础测试工具使用经验

背景 vtune&#xff0c;perf, nsight system等基础测试工具&#xff0c;都是用过的&#xff0c;但是没有记录&#xff0c;都逐渐忘了。所以写这篇博客总结记录一下&#xff0c;只要以后发现新的用法&#xff0c;就记得来编辑补充一下 perf 比较基础的用法&#xff1a; 先改这…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

Xen Server服务器释放磁盘空间

disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...

NXP S32K146 T-Box 携手 SD NAND(贴片式TF卡):驱动汽车智能革新的黄金组合

在汽车智能化的汹涌浪潮中&#xff0c;车辆不再仅仅是传统的交通工具&#xff0c;而是逐步演变为高度智能的移动终端。这一转变的核心支撑&#xff0c;来自于车内关键技术的深度融合与协同创新。车载远程信息处理盒&#xff08;T-Box&#xff09;方案&#xff1a;NXP S32K146 与…...

逻辑回归暴力训练预测金融欺诈

简述 「使用逻辑回归暴力预测金融欺诈&#xff0c;并不断增加特征维度持续测试」的做法&#xff0c;体现了一种逐步建模与迭代验证的实验思路&#xff0c;在金融欺诈检测中非常有价值&#xff0c;本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...

嵌入式常见 CPU 架构

架构类型架构厂商芯片厂商典型芯片特点与应用场景PICRISC (8/16 位)MicrochipMicrochipPIC16F877A、PIC18F4550简化指令集&#xff0c;单周期执行&#xff1b;低功耗、CIP 独立外设&#xff1b;用于家电、小电机控制、安防面板等嵌入式场景8051CISC (8 位)Intel&#xff08;原始…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践

在 Kubernetes 集群中&#xff0c;如何在保障应用高可用的同时有效地管理资源&#xff0c;一直是运维人员和开发者关注的重点。随着微服务架构的普及&#xff0c;集群内各个服务的负载波动日趋明显&#xff0c;传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...