当前位置: 首页 > news >正文

spark shuffle写操作——SortShuffleWriter


写入的简单流程:
1.生成ExternalSorter对象
2.将消息都是插入ExternalSorter对象中
3.获取到mapOutputWriter,将中间产生的临时文件合并到一个临时文件
4.生成最后的data文件和index文件
可以看到写入的重点类是ExternalSorter对象
image.png

ExternalSorter

基本功能:对(k,v)进行排序,中间可能存在合并操作,最后生成(k,c)。

  1. 使用partitioner对key进行分区
  2. 在每个分区中使用Comparator进行排序
  3. 输出一个单独的文件,每个分区对应这个文件中的一段范围。

如果禁用了合并操作,类型C必须等于V
这个类的工作流程如下:

  • 使用数据,反复填充内存缓冲区。如果是可以合并的数据,则使用PartitionedAppendOnlyMap;如果不合并,则使用PartitionedPairBuffer。在这些缓冲区中,我们首先按分区ID对元素进行排序,然后可能还会按键进行排序。为了避免对每个键多次调用分区器,我们将分区ID与每条记录一同存储。
  • 当每个缓冲区达到内存限制时,会将其spill到文件中。这个文件首先按分区ID排序,如果需要做聚合操作,其次可能按键或键的哈希码排序。对于每个文件,跟踪每个分区在内存中的对象数量,因此不必为每个元素都写出分区ID。
  • 当用户请求迭代器或文件输出时,溢写的文件会与任何剩余的内存数据一起被合并,使用上述定义的相同排序顺序(除非排序和聚合都被禁用)。如果需要按键进行聚合,我们或者从ordering参数中使用全序,或者读取具有相同哈希码的键并相互比较它们的相等性来合并值。
  • 用户在结束时应调用stop()方法来删除所有中间文件。

缓存buffer:PartitionedAppendOnlyMap、PartitionedPairBuffer
关键方法:insertAll、maybeSpillCollection、spill、writePartitionedMapOutput
image.png
image.png

PartitionedPairBuffer

capacity 容量
curSize 当前放入的数据量
data 数组,存储的数据,(k,v)占用数组的两个位置
image.png

insert

如果容量达到瓶颈就进行扩容。
先存key,再存value。再调用afterUpdate
image.png

afterUpdate

numUpdates数据插入/更新次数
nextSampleNum下一次采样的次数
更新numUpdates,如果达到采样次数,执行采样takeSample
image.png

takeSample

samples中只存两个样品数据,用来计算每次更新的差值。
采样的时候要移除多余的数据。更新下一次采样的数据量。
image.png

estimateSize

预估大小。
最后一个样品的lastSize+bytesPerUpdate*新增的更新次数。
image.png

resetSamples

重新进行采样。
image.png

growArray

扩容2倍容量,迁移数据,重启采样
image.png

partitionedDestructiveSortedIterator

生成比较器comparator,调用sort对缓存的数据进行排序。
image.png
sorter是使用TimSort进行排序的。
TimSort介绍: https://zhuanlan.zhihu.com/p/695042849
image.png

iterator

用pos计算剩余量。
data(2 * pos)为key,data(2 * pos+1) 为value
image.png

PartitionedAppendOnlyMap

存储数据用的数组data,里面的元素是key0, value0, key1, value1, key2, value2…
image.png

changeValue

PartitionedAppendOnlyMap插入数据不再是追加,而是有一个相同key合并值的过程。

  1. key是null,返回null,不进行存储
  2. key首次插入,更新data中的对应的kv值
  3. key非首次插入,更新data中合并的的新value
  4. key发生哈希冲突,就向后加1,直到不冲突

image.png

update

跟changeValue类似。
image.png

growTable

比较简单,就是容量扩大两倍,将旧的kv值重新计算hash插入到新的数组中,如果发生hash冲突就不断向后移动一位。
image.png

iterator

核心方法是nextValue,在nextValue中,遍历data数组的对应key值,要求不是null,表明这个位置是有值的。
如果有key为null,要求pos=-1且haveNullValue=true
image.png

partitionedDestructiveSortedIterator

调用destructiveSortedIterator方法
image.png

destructiveSortedIterator

data数组中元素是分散的,首先将数组中的元素都集中到数组的前面。后面就跟PartitionedPairBuffer的partitionedDestructiveSortedIterator方法一样使用TimeSort进行排序。
image.png

采样相关方法

跟上面的PartitionedPairBuffer的采样相关方法一样。

spill相关方法

入口方法是maybeSpillCollection
image.png

maybeSpillCollection

不论使用的数据结构是buffer还是map,都是计算消耗的容量,再调用maybeSpill方法,最后重新初始化化对应数据结构。可以想到maybeSpill中就将缓存的数据放到了本地。
image.png

maybeSpill

每32条数据就进行一次内存使用情况判断。如果当前使用内存超过了限制,就先申请新的内存,按照两倍的内存使用量申请,不一定申请到足量的内存。申请后还是内存使用超过了限制,就进行spill,调用spill方法,同时调用releaseMemory释放内存。
image.png

releaseMemory

image.png

spill

调用destructiveSortedWritablePartitionedIterator方法返回排好序的分区迭代器。
调用spillMemoryIteratorToDisk将数据溢写到磁盘上
最后将生成的文件记录到spills中
image.png

destructiveSortedWritablePartitionedIterator

调用对应数据结构的partitionedDestructiveSortedIterator方法返回排序的迭代器。
就是上面的PartitionedPairBuffer和PartitionedAppendOnlyMap的partitionedDestructiveSortedIterator方法。
image.png

spillMemoryIteratorToDisk

创建临时文件,生成对应的writer
image.png
遍历将数据写入的文件中,每10000条进行一次flush。
如果失败了,调用revertPartialWritesAndClose进行回滚。
image.png

revertPartialWritesAndClose

如果这次写入出现问题,使用这个方法。回滚写入,只保留截止到上一次写入的内容。
image.png

writePartitionedMapOutput

将排好序的缓存和文件合并成一个文件输出。
spills为空,即没有产生排序文件。将缓存中数据生成排好序的迭代器,遍历写入到文件中。
image.png
存在排好序的文件。则需要调用partitionedIterator方法将文件数据和缓存的数据进行合并,再遍历输出。
image.png

partitionedIterator

调用merge方法合并内存和文件数据
image.png

merge

merge的第一个参数是spilled文件,第二个参数是内存缓存的数据。
流程是遍历分区,取出对应分区的spilled文件中和缓存中的数据。
根据情况进行聚合或者排序等操作后输出合并后的排好序的文件。
image.png

mergeSort

使用堆排序,但是heap中存放的是已经排好序的iterator。
最小值就是heap中首个iterator中的第一个元素。
image.png

mergeWithAggregation

有总排序,这样相同的key会在一起。
调用mergeSort将iterators合并成一个排好序的iterator。
next方法就是遍历key出来全部的值,进行合并后输出,因为是全局有序,不需要遍历iterator全部数据。
image.png
没有总排序
跟上面流程类似,先得到合并的iterator,但是它不是全局有序的。存在不同的key在comparator比较下相等,如使用hash进行比较,因此存在 aaabaaa 这种情况的key分布。
在获取相同key对应的值的时候需要遍历iterator的使用comparator和equal进行比较数据,再进行合并。返回值是一个comparator相同有可能key不同的key组成的iterator
image.png

相关文章:

spark shuffle写操作——SortShuffleWriter

写入的简单流程: 1.生成ExternalSorter对象 2.将消息都是插入ExternalSorter对象中 3.获取到mapOutputWriter,将中间产生的临时文件合并到一个临时文件 4.生成最后的data文件和index文件 可以看到写入的重点类是ExternalSorter对象 ExternalSorter 基…...

ESP32CAM物联网教学12

ESP32CAM物联网教学12 MicroPython 视频服务 小智希望能在MicroPython中实现摄像头的视频服务,就像官方示例程序CameraWebServer那样。 下载视频服务驱动库 小智通过上网搜索,发现相关的教学材料还不少,并且知道有人已经写出了视频服务的驱…...

【C++精华铺】12.STL list模拟实现

1.序言 STL (Standard Template Library)是C标准库中的一个重要组件,提供了许多通用的数据结构和算法。其中,STL list是一种带头双向链表容器,可以存储任意类型的元素。 list的特点包括: 双向性:list中的元素可以根据需…...

ChatGPT Mac App 发布!

2024 年 6 月,OpenAI 的大语言模型 ChatGPT 的 Mac 客户端与 ChatGPT-4o 一起发布了。ChatGPT Mac 户端可以让用户直接在 Mac 电脑上使用 ChatGPT 进行对话。它提供了一个简单易用的用户界面,用户可以在其中输入文本或语音指令,并接收模型生成…...

ACE之ACE_Time_Value

简介 ACE_Time_Value在ACE中表示时间,集成不同平台的时间 结构 #mermaid-svg-dGoKn1R7GicabUif {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-dGoKn1R7GicabUif .error-icon{fill:#552222;}#mermaid-…...

[论文笔记] 自对齐指令反翻译:SELF-ALIGNMENT WITH INSTRUCTION BACKTRANSLATION

https://arxiv.org/pdf/2308.06259 这篇论文介绍了一种名为“指令反向翻译”(instruction backtranslation)的方法,用于通过自动标记人类书写的文本和相应的指令来构建高质量的指令跟随语言模型。这里是一个通俗易懂的解释: 一、背景 通常,训练一个高质量的指令跟随语言…...

算术运算符. 二

# 表达式 # 操作数和运算符组成 比如 11 # 作用:表达式可以求值,也可以给变量赋值。 # Python算术运算符: # - * / % //(整除:向下取整) ** print(10 4) # 14 print(10 - 4) # 6 print(10 * 4) # 40 …...

代码优化方法记录

每次代码 review 之后,对 review 的情况进行总结记录,产出实际经验,方便组内学习、分享。 1、提取公共内容 公共内容要提取,避免重复编写; 2、css 色值使用变量 css 中的色值、字体,都换成组件库中的变…...

qt 图形、图像、3D相关知识

1.qt 支持3d吗 Qt确实支持3D图形渲染。Qt 3D模块是Qt的一个组成部分,它允许开发者在Qt应用程序中集成3D内容。Qt 3D模块提供了一组类和函数,用于创建和渲染3D场景、处理3D对象、应用光照和纹理等。 Qt 3D模块包括以下几个主要组件: Qt 3D …...

【逆向基础】十、工具分享之DIE(Detect It Easy)

一、简介 DIE(Detect It Easy)是一款可以轻松检测PE文件的程序;其主要作用是查壳,并将pe文件的内容解析出来,包括PE文件中包含的导入函数、导出函数的名称及地址,入口函数地址等,是技术人员分析…...

Netcat:——网络瑞士军刀

Netcat: 网络瑞士军刀 概述 Netcat(通常称为 nc)是一个功能强大的网络工具,广泛用于网络测试和调试。它能够读取和写入网络数据,支持TCP、UDP协议,可以用于端口扫描、端口监听、文件传输等多种用途。 主要用途 获取…...

C++ //练习 14.50 在初始化ex1和ex2的过程中,可能用到哪些类类型的转换序列呢?说明初始化是否正确并解释原因。

C Primer(第5版) 练习 14.50 练习 14.50 在初始化ex1和ex2的过程中,可能用到哪些类类型的转换序列呢?说明初始化是否正确并解释原因。 struct LongDouble{LongDouble(double 0.0);operator double();operator float(); }; Long…...

【开源 Mac 工具推荐之 1】gibMacOS:方便快捷的 macOS 完整包下载 Shell 工具

简介 gibMacOS 是由 GitHub 开发者 corpnewt 编写的一款 Shell 工具。它采用 Python 编程语言,可以让用户打开后在纯文本页面中轻松选择并下载来源于 Apple 官方的 macOS 完整安装包。 Repo 地址:https://github.com/corpnewt/gibMacOS (其…...

pdf文件如何快速英文转中文?

要将 PDF 文件中的英文内容转换为中文,你可以使用以下几种方法: 1、在线翻译工具: 使用网上的免费在线翻译工具,如Google翻译、百度翻译或有道翻译,将整个 PDF 文档粘贴到工具中进行翻译。 2、专业翻译软件&#xf…...

程序的控制结构——if-else语句(双分支结构)【互三互三】

目录 🍁 引言 🍁if-else语句(双分支结构) 👉格式1: 👉功能: 👉程序设计风格提示: 👉例题 👉格式2: 👉…...

[C++]初识C++(命名空间,命名空间使用,函数重载,缺省参数等)

💖💖💖欢迎来到我的博客,我是anmory💖💖💖 又和大家见面了 欢迎来到C探索系列 作为一个程序员你不能不掌握的知识 先来自我推荐一波 个人网站欢迎访问以及捐款 推荐阅读 如何低成本搭建个人网站…...

每天一个数据分析题(四百十六)- 线性回归模型

根据模型假设,线性回归模型中误差项的方差为 A. 常数 B. 函数 C. 随机变量 D. 以上都不是 数据分析认证考试介绍:点击进入 题目来源于CDA模拟题库 点击此处获取答案 数据分析专项练习题库 内容涵盖Python,SQL,统计学&#…...

JupyterNotebook中导出当前环境,并存储为requirements.txt

​使用Anaconda管理Python环境时,可以轻松地导出环境配置,以便在其他机器或环境中重新创建相同的环境。可以通过生成一个environment.yml文件实现的,该文件包含了环境中安装的所有包及其版本。但是,常常在一些课程中JupyterNotebo…...

Java对象复制系列二: 手把手带你写一个Apache BeanUtils

👆🏻👆🏻👆🏻关注博主,让你的代码变得更加优雅。 前言 Apache BeanUtils 是Java中用来复制2个对象属性的一个类型。 上一篇文章我们讲到了 Apache BeanUtils 性能相对比较差,今天…...

一个极简的 Vue 示例

https://andi.cn/page/621516.html...

Spring Boot 实现流式响应(兼容 2.7.x)

在实际开发中,我们可能会遇到一些流式数据处理的场景,比如接收来自上游接口的 Server-Sent Events(SSE) 或 流式 JSON 内容,并将其原样中转给前端页面或客户端。这种情况下,传统的 RestTemplate 缓存机制会…...

《Playwright:微软的自动化测试工具详解》

Playwright 简介:声明内容来自网络,将内容拼接整理出来的文档 Playwright 是微软开发的自动化测试工具,支持 Chrome、Firefox、Safari 等主流浏览器,提供多语言 API(Python、JavaScript、Java、.NET)。它的特点包括&a…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

C++中string流知识详解和示例

一、概览与类体系 C 提供三种基于内存字符串的流&#xff0c;定义在 <sstream> 中&#xff1a; std::istringstream&#xff1a;输入流&#xff0c;从已有字符串中读取并解析。std::ostringstream&#xff1a;输出流&#xff0c;向内部缓冲区写入内容&#xff0c;最终取…...

大模型多显卡多服务器并行计算方法与实践指南

一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)

船舶制造装配管理现状&#xff1a;装配工作依赖人工经验&#xff0c;装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书&#xff0c;但在实际执行中&#xff0c;工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...

网站指纹识别

网站指纹识别 网站的最基本组成&#xff1a;服务器&#xff08;操作系统&#xff09;、中间件&#xff08;web容器&#xff09;、脚本语言、数据厍 为什么要了解这些&#xff1f;举个例子&#xff1a;发现了一个文件读取漏洞&#xff0c;我们需要读/etc/passwd&#xff0c;如…...

纯 Java 项目(非 SpringBoot)集成 Mybatis-Plus 和 Mybatis-Plus-Join

纯 Java 项目&#xff08;非 SpringBoot&#xff09;集成 Mybatis-Plus 和 Mybatis-Plus-Join 1、依赖1.1、依赖版本1.2、pom.xml 2、代码2.1、SqlSession 构造器2.2、MybatisPlus代码生成器2.3、获取 config.yml 配置2.3.1、config.yml2.3.2、项目配置类 2.4、ftl 模板2.4.1、…...

AI语音助手的Python实现

引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...