当前位置: 首页 > news >正文

Spark教程5-基本结构化操作

加载csv文件

df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

Schema

输出Schema

df.printSchema()

使用Schema读取csv文件,以指定数据类型

from pyspark.sql.types import StructField, StructType, StringType, LongTypemySchema = StructType([StructField("DEST_COUNTRY_NAME", StringType(), True),StructField("ORIGIN_COUNTRY_NAME", StringType(), True),StructField("count", LongType(), False)]
)
df = spark.read.format("json").schema(mySchema).load("/Users/yangyong/dev/learn_spark/2015-summary.json")

获取第一行

df.first()

创建行

from pyspark.sql import RowmyRow = Row("Hello", None, 1, False)

创建DataFrames

加载csv文件为DataFrames

df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")

合并Schema和Rows为DataFrames

Schema1 = StructType([StructField("id", StringType(), True),StructField("name", StringType(), True),StructField("country", StringType(), True)]
)row1 = Row('1', 'Oscar', 'United States')
row2 = Row('2', 'China', 'England')
myDF = spark.createDataFrame([row1, row2], schema=Schema1)
myDF.show()"""
+---+-----+-------------+
| id| name|      country|
+---+-----+-------------+
|  1|Oscar|United States|
|  2|China|      England|
+---+-----+-------------+
"""

两种查询:select和selectExpr

select

from  pyspark.sql.functions import expr, col, columndf.select('dest_country_name').show(2)
df.select('dest_country_name', 'origin_country_name').show(2)
df.select(expr('dest_country_name'), col('dest_country_name'), column('dest_country_name')).show(2)"""
+-----------------+
|dest_country_name|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows+-----------------+-------------------+
|dest_country_name|origin_country_name|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows+-----------------+-----------------+-----------------+
|dest_country_name|dest_country_name|dest_country_name|
+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|
|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+
only showing top 2 rows
"""

列重命名

df.select(expr('dest_country_name as destination')).show(2)
df.select(col('dest_country_name').alias('destination')).show(2)"""
+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows
"""

selectExpr

列重命名

df.selectExpr('dest_country_name as destination', 'dest_country_name').show(2)"""
+-------------+-----------------+
|  destination|dest_country_name|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows
"""

新增列

df.selectExpr('*', '(dest_country_name = origin_country_name) as withinCountry').show(2)"""
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows
"""

相当于SQL

SELECT *, (dest_country_name = origin_country_name) as withinCountry 
FROM dfTable limit 2

使用聚合函数

df.selectExpr('avg(count)', 'count(distinct(dest_country_name))').show(2)"""
+-----------+---------------------------------+
| avg(count)|count(DISTINCT dest_country_name)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+
"""

添加列 withColumn

from pyspark.sql.functions import litdf.withColumn('numberOne', lit(1)).show(2)
df.withColumn('withinCountry', expr('dest_country_name == origin_country_name')).show(2)"""
+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows
"""

列重命名 withColumnRenamed

df.withColumnRenamed('dest_country_name', 'dest').show(2)"""
+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows
"""

去掉列

df.drop('origin_country_name').show(2)
"""
+-----------------+-----+
|DEST_COUNTRY_NAME|count|
+-----------------+-----+
|    United States|   15|
|    United States|    1|
+-----------------+-----+
only showing top 2 rows
"""

修改列类型

df.withColumn('count2', col('count').cast('long'))

行过滤 filter/where

这两者是等价的

df.filter('count < 2').show(2)
df.where('count < 2').show(2)
df.where(col('count') < 2).show(2)"""
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows
"""

多个条件过滤

df.where('count < 2').where('dest_country_name != "United States"').show(2)"""
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|            Malta|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows
"""

去重

df.select('dest_country_name', 'origin_country_name').distinct().count()"""
equal to SQL:
SELECT COUNT(DISTINCT(dest_country_name, origin_country_name)) FROM dfTable;
"""

合并DataFrames

拥有同样的Schema以及columns才能合并

from pyspark.sql import Row
schema = df.schema
newRows = [Row("New Country", "Other Country", 5),Row("New Country 2", "Other Country 3", 1)
]
newDF = spark.createDataFrame(newRows, schema)# in Python
df.union(newDF)\.where("count = 1")\.where(col("ORIGIN_COUNTRY_NAME") != "United States")\.show()
"""
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
|    United States|          Gibraltar|    1|
|    United States|             Cyprus|    1|
|    United States|            Estonia|    1|
|    United States|          Lithuania|    1|
|    United States|           Bulgaria|    1|
|    United States|            Georgia|    1|
|    United States|            Bahrain|    1|
|    United States|   Papua New Guinea|    1|
|    United States|         Montenegro|    1|
|    United States|            Namibia|    1|
|    New Country 2|    Other Country 3|    1|
+-----------------+-------------------+-----+
"""

行排序 sort/orderBy

两种方式等价


df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)from pyspark.sql.functions import desc, ascdf.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

Limit

df.limit(5).show()
df.orderBy(expr("count desc")).limit(6).show()

相关文章:

Spark教程5-基本结构化操作

加载csv文件 df spark.read.format("json").load("/data/flight-data/json/2015-summary.json")Schema 输出Schema df.printSchema()使用Schema读取csv文件&#xff0c;以指定数据类型 from pyspark.sql.types import StructField, StructType, Strin…...

内置数据类型、变量名、字符串、数字及其运算、数字的处理、类型转换

内置数据类型 python中的内置数据类型包括&#xff1a;整数、浮点数、布尔类型&#xff08;以大写字母开头&#xff09;、字符串 变量名 命名变量要见名知意&#xff0c;确保变量名称具有描述性和意义&#xff0c;这样可以使得代码更容易维护&#xff0c;使用_可以使得变量名…...

Win/Mac/Android/iOS怎麼刪除代理設置?

設置代理設置的主要構成 IP 地址和端口 這些是代理伺服器配置的最基本組件。代理伺服器的IP地址引導互聯網流量&#xff0c;而端口號指定伺服器上的通信通道。 為什麼要刪除代理設置&#xff1f; 刪除代理設置通常是為了解決網路問題、提高速度、恢復安全性或過渡到新的網路…...

数据结构------手撕顺序表

文章目录 线性表顺序表的使用及其内部方法ArrayList 的扩容机制顺序表的几种遍历方式顺序表的优缺点顺序表的模拟实现洗牌算法 线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;…...

UDP(用户数据报协议)端口监控

随着网络的扩展&#xff0c;确保高效的设备通信对于优化网络功能变得越来越重要。在这个过程中&#xff0c;端口发挥着重要作用&#xff0c;它是实现外部设备集成的物理连接器。通过实现数据的无缝传输和交互&#xff0c;端口为网络基础设施的顺畅运行提供了保障。端口使数据通…...

【Java小白图文教程】-05-数组和排序算法详解

精品专题&#xff1a; 01.《C语言从不挂科到高绩点》课程详细笔记 https://blog.csdn.net/yueyehuguang/category_12753294.html?spm1001.2014.3001.5482 02. 《SpringBoot详细教程》课程详细笔记 https://blog.csdn.net/yueyehuguang/category_12789841.html?spm1001.20…...

OpenCV视觉分析之目标跟踪(1)计算密集光流的类DISOpticalFlow的介绍

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 这个类实现了 Dense Inverse Search (DIS) 光流算法。更多关于该算法的细节可以在文献 146中找到。该实现包含了三个预设参数集&#xff0c;以提…...

Lucas带你手撕机器学习——套索回归

好的&#xff0c;下面我将详细介绍套索回归的背景、理论基础、实现细节以及在实践中的应用&#xff0c;同时还会讨论其优缺点和一些常见问题。 套索回归&#xff08;Lasso Regression&#xff09; 1. 背景与动机 在机器学习和统计学中&#xff0c;模型的复杂性通常会影响其在…...

面试中的一个基本问题:如何在数据库中存储密码?

面试中的一个基本问题&#xff1a;如何在数据库中存储密码&#xff1f; 在安全面试中&#xff0c;“如何在数据库中存储密码&#xff1f;”是一个基础问题&#xff0c;但反映了应聘者对安全最佳实践的理解。以下是安全存储密码的最佳实践概述。 了解风险 存储密码必须安全&am…...

XML HTTP Request

XML HTTP Request 简介 XMLHttpRequest(XHR)是一个JavaScript对象,它最初由微软设计,并在IE5中引入,用于在后台与服务器交换数据。它允许网页在不重新加载整个页面的情况下更新部分内容,这使得网页能够实现动态更新,大大提高了用户体验。虽然名字中包含“XML”,但XML…...

TLS协议基本原理与Wireshark分析

01背 景 随着车联网的迅猛发展&#xff0c;汽车已经不再是传统的机械交通工具&#xff0c;而是智能化、互联化的移动终端。然而&#xff0c;随之而来的是对车辆通信安全的日益严峻的威胁。在车联网生态系统中&#xff0c;车辆通过无线网络与其他车辆、基础设施以及云端服务进行…...

当遇到 502 错误(Bad Gateway)怎么办

很多安装雷池社区版的时候&#xff0c;配置完成&#xff0c;访问的时候可能会遇到当前问题&#xff0c;如何解决呢&#xff1f; 客户端&#xff0c;浏览器排查 1.刷新页面和清除缓存 首先尝试刷新页面&#xff0c;因为有时候 502 错误可能是由于网络临时波动导致服务器无法连…...

学习记录:js算法(七十五): 加油站

文章目录 加油站思路一思路二思路三思路四思路五 加油站 在一条环路上有 n 个加油站&#xff0c;其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车&#xff0c;从第 i 个加油站开往第 i1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发&#xf…...

强心剂!EEMD-MPE-KPCA-LSTM、EEMD-MPE-LSTM、EEMD-PE-LSTM故障识别、诊断

强心剂&#xff01;EEMD-MPE-KPCA-LSTM、EEMD-MPE-LSTM、EEMD-PE-LSTM故障识别、诊断 目录 强心剂&#xff01;EEMD-MPE-KPCA-LSTM、EEMD-MPE-LSTM、EEMD-PE-LSTM故障识别、诊断效果一览基本介绍程序设计参考资料 效果一览 基本介绍 EEMD-MPE-KPCA-LSTM(集合经验模态分解-多尺…...

yarn的安装与使用以及与npm的区别(安装过程中可能会遇到的问题)

一、yarn的安装 使用npm就可以进行安装 但是需要注意的一点是yarn的使用和node版本是有关系的必须是16.0以上的版本。 输入以下代码就可以实现yarn的安装 npm install -g yarn 再通过版本号的检查来确定&#xff0c;yarn是否安装成功 yarn -v二、遇到的问题 1、问题描述…...

大数据行业预测

大数据行业预测 编译 李升伟 和所有预测一样&#xff0c;我们必须谨慎对待这些预测&#xff0c;因为其中一些预测可能成不了事实。当然&#xff0c;真正改变游戏规则的创新往往出乎意料&#xff0c;甚至让最警惕的预言家也措手不及。所以&#xff0c;如果在来年发生了一些惊天…...

可能是NextJs(使用ssr、api route)打包成桌面端(nextron、electron、tauri)的最佳解决方式

可能是NextJs(使用ssr、api route)打包成桌面端(nextron、electron、tauri)的最佳解决方式 前言 在我使用nextron&#xff08;nextelectron&#xff09;写了一个项目后打包发现nextron等一系列桌面端框架在生产环境是不支持next的ssr也就是api route功能的这就导致我非常难受&…...

二百七十、Kettle——ClickHouse中增量导入清洗数据错误表

一、目的 比如原始数据100条&#xff0c;清洗后&#xff0c;90条正确数据在DWD层清洗表&#xff0c;10条错误数据在DWD层清洗数据错误表&#xff0c;所以清洗数据错误表任务一定要放在清洗表任务之后。 更关键的是&#xff0c;Hive中原本的SQL语句&#xff0c;放在ClickHouse…...

CentOS6升级OpenSSH9.2和OpenSSL3

文章目录 1.说明2.下载地址3.升级OpenSSL4.安装telnet 服务4.1.安装 telnet 服务4.2 关闭防火墙4.2.使用 telnet 连接 5.升级OpenSSH5.1.安装相关命令依赖5.2.备份原 ssh 配置5.3.卸载原有的 OpenSSH5.4.安装 OpenSSH5.5.修改 ssh 配置文件5.6关闭 selinux5.7.重启 OpenSSH 1.说…...

2024 年 MathorCup 数学应用挑战赛——大数据竞赛-赛道 A:台风的分类与预测

2024年MathorCup大数据挑战赛-赛道A初赛--思路https://download.csdn.net/download/qq_52590045/89922904↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

安宝特方案丨XRSOP人员作业标准化管理平台:AR智慧点检验收套件

在选煤厂、化工厂、钢铁厂等过程生产型企业&#xff0c;其生产设备的运行效率和非计划停机对工业制造效益有较大影响。 随着企业自动化和智能化建设的推进&#xff0c;需提前预防假检、错检、漏检&#xff0c;推动智慧生产运维系统数据的流动和现场赋能应用。同时&#xff0c;…...

高频面试之3Zookeeper

高频面试之3Zookeeper 文章目录 高频面试之3Zookeeper3.1 常用命令3.2 选举机制3.3 Zookeeper符合法则中哪两个&#xff1f;3.4 Zookeeper脑裂3.5 Zookeeper用来干嘛了 3.1 常用命令 ls、get、create、delete、deleteall3.2 选举机制 半数机制&#xff08;过半机制&#xff0…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码

目录 一、&#x1f468;‍&#x1f393;网站题目 二、✍️网站描述 三、&#x1f4da;网站介绍 四、&#x1f310;网站效果 五、&#x1fa93; 代码实现 &#x1f9f1;HTML 六、&#x1f947; 如何让学习不再盲目 七、&#x1f381;更多干货 一、&#x1f468;‍&#x1f…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

如何在Windows本机安装Python并确保与Python.NET兼容

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...

Vue 3 + WebSocket 实战:公司通知实时推送功能详解

&#x1f4e2; Vue 3 WebSocket 实战&#xff1a;公司通知实时推送功能详解 &#x1f4cc; 收藏 点赞 关注&#xff0c;项目中要用到推送功能时就不怕找不到了&#xff01; 实时通知是企业系统中常见的功能&#xff0c;比如&#xff1a;管理员发布通知后&#xff0c;所有用户…...