当前位置: 首页 > news >正文

Spark 3:Spark Core RDD持久化

RDD 的数据是过程数据

b8da17ba7f4f4942b11bb5f211111136.png

RDD 的缓存

a8536b8352164f449bcb9d6c550cfe4a.png

# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 缓存到内存中rdd3.cache()# 先放内存,如果不够则放硬盘,2份副本rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())# 清理缓存rdd3.unpersist()time.sleep(100000)

06100dee265d4fe587b2abfeb2291e1f.png

e68c6e5996a148c2991e884258f5bc4a.png

103ceba93d6c42daa77820cb985257f8.png

RDD 的CheckPoint

fa83627f868d444db9e8f3b5efc06e8d.png

# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 告知spark, 开启CheckPoint功能sc.setCheckpointDir("hdfs://node1:8020/output/ckp")rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 调用checkpoint API 保存数据即可rdd3.checkpoint()rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())rdd3.unpersist()time.sleep(100000)

6297822106df4f648b5759fd320c6ca8.png

961fa643346945a1b842bddda7222db3.png

Cache和Checkpoint区别
Cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)。
CheckPoint是重量级保存RDD数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留RDD血缘关系)。
Cache 和 CheckPoint的性能对比?
Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快。
CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本) 。

案例练习:搜索引擎日志分析

0dd1f3dcea24466fa400012cb8541847.png

bcdb47e47d32466aa25847a43dd57614.png

60c99356504f4f02aa731a890bfd9597.png

# coding:utf8# 导入Spark的相关包
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from defs import context_jieba, filter_words, append_words, extract_user_and_word
from operator import addif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 读取数据文件file_rdd = sc.textFile("hdfs://node1:8020/input/SogouQ.txt")# 2. 对数据进行切分 \tsplit_rdd = file_rdd.map(lambda x: x.split("\t"))# 3. 因为要做多个需求, split_rdd 作为基础的rdd 会被多次使用.split_rdd.persist(StorageLevel.DISK_ONLY)# TODO: 需求1: 用户搜索的关键`词`分析# 主要分析热点词# 将所有的搜索内容取出# print(split_rdd.takeSample(True, 3))context_rdd = split_rdd.map(lambda x: x[2])# 对搜索的内容进行分词分析words_rdd = context_rdd.flatMap(context_jieba)# print(words_rdd.collect())# 院校 帮 -> 院校帮# 博学 谷 -> 博学谷# 传智播 客 -> 传智播客filtered_rdd = words_rdd.filter(filter_words)# 将关键词转换: 传智播 -> 传智播客final_words_rdd = filtered_rdd.map(append_words)# 对单词进行 分组 聚合 排序 求出前5名result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求1结果: ", result1)# TODO: 需求2: 用户和关键词组合分析# 1, 我喜欢传智播客# 1+我  1+喜欢 1+传智播客user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))# 对用户的搜索内容进行分词, 分词后和用户ID再次组合user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)# 对内容进行 分组 聚合 排序 求前5result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求2结果: ", result2)# TODO: 需求3: 热门搜索时间段分析# 取出来所有的时间time_rdd = split_rdd.map(lambda x: x[0])# 对时间进行处理, 只保留小时精度即可hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))# 分组 聚合 排序result3 = hour_with_one_rdd.reduceByKey(add).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\collect()print("需求3结果: ", result3)time.sleep(100000)import jiebadef context_jieba(data):"""通过jieba分词工具 进行分词操作"""seg = jieba.cut_for_search(data)l = list()for word in seg:l.append(word)return ldef filter_words(data):"""过滤不要的 谷 \ 帮 \ 客"""return data not in ['谷', '帮', '客']def append_words(data):"""修订某些关键词的内容"""if data == '传智播': data = '传智播客'if data == '院校': data = '院校帮'if data == '博学': data = '博学谷'return (data, 1)def extract_user_and_word(data):"""传入数据是 元组 (1, 我喜欢传智播客)"""user_id = data[0]content = data[1]# 对content进行分词words = context_jieba(content)return_list = list()for word in words:# 不要忘记过滤 \谷 \ 帮 \ 客if filter_words(word):return_list.append((user_id + "_" + append_words(word)[0], 1))return return_list

提交到集群运行

4bda8984859e4bdb8760a429a5209708.png

e35dfa2d151241b5a10bd5d21198ad38.png为什么要在全部的服务器安装jieba库?
因为YARN是集群运行,Executor可以在所有服务器上执行,所以每个服务器都需要有jieba库提供支撑。
如何尽量提高任务计算的资源?
计算CPU核心和内存量,通过--executor-memory 指定executor内存,通过--executor-cores 指定
executor的核心数,通过--num-executors 指定总executor数量。

 

相关文章:

Spark 3:Spark Core RDD持久化

RDD 的数据是过程数据 RDD 的缓存 # coding:utf8 import timefrom pyspark import SparkConf, SparkContext from pyspark.storagelevel import StorageLevelif __name__ __main__:conf SparkConf().setAppName("test").setMaster("local[*]")sc SparkC…...

字节跳动五面都过了,结果被刷了,问了hr原因竟说是...

摘要 说在前面,面试时最好不要虚报工资。本来字节跳动是很想去的,几轮面试也通过了,最后没offer,自己只想到几个原因:1、虚报工资,比实际高30%;2、有更好的人选,这个可能性不大&…...

Python日期带时区转换工具类总结

文章目录 1.背景2. 遇到的坑3. 一些小案例3.1 当前日期、日期时间、UTC日期时间3.2 昨天、昨天UTC日期、昨天现在这个时间点的时间戳3.3 日期转时间戳3.4 时间戳转日期3.5 日期加减、小时的加减 4. 总结5. 完整的编码 1.背景 最近项目是国际项目,所以需要经常需要用…...

视频会议产品对比分析

内网视频会议系统如何选择?有很多单位为了保密,只能使用内部网络,无法连接互联网,那些SaaS视频会议就无法使用。在内网的优秀视频会议也有很多可供选择,以下是几个常用的: 1. 宝利通:它支持多种…...

每日一练 | 华为认证真题练习Day47

1、某台路由器输出信息如下,下列说法错误的是?(多选) A. 本路由器开启了区域认证 B. 本设备出现故障,配置的Router Id和实际生效的Router ID不一致 C. 本设备生效的Router Id为10.0.12.1 D. 本设备生效的Router Id为…...

ChatIE(LLM大模型用于信息抽取)

Zero-Shot Information Extraction via Chatting with ChatGPT paper:https://arxiv.org/abs/2302.10205 利用ChatGPT实现零样本信息抽取(Information Extraction,IE),看到零样本就能大概明白这篇文章将以ChatGPT作为…...

提升企业管理效率的利器——ADManager Plus

在当今信息时代,企业的规模和复杂性不断增长,管理各个方面变得愈发具有挑战性。而在企业管理中,活跃目录(Active Directory)起着至关重要的作用。它是一种用于组织内部的用户、计算机、组和其他对象进行集中管理的目录…...

《入侵的艺术》读书心得:第六章:渗透测试中的智慧与愚昧

第六章:渗透测试中的智慧与愚昧 这些想法是愚昧的 1.任何期待渗透测试结果是“毫无破绽”、“无懈可击”…都是极其愚昧的: 第一层含义:测试的不可穷尽性原理(同软件测试) 第二层含义:作为优秀甚至只是合…...

SAP-MM-采购申请-价值特性

采购申请审批在维护价值特性时要注意是抬头价值还是行价值,要确定选择哪个,配置时对应配置。 1、创建价值特性CT04 字段名称:CEBAN-GSWRT,和CEBAN-GFWRT 抬头总价值:CEBAN-GFWRT;如果选择的是抬头审批&am…...

设计模式 - 代理模式

基本介绍: 代理模式:为一个对象提供一个替身,以控制对这个对象的访问。即通过代理 对象访问目标对象.这样做的好处是:可以在目标对象实现的基础上,增强额外的 功能操作,即扩展目标对象的功能。被代理的对象可以是远程对象、创建开销大的对象或需要安全控…...

IOC初始化 IOC启动阶段 (Spring容器的启动流程)

[toc](IOC初始化 IOC启动阶段 (Spring容器的启动流程)) IOC初始化 IOC启动阶段 (Spring容器的启动流程) Resource定位过程:这个过程是指定位BeanDefinition的资源,也就是配置文件(如xml)的位置,并将其封装成Resource对…...

Java后端入职第四天,就被要求代码回退(Git回退实战)

一、需求背景 初入职场,由于自己的失误或者对git不熟悉,把被人的代码给冲突掉了,然后需要立马回滚,对于新手开发,应该比较常见吧!或者,比较多一种情况,错误把工程add了到了暂存区,比如一些本地配置,本来就不应该提交的,又或者,开发中只提交部分代码,又想最新的提…...

【swing】SplitPanel

当使用Java的Swing库来实现一个左右风格的SplitPanel时,可以使用JSplitPane作为容器,并在左边的面板中放置三个按钮,以及在右边的面板中显示图片。以下是一个示例代码: import javax.swing.*; import java.awt.*; import java.aw…...

网络货运平台源码 管理平台端+司机端APP+货主端APP源码

网络货运平台系统源码,网络货运平台源码 管理平台端司机端APP货主端APP 遵循政策要求的八项基本功能,结合货主、实际承运人、监管方等多方业务场景,构建人、车、货、企一体的标准化网络货运平台系统。具有信息发布、线上交易、全程监控、金融…...

Yarn学习笔记

Apache Hadoop YARN (Yet AnotherResource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一…...

智能路由器开发之OpenWrt简介

智能路由器开发之OpenWrt简介 1. 引言 1.1 智能路由器的重要性和应用场景 智能路由器作为网络通信的核心设备,具有重要的地位和广泛的应用场景。传统的路由器主要提供基本的网络连接功能,但随着智能家居、物联网和大数据应用的快速发展,对于…...

Linux音频和视频命令速查表

在Linux系统中,有许多命令可以帮助我们处理音频和视频文件,从基本的播放和转码,到编辑和处理音频、视频流。 本文将提供一个Linux音频和视频命令速查表,帮助您快速查找并了解各种常用的命令及其用法。 音频命令 播放音频文件 a…...

脉蜂:Django + Flutter 开发的进销存管理系统【已开源】

项目说明 小规模零售(包括电商)跟大规模零售企业的差别在哪里呢? 以我当前的认知来看,小规模零售跟大规模零售企业的差别更多的是在供应链管理、进销存管控上面产生的。如果有一个工具,能够帮他们减少这方面的差异&…...

树的先序,中序,后序递归遍历

//树的先序、中序、后序遍历递归 #include<bits/stdc.h> typedef struct node { char data; struct node *lchild,*rchild; }BTNode; void Greate(BTNode *&T) { char ch; scanf("%c",&ch); if(ch#) TNULL; else { T(BTNode*)malloc(sizeof(BT…...

如何在Linux中更改SSH端口?

SSH&#xff08;Secure Shell&#xff09;是一种安全的远程登录协议&#xff0c;它允许您通过网络远程连接到Linux系统并进行管理操作。默认情况下&#xff0c;SSH使用22端口进行通信。然而&#xff0c;为了增强系统的安全性&#xff0c;有时候我们需要更改SSH端口&#xff0c;…...

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

【Java学习笔记】Arrays类

Arrays 类 1. 导入包&#xff1a;import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序&#xff08;自然排序和定制排序&#xff09;Arrays.binarySearch()通过二分搜索法进行查找&#xff08;前提&#xff1a;数组是…...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

HTML前端开发:JavaScript 常用事件详解

作为前端开发的核心&#xff0c;JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例&#xff1a; 1. onclick - 点击事件 当元素被单击时触发&#xff08;左键点击&#xff09; button.onclick function() {alert("按钮被点击了&#xff01;&…...

Java入门学习详细版(一)

大家好&#xff0c;Java 学习是一个系统学习的过程&#xff0c;核心原则就是“理论 实践 坚持”&#xff0c;并且需循序渐进&#xff0c;不可过于着急&#xff0c;本篇文章推出的这份详细入门学习资料将带大家从零基础开始&#xff0c;逐步掌握 Java 的核心概念和编程技能。 …...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录&#xff0c;但是由于这个树组件的节点越来越多&#xff0c;导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多&#xff0c;导致的浏览器卡顿&#xff0c;这里很明显就需要用到虚拟列表的技术&…...

SiFli 52把Imagie图片,Font字体资源放在指定位置,编译成指定img.bin和font.bin的问题

分区配置 (ptab.json) img 属性介绍&#xff1a; img 属性指定分区存放的 image 名称&#xff0c;指定的 image 名称必须是当前工程生成的 binary 。 如果 binary 有多个文件&#xff0c;则以 proj_name:binary_name 格式指定文件名&#xff0c; proj_name 为工程 名&…...

C++.OpenGL (14/64)多光源(Multiple Lights)

多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...