当前位置: 首页 > news >正文

PySpark用sort-merge join解决数据倾斜的完整案例

假设有两个大表 table1 和 table2 ,并通过 sort-merge join 来解决可能的数据倾斜问题。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col# 初始化SparkSession
spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()# 加载数据,假设数据来自parquet文件
table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")# 查看表的大小
print("table1 size: ", table1.count())
print("table2 size: ", table2.count())# 为了演示数据倾斜,假设我们直接使用join,这里用inner join举例
joined = table1.join(table2, table1["id"] == table2["id"], "inner")# 先对连接键进行排序,为sort-merge join做准备sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")# 使用sort-merge join进行连接
joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")# 触发Action,查看执行计划,此时可以去Spark WebUI查看任务执行情况
joined.count()# 停止SparkSession
spark.stop()

代码解释

初始化SparkSession:创建一个SparkSession对象,这是与Spark交互的入口。

spark = SparkSession.builder.appName("SortMergeJoinExample").getOrCreate()

加载数据并查看表大小:从Parquet文件加载两张表,并打印出它们的行数,以此来了解表的规模。

table1 = spark.read.parquet("path/to/table1.parquet")
table2 = spark.read.parquet("path/to/table2.parquet")print("table1 size: ", table1.count())
print("table2 size: ", table2.count())

数据预处理:在进行 sort-merge join 之前,对两个表按照连接键 id 在每个分区内进行排序。

sorted_table1 = table1.sortWithinPartitions("id")
sorted_table2 = table2.sortWithinPartitions("id")

执行sort-merge join:利用排序后的表,执行 sort-merge join 操作,这里选择的是内连接。

joined = sorted_table1.join(sorted_table2, sorted_table1["id"] == sorted_table2["id"], "inner")

触发Action并查看执行情况:调用 count() 方法触发一个Action,此时Spark会真正执行整个计算流程。与此同时,可以打开Spark WebUI(通常是 http://your-spark-master:4040 ),在 Stages 页面查看任务执行计划,尤其是查看各个阶段的数据分布情况,确认数据倾斜是否得到解决。

joined.count()

停止SparkSession:任务完成后,关闭SparkSession释放资源。

spark.stop()

要在Spark WebUI中查看数据倾斜:

  • 在执行 joined.count() 后,迅速打开浏览器访问Spark WebUI。进入 Stages 标签页,找到正在执行的 join 相关阶段。查看每个任务的处理数据量,如果之前存在数据倾斜,经过 sort-merge join 处理后,各个任务处理的数据量应该相对均匀。

相关文章:

PySpark用sort-merge join解决数据倾斜的完整案例

假设有两个大表 table1 和 table2 ,并通过 sort-merge join 来解决可能的数据倾斜问题。 from pyspark.sql import SparkSession from pyspark.sql.functions import col# 初始化SparkSession spark SparkSession.builder.appName("SortMergeJoinExample&quo…...

sklearn-逻辑回归-制作评分卡

目录 数据集处理 分箱 分多少个箱子合适 分箱要达成什么样的效果 对一个特征进行分箱的步骤 分箱的实现 封装计算 WOE 值和 IV值函数 画IV曲线,判断最佳分箱数量 结论 pd.qcut 执行报错 功能函数封装 判断分箱个数 在银行借贷场景中,评分卡是…...

scrapy爬取图片

scrapy 爬取图片 环境准备 python3.10scrapy pillowpycharm 简要介绍scrapy Scrapy 是一个开源的 Python 爬虫框架,专为爬取网页数据和进行 Web 抓取而设计。它的主要特点包括: 高效的抓取性能:Scrapy 采用了异步机制,能够高效…...

在 Vue 项目中使用地区级联选

在 Vue 项目中使用地区级联选择的完整流程: 1.安装依赖包,这个包提供了中国省市区的完整数据。 npm install element-china-area-data --save 2.导入数据 import { regionData } from element-china-area-data 这个包提供了几种不同的数据格式&#…...

【简博士统计学习方法】第1章:1. 统计学习的定义与分类

自用笔记 1. 统计学习的定义与分类 1.1 统计学习的概念 统计学习(Statistical Machine Learning)是关于计算机基于数据构建概率统计模型并运用模型对数据进行预测与分析的一门学科。 以计算机和网络为平台;以数据为研究对象;以…...

利用 Python 脚本批量创建空白 Markdown 笔记

文章目录 利用 Python 脚本批量创建空白 Markdown 笔记1 背景介绍2 需求描述3 明确思路4 具体实现4.1. 遍历 toc.md 文件,收集文件名和对应的文件内容4.2. 实现文件批量生成逻辑4.3. 补全缺失的工具函数4.4. 进一步补全工具函数中的工具函数 5 脚本运行6 注意事项 利…...

【Qt】C++11 Lambda表达式

1. 举例 connect(ui->pushButton, &QPushButton::clicked, [](bool checked){//具体代码qDebug() << "Hello" << checked;}); 2. 详情 //完整形式 [ capture ] ( params ) opt -> ret { body; }; capture 是捕获列表params 是参数表opt 是函数…...

怎样提高服务器中的数据传输速度?

服务器中的数据传输速度会影响着用户的体验感&#xff0c;当企业中的数据传输速度出现卡顿或者是过慢时&#xff0c;用户不能及时浏览到所需的内容&#xff0c;给用户造成不好的体验感&#xff0c;那么企业该怎样才能提高服务器中的数据传输速度呢&#xff1f; 服务器之间如何传…...

Vue 封装公告滚动

文章目录 需求分析1. 创建公告组件Notice.vue2. 注册全局组件3. 使用 需求 系统中需要有一个公告展示&#xff0c;且这个公告位于页面上方&#xff0c;每个页面都要看到 分析 1. 创建公告组件Notice.vue 第一种 在你的项目的合适组件目录下&#xff08;比如components目录&a…...

JVM实战—12.OOM的定位和解决

大纲 1.如何对系统的OOM异常进行监控和报警 2.如何在JVM内存溢出时自动dump内存快照 3.Metaspace区域内存溢出时应如何解决(OutOfMemoryError: Metaspace) 4.JVM栈内存溢出时应如何解决(StackOverflowError) 5.JVM堆内存溢出时应该如何解决(OutOfMemoryError: Java heap s…...

【python翻译软件V1.0】

如果不想使用密钥的形式&#xff0c;且需要一个直接可用的中英文翻译功能&#xff0c;可以使用一些免费的公共 API&#xff0c;如 opencc 或其他无需密钥的库&#xff0c;或直接用 requests 获取翻译结果。 其中&#xff0c;我可以给你一个简单的代码示例&#xff0c;使用 tra…...

Spring Boot中的依赖注入是如何工作

Spring Boot 中的依赖注入&#xff08;Dependency Injection&#xff0c;简称 DI&#xff09;是通过 Spring 框架的核心机制——控制反转&#xff08;Inversion of Control&#xff0c;IOC&#xff09;容器来实现的。Spring Boot 基于 Spring Framework&#xff0c;在应用中自动…...

ubuntu22.04 编译安装libvirt 10.x

环境安装 sudo apt-get update -y sudo apt-get install qemu-system-x86 bridge-utils libyajl-dev -y sudo apt-get install build-essential autoconf automake libtool -y sudo apt-get install libxml2-dev libxslt1-dev libgnutls28-dev libpciaccess-dev libnl-3-de…...

[fastadmin] 第三十四篇 FastAdmin 商城模块标签使用详解

FastAdmin 商城模块标签使用详解 一、标签基本语法 1.1 基础语法格式 {shop:goodslist flag"参数值" id"变量名" row"数量"}<!-- 循环内容 --> {/shop:goodslist}1.2 常用参数说明 flag: 商品标记筛选id: 循环变量名row: 显示数量 1.…...

(2024,LLaVA-Bench (Wilder),LLaVA-NeXT,LLaMA3,Qwen-1.5,语言模型扩展)

LLaVA-NeXT: Stronger LLMs Supercharge Multimodal Capabilities in the Wild 目录 1. 简介 2. 探索大规模语言模型的能力极限 3. LLaVA-Bench (Wilder)&#xff1a;日常生活视觉聊天基准 4. Benchmark 结果 1. 简介 我们通过引入近期更强大的开源大语言模型&#xff08;…...

IPEX-LLM开发项目过程中的技术总结和心得

IPEX-LLM开发项目过程中的技术总结和心得 在人工智能快速发展的时代&#xff0c;高效地开发和部署大语言模型&#xff08;LLM&#xff09;已成为技术人员的必备技能。在我们的项目中&#xff0c;我们采用了 Intel Extension for PyTorch&#xff08;简称 IPEX&#xff09;和 L…...

HTTP/HTTPS ②-Cookie || Session || HTTP报头

这里是Themberfue 上篇文章介绍了HTTP报头的首行信息 本篇我们将更进一步讲解HTTP报头键值对的含义~~~ ❤️❤️❤️❤️ 报头Header ✨再上一篇的学习中&#xff0c;我们了解了HTTP的报头主要是通过键值对的结构存储和表达信息的&#xff1b;我们已经了解了首行的HTTP方法和UR…...

【软考】软件设计师

「学习路线」&#xff08;推荐该顺序学习&#xff0c;按照先易后难排序&#xff09; 1、上午题—计算机系统&#xff08;5~6分&#xff09;[1.8; ] 2、上午题—程序设计语言&#xff08;固定6分&#xff09;[1.9; ] 3、下午题—试题一&#xff08;15分&#xff09; 4、上午题—…...

K8s Pod OOMKilled,监控却显示内存资源并未打满

1. 问题现象 pod一直重启&#xff0c;通过grafana查看&#xff0c;发现内存使用率并没有100%。 2. 排查过程 2.1 describe查看pod最新一次的状态 可以明显看到&#xff0c;最近一次的重启就是因为内存不足导致的。 2.2 describe 查看node节点状态 找到原因了&#xff0c;原来…...

C++ 原子变量

C 原子变量 文章目录 C 原子变量1. 原子变量是什么&#xff1f;2. 原子操作的特点3. 原子变量的作用1. 多线程安全的共享数据访问2. 替代锁机制3. 实现低级同步算法 4. 原子变量的常见操作5. 内存顺序&#xff08;Memory Ordering&#xff09;内存顺序控制在原子变量中的作用如…...

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

零门槛NAS搭建:WinNAS如何让普通电脑秒变私有云?

一、核心优势&#xff1a;专为Windows用户设计的极简NAS WinNAS由深圳耘想存储科技开发&#xff0c;是一款收费低廉但功能全面的Windows NAS工具&#xff0c;主打“无学习成本部署” 。与其他NAS软件相比&#xff0c;其优势在于&#xff1a; 无需硬件改造&#xff1a;将任意W…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件&#xff1a; 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

NLP学习路线图(二十三):长短期记忆网络(LSTM)

在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...

【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)

1.获取 authorizationCode&#xff1a; 2.利用 authorizationCode 获取 accessToken&#xff1a;文档中心 3.获取手机&#xff1a;文档中心 4.获取昵称头像&#xff1a;文档中心 首先创建 request 若要获取手机号&#xff0c;scope必填 phone&#xff0c;permissions 必填 …...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...