Spark 3:Spark Core RDD持久化
RDD 的数据是过程数据
RDD 的缓存
# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 缓存到内存中rdd3.cache()# 先放内存,如果不够则放硬盘,2份副本rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())# 清理缓存rdd3.unpersist()time.sleep(100000)
RDD 的CheckPoint
# coding:utf8
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 告知spark, 开启CheckPoint功能sc.setCheckpointDir("hdfs://node1:8020/output/ckp")rdd1 = sc.textFile("../data/input/words.txt")rdd2 = rdd1.flatMap(lambda x: x.split(" "))rdd3 = rdd2.map(lambda x: (x, 1))# 调用checkpoint API 保存数据即可rdd3.checkpoint()rdd4 = rdd3.reduceByKey(lambda a, b: a + b)print(rdd4.collect())rdd5 = rdd3.groupByKey()rdd6 = rdd5.mapValues(lambda x: sum(x))print(rdd6.collect())rdd3.unpersist()time.sleep(100000)
Cache和Checkpoint区别
Cache是轻量化保存RDD数据,可存储在内存和硬盘,是分散存储,设计上数据是不安全的(保留RDD血缘关系)。
CheckPoint是重量级保存RDD数据,是集中存储,只能存储在硬盘(HDFS)上,设计上是安全的(不保留RDD血缘关系)。
Cache 和 CheckPoint的性能对比?
Cache性能更好,因为是分散存储,各个Executor并行执行,效率高,可以保存到内存中(占内存),更快。
CheckPoint比较慢,因为是集中存储,涉及到网络IO,但是存储到HDFS上更加安全(多副本) 。
案例练习:搜索引擎日志分析
# coding:utf8# 导入Spark的相关包
import timefrom pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from defs import context_jieba, filter_words, append_words, extract_user_and_word
from operator import addif __name__ == '__main__':# 0. 初始化执行环境 构建SparkContext对象conf = SparkConf().setAppName("test").setMaster("local[*]")sc = SparkContext(conf=conf)# 1. 读取数据文件file_rdd = sc.textFile("hdfs://node1:8020/input/SogouQ.txt")# 2. 对数据进行切分 \tsplit_rdd = file_rdd.map(lambda x: x.split("\t"))# 3. 因为要做多个需求, split_rdd 作为基础的rdd 会被多次使用.split_rdd.persist(StorageLevel.DISK_ONLY)# TODO: 需求1: 用户搜索的关键`词`分析# 主要分析热点词# 将所有的搜索内容取出# print(split_rdd.takeSample(True, 3))context_rdd = split_rdd.map(lambda x: x[2])# 对搜索的内容进行分词分析words_rdd = context_rdd.flatMap(context_jieba)# print(words_rdd.collect())# 院校 帮 -> 院校帮# 博学 谷 -> 博学谷# 传智播 客 -> 传智播客filtered_rdd = words_rdd.filter(filter_words)# 将关键词转换: 传智播 -> 传智播客final_words_rdd = filtered_rdd.map(append_words)# 对单词进行 分组 聚合 排序 求出前5名result1 = final_words_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求1结果: ", result1)# TODO: 需求2: 用户和关键词组合分析# 1, 我喜欢传智播客# 1+我 1+喜欢 1+传智播客user_content_rdd = split_rdd.map(lambda x: (x[1], x[2]))# 对用户的搜索内容进行分词, 分词后和用户ID再次组合user_word_with_one_rdd = user_content_rdd.flatMap(extract_user_and_word)# 对内容进行 分组 聚合 排序 求前5result2 = user_word_with_one_rdd.reduceByKey(lambda a, b: a + b).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\take(5)print("需求2结果: ", result2)# TODO: 需求3: 热门搜索时间段分析# 取出来所有的时间time_rdd = split_rdd.map(lambda x: x[0])# 对时间进行处理, 只保留小时精度即可hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0], 1))# 分组 聚合 排序result3 = hour_with_one_rdd.reduceByKey(add).\sortBy(lambda x: x[1], ascending=False, numPartitions=1).\collect()print("需求3结果: ", result3)time.sleep(100000)import jiebadef context_jieba(data):"""通过jieba分词工具 进行分词操作"""seg = jieba.cut_for_search(data)l = list()for word in seg:l.append(word)return ldef filter_words(data):"""过滤不要的 谷 \ 帮 \ 客"""return data not in ['谷', '帮', '客']def append_words(data):"""修订某些关键词的内容"""if data == '传智播': data = '传智播客'if data == '院校': data = '院校帮'if data == '博学': data = '博学谷'return (data, 1)def extract_user_and_word(data):"""传入数据是 元组 (1, 我喜欢传智播客)"""user_id = data[0]content = data[1]# 对content进行分词words = context_jieba(content)return_list = list()for word in words:# 不要忘记过滤 \谷 \ 帮 \ 客if filter_words(word):return_list.append((user_id + "_" + append_words(word)[0], 1))return return_list
提交到集群运行
为什么要在全部的服务器安装jieba库?
因为YARN是集群运行,Executor可以在所有服务器上执行,所以每个服务器都需要有jieba库提供支撑。
如何尽量提高任务计算的资源?
计算CPU核心和内存量,通过--executor-memory 指定executor内存,通过--executor-cores 指定
executor的核心数,通过--num-executors 指定总executor数量。
相关文章:

Spark 3:Spark Core RDD持久化
RDD 的数据是过程数据 RDD 的缓存 # coding:utf8 import timefrom pyspark import SparkConf, SparkContext from pyspark.storagelevel import StorageLevelif __name__ __main__:conf SparkConf().setAppName("test").setMaster("local[*]")sc SparkC…...

字节跳动五面都过了,结果被刷了,问了hr原因竟说是...
摘要 说在前面,面试时最好不要虚报工资。本来字节跳动是很想去的,几轮面试也通过了,最后没offer,自己只想到几个原因:1、虚报工资,比实际高30%;2、有更好的人选,这个可能性不大&…...

Python日期带时区转换工具类总结
文章目录 1.背景2. 遇到的坑3. 一些小案例3.1 当前日期、日期时间、UTC日期时间3.2 昨天、昨天UTC日期、昨天现在这个时间点的时间戳3.3 日期转时间戳3.4 时间戳转日期3.5 日期加减、小时的加减 4. 总结5. 完整的编码 1.背景 最近项目是国际项目,所以需要经常需要用…...

视频会议产品对比分析
内网视频会议系统如何选择?有很多单位为了保密,只能使用内部网络,无法连接互联网,那些SaaS视频会议就无法使用。在内网的优秀视频会议也有很多可供选择,以下是几个常用的: 1. 宝利通:它支持多种…...

每日一练 | 华为认证真题练习Day47
1、某台路由器输出信息如下,下列说法错误的是?(多选) A. 本路由器开启了区域认证 B. 本设备出现故障,配置的Router Id和实际生效的Router ID不一致 C. 本设备生效的Router Id为10.0.12.1 D. 本设备生效的Router Id为…...

ChatIE(LLM大模型用于信息抽取)
Zero-Shot Information Extraction via Chatting with ChatGPT paper:https://arxiv.org/abs/2302.10205 利用ChatGPT实现零样本信息抽取(Information Extraction,IE),看到零样本就能大概明白这篇文章将以ChatGPT作为…...

提升企业管理效率的利器——ADManager Plus
在当今信息时代,企业的规模和复杂性不断增长,管理各个方面变得愈发具有挑战性。而在企业管理中,活跃目录(Active Directory)起着至关重要的作用。它是一种用于组织内部的用户、计算机、组和其他对象进行集中管理的目录…...
《入侵的艺术》读书心得:第六章:渗透测试中的智慧与愚昧
第六章:渗透测试中的智慧与愚昧 这些想法是愚昧的 1.任何期待渗透测试结果是“毫无破绽”、“无懈可击”…都是极其愚昧的: 第一层含义:测试的不可穷尽性原理(同软件测试) 第二层含义:作为优秀甚至只是合…...

SAP-MM-采购申请-价值特性
采购申请审批在维护价值特性时要注意是抬头价值还是行价值,要确定选择哪个,配置时对应配置。 1、创建价值特性CT04 字段名称:CEBAN-GSWRT,和CEBAN-GFWRT 抬头总价值:CEBAN-GFWRT;如果选择的是抬头审批&am…...

设计模式 - 代理模式
基本介绍: 代理模式:为一个对象提供一个替身,以控制对这个对象的访问。即通过代理 对象访问目标对象.这样做的好处是:可以在目标对象实现的基础上,增强额外的 功能操作,即扩展目标对象的功能。被代理的对象可以是远程对象、创建开销大的对象或需要安全控…...

IOC初始化 IOC启动阶段 (Spring容器的启动流程)
[toc](IOC初始化 IOC启动阶段 (Spring容器的启动流程)) IOC初始化 IOC启动阶段 (Spring容器的启动流程) Resource定位过程:这个过程是指定位BeanDefinition的资源,也就是配置文件(如xml)的位置,并将其封装成Resource对…...
Java后端入职第四天,就被要求代码回退(Git回退实战)
一、需求背景 初入职场,由于自己的失误或者对git不熟悉,把被人的代码给冲突掉了,然后需要立马回滚,对于新手开发,应该比较常见吧!或者,比较多一种情况,错误把工程add了到了暂存区,比如一些本地配置,本来就不应该提交的,又或者,开发中只提交部分代码,又想最新的提…...
【swing】SplitPanel
当使用Java的Swing库来实现一个左右风格的SplitPanel时,可以使用JSplitPane作为容器,并在左边的面板中放置三个按钮,以及在右边的面板中显示图片。以下是一个示例代码: import javax.swing.*; import java.awt.*; import java.aw…...

网络货运平台源码 管理平台端+司机端APP+货主端APP源码
网络货运平台系统源码,网络货运平台源码 管理平台端司机端APP货主端APP 遵循政策要求的八项基本功能,结合货主、实际承运人、监管方等多方业务场景,构建人、车、货、企一体的标准化网络货运平台系统。具有信息发布、线上交易、全程监控、金融…...

Yarn学习笔记
Apache Hadoop YARN (Yet AnotherResource Negotiator,另一种资源协调者)是一种新的 Hadoop 资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一…...

智能路由器开发之OpenWrt简介
智能路由器开发之OpenWrt简介 1. 引言 1.1 智能路由器的重要性和应用场景 智能路由器作为网络通信的核心设备,具有重要的地位和广泛的应用场景。传统的路由器主要提供基本的网络连接功能,但随着智能家居、物联网和大数据应用的快速发展,对于…...

Linux音频和视频命令速查表
在Linux系统中,有许多命令可以帮助我们处理音频和视频文件,从基本的播放和转码,到编辑和处理音频、视频流。 本文将提供一个Linux音频和视频命令速查表,帮助您快速查找并了解各种常用的命令及其用法。 音频命令 播放音频文件 a…...

脉蜂:Django + Flutter 开发的进销存管理系统【已开源】
项目说明 小规模零售(包括电商)跟大规模零售企业的差别在哪里呢? 以我当前的认知来看,小规模零售跟大规模零售企业的差别更多的是在供应链管理、进销存管控上面产生的。如果有一个工具,能够帮他们减少这方面的差异&…...
树的先序,中序,后序递归遍历
//树的先序、中序、后序遍历递归 #include<bits/stdc.h> typedef struct node { char data; struct node *lchild,*rchild; }BTNode; void Greate(BTNode *&T) { char ch; scanf("%c",&ch); if(ch#) TNULL; else { T(BTNode*)malloc(sizeof(BT…...

如何在Linux中更改SSH端口?
SSH(Secure Shell)是一种安全的远程登录协议,它允许您通过网络远程连接到Linux系统并进行管理操作。默认情况下,SSH使用22端口进行通信。然而,为了增强系统的安全性,有时候我们需要更改SSH端口,…...

回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...

深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...
Oracle11g安装包
Oracle 11g安装包 适用于windows系统,64位 下载路径 oracle 11g 安装包...

mac:大模型系列测试
0 MAC 前几天经过学生优惠以及国补17K入手了mac studio,然后这两天亲自测试其模型行运用能力如何,是否支持微调、推理速度等能力。下面进入正文。 1 mac 与 unsloth 按照下面的进行安装以及测试,是可以跑通文章里面的代码。训练速度也是很快的。 注意…...
PostgreSQL 与 SQL 基础:为 Fast API 打下数据基础
在构建任何动态、数据驱动的Web API时,一个稳定高效的数据存储方案是不可或缺的。对于使用Python FastAPI的开发者来说,深入理解关系型数据库的工作原理、掌握SQL这门与数据库“对话”的语言,以及学会如何在Python中操作数据库,是…...

Ubuntu 安装 Mysql 数据库
首先更新apt-get工具,执行命令如下: apt-get upgrade安装Mysql,执行如下命令: apt-get install mysql-server 开启Mysql 服务,执行命令如下: service mysql start并确认是否成功开启mysql,执行命令如下&am…...

实现p2p的webrtc-srs版本
1. 基本知识 1.1 webrtc 一、WebRTC的本质:实时通信的“网络协议栈”类比 将WebRTC类比为Linux网络协议栈极具洞察力,二者在架构设计和功能定位上高度相似: 分层协议栈架构 Linux网络协议栈:从底层物理层到应用层(如…...
【中间件】Web服务、消息队列、缓存与微服务治理:Nginx、Kafka、Redis、Nacos 详解
Nginx 是什么:高性能的HTTP和反向代理Web服务器。怎么用:通过配置文件定义代理规则、负载均衡、静态资源服务等。为什么用:提升Web服务性能、高并发处理、负载均衡和反向代理。优缺点:轻量高效,但动态处理能力较弱&am…...
Amazon RDS on AWS Outposts:解锁本地化云数据库的混合云新体验
在混合云架构成为企业数字化转型标配的今天,如何在本地数据中心享受云数据库的强大能力,同时满足数据本地化、低延迟访问的严苛需求?Amazon RDS on AWS Outposts 给出了完美答案——将AWS完全托管的云数据库服务无缝延伸至您的机房࿰…...