当前位置: 首页 > news >正文

Spark-06:Spark 共享变量

目录

1.广播变量(broadcast variables)

2.累加器(accumulators)


      在分布式计算中,当在集群的多个节点上并行运行函数时,默认情况下,每个任务都会获得函数中使用到的变量的一个副本。如果变量很大,这会导致网络传输占用大量带宽,并且在每个节点上都占用大量内存空间。为了解决这个问题,Spark引入了共享变量的概念。

        共享变量允许在多个任务之间共享数据,而不是为每个任务分别复制一份变量。这样可以显著降低网络传输的开销和内存占用。Spark提供了两种类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

1.广播变量(broadcast variables)

        通常情况下,Spark程序运行时,通常会将数据以副本的形式分发到每个执行器(Executor)的任务(Task)中,但当变量较大时,这会导致大量的内存和网络开销。通过使用广播变量,Spark将变量只发送一次到每个节点,并在多个任务之间共享这个副本,从而显著降低了内存占用和网络传输的开销。

Scala 实现:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Java 实现:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});broadcastVar.value();
// returns [1, 2, 3]

2.累加器(accumulators)

        累加器是Spark中的一种特殊类型的共享变量,主要用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。累加器支持的数据类型仅限于数值类型,包括整数和浮点数等。

Scala 实现:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.value
res2: Long = 10

Java 实现:

LongAccumulator accum = jsc.sc().longAccumulator();sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 saccum.value();
// returns 10

        内置累加器功能有限,但可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个方法必须重写:reset用于将累加器重置为零,add用于向累加器中添加另一个值,merge用于将另一个相同类型的累加器合并到此累加器。

自定义累加器Scala实现:

package com.yichenkeji.demo.sparkscalaimport org.apache.spark.util.AccumulatorV2class CustomAccumulator extends AccumulatorV2[Int, Int]{//初始化累加器的值private var sum = 0override def isZero: Boolean = sum == 0override def copy(): AccumulatorV2[Int, Int] = {val newAcc = new CustomAccumulator()newAcc.sum = sumnewAcc}override def reset(): Unit = sum = 0override def add(v: Int): Unit = sum += voverride def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.valueoverride def value: Int = sum
}

自定义累加器Java实现:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.util.AccumulatorV2;public class CustomAccumulator extends AccumulatorV2<Integer, Integer> {// 初始化累加器的值private Integer sum = 0;@Overridepublic boolean isZero() {return sum == 0;}@Overridepublic AccumulatorV2<Integer, Integer> copy() {CustomAccumulator customAccumulator = new CustomAccumulator();customAccumulator.sum = this.sum;return customAccumulator;}@Overridepublic void reset() {this.sum = 0;}@Overridepublic void add(Integer v) {this.sum += v;}@Overridepublic void merge(AccumulatorV2<Integer, Integer> other) {this.sum += ((CustomAccumulator) other).sum;}@Overridepublic Integer value() {return sum;}
}

自定义累加器的使用:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;
import java.util.List;public class AccumulatorTest {public static void main(String[] args) {//1.初始化SparkContext对象SparkConf sparkConf = new SparkConf().setAppName("Spark Java").setMaster("local[*]");JavaSparkContext sc = new JavaSparkContext(sparkConf);CustomAccumulator customAccumulator = new CustomAccumulator();//注册自定义累加器才能使用sc.sc().register(customAccumulator);sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).foreach(x -> customAccumulator.add(x));System.out.println(customAccumulator.value());//5.停止SparkContextsc.stop();}
}

相关文章:

Spark-06:Spark 共享变量

目录 1.广播变量&#xff08;broadcast variables&#xff09; 2.累加器&#xff08;accumulators&#xff09; 在分布式计算中&#xff0c;当在集群的多个节点上并行运行函数时&#xff0c;默认情况下&#xff0c;每个任务都会获得函数中使用到的变量的一个副本。如果变量很…...

Spring整合web环境

目录 Javaweb三大组件及环境特点 Spring整合web环境的思路及实现 Spring的web开发组件spring-web MVC框架思想及其设计思路 Javaweb三大组件及环境特点 Spring整合web环境的思路及实现 package com.xfy.listener;import com.xfy.config.SpringConfig; import org.springfra…...

分享从零开始学习网络设备配置--任务4.3 使用动态路由RIPng实现网络连通

任务描述 某公司使用IPv6技术搭建企业网络&#xff0c;由于静态路由需要管理员手工配置&#xff0c;在网络拓扑发生变化时&#xff0c;也不会自动生成新的路由&#xff0c;因此采用IPv6动态路由协议RIPng实现网络连通&#xff0c;实现任意两个节点之间的通信&#xff0c;并降低…...

vue2.0+elementui集成file-loader之后图标失效问题

背景 跑vue2elementUI项目时&#xff0c;由于前端这边需要在本地存放xlsx模板文件&#xff0c;供用户下载模板文件&#xff0c;所以需要在webpack构建的时候增加file-loader进行解析xlsx文件打包。 vue版本2.x element-ui 版本 2.13.x 注意 npm i -D file-loader版本号给vue项…...

C# 文件帮助类(FileHelper)

引言 在研究程序反射的时候我们往往需要获取当前运行程序所引用的dll文件,按照传统的方式我们可以维护一个这样的列表,但是这样维护成本实在是太高,而且不利于团队合作开发,在高版本的.net 4.6.2之后官方出了专门的dll帮我们做这个事情Microsoft.Extensions.DependencyMod…...

WordPress 外链跳转插件

WordPress 外链跳转插件是本站开发的一款WordPress插件&#xff0c;能对文中外链添加一层过滤&#xff0c;有效防止追踪&#xff0c;以及提醒用户。 类似于知乎、CSDN打开其他链接的提示。 后台可以设置白名单 学习资料源代码&#xff1a;百度网盘 密码&#xff1a;123...

算法的10大排序

10大排序算法--python 一颗星--选择排序一颗星--冒泡排序一颗星--插入排序两颗星--归并排序&#xff08;递归-难&#xff09;三颗星--桶排序三颗星--计数排序四颗星--基数排序四颗星--快速排序&#xff0c;寻找标志位&#xff08;递归-难&#xff09;四颗星--又是比较难的希尔排…...

“十道机器学习问题,帮助你了解基础知识和常见算法“

目录 简介&#xff1a; 1. 什么是机器学习&#xff1f;它与传统编程有什么不同之处&#xff1f;2. 请解释监督学习和无监督学习的区别。3. 什么是过拟合和欠拟合&#xff1f;如何解决这些问题&#xff1f;4. 请解释交叉验证在机器学习中的作用。5. 什么是特征选择&#xff1f;为…...

部署WAF安全应用防火墙(openresty部署)

使用NGINX+Openresty实现WAF功能 一、了解WAF 1.1 什么是WAF Web应用防护系统(也称:网站应用级入侵防御系统 。英文:Web Application Firewall,简称: WAF)。利用国际上公认的一种说法:Web应用 防火墙 是通过执行一系列针对HTTP/HTTPS的 安全策略 来专门为Web应用提供保…...

yml转properties工具

目前搜索到的大部分代码都存在以下问题&#xff1a; 复杂结构解析丢失解析后顺序错乱 所以自己写了一个&#xff0c;经过不充分测试&#xff0c;基本满足使用。可以直接在线使用 在线地址 除了yml和properties互转之外&#xff0c;还可以生成代码、sql转json等&#xff0c;可…...

zerotier 搭建 moon中转服务器 及 自建planet

搭建moon 服务器 环境准备 # 安装依赖 yum install wget gcc gcc-c git -y yum install json-devel -y# 下载及安装 curl -s https://install.zerotier.com/ | sudo bash节点ID 配置 配置moon.json文件 cd /var/lib/zerotier-one/# 导出依赖 zerotier-idtool initmoon ide…...

深入了解Rabbit加密技术:原理、实现与应用

一、引言 在信息时代&#xff0c;数据安全愈发受到重视&#xff0c;加密技术作为保障信息安全的核心手段&#xff0c;得到了广泛的研究与应用。Rabbit加密技术作为一种新型加密方法&#xff0c;具有较高的安全性和便捷性。本文将对Rabbit加密技术进行深入探讨&#xff0c;分析…...

Linux常用命令——mv命令

文章目录 1. 简介2. 命令格式3. 主要参数4. 常见用法及示例4.1 移动文件4.2 重命名文件4.3 交互式移动文件4.4 强制移动文件4.5 移动多个文件4.6 使用通配符移动文件 5. 注意事项6. 结论 1. 简介 mv 命令在Linux系统中用于移动文件或目录&#xff0c;同时也可以用于重命名文件…...

Panalog 日志审计系统 前台RCE漏洞复现

0x01 产品简介 Panalog是一款日志审计系统&#xff0c;方便用户统一集中监控、管理在网的海量设备。 0x02 漏洞概述 Panalog日志审计系统 sy_query.php接口处存在远程命令执行漏洞&#xff0c;攻击者可执行任意命令&#xff0c;接管服务器权限。 0x03 复现环境 FOFA&#xf…...

Android设置文字颜色渐变

项目中用到了很多文字颜色渐变的设计&#xff0c;因此做一下记录。 核心代码如下&#xff1a; /*** 统一文字渐变色设置* param colors 渐变色字符串数组* param positions 渐变色位置数组&#xff0c;可为空* param start 渐变起始点&#xff0c;可为空* param end 渐变结束…...

java基础面试题(二)

java后端面试题大全 3.JVM3.1 对象实例、类信息、常量、静态变量分别在运行时数据区的哪个位置?3.2 java类的加载流程3.3 java内存溢出什么时候会发生以及解决方法 3.JVM 3.1 对象实例、类信息、常量、静态变量分别在运行时数据区的哪个位置? 堆 对象实例、String常量池、基…...

php爬虫实现把目标页面变成自己的网站页面

最近又被烦的不行&#xff0c;琐事不断&#xff0c;要是比起懒来一个人比一个人懒&#xff0c;但是懒要转换成动力啊&#xff0c;能让自己真正的偷懒&#xff0c;而不是浪费时间。每天还是需要不断的学习的&#xff0c;才能更好的提高效率&#xff0c;把之前做的简单小功能爬虫…...

[c语言c++]手写你自己的swap交换函数

函数传参有按值传递&#xff0c;指针传递&#xff0c;引用传递&#xff0c;分别看一下三种情况下的交换函数如何书写&#xff0c;应该使用哪种最方便。 当书写一个交换两个值的 swap 函数时&#xff0c;我们可以分别使用按值传参、指针传参和引用传参的方式来实现。下面是示例和…...

技术类知识汇总(二)

在自己日常学习javaweb的过程中&#xff0c;做的一些笔记和总结&#xff0c;汇总如下&#xff1a; Springboot项目的静态资源(html&#xff0c;css&#xff0c;js等前端资源)默认存放目录为&#xff1a;classpath:/static classpath:/public classpath:/resources"三层架…...

简单好用!日常写给 ChatGPT 的几个提示词技巧

ChatGPT 很强&#xff0c;但是有时候又显得很蠢&#xff0c;下面是使用 GPT4 的一个实例&#xff1a; 技巧一&#xff1a;三重冒号 """ 引用内容使用三重冒号 """&#xff0c;让 ChatGPT 清晰引用的内容&#xff1a; 技巧二&#xff1a;角色设定…...

pytorch分布式训练

1 基本概念 rank&#xff1a;进程号&#xff0c;在多进程上下文中&#xff0c;我们通常假定rank 0是第一个进程或者主进程&#xff0c;其它进程分别具有1&#xff0c;2&#xff0c;3不同rank号&#xff0c;这样总共具有4个进程 node&#xff1a;物理节点&#xff0c;可以是一个…...

【PyTorch】(三)模型的创建、参数初始化、保存和加载

文章目录 1. 模型的创建1.1. 创建方法1.1.1. 通过使用模型组件1.1.2. 通过继承nn.Module类 1.2. 模型组件1.2.1. 网络层1.2.2. 函数包1.2.3. 容器 1.3. 将模型转移到GPU 2. 模型参数初始化3. 模型的保存与加载3.1. 只保存参数3.2. 保存模型和参数 1. 模型的创建 1.1. 创建方法…...

高效开发之:判断复杂list中的对象属性是否包含某个值

技术使用&#xff1a;使用了Java 8引入的Stream API以及Optional类。这些特性用于简化集合的处理和减少空指针异常。 List<ResourceInfoDto> authData chatBase.getData();String baseName dto.getBaseName();Optional<ResourceInfoDto> authWithResourceCode a…...

MacOS + Android Studio 通过 USB 数据线真机调试

环境&#xff1a;Apple M1 MacOS Sonoma 14.1.1 软件&#xff1a;Android Studio Giraffe | 2022.3.1 Patch 3 设备&#xff1a;小米10 Android 13 一、创建测试项目 安卓 HelloWorld 项目: 安卓 HelloWorld 项目 二、数据线连接手机 1. 手机开启开发者模式 参考&#xff1…...

部署jekins遇到的问题

jdk问题 我用的jdk版本是21的结果版本太新了&#xff0c;启动jekins服务的时候总是报错最后在jekins的安装目录下面的jekinsErr.log查看日志发现是jdk问题最后换了一个17版本的就解决了。 unity和jekins jekins和Git源码管理 jekins和Git联动使用 我想让jekins每次打包的时…...

SQLY优化

insert优化 1.批量插入 手动事务提交 主键顺序插入&#xff0c;主键顺序插入性能高于乱序插入 2.大批量插入数据 如果一次性需要插入大批量数据&#xff0c;使用Insert语句插入性能较低&#xff0c;此时可以使用MYSQL数据库提供的load指令进行插入 主键优化 主键设计原则 …...

设计模式——行为型模式(一)

行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。 行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行…...

Rust语言入门教程(六) - 字符串类型

在Rust中&#xff0c; 字符串类型其实是一个比较复杂的话题。在Rust的标准库中&#xff0c;至少都提供了6种字符串类型&#xff0c;我们平常使用的最多的是其中的两种。这两种类型互相之间也有所关联&#xff1a; str&#xff1a; 字符串切片String 字符串 其中&#xff0c; 字…...

【MATLAB源码-第92期】基于simulink的QPSK调制解调仿真,采用相干解调对比原始信号和解调信号。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 QPSK&#xff0c;有时也称作四位元PSK、四相位PSK、4-PSK&#xff0c;在坐标图上看是圆上四个对称的点。通过四个相位&#xff0c;QPSK可以编码2位元符号。图中采用格雷码来达到最小位元错误率&#xff08;BER&#xff09; —…...

关于C语言控制浮点数输出精度问题

众所周知 C语言在控制一个浮点数输出精度的时候是在%和f之间加上一个.(想要控制的精度) 如&#xff1a;printf("%.2f", num); 问&#xff0c;试问&#xff1a;&#xff08;你就是我的御主吗&#xff1f;&#xff09;如果输出的精度是根据输入的数字变化的怎么办&am…...