当前位置: 首页 > news >正文

Spark 3:Spark Core RDD持久化

RDD 的数据是过程数据

b8da17ba7f4f4942b11bb5f211111136.png

RDD 的缓存

a8536b8352164f449bcb9d6c550cfe4a.png

# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 缓存到内存中rdd3.cache()# 先放内存,如果不够则放硬盘,2份副本rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())# 清理缓存rdd3.unpersist()time.sleep(100000)

06100dee265d4fe587b2abfeb2291e1f.png

e68c6e5996a148c2991e884258f5bc4a.png

103ceba93d6c42daa77820cb985257f8.png

RDD 的CheckPoint

fa83627f868d444db9e8f3b5efc06e8d.png

# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 告知spark, 开启CheckPoint功能sc.setCheckpointDir("hdfs://node1:8020/output/ckp")rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 调用checkpoint API 保存数据即可rdd3.checkpoint()rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())rdd3.unpersist()time.sleep(100000)

6297822106df4f648b5759fd320c6ca8.png

961fa643346945a1b842bddda7222db3.png

Cache和Checkpoint区别
Cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)。
CheckPoint是重量级保存RDD数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留RDD血缘关系)。
Cache 和 CheckPoint的性能对比?
Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快。
CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本) 。

案例练习:搜索引擎日志分析

0dd1f3dcea24466fa400012cb8541847.png

bcdb47e47d32466aa25847a43dd57614.png

60c99356504f4f02aa731a890bfd9597.png

# coding:utf8# 导入Spark的相关包
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from defs import context_jieba, filter_words, append_words, extract_user_and_word
from operator import addif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 读取数据文件file_rdd = sc.textFile("hdfs://node1:8020/input/SogouQ.txt")# 2. 对数据进行切分 \tsplit_rdd = file_rdd.map(lambda x: x.split("\t"))# 3. 因为要做多个需求, split_rdd 作为基础的rdd 会被多次使用.split_rdd.persist(StorageLevel.DISK_ONLY)# TODO: 需求1: 用户搜索的关键`词`分析# 主要分析热点词# 将所有的搜索内容取出# print(split_rdd.takeSample(True, 3))context_rdd = split_rdd.map(lambda x: x[2])# 对搜索的内容进行分词分析words_rdd = context_rdd.flatMap(context_jieba)# print(words_rdd.collect())# 院校 帮 -> 院校帮# 博学 谷 -> 博学谷# 传智播 客 -> 传智播客filtered_rdd = words_rdd.filter(filter_words)# 将关键词转换: 传智播 -> 传智播客final_words_rdd = filtered_rdd.map(append_words)# 对单词进行 分组 聚合 排序 求出前5名result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求1结果: ", result1)# TODO: 需求2: 用户和关键词组合分析# 1, 我喜欢传智播客# 1+我  1+喜欢 1+传智播客user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))# 对用户的搜索内容进行分词, 分词后和用户ID再次组合user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)# 对内容进行 分组 聚合 排序 求前5result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求2结果: ", result2)# TODO: 需求3: 热门搜索时间段分析# 取出来所有的时间time_rdd = split_rdd.map(lambda x: x[0])# 对时间进行处理, 只保留小时精度即可hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))# 分组 聚合 排序result3 = hour_with_one_rdd.reduceByKey(add).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\collect()print("需求3结果: ", result3)time.sleep(100000)import jiebadef context_jieba(data):"""通过jieba分词工具 进行分词操作"""seg = jieba.cut_for_search(data)l = list()for word in seg:l.append(word)return ldef filter_words(data):"""过滤不要的 谷 \ 帮 \ 客"""return data not in ['谷', '帮', '客']def append_words(data):"""修订某些关键词的内容"""if data == '传智播': data = '传智播客'if data == '院校': data = '院校帮'if data == '博学': data = '博学谷'return (data, 1)def extract_user_and_word(data):"""传入数据是 元组 (1, 我喜欢传智播客)"""user_id = data[0]content = data[1]# 对content进行分词words = context_jieba(content)return_list = list()for word in words:# 不要忘记过滤 \谷 \ 帮 \ 客if filter_words(word):return_list.append((user_id + "_" + append_words(word)[0], 1))return return_list

提交到集群运行

4bda8984859e4bdb8760a429a5209708.png

e35dfa2d151241b5a10bd5d21198ad38.png为什么要在全部的服务器安装jieba库?
因为YARN是集群运行,Executor可以在所有服务器上执行,所以每个服务器都需要有jieba库提供支撑。
如何尽量提高任务计算的资源?
计算CPU核心和内存量,通过--executor-memory 指定executor内存,通过--executor-cores 指定
executor的核心数,通过--num-executors 指定总executor数量。

 

相关文章:

Spark 3:Spark Core RDD持久化

RDD 的数据是过程数据 RDD 的缓存 # coding:utf8 import timefrom pyspark import SparkConf, SparkContext from pyspark.storagelevel import StorageLevelif __name__ __main__:conf SparkConf().setAppName("test").setMaster("local[*]")sc SparkC…...

字节跳动五面都过了,结果被刷了,问了hr原因竟说是...

摘要 说在前面,面试时最好不要虚报工资。本来字节跳动是很想去的,几轮面试也通过了,最后没offer,自己只想到几个原因:1、虚报工资,比实际高30%;2、有更好的人选,这个可能性不大&…...

Python日期带时区转换工具类总结

文章目录 1.背景2. 遇到的坑3. 一些小案例3.1 当前日期、日期时间、UTC日期时间3.2 昨天、昨天UTC日期、昨天现在这个时间点的时间戳3.3 日期转时间戳3.4 时间戳转日期3.5 日期加减、小时的加减 4. 总结5. 完整的编码 1.背景 最近项目是国际项目,所以需要经常需要用…...

视频会议产品对比分析

内网视频会议系统如何选择?有很多单位为了保密,只能使用内部网络,无法连接互联网,那些SaaS视频会议就无法使用。在内网的优秀视频会议也有很多可供选择,以下是几个常用的: 1. 宝利通:它支持多种…...

每日一练 | 华为认证真题练习Day47

1、某台路由器输出信息如下,下列说法错误的是?(多选) A. 本路由器开启了区域认证 B. 本设备出现故障,配置的Router Id和实际生效的Router ID不一致 C. 本设备生效的Router Id为10.0.12.1 D. 本设备生效的Router Id为…...

ChatIE(LLM大模型用于信息抽取)

Zero-Shot Information Extraction via Chatting with ChatGPT paper:https://arxiv.org/abs/2302.10205 利用ChatGPT实现零样本信息抽取(Information Extraction,IE),看到零样本就能大概明白这篇文章将以ChatGPT作为…...

提升企业管理效率的利器——ADManager Plus

在当今信息时代,企业的规模和复杂性不断增长,管理各个方面变得愈发具有挑战性。而在企业管理中,活跃目录(Active Directory)起着至关重要的作用。它是一种用于组织内部的用户、计算机、组和其他对象进行集中管理的目录…...

《入侵的艺术》读书心得:第六章:渗透测试中的智慧与愚昧

第六章:渗透测试中的智慧与愚昧 这些想法是愚昧的 1.任何期待渗透测试结果是“毫无破绽”、“无懈可击”…都是极其愚昧的: 第一层含义:测试的不可穷尽性原理(同软件测试) 第二层含义:作为优秀甚至只是合…...

SAP-MM-采购申请-价值特性

采购申请审批在维护价值特性时要注意是抬头价值还是行价值,要确定选择哪个,配置时对应配置。 1、创建价值特性CT04 字段名称:CEBAN-GSWRT,和CEBAN-GFWRT 抬头总价值:CEBAN-GFWRT;如果选择的是抬头审批&am…...

设计模式 - 代理模式

基本介绍: 代理模式:为一个对象提供一个替身,以控制对这个对象的访问。即通过代理 对象访问目标对象.这样做的好处是:可以在目标对象实现的基础上,增强额外的 功能操作,即扩展目标对象的功能。被代理的对象可以是远程对象、创建开销大的对象或需要安全控…...

IOC初始化 IOC启动阶段 (Spring容器的启动流程)

[toc](IOC初始化 IOC启动阶段 (Spring容器的启动流程)) IOC初始化 IOC启动阶段 (Spring容器的启动流程) Resource定位过程:这个过程是指定位BeanDefinition的资源,也就是配置文件(如xml)的位置,并将其封装成Resource对…...

Java后端入职第四天,就被要求代码回退(Git回退实战)

一、需求背景 初入职场,由于自己的失误或者对git不熟悉,把被人的代码给冲突掉了,然后需要立马回滚,对于新手开发,应该比较常见吧!或者,比较多一种情况,错误把工程add了到了暂存区,比如一些本地配置,本来就不应该提交的,又或者,开发中只提交部分代码,又想最新的提…...

【swing】SplitPanel

当使用Java的Swing库来实现一个左右风格的SplitPanel时,可以使用JSplitPane作为容器,并在左边的面板中放置三个按钮,以及在右边的面板中显示图片。以下是一个示例代码: import javax.swing.*; import java.awt.*; import java.aw…...

网络货运平台源码 管理平台端+司机端APP+货主端APP源码

网络货运平台系统源码,网络货运平台源码 管理平台端司机端APP货主端APP 遵循政策要求的八项基本功能,结合货主、实际承运人、监管方等多方业务场景,构建人、车、货、企一体的标准化网络货运平台系统。具有信息发布、线上交易、全程监控、金融…...

Yarn学习笔记

Apache Hadoop YARN (Yet AnotherResource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一…...

智能路由器开发之OpenWrt简介

智能路由器开发之OpenWrt简介 1. 引言 1.1 智能路由器的重要性和应用场景 智能路由器作为网络通信的核心设备,具有重要的地位和广泛的应用场景。传统的路由器主要提供基本的网络连接功能,但随着智能家居、物联网和大数据应用的快速发展,对于…...

Linux音频和视频命令速查表

在Linux系统中,有许多命令可以帮助我们处理音频和视频文件,从基本的播放和转码,到编辑和处理音频、视频流。 本文将提供一个Linux音频和视频命令速查表,帮助您快速查找并了解各种常用的命令及其用法。 音频命令 播放音频文件 a…...

脉蜂:Django + Flutter 开发的进销存管理系统【已开源】

项目说明 小规模零售(包括电商)跟大规模零售企业的差别在哪里呢? 以我当前的认知来看,小规模零售跟大规模零售企业的差别更多的是在供应链管理、进销存管控上面产生的。如果有一个工具,能够帮他们减少这方面的差异&…...

树的先序,中序,后序递归遍历

//树的先序、中序、后序遍历递归 #include<bits/stdc.h> typedef struct node { char data; struct node *lchild,*rchild; }BTNode; void Greate(BTNode *&T) { char ch; scanf("%c",&ch); if(ch#) TNULL; else { T(BTNode*)malloc(sizeof(BT…...

如何在Linux中更改SSH端口?

SSH&#xff08;Secure Shell&#xff09;是一种安全的远程登录协议&#xff0c;它允许您通过网络远程连接到Linux系统并进行管理操作。默认情况下&#xff0c;SSH使用22端口进行通信。然而&#xff0c;为了增强系统的安全性&#xff0c;有时候我们需要更改SSH端口&#xff0c;…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

【HarmonyOS 5.0】DevEco Testing:鸿蒙应用质量保障的终极武器

——全方位测试解决方案与代码实战 一、工具定位与核心能力 DevEco Testing是HarmonyOS官方推出的​​一体化测试平台​​&#xff0c;覆盖应用全生命周期测试需求&#xff0c;主要提供五大核心能力&#xff1a; ​​测试类型​​​​检测目标​​​​关键指标​​功能体验基…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

Go 语言接口详解

Go 语言接口详解 核心概念 接口定义 在 Go 语言中&#xff0c;接口是一种抽象类型&#xff0c;它定义了一组方法的集合&#xff1a; // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的&#xff1a; // 矩形结构体…...

工程地质软件市场:发展现状、趋势与策略建议

一、引言 在工程建设领域&#xff0c;准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具&#xff0c;正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

k8s业务程序联调工具-KtConnect

概述 原理 工具作用是建立了一个从本地到集群的单向VPN&#xff0c;根据VPN原理&#xff0c;打通两个内网必然需要借助一个公共中继节点&#xff0c;ktconnect工具巧妙的利用k8s原生的portforward能力&#xff0c;简化了建立连接的过程&#xff0c;apiserver间接起到了中继节…...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

是否存在路径(FIFOBB算法)

题目描述 一个具有 n 个顶点e条边的无向图&#xff0c;该图顶点的编号依次为0到n-1且不存在顶点与自身相连的边。请使用FIFOBB算法编写程序&#xff0c;确定是否存在从顶点 source到顶点 destination的路径。 输入 第一行两个整数&#xff0c;分别表示n 和 e 的值&#xff08;1…...