Pyspark下操作dataframe方法(1)
文章目录
- Pyspark dataframe
- 创建DataFrame
- 使用Row对象
- 使用元组与scheam
- 使用字典与scheam
- 注意
 
- agg 聚合操作
- alias 设置别名
- 字段设置别名
- 设置dataframe别名
 
- cache 缓存
- checkpoint RDD持久化到外部存储
- coalesce 设置dataframe分区数量
- collect 拉取数据
- columns 获取dataframe列
 
Pyspark dataframe
创建DataFrame
from pyspark.sql import  SparkSession,Row
from pyspark.sql.types import *def init_spark():spark  = SparkSession.builder.appName('LDSX_TEST_DATAFrame') \.config('hive.metastore.uris', 'thrift://hadoop01:9083') \.config('spark.master', "local[2]") \.enableHiveSupport().getOrCreate()return spark
spark = init_spark()# 设置字段类型
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("gender", StringType(), True),
])
使用Row对象
cs = Row('name','age','id','gender')
row_list = [ cs('ldsx','12','1','男'),cs('test1','20','1','女'),cs('test2','26','1','男'),cs('test3','19','1','女'),cs('test4','51','1','女'),cs('test5','13','1','男')]
data = spark.createDataFrame(row_list)
data.show()+-----+---+---+---+
| name|age| id|gender|
+-----+---+---+---+
| ldsx| 12|  1| 男|
|test1| 20|  1| 女|
|test2| 26|  1| 男|
|test3| 19|  1| 女|
|test4| 51|  1| 女|
|test5| 13|  1| 男|
+-----+---+---+---+
data.printSchema()
root|-- name: string (nullable = true)|-- age: string (nullable = true)|-- id: string (nullable = true)|-- gender: string (nullable = true)
使用元组与scheam
park.createDataFrame([('ldsx1','12','1','男'),('ldsx2','12','1','男')],schema).show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|ldsx1| 12|  1|    男|
|ldsx2| 12|  1|    男|
+-----+---+---+------+
使用字典与scheam
spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}]).show()
+---+------+---+----+
|age|gender| id|name|
+---+------+---+----+
| 12|    女|  1|ldsx|
+---+------+---+----+
注意
scheam设置优先级高于row设置,dict设置的key
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True),StructField("id", StringType(), True),StructField("测试", StringType(), True),
])
spark.createDataFrame([{'name':'ldsx','age':'12','id':'1','gender':'女'}],schema).show()
+----+---+---+----+
|name|age| id|测试|
+----+---+---+----+
|ldsx| 12|  1|null|
+----+---+---+----+
agg 聚合操作
在 PySpark 中,agg(aggregate)函数用于对 DataFrame 进行聚合操作。它允许你在一个或多个列上应用一个或多个聚合函数,并返回计算后的结果。可以结合groupby使用。
from pyspark.sql import functions as sf
data.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
data.agg({'age':'max'}).show()
+--------+
|max(age)|
+--------+
|      51|
+--------+
data.agg({'age':'max','gender':"max"}).show()
+-----------+--------+
|max(gender)|max(age)|
+-----------+--------+
|         男|      51|
+-----------+--------+data.agg(sf.min(data.age)).show()
+--------+
|min(age)|
+--------+
|      12|
+--------+
data.agg(sf.min(data.age),sf.min(data.name)).show()
+--------+---------+
|min(age)|min(name)|
+--------+---------+
|      12|     ldsx|
+--------+---------+结合groupby使用
data.groupBy('gender').agg(sf.min('age')).show()+------+--------+
|gender|min(age)|
+------+--------+
|    女|      19|
|    男|      12|
+------+--------+
data.groupBy('gender').agg(sf.min('age'),sf.max('name')).show()
+------+--------+---------+
|gender|min(age)|max(name)|
+------+--------+---------+
|    女|      19|    test4|
|    男|      12|    test5|
+------+--------+---------+alias 设置别名
字段设置别名
#字段设置别名
data.select(data['name'].alias('rename_name')).show()
+-----------+
|rename_name|
+-----------+
|       ldsx|
|      test1|
|      test2|
|      test3|
|      test4|
|      test5|
+-----------+
设置dataframe别名
d1 = data.alias('ldsx1')
d2 = data2.alias('ldsx2')
d1.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
| ldsx| 12|  1|    男|
|test1| 20|  1|    女|
|test2| 26|  1|    男|
|test3| 19|  1|    女|
|test4| 51|  1|    女|
|test5| 13|  1|    男|
+-----+---+---+------+
d2.show()
+-----+---+---+------+
| name|age| id|gender|
+-----+---+---+------+
|测试1| 12|  1|    男|
|测试2| 20|  1|    男|
+-----+---+---+------+d3 = d1.join(d2,col('ldsx1.gender')==col('ldsx2.gender'),'inner')
d3.show()
+-----+---+---+------+-----+---+---+------+
| name|age| id|gender| name|age| id|gender|
+-----+---+---+------+-----+---+---+------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|
|test2| 26|  1|    男|测试1| 12|  1|    男|
|test2| 26|  1|    男|测试2| 20|  1|    男|
|test5| 13|  1|    男|测试1| 12|  1|    男|
|test5| 13|  1|    男|测试2| 20|  1|    男|
+-----+---+---+------+-----+---+---+------+d3[['name']].show()
#报错提示
pyspark.errors.exceptions.captured.AnalysisException: [AMBIGUOUS_REFERENCE] Reference `name` is ambiguous, could be: [`ldsx1`.`name`, `ldsx2`.`name`].
# 使用别名前缀获取
d3[['ldsx1.name']].show()
+-----+
| name|
+-----+
| ldsx|
| ldsx|
|test2|
|test2|
|test5|
|test5|
+-----+
>>> d3[['ldsx2.name']].show()
+-----+
| name|
+-----+
|测试1|
|测试2|
|测试1|
|测试2|
|测试1|
|测试2|
+-----+
d3.select('ldsx1.name','ldsx2.name').show()
+-----+-----+
| name| name|
+-----+-----+
| ldsx|测试1|
| ldsx|测试2|
|test2|测试1|
|test2|测试2|
|test5|测试1|
|test5|测试2|
+-----+-----+cache 缓存
dataframe缓存默认缓存级别MEMORY_AND_DISK_DESER
df.cache()
# 查看逻辑计划和物理计划
df.explain()
checkpoint RDD持久化到外部存储
Checkpoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用Checkpoint比较合适,或者数据量很大的时候,采用Checkpoint比较合适。如果数据量小,或者RDD重新计算也是非常快的,直接使用缓存即可。
 CheckPoint支持写入HDFS。CheckPoint被认为是安全的
sc = spark.sparkContext
# 设置检查存储目录
sc.setCheckpointDir('hdfs:///ldsx_checkpoint')
d3.count()
# 保存会在hdfs上进行存储
d3.checkpoint()
# 从hdfs读取
d3.count()

coalesce 设置dataframe分区数量
# 设置dataframe分区数量
d3 = d3.coalesce(3)
# 获取分区数量
d3.rdd.getNumPartitions()
collect 拉取数据
当任务提交到集群的时候collect()操作是用来将所有结点中的数据收集到dirver节点,数据量很大慎用防止dirver炸掉。
d3.collect()
[Row(name='ldsx', age='12', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='ldsx', age='12', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test2', age='26', id='1', gender='男', name='测试2', age='20', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试1', age='12', id='1', gender='男'), Row(name='test5', age='13', id='1', gender='男', name='测试2', age='20', id='1', gender='男')]
columns 获取dataframe列
>>> d3.columns
['name', 'age', 'id', 'gender', 'name', 'age', 'id', 'gender']d3.withColumn('ldsx1.name_1',col('ldsx1.name')).show()
+-----+---+---+------+-----+---+---+------+------------+
| name|age| id|gender| name|age| id|gender|ldsx1.name_1|
+-----+---+---+------+-----+---+---+------+------------+
| ldsx| 12|  1|    男|测试1| 12|  1|    男|        ldsx|
| ldsx| 12|  1|    男|测试2| 20|  1|    男|        ldsx|
|test2| 26|  1|    男|测试1| 12|  1|    男|       test2|
|test2| 26|  1|    男|测试2| 20|  1|    男|       test2|
|test5| 13|  1|    男|测试1| 12|  1|    男|       test5|
|test5| 13|  1|    男|测试2| 20|  1|    男|       test5|
+-----+---+---+------+-----+---+---+------+------------+# 重命名列名
d3.withColumnRenamed('ldsx1.name_1',col('ldsx1.name')).show()
相关文章:
 
Pyspark下操作dataframe方法(1)
文章目录 Pyspark dataframe创建DataFrame使用Row对象使用元组与scheam使用字典与scheam注意 agg 聚合操作alias 设置别名字段设置别名设置dataframe别名 cache 缓存checkpoint RDD持久化到外部存储coalesce 设置dataframe分区数量collect 拉取数据columns 获取dataframe列 Pys…...
注解实现json序列化的时候自动进行数据脱敏
最近在进行开发的时候遇到一个问题,需要对用户信息进行脱敏处理,原有的方式是写一个util类,在需要脱敏的字段查出数据后,显示掉用方法处理后再set回去,觉得这种方式能实现功能,但是不是特别优雅,…...
使用Python下载文件的简易指南
在日常的数据处理、自动化任务或软件开发中,经常需要从网络上下载文件。Python作为一门功能强大的编程语言,提供了多种方法来实现文件的下载。本文将介绍几种常用的方法来使用Python下载文件,包括使用requests库和urllib库。 准备工作 在开…...
 
中秋国庆双节长假,景区迎来客流高峰,如何保障景区安全管理?
一、方案背景 近年来,国内旅游市场持续升温,节假日期间景区游客数量激增,给景区安全管理带来了巨大挑战。然而,景区安全风险意识不足、防护措施不完善、游客安全意识欠缺等问题依然存在,导致景区安全事故频发。随着中秋…...
多维数组转一维数组:探索 JavaScript 中的数组扁平化
在 JavaScript 编程中,我们经常会遇到需要将多维数组转换为一维数组的情况。无论是处理复杂的数据结构还是进行数据的进一步操作,数组扁平化都是一个常见且有用的技术。本文将介绍几种在 JavaScript 中将多维数组转换为一维数组的方法。 什么是数组扁平…...
配环境时的一些记录
连centos:正常连就好(密码验证码)连rocky:需要在centos上连,终端里直接ssh [rocky_ip];在vscode中需要: 修改配置文件:打开命令面板(ctrlshiftp) -> 输入并…...
如何解析域名到网站?
在现代互联网中,域名解析是用户访问网站的关键过程。用户通过输入易于记忆的域名来访问网站,而背后则是复杂的域名解析机制将域名转换为服务器的IP地址,使得浏览器能够找到并加载目标网站。聚名网详细介绍域名解析的过程及其相关技术。 一、…...
 
【F172】基于Springboot+vue实现的智能菜谱系统
作者主页:Java码库 主营内容:SpringBoot、Vue、SSM、HLMT、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、小程序、安卓app等设计与开发。 收藏点赞不迷路 关注作者有好处 文末获取源码 项目描述 近些年,随着中国经济发展,人民的…...
 
Spring-AOP核心源码、原理详解前篇
本文主要分4部分 Aop原理介绍介绍aop相关的一些类通过源码详解aop代理的创建过程通过源码详解aop代理的调用过程Aop代理一些特性的使用案例 Spring AOP原理 原理比较简单,主要就是使用jdk动态代理和cglib代理来创建代理对象,通过代理对象来访问目标对象…...
Reflection反射——Class类
概述 在Java中,除了int等基本类型外,Java的其他类型全部都是class(包括interface)。例如: String、Object、Runnable、Exception…… Java反射机制是Java语言的一个重要特性。在学习Java反射机制前,需要了…...
王朝兴替的因果
天道好轮 回,苍天饶过谁。王朝兴亡,天道无情。 而其因果循环,天道之森严,让人敬畏。 王朝创业帝王造下什么业,后世子孙在兴替之时,往往要承担何种果 报。 中国几千年的王朝史,因 果循环&…...
 
损坏SD数据恢复的8种有效方法
SD卡被用于许多不同的产品来存储重要数据,如图片和重要的商业文件。如果您的SD卡坏了,您需要SD数据恢复来获取您的信息。通过从损坏的SD卡中取回数据,您可以确保重要文件不会永远丢失,这对于工作或个人原因是非常重要的。 有许多…...
 
好评如潮的年度黑马韩剧,惊喜从一上线就开始
韩剧一直以来都以细腻的情感和紧凑的剧情打动观众,而最近播出的一部作品更是掀起了不小的风波-《法官大人》。孙贤周与金明民两大演技派领衔主演,凭借他们的深沉演技和复杂的角色关系,让这部剧集迅速成为热议焦点。故事围绕着一起交通事故展开…...
 
超好用的PC端语音转文字工具CapsWriter-Offline结合内网穿透实现远程使用
文章目录 前言1. 软件与模型下载2. 本地使用测试3. 异地远程使用3.1 内网穿透工具下载安装3.2 配置公网地址3.3 修改config文件3.4 异地远程访问服务端 4. 配置固定公网地址4.1 修改config文件 5. 固定tcp公网地址远程访问服务端 前言 本文主要介绍如何在Windows系统电脑端使用…...
 
1、https的全过程
目录 一、概述二、SSL过程如何获取会话秘钥1、首先认识几个概念:2、没有CA机构的SSL过程:3、没有CA机构下的安全问题4、有CA机构下的SSL过程 一、概述 https是非对称加密和对称加密的过程,首先建立https链接需要经过两轮握手: T…...
 
抢鲜体验 PolarDB PG 15 开源版
unsetunsetPolarDB 商业版unsetunset 8 月,PolarDB PostgreSQL 版兼容 PostgreSQL 15 版本(商业版)正式发布上线。 当前版本主要增强优化了以下方面: 改进排序功能:改进内存和磁盘排序算法。 增强SQL功能:支…...
UEFI——使用标准C库
一、C标准库 C标准库是ANSL C标准为C语言定义的标准库。C标准库包含15个头文件:assert.h ctype.h error.h float.h limits.h locale.h math.h setjmp.h signal.h stdarg.h stddef.h stdio.h stdlib.h string.h time.h。标准库函数与C语言的紧密结合给我们开发程序带…...
 
[全网首发]怎么让国行版iPhone使用苹果Apple Intelligence
全文共分为两个部分:第一让苹果手机接入AI,第二是让苹果手机接入ChatGPT 4o功能。 一、国行版iPhone开通 Apple Intelligence教程 打破限制:让国行版苹果手机也能接入AI 此次发布会上,虽然国行 iPhone16 系列不支持 GPT-4o&…...
 
C语言-综合案例:通讯录
传送门:C语言-第九章-加餐:文件位置指示器与二进制读写 目录 第一节:思路整理 第二节:代码编写 2-1.通讯录初始化 2-2.功能选择 2-3.增加 和 扩容 2-4.查看 2-5.查找 2-6.删除 2-7.修改 2-8.退出 第三节:测试 下期…...
XWiki中添加 html 二次编辑失效
如果直接在 XWiki 中添加 html, 例如 修改颜色, 新窗口打开主页面等功能, 首次保存是生效的. 如果再次编辑, 则失效, 原因是被转换成了 Markdown 的代码, 而 Markdown 不支持. 解决这个问题可以使用 HTML 宏. 在 XWiki 中使用 Markdown 1.2 语法时,默认 Markdown …...
 
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
 
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
 
基于FPGA的PID算法学习———实现PID比例控制算法
基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容:参考网站: PID算法控制 PID即:Proportional(比例)、Integral(积分&…...
 
树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法
树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作,无需更改相机配置。但是,一…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...
 
如何在看板中有效管理突发紧急任务
在看板中有效管理突发紧急任务需要:设立专门的紧急任务通道、重新调整任务优先级、保持适度的WIP(Work-in-Progress)弹性、优化任务处理流程、提高团队应对突发情况的敏捷性。其中,设立专门的紧急任务通道尤为重要,这能…...
postgresql|数据库|只读用户的创建和删除(备忘)
CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...
 
《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
 
论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...
 
Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)
引言:为什么 Eureka 依然是存量系统的核心? 尽管 Nacos 等新注册中心崛起,但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制,是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...
