当前位置: 首页 > news >正文

测试PySpark

文章最前: 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。

这篇文章旨在帮你写出健壮的pyspark 代码。

在这里,通过它写pyspark单元测试,看这个代码通过PySpark built,下载该目录代码,查看JIRA 看板票的pyspark测试

创建PySpark应用

这边一个例子是怎么创建pyspark应用,如果你的应用已经测试,你可以跳过这一段,测试你的pyspark程序。

现在,开始测试你的spark session

from pyspark.sql import SparkSession
from pyspark.sql.functions import col# Create a SparkSession
spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()

接下来,创建一个DataFrame

sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]df = spark.createDataFrame(sample_data)

现在,我们对我们的DataFrame来定义转换算子

from pyspark.sql.functions import col, regexp_replace# Remove additional spaces in name
def remove_extra_spaces(df, column_name):# Remove extra spaces from the specified columndf_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))return df_transformedtransformed_df = remove_extra_spaces(df, "name")transformed_df.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

测试你的pyspark应用

现在来测试你的pyspark转换算子。一个选择简化DataFrame测试结果,可以简化数据或者输入数据。更好的方式写测试例子,这里有一些例子怎么去测试我们的代码,这些代码是基于spark 3.5以下版本。对于这些例子做笔记是非常值得的,可以通过测试框架,不管你是使用unittest or pytest; built-in PySpark 测试是单机的,意味着他兼容测试框架和CI测试

选项1: 仅仅使用PySpark Built-in 测试方法

import pyspark.testing
from pyspark.testing.utils import assertDataFrameEqual# Example 1
df1 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 1000), ("2", 3000)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2)  # pass, DataFrames are identical
# Example 2
df1 = spark.createDataFrame(data=[("1", 0.1), ("2", 3.23)], schema=["id", "amount"])
df2 = spark.createDataFrame(data=[("1", 0.109), ("2", 3.23)], schema=["id", "amount"])
assertDataFrameEqual(df1, df2, rtol=1e-1)  # pass, DataFrames are approx equal by rtol

 您还可以简单地比较两个 DataFrame 模式:

from pyspark.testing.utils import assertSchemaEqual
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleTypes1 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])
s2 = StructType([StructField("names", ArrayType(DoubleType(), True), True)])assertSchemaEqual(s1, s2)  # pass, schemas are identical

选项 2:使用单元测试

对于更复杂的测试场景,您可能需要使用测试框架。

最流行的测试框架选项之一是单元测试。让我们逐步了解如何使用内置 Pythonunittest库来编写 PySpark 测试。有关该unittest库的更多信息,请参阅此处: https: //docs.python.org/3/library/unittest.html。

首先,您需要一个 Spark 会话。您可以使用包@classmethod中的装饰器unittest来负责设置和拆除 Spark 会话。

import unittestclass PySparkTestCase(unittest.TestCase):@classmethoddef setUpClass(cls):cls.spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()@classmethoddef tearDownClass(cls):cls.spark.stop()

 现在我们来写一个unittest类。

from pyspark.testing.utils import assertDataFrameEqualclass TestTranformation(PySparkTestCase):def test_single_space(self):sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]# Create a Spark DataFrameoriginal_df = spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df = remove_extra_spaces(original_df, "name")expected_data = [{"name": "John D.", "age": 30},{"name": "Alice G.", "age": 25},{"name": "Bob T.", "age": 35},{"name": "Eve A.", "age": 28}]expected_df = spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df)
运行时,unittest将选取名称以“test”开头的所有函数。

选项 3:使用Pytest

pytest我们还可以使用最流行的 Python 测试框架之一来编写测试。有关 的更多信息pytest,请参阅此处的文档: https: //docs.pytest.org/en/7.1.x/contents.html。

使用pytest固定装置允许我们在测试之间共享 Spark 会话,并在测试完成时将其拆除。

import pytest@pytest.fixture
def spark_fixture():spark = SparkSession.builder.appName("Testing PySpark Example").getOrCreate()yield spark

然后我们可以这样定义我们的测试:

import pytest
from pyspark.testing.utils import assertDataFrameEqualdef test_single_space(spark_fixture):sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]# Create a Spark DataFrameoriginal_df = spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df = remove_extra_spaces(original_df, "name")expected_data = [{"name": "John D.", "age": 30},{"name": "Alice G.", "age": 25},{"name": "Bob T.", "age": 35},{"name": "Eve A.", "age": 28}]expected_df = spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df)

当您使用该pytest命令运行测试文件时,它将选取名称以“test”开头的所有函数。

把它们放在一起!

让我们在单元测试示例中一起查看所有步骤。

# pkg/etl.py
import unittestfrom pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import regexp_replace
from pyspark.testing.utils import assertDataFrameEqual# Create a SparkSession
spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]df = spark.createDataFrame(sample_data)# Define DataFrame transformation function
def remove_extra_spaces(df, column_name):# Remove extra spaces from the specified column using regexp_replacedf_transformed = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))return df_transformed
# pkg/test_etl.py
import unittestfrom pyspark.sql import SparkSession# Define unit test base class
class PySparkTestCase(unittest.TestCase):@classmethoddef setUpClass(cls):cls.spark = SparkSession.builder.appName("Sample PySpark ETL").getOrCreate()@classmethoddef tearDownClass(cls):cls.spark.stop()# Define unit test
class TestTranformation(PySparkTestCase):def test_single_space(self):sample_data = [{"name": "John    D.", "age": 30},{"name": "Alice   G.", "age": 25},{"name": "Bob  T.", "age": 35},{"name": "Eve   A.", "age": 28}]# Create a Spark DataFrameoriginal_df = spark.createDataFrame(sample_data)# Apply the transformation function from beforetransformed_df = remove_extra_spaces(original_df, "name")expected_data = [{"name": "John D.", "age": 30},{"name": "Alice G.", "age": 25},{"name": "Bob T.", "age": 35},{"name": "Eve A.", "age": 28}]expected_df = spark.createDataFrame(expected_data)assertDataFrameEqual(transformed_df, expected_df)
unittest.main(argv=[''], verbosity=0, exit=False)
在 1.734 秒内完成 1 次测试
<unittest.main.TestProgram 位于 0x174539db0>

相关文章:

测试PySpark

文章最前&#xff1a; 我是Octopus&#xff0c;这个名字来源于我的中文名--章鱼&#xff1b;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github &#xff1b;这博客是记录我学习的点点滴滴&#xff0c;如果您对 Python、Java、AI、算法有兴趣&#xff0c;可以关注我的…...

C语言- 原子操作

基本概念 在C语言(尤其是C11标准之后)中,原子操作提供了一种机制,使得程序员可以在并发环境中,不使用互斥或其他同步原语,而直接对数据进行操作,同时确保数据的完整性和一致性。 原子变量和原子操作的核心思想是:无论什么时候,只有一个线程能够看到变量的修改操作。…...

设置hadoop+安装java环境

上一篇 http://t.csdnimg.cn/K3MFS 基本操作 接着上一篇 先导入之前导出的虚拟机 选择导出到对应的文件夹中 这里修改一下保存虚拟机的位置&#xff08;当然你默认也可以&#xff09; 改一个名字 新建一个share文件夹用来存放共享软件的文件夹 在虚拟机的设置中找到这个设置…...

阿里云新加坡主机服务器选择

阿里云新加坡主机有哪些选择&#xff1f;可以选择云服务器ECS或轻量应用服务器&#xff0c;都有新加坡地域可以选择&#xff0c;东南亚地区可以选择新加坡、韩国首尔、日本东京等地域&#xff0c;阿里云新加坡主机测试IP地址&#xff1a;161.117.118.93 可以测试下本地到新加坡…...

21天打卡掌握java基础操作

Java安装环境变量配置-day1 参考&#xff1a; https://www.runoob.com/w3cnote/windows10-java-setup.html 生成class文件 java21天打卡-day2 输入和输出 题目&#xff1a;设计一个程序&#xff0c;输入上次考试成绩&#xff08;int&#xff09;和本次考试成绩&#xff0…...

SQL题目记录

1.商品推荐题目 1.思路&#xff1a; 通过取差集 得出要推荐的商品差集的选取&#xff1a;except直接取差集 或者a left join b on where b null 2.知识点 1.except selectfriendship_info.user1_id as user_id,sku_id fromfriendship_infojoin favor_info on friendship_in…...

Linux程序调试器——gdb的使用

gdb的概述 GDB 全称“GNU symbolic debugger”&#xff0c;从名称上不难看出&#xff0c;它诞生于 GNU 计划&#xff08;同时诞生的还有 GCC、Emacs 等&#xff09;&#xff0c;是 Linux 下常用的程序调试器。发展至今&#xff0c;GDB 已经迭代了诸多个版本&#xff0c;当下的…...

前端打包项目上线-nginx

第一步&#xff1a;下载nginx。 直接下载 nginx/Windows-1.25.2 pgp 第二步&#xff1a;解压zip包 第三步&#xff1a;打开文件夹&#xff0c;把http里的路径打开cmd 第四步&#xff1a;打开你的http-server服务&#xff0c;没有下载去下一次就ok了 打开后就可以访问了 第五步…...

创龙瑞芯微RK3568参数修改(调试口波特率和rootfs文件)

前言 前面写了基本的文件编译、系统编译和系统烧写&#xff0c;差不多前期工作就准备的差不多了。目前的东西能解决大部分入门级的需求。当然如果需要开发的话&#xff0c;还需要修改其他东西&#xff0c;下面一步一步的给小伙伴介绍关键参数怎么修改。 给定波特率 拿到开发板…...

VMware——VMware17安装WindowServer2012R2环境(图解版)

目录 一、WindowServer2012R2镜像百度云下载二、安装 一、WindowServer2012R2镜像百度云下载 下载链接&#xff1a;https://pan.baidu.com/s/1TWnSRJTk0ruGNn4YinzIgA 提取码&#xff1a;e7u0 二、安装 打开虚拟机&#xff0c;点击【创建新的虚拟机】&#xff0c;如下图&…...

ModuleNotFoundError: No module named ‘torch‘

目录 情况1,真的没有安装pytorch情况2(安装了与CUDA不对应的pytorch版本导致无法识别出torch) 情况1,真的没有安装pytorch 虚拟环境里面真的是没有torch,这种情况就easy job了,点击此链接直接安装与CUDA对应的pytorch版本,CTRLF直接搜索对应CUDA版本即可查找到对应的命令.按图…...

采用Spring Boot框架开发的医院预约挂号系统3e3g0+vue+java

本医院预约挂号系统有管理员&#xff0c;医生和用户。管理员功能有个人中心&#xff0c;用户管理&#xff0c;医生管理&#xff0c;科室信息管理&#xff0c;预约挂号管理&#xff0c;用户投诉管理&#xff0c;投诉处理管理&#xff0c;通知公告管理&#xff0c;科室分类管理。…...

Jmeter性能测试(压力测试)

1.先保存 2.添加请求&#xff08;即添加一个线程组&#xff09; 3.添加取样器&#xff08;在线程组下面添加一个http请求&#xff09; 场景1&#xff1a;模拟半小时之内1000个用户访问服务器资源&#xff0c;要求平均响应时间在3000毫秒内&#xff0c;且错误率为0&#xff0…...

NetCore/Net8下使用Redis的分布式锁实现秒杀功能

目的 本文主要是使用NetCore/Net8加上Redis来实现一个简单的秒杀功能&#xff0c;学习Redis的分布式锁功能。 准备工作 1.Visual Studio 2022开发工具 2.Redis集群&#xff08;6个Redis实例&#xff0c;3主3从&#xff09;或者单个Redis实例也可以。 实现思路 1.秒杀开始…...

openGauss学习笔记-102 openGauss 数据库管理-管理数据库安全-客户端接入之查看数据库连接数

文章目录 openGauss学习笔记-102 openGauss 数据库管理-管理数据库安全-客户端接入之查看数据库连接数102.1 背景信息102.2 操作步骤 openGauss学习笔记-102 openGauss 数据库管理-管理数据库安全-客户端接入之查看数据库连接数 102.1 背景信息 当用户连接数达到上限后&#…...

lspci源码

lspci 显示Linux系统的pci设备最简单的方法就是使用lspci命令&#xff0c;前提是要安装pciutils包&#xff08;centos在最小化安装时不会自带该包&#xff0c;需要自己下载安装&#xff09; pciutils包的源码github地址为&#xff1a; https://github.com/pciutils/pciutils …...

CMake教程-第 8 步:添加自定义命令和生成文件

CMake教程-第 8 步&#xff1a;添加自定义命令和生成文件 1 CMake教程介绍2 学习步骤Step 1: A Basic Starting PointStep 2: Adding a LibraryStep 3: Adding Usage Requirements for a LibraryStep 4: Adding Generator ExpressionsStep 5: Installing and TestingStep 6: Ad…...

快速入门:Spring Cache

目录 一:Spring Cache简介 二:Spring Cache常用注解 2.1:EnableCaching 2.2: Cacheable 2.3:CachePut 2.4:CacheEvict 三:Spring Cache案例 3.1:先在pom.xml中引入两个依赖 3.2:案例 3.2.1:构建数据库表 3.2.2:构建User类 3.2.3:构建Controller mapper层代码 3.…...

探索音频传输系统:数字声音的无限可能 | 百能云芯

音频传输系统是一项关键的技术&#xff0c;已经在数字时代的各个领域中广泛应用&#xff0c;从音乐流媒体到电话通信&#xff0c;再到多媒体制作。本文将深入探讨音频传输系统的定义、工作原理以及在现代生活中的各种应用&#xff0c;以帮助您更好地了解这一重要技术。 音频传输…...

【C++】-c++的类型转换

&#x1f496;作者&#xff1a;小树苗渴望变成参天大树&#x1f388; &#x1f389;作者宣言&#xff1a;认真写好每一篇博客&#x1f4a4; &#x1f38a;作者gitee:gitee✨ &#x1f49e;作者专栏&#xff1a;C语言,数据结构初阶,Linux,C 动态规划算法&#x1f384; 如 果 你 …...

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

Java面试专项一-准备篇

一、企业简历筛选规则 一般企业的简历筛选流程&#xff1a;首先由HR先筛选一部分简历后&#xff0c;在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如&#xff1a;Boss直聘&#xff08;招聘方平台&#xff09; 直接按照条件进行筛选 例如&#xff1a…...

华硕a豆14 Air香氛版,美学与科技的馨香融合

在快节奏的现代生活中&#xff0c;我们渴望一个能激发创想、愉悦感官的工作与生活伙伴&#xff0c;它不仅是冰冷的科技工具&#xff0c;更能触动我们内心深处的细腻情感。正是在这样的期许下&#xff0c;华硕a豆14 Air香氛版翩然而至&#xff0c;它以一种前所未有的方式&#x…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...