当前位置: 首页 > news >正文

PySpark大数据处理详细教程

在这里插入图片描述
欢迎各位数据爱好者!今天,我很高兴与您分享我的最新博客,专注于探索 PySpark DataFrame 的强大功能。无论您是刚入门的数据分析师,还是寻求深入了解大数据技术的专业人士,这里都有丰富的知识和实用的技巧等着您。让我们一起潜入 PySpark 的世界,解锁数据处理和分析的无限可能!

基础操作

基础操作涵盖了数据的创建、加载、查看、选择、过滤、转换、聚合、排序、合并和导出等基本操作。

1.数据创建和加载

# 读取 CSV 文件
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)# 读取 HIVE 表
hive_sql = f"select * from {DATABASE}.{TABLE_NAME} {CONDITION}"
df = spark.sql(hive_sql)# 读取 Parquet 文件
parquet_file = "path/to/parquet/file"
df = spark.read.parquet(parquet_file)

2.数据查看和检查

df.show(2,truncate=False)
df.printSchema()

3.查看分位数

quantiles = df.approxQuantile("salary", [0.25, 0.5, 0.75], 0)
# col:要计算分位数的列名,为字符串类型。
# probabilities:一个介于 0 和 1 之间的数字列表,表示要计算的分位数。例如,0.5 表示中位数。
# relativeError:相对误差。这是一个非负浮点数,用于控制计算精度。
# 值为 0 表示计算精确的分位数(可能非常耗时)。
# 随着该值的增加,计算速度会提高,但精度会降低。例如,如果 relativeError 为 0.01,则计算结果与真实分位数的差距在真实分位数的 1% 范围内。

4.数据选择和过滤

df.select("column1").show()
df.filter(df["column1"] > 100).show()# 或者
df.filter(F.col("column1") > 100).show()
5.数据转换和操作
df.withColumn("new_column", F.col("column1").cast("int"))).show()df.withColumn("new_column", df["column1"] + F.lit(100)).show()
df.withColumn("new_column", F.col("column1") + F.lit(100)).show()df.drop("column1").show()

6.数据聚合和分组

df.groupBy("column1").count().show()df.groupBy("column1")agg.(F.count(F.col("id"))).show()

7.排序和排名取TopN

df.orderBy(df["column1"].desc()).show()
df.orderBy(F.col("column1").desc()).show()

8.数据合并和连接

df1.join(df2, df1["column"] == df2["column"]).show()# 或者
from functools import reduce
from pyspark.sql import DataFrame
dataframes = [df1,df2,df3]
union_df = reduce(DataFrame.union, dataframes)

9.缺失值和异常值处理

df.na.fill({"column1": 0}).show()

10.数据转换和类型转换

df.withColumn("column_casted", df["column1"].cast("int")).show()

11.数据导出和写入

# 存储 DataFrame 为CSV
df.write.csv("path/to/output.csv")
# 存储 DataFrame 为HIVE
df.write.format("orc").mode("overwrite").saveAsTable(f"test.sample")
# 存储 DataFrame 为 Parquet 文件
output_path = "path/to/output/directory"
df.write.parquet(output_path)

高级操作

高级操作包括更复杂的数据处理技术、特征工程、文本处理和高级 SQL 查询。

1.数据分区和优化

df.repartition(10).write.parquet("path/to/output")

2.数据探索和分析

df.describe().show()
# 或者
df.summary().show())

3.复杂数据类型处理

from pyspark.sql.functions import explode
df.withColumn("exploded_col", explode(df["array_col"])).show()

4.特征工程

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="category", outputCol="category_index")
df_indexed = indexer.fit(df).transform(df)

5.文本数据处理

from pyspark.ml.feature import Tokenizer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
df_words = tokenizer.transform(df)

6.高级 SQL 查询

df.createOrReplaceTempView("table")
spark.sql("SELECT * FROM table WHERE column1 > 100").show()

进阶操作

进阶操作涵盖了性能调优、与其他数据源的集成和数据流处理,这些通常需要更深入的理解和经验。

1.性能调优和监控

df.explain()

2.与其他数据源集成

df_jdbc = spark.read \.format("jdbc") \.option("url", "jdbc:mysql://your-db-url") \.option("dbtable", "tablename") \.option("user", "username") \.option("password", "password") \.load()

3.数据流处理

df_stream = spark.readStream \.schema(df_schema) \.option("maxFilesPerTrigger", 1) \.json("/path/to/directory/")

4.使用 Structured Streaming

stream_query = df_stream.writeStream \.outputMode("append") \.format("console") \.start()
stream_query.awaitTermination()

这些示例提供了对 PySpark 操作的广泛了解,从基础到进阶,涵盖了数据处理和分析的多个方面。对于更复杂的场景和高级功能,强烈建议查阅 PySpark 的官方文档和相关教程。
在这里插入图片描述

相关文章:

PySpark大数据处理详细教程

欢迎各位数据爱好者!今天,我很高兴与您分享我的最新博客,专注于探索 PySpark DataFrame 的强大功能。无论您是刚入门的数据分析师,还是寻求深入了解大数据技术的专业人士,这里都有丰富的知识和实用的技巧等着您。让我们…...

三(五)ts非基础类型(对象)

在ts里面定义对象的方式也有很多。 普通定义 let obj1:{} {} // obj1.name fufu 报错,只能定义为空对象且不能修改 // 但是可以在赋初始值的时候直接添加属性,这是ts在类型推断时,它会宽容地匹配对象的结构。 let obj2:{} {name: fufu}…...

HeartBeat监控Redis状态

目录 一、概述 二、 安装部署 三、配置 四、启动服务 五、查看数据 一、概述 使用heartbeat可以实现在kibana界面对redis服务存活状态进行观察,如有必要,也可在服务宕机后立即向相关人员发送邮件通知 二、 安装部署 参照文章:HeartBeat监…...

FairGuard无缝兼容小米澎湃OS、ColorOS 14 、鸿蒙4!

随着移动互联网时代的发展,各大手机厂商为打造生态系统、构建自身的技术壁垒,纷纷投身自研操作系统。 而对于一款游戏安全产品,在不同操作系统下,是否能够无缝兼容并且提供稳定的、高强度的加密保护,成了行业的一大痛…...

【Copilot】Edge浏览器的copilot消失了怎么办

这种原因,可能是因为你的ip地址的不在这个服务的允许范围内。你需要重新使用之前出现copilot的ip地址,然后退出edge的账号,重新登录一遍,最后重启edge,就能够使得copilot侧边栏重新出现了。...

C++入门【6-C++ 修饰符类型】

C 修饰符类型 C 允许在 char、int 和 double 数据类型前放置修饰符。 修饰符是用于改变变量类型的行为的关键字,它更能满足各种情境的需求。 下面列出了数据类型修饰符: signed:表示变量可以存储负数。对于整型变量来说,signe…...

STP笔记总结

STP --- 生成树协议 STP(Spanning Tree Protocol,生成树协议)是根据 IEEE802.1D标准建立的,用于在局域网中消除数据链路层环路的协议。运行STP协议的设备通过彼此交互信息发现网络中的环路,并有选择地对某些端口进行阻…...

Qt开发 之 记一次安装 Qt5.12.12 安卓环境的失败案例

文章目录 1、安装Qt2、安卓开发的组合套件2.1、CSDN地址2.2、官网地址2.3、发现老方法不适用了 3、尝试用新方法解决3.1、先安装JDK,搞定JDK环境变量3.1.1、安装jdk3.1.2、确定jdk安装路径3.1.3、打开系统环境变量配置3.1.4、配置系统环境变量3.1.5、验证JDK环境变量…...

基于SpringBoot的就业信息管理系统设计与实现(源码+数据库+文档)

摘 要 在新冠肺炎疫情的影响下,大学生的就业问题已经变成了一个引起人们普遍重视的社会焦点问题。在这次疫情的冲击之下,大学生的就业市场的供求双方都受到了不同程度的影响,大学生的就业情况并不十分乐观。目前,各种招聘平台上…...

Java面试整理(四)Java IO流

我记得自己刚开始学Java的时候,都听过师兄的分享,说IO流是很重要,而且很难。 自己正式接触之后,其实IO流这块知识并不是特别难,而且随着IT的发展,IO流这块反而用得不是很多。特别是在应用开发这个层面,用得更少。 当然,可能会有朋友跳出来说“这怎么可能?你不懂Java吧…...

《安富莱嵌入式周报》第328期:自主微型机器人,火星探测器发射前失误故障分析,微软推出12周24期免费AI课程,炫酷3D LED点阵设计,MDK5.39发布

周报汇总地址:嵌入式周报 - uCOS & uCGUI & emWin & embOS & TouchGFX & ThreadX - 硬汉嵌入式论坛 - Powered by Discuz! 更新一期视频教程: 【实战技能】 单步运行源码分析,一期视频整明白FreeRTOS内核源码框架和运行…...

产品经理在项目周期中扮演的角色Axure的安装与基本使用

目录 一.项目周期流程 二.Axure是什么 三.Axure安装 3.1 一键式安装 3.2 汉化 3.3 授权登录 四.Axure的界面介绍及基本使用 4.1 菜单栏的使用 4.2 工具栏的使用 4.3 页面概要的使用及组件的使用 4.4 组件的样式设计 一.项目周期流程 在一般的项目周期中包含的工作内容有&…...

Dockerfile创建镜像介绍

1.介绍 Docker 提供了一种更便捷的方式&#xff0c;叫作 Dockerfile&#xff0c;docker build命令用于根据给定的Dockerfile构建Docker镜像。 docker build语法&#xff1a; # docker build [OPTIONS] <PATH | URL | -> 常用选项说明 --build-arg&#xff0c;设置构建时的…...

Android 滥用 SharedPreference 导致 ANR 问题

SharedPreference 是 Android 平台提供的一种轻量级的数据存储方式&#xff0c;它用于存储应用的配置信息或者一些简单的数据。SharedPreference 基于键值对的存储&#xff0c;并且支持基本的数据类型&#xff0c;如整型、字符串、布尔值等。它的使用非常简单方便&#xff0c;适…...

虚幻商城 道具汇总

文章目录 载具Vehicle Variety Pack(车辆品种包)Vehicle Variety Pack Volume 2(车辆品种包第 2 卷)家具Free Furniture Pack(免费家具包)Old West - VOL 1 - Interior Furniture(旧西部 - 第1卷 - 家具包)Old West VOL.3 - Travel Supplies and Goods(旧西部 - 第3卷…...

docker: Error response from daemon: failed to create shim task: OCI runtime

1 概述 在解决"Docker: Error response from daemon: failed to create shim task: OCI runtime"问题之前&#xff0c;我们先来了解一下Docker和OCI runtime的基本概念。 Docker是一个开源的应用容器引擎&#xff0c;可以帮助开发者将应用程序和其依赖打包到一个可…...

SpringBoot+线程池实现高频调用http接口并多线程解析json数据

场景 SpringbootFastJson实现解析第三方http接口json数据为实体类(时间格式化转换、字段包含中文)&#xff1a; SpringbootFastJson实现解析第三方http接口json数据为实体类(时间格式化转换、字段包含中文)-CSDN博客 Java中ExecutorService线程池的使用(Runnable和Callable多…...

java实现局域网内视频投屏播放(一)背景/需求

一 背景 我们在用电视上投屏电影或者电视剧时&#xff0c;如果没有vip&#xff0c;用盗版电影网站投屏的话会有两个问题&#xff0c;1:他们网站没有投屏功能。2:卡&#xff01;&#xff01;&#xff01;。还有就是不能随心所欲的设置自己先要自动播放的视频列表&#xff08;如…...

【Spring】手写一个简易starter

需求&#xff1a; 自定义一个starter&#xff0c;里面包含一个TimeLog注解和一个TimeLogAspect切面类&#xff0c;用于统计接口耗时。要求在其它项目引入starter依赖后&#xff0c;启动springboot项目时能进行自动装配。 步骤&#xff1a; &#xff08;1&#xff09;引入pom依赖…...

Spring Cloud Alibaba实践 --Sentinel

sentinel介绍 Sentinel的官方标题是&#xff1a;分布式系统的流量防卫兵。从名字上来看&#xff0c;很容易就能猜到它是用来作服务稳定性保障的。对于服务稳定性保障组件&#xff0c;如果熟悉Spring Cloud的用户&#xff0c;第一反应应该就是Hystrix。但是比较可惜的是Netflix…...

Ubuntu系统下交叉编译openssl

一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机&#xff1a;Ubuntu 20.04.6 LTSHost&#xff1a;ARM32位交叉编译器&#xff1a;arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

Mac下Android Studio扫描根目录卡死问题记录

环境信息 操作系统: macOS 15.5 (Apple M2芯片)Android Studio版本: Meerkat Feature Drop | 2024.3.2 Patch 1 (Build #AI-243.26053.27.2432.13536105, 2025年5月22日构建) 问题现象 在项目开发过程中&#xff0c;提示一个依赖外部头文件的cpp源文件需要同步&#xff0c;点…...

docker 部署发现spring.profiles.active 问题

报错&#xff1a; org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...

协议转换利器,profinet转ethercat网关的两大派系,各有千秋

随着工业以太网的发展&#xff0c;其高效、便捷、协议开放、易于冗余等诸多优点&#xff0c;被越来越多的工业现场所采用。西门子SIMATIC S7-1200/1500系列PLC集成有Profinet接口&#xff0c;具有实时性、开放性&#xff0c;使用TCP/IP和IT标准&#xff0c;符合基于工业以太网的…...

写一个shell脚本,把局域网内,把能ping通的IP和不能ping通的IP分类,并保存到两个文本文件里

写一个shell脚本&#xff0c;把局域网内&#xff0c;把能ping通的IP和不能ping通的IP分类&#xff0c;并保存到两个文本文件里 脚本1 #!/bin/bash #定义变量 ip10.1.1 #循环去ping主机的IP for ((i1;i<10;i)) doping -c1 $ip.$i &>/dev/null[ $? -eq 0 ] &&am…...

网页端 js 读取发票里的二维码信息(图片和PDF格式)

起因 为了实现在报销流程中&#xff0c;发票不能重用的限制&#xff0c;发票上传后&#xff0c;希望能读出发票号&#xff0c;并记录发票号已用&#xff0c;下次不再可用于报销。 基于上面的需求&#xff0c;研究了OCR 的方式和读PDF的方式&#xff0c;实际是可行的&#xff…...

Android屏幕刷新率与FPS(Frames Per Second) 120hz

Android屏幕刷新率与FPS(Frames Per Second) 120hz 屏幕刷新率是屏幕每秒钟刷新显示内容的次数&#xff0c;单位是赫兹&#xff08;Hz&#xff09;。 60Hz 屏幕&#xff1a;每秒刷新 60 次&#xff0c;每次刷新间隔约 16.67ms 90Hz 屏幕&#xff1a;每秒刷新 90 次&#xff0c;…...