当前位置: 首页 > news >正文

Spark---转换算子、行动算子、持久化算子

一、转换算子和行动算子

1、Transformations转换算子

1)、概念

Transformations类算子是一类算子(函数)叫做转换算子,如map、flatMap、reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。

2)、Transformation类算子

filter :过滤符合条件的记录数,true保留,false过滤掉

map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。

flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。

sample:随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。

reduceByKey:将相同的Key根据相应的逻辑进行处理。

sortByKey/sortBy:作用在K,V格式的RDD上,对Key进行升序或者降序排序。

2、Action行动算子

1)、概念:

Action类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。

2)、Action类算子

count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。

take(n):返回一个包含数据集前n个元素的集合。

first:first=take(1),返回数据集中的第一个元素。

foreach:循环遍历数据集中的每个元素,运行相应的逻辑。

collect:将计算结果回收到Driver端。

3)、demo:动态统计出现次数最多的单词个数,过滤掉。

  • 一千万条数据量的文件,过滤掉出现次数多的记录,并且其余记录按照出现次数降序排序。

假设有一个records.txt文件

hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark1
hello Spark
hello Spark
hello Spark2
hello Spark
hello Spark
hello Spark
hello Spark3
hello Spark
hello HDFS
hello hadoop
hello linux
hello Spark
hello Spark
hello Spark4
hello Spark
hello Spark
hello Spark5
hello Spark
hello Spark

代码处理:

package com.bjsxt.demo;import java.util.Arrays;
import java.util.List;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;
/*** 动态统计出现次数最多的单词个数,过滤掉。* @author root**/
public class Demo1 {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local").setAppName("demo1");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile("./records.txt");JavaRDD<String> flatMap = lines.flatMap(new FlatMapFunction<String, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String t) throws Exception {return Arrays.asList(t.split(" "));}});JavaPairRDD<String, Integer> mapToPair = flatMap.mapToPair(new PairFunction<String,String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String t) throws Exception {return new Tuple2<String, Integer>(t, 1);}});JavaPairRDD<String, Integer> sample = mapToPair.sample(true, 0.5);final List<Tuple2<String, Integer>> take = sample.reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}}).take(1);System.out.println("take--------"+take);JavaPairRDD<String, Integer> result = mapToPair.filter(new Function<Tuple2<String,Integer>, Boolean>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Boolean call(Tuple2<String, Integer> v1) throws Exception {return !v1._1.equals(take.get(0)._1);}}).reduceByKey(new Function2<Integer,Integer,Integer>(){/*** */private static final long serialVersionUID = 1L;@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1+v2;}}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> t)throws Exception {return new Tuple2<Integer, String>(t._2, t._1);}}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> t)throws Exception {return new Tuple2<String, Integer>(t._2, t._1);}});result.foreach(new VoidFunction<Tuple2<String,Integer>>() {/*** */private static final long serialVersionUID = 1L;@Overridepublic void call(Tuple2<String, Integer> t) throws Exception {System.out.println(t);}});jsc.stop();}
}

3、Spark代码流程

1)、创建SparkConf对象

可以设置Application name。

可以设置运行模式。

可以设置Spark application的资源需求。

2)、创建SparkContext对象

3)、基于Spark的上下文创建一个RDD,对RDD进行处理。

4)、应用程序中要有Action类算子来触发Transformation类算子执行。

5)、关闭Spark上下文对象SparkContext。

二、Spark持久化算子

1、控制算子

1)、概念

控制算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

2)、cache

默认将RDD的数据持久化到内存中。cache是懒执行。

注意:chche()=persist()=persist(StorageLevel.Memory_Only)

测试cache文件:

测试代码:

1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("CacheTest");
3.JavaSparkContext jsc = new JavaSparkContext(conf);
4.JavaRDD<String> lines = jsc.textFile("persistData.txt");
5.
6.lines = lines.cache();
7.long startTime = System.currentTimeMillis();
8.long count = lines.count();
9.long endTime = System.currentTimeMillis();
10.System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ 
11.(endTime-startTime));
12.
13.long countStartTime = System.currentTimeMillis();
14.long countrResult = lines.count();
15.long countEndTime = System.currentTimeMillis();
16.System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-
17.countStartTime));
18.
19.jsc.stop();

persist:

可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。”_2“表示有副本数。

持久化级别如下:

2、cache和persist的注意事项

1)、cache和persist都是懒执行,必须有一个action类算子触发执行。

2)、cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。

3)、cache和persist算子后不能立即紧跟action算子。

4)、cache和persist算子持久化的数据当applilcation执行完成之后会被清除。

错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

3、checkpoint

checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
  • persist(StorageLevel.DISK_ONLY)与Checkpoint的区别?

1)、checkpoint需要指定额外的目录存储数据,checkpoint数据是由外部的存储系统管理,不是Spark框架管理,当application完成之后,不会被清空。

2)、cache() 和persist() 持久化的数据是由Spark框架管理,当application完成之后,会被清空。

3)、checkpoint多用于保存状态。

  • checkpoint 的执行原理:

1)、当RDD的job执行完毕后,会从finalRDD从后往前回溯。

2)、当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。

3)、Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。

  • 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
  • 使用:
1.SparkConf conf = new SparkConf();
2.conf.setMaster("local").setAppName("checkpoint");
3.JavaSparkContext sc = new JavaSparkContext(conf);
4.sc.setCheckpointDir("./checkpoint");
5.JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
6.parallelize.checkpoint();
7.parallelize.count();
8.sc.stop();

相关文章:

Spark---转换算子、行动算子、持久化算子

一、转换算子和行动算子 1、Transformations转换算子 1&#xff09;、概念 Transformations类算子是一类算子&#xff08;函数&#xff09;叫做转换算子&#xff0c;如map、flatMap、reduceByKey等。Transformations算子是延迟执行&#xff0c;也叫懒加载执行。 2)、Transf…...

什么是关系型数据库?

什么是关系型数据库&#xff1f; 关系型数据库&#xff08;RDBMS&#xff09;是建立在关系模型基础上的数据库系统。关系模型是一种数据模型&#xff0c;它表示数据之间的联系&#xff0c;包括一对一、一对多和多对多的关系。在关系型数据库中&#xff0c;数据以表格的形式存储…...

【LeetCode】挑战100天 Day12(热题+面试经典150题)

【LeetCode】挑战100天 Day12&#xff08;热题面试经典150题&#xff09; 一、LeetCode介绍二、LeetCode 热题 HOT 100-142.1 题目2.2 题解 三、面试经典 150 题-143.1 题目3.2 题解 一、LeetCode介绍 LeetCode是一个在线编程网站&#xff0c;提供各种算法和数据结构的题目&…...

ArcGIS10.x系列 Python工具箱教程

ArcGIS10.x系列 Python工具箱教程 目录 1.前提 2.需要了解的资料 3.Python工具箱制作教程 4. Python工具箱具体样例代码&#xff08;DEM流域分析-河网等级矢量化&#xff09; 1.前提 如果你想自己写Python工具箱&#xff0c;那么假定你已经会ArcPy&#xff0c;如果只是自己…...

【蓝桥杯】刷题

刷题网站 记录总结刷题过程中遇到的一些问题 1、最大公约数与最小公倍数 a,bmap(int,input().split())sa*bwhile a%b:a,bb,a%bprint(b,s//b)2.迭代法求平方根(题号1021) #include<stdio.h> #include<math.h> int main() {double x11.0,x2;int a;scanf("%d&…...

软件产品登记的材料条件

(1&#xff09;申请双软认证前应该要获得信息产业部授权的软件检测机构出具的检测证明&#xff0c;这份检测证明可以到软件行业协会申请&#xff0c;然后协会会派专家到公司进行“检测”&#xff0c;检测通过后出具证明&#xff0c;这份证明的申请与软件著作权等无关&#xff0…...

春节后跟进客户开发信模板?外贸邮件模板?

适合新年的客户开发信模板&#xff1f;年后给客户的邮件怎么写&#xff1f; 在春节这一传统的中国节日结束后&#xff0c;跟进客户对于维持和发展业务至关重要。客户开发信模板是一种有效的工具。蜂邮将介绍一些春节后跟进客户开发信模板的关键技巧&#xff0c;以确保您的业务…...

个人财务管理软件CheckBook Pro mac中文版特点介绍

CheckBook Pro mac是一款Mac平台的个人财务管理软件&#xff0c;主要用于跟踪个人收入、支出和账户余额等信息。 CheckBook Pro mac 软件特点 简单易用&#xff1a;该软件的用户界面非常简洁明了&#xff0c;即使您是初学者也可以轻松上手。 多账户管理&#xff1a;该软件支持…...

rfc4301- IP 安全架构

1. 引言 1.1. 文档内容摘要 本文档规定了符合IPsec标准的系统的基本架构。它描述了如何为IP层的流量提供一组安全服务&#xff0c;同时适用于IPv4 [Pos81a] 和 IPv6 [DH98] 环境。本文档描述了实现IPsec的系统的要求&#xff0c;这些系统的基本元素以及如何将这些元素结合起来…...

【数据结构/C++】线性表_双链表基本操作

#include <iostream> using namespace std; typedef int ElemType; // 3. 双链表 typedef struct DNode {ElemType data;struct DNode *prior, *next; } DNode, *DLinkList; // 初始化带头结点 bool InitDNodeList(DLinkList &L) {L (DNode *)malloc(sizeof(DNode))…...

前端已死?看看我的秋招上岸历程

背景 求职方向&#xff1a;web前端 技术栈&#xff1a;vue2、springboot&#xff08;学校开过课&#xff0c;简单的学习过&#xff09; 实习经历&#xff1a;两段&#xff0c;但都是实训类的&#xff0c;说白了就是类似培训&#xff0c;每次面试官问起时我也会坦诚交代&…...

Flink Flink中的合流

一、Flink中的基本合流操作 在实际应用中&#xff0c;我们经常会遇到来源不同的多条流&#xff0c;需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍&#xff0c;对应的 API 也更加丰富。 二、联合&#xff08;Union&#xff09; 最简单的合流操作&#xf…...

工业园区重金属废水深度处理工程项目,稳定出水0.1mg/l

随着环保要求不断提高&#xff0c;工业废水处理已成为众多企业的必修课。然而在工业生产中&#xff0c;如何有效处理含有重金属的废水成为了一个关键的挑战。 重金属废水是指含有汞、铅、铜、镉、锌、镍等有毒有害物质的废水&#xff0c;来源于矿山开采、金属冶炼、电镀、印刷线…...

element table滚动条失效

问题描述:给el-table限制高度之后滚动条没了 给看看咋设置的&#xff1a; <el-table:data"tableData"style"width: 100%;"ref"table"max-height"400"sort-change"changeSort">对比了老半天找不出问题&#xff0c;最后…...

代码随想录算法训练营 ---第四十六天

第一题&#xff1a; 简介&#xff1a; 本题的重点在于确定背包容量和物品数量 确定dp数组以及下标的含义 dp[i] : 字符串长度为i的话&#xff0c;dp[i]为true&#xff0c;表示可以拆分为一个或多个在字典中出现的单词。 2.确定递推公式 如果确定dp[j] 是true&#xff0c;且…...

MySQL-02-InnoDB存储引擎

实际的业务系统开发中&#xff0c;使用MySQL数据库&#xff0c;我们使用最多的当然是支持事务并发的InnoDB存储引擎的这种表结构&#xff0c;下面我们介绍下InnoDB存储引擎相关的知识点。 1-Innodb体系架构 InnoDB存储引擎有多个内存块&#xff0c;可以认为这些内存块组成了一…...

Qt路径和Anaconda中QT路径冲突(ubuntu系统)

最近做一个项目需要配置QT库&#xff0c;本项目配置环境如下&#xff1a; Qt version 5 Operating system, version and so on ubuntu 20.04 Description 之前使用过anaconda环境安装过QT5&#xff0c;所以在项目中CMakeLists文件中使用find_package时候&#xff0c;默认使用An…...

vue2.js添加水印

通过canvas生成水印图片 function addWaterMark(str) {let ctx document.createElement("canvas");ctx.width 900;ctx.height 450;ctx.style.display "none";let cans ctx.getContext("2d");cans.rotate((-20 * Math.PI) / 180);cans.font…...

Eureka简单使用做微服务模块之间动态请求

创建一个eureka模块,引入eureka 为启动项加上EnableEurekaServer注解 配置信息 orderService和userService的操作是一样的 这里以orderService为例: 引入eureka客户端 加上 LoadBalanced注解 配置 orderService和userService都配置好了之后 启动 这样我们在http://localhos…...

竞赛选题 题目:基于深度学习卷积神经网络的花卉识别 - 深度学习 机器视觉

文章目录 0 前言1 项目背景2 花卉识别的基本原理3 算法实现3.1 预处理3.2 特征提取和选择3.3 分类器设计和决策3.4 卷积神经网络基本原理 4 算法实现4.1 花卉图像数据4.2 模块组成 5 项目执行结果6 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基…...

服务器硬防的应用场景都有哪些?

服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式&#xff0c;避免服务器受到各种恶意攻击和网络威胁&#xff0c;那么&#xff0c;服务器硬防通常都会应用在哪些场景当中呢&#xff1f; 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

实现弹窗随键盘上移居中

实现弹窗随键盘上移的核心思路 在Android中&#xff0c;可以通过监听键盘的显示和隐藏事件&#xff0c;动态调整弹窗的位置。关键点在于获取键盘高度&#xff0c;并计算剩余屏幕空间以重新定位弹窗。 // 在Activity或Fragment中设置键盘监听 val rootView findViewById<V…...

现有的 Redis 分布式锁库(如 Redisson)提供了哪些便利?

现有的 Redis 分布式锁库&#xff08;如 Redisson&#xff09;相比于开发者自己基于 Redis 命令&#xff08;如 SETNX, EXPIRE, DEL&#xff09;手动实现分布式锁&#xff0c;提供了巨大的便利性和健壮性。主要体现在以下几个方面&#xff1a; 原子性保证 (Atomicity)&#xff…...

MyBatis中关于缓存的理解

MyBatis缓存 MyBatis系统当中默认定义两级缓存&#xff1a;一级缓存、二级缓存 默认情况下&#xff0c;只有一级缓存开启&#xff08;sqlSession级别的缓存&#xff09;二级缓存需要手动开启配置&#xff0c;需要局域namespace级别的缓存 一级缓存&#xff08;本地缓存&#…...

Vue 模板语句的数据来源

&#x1f9e9; Vue 模板语句的数据来源&#xff1a;全方位解析 Vue 模板&#xff08;<template> 部分&#xff09;中的表达式、指令绑定&#xff08;如 v-bind, v-on&#xff09;和插值&#xff08;{{ }}&#xff09;都在一个特定的作用域内求值。这个作用域由当前 组件…...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题&#xff1a;安全。文章将详细阐述认证&#xff08;Authentication) 与授权&#xff08;Authorization的核心概念&#xff0c;对比传统 Session-Cookie 与现代 JWT&#xff08;JS…...

负载均衡器》》LVS、Nginx、HAproxy 区别

虚拟主机 先4&#xff0c;后7...

GAN模式奔溃的探讨论文综述(一)

简介 简介:今天带来一篇关于GAN的,对于模式奔溃的一个探讨的一个问题,帮助大家更好的解决训练中遇到的一个难题。 论文题目:An in-depth review and analysis of mode collapse in GAN 期刊:Machine Learning 链接:...

运行vue项目报错 errors and 0 warnings potentially fixable with the `--fix` option.

报错 找到package.json文件 找到这个修改成 "lint": "eslint --fix --ext .js,.vue src" 为elsint有配置结尾换行符&#xff0c;最后运行&#xff1a;npm run lint --fix...