大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache
1、broadcast广播

在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。
2、构建缓存
import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject// 定义全局缓存单例对象
object GlobalCache extends Serializable {// 广播变量,用于存储缓存数据private var cacheData: Broadcast[collection.mutable.Map[String, JSONObject]] = _// 设置 SparkSession 和广播变量def setSparkSession(spark: SparkSession): Unit = {cacheData = spark.sparkContext.broadcast(collection.mutable.Map.empty[String, JSONObject])}// 按订单ID和用户ID缓存JSONObject对象def cacheJSONObject(orderId: String, userId: String, jsonObject: JSONObject): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.put(generateKey(orderId, userId), jsonObject)}}// 根据订单ID和用户ID删除缓存的JSONObject对象def removeJSONObject(orderId: String, userId: String): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.remove(generateKey(orderId, userId))}}// 根据订单ID和用户ID获取缓存的JSONObject对象def getJSONObjet(orderId: String, userId: String): JSONObject = {// 获取广播变量的值并进行访问val data = cacheData.valuedata.synchronized {data.get(generateKey(orderId, userId)).orNull}}// 生成缓存键,使用订单ID和用户ID拼接private def generateKey(orderId: String, userId: String): String = s"$orderId|$userId"
}
3、缓存测试
import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}object CacheTest {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getRootLogger().setLevel(Level.ERROR) // 设置日志级别def addItem(orderId:String, userId:String, name:String): Unit = {val jsonObject = new JSONObject()jsonObject.put("name", name)// 缓存JSONObject对象GlobalCache.cacheJSONObject(orderId, userId, jsonObject)}def getCache(orderId: String, userId: String): JSONObject = {// 获取缓存的JSONObject对象GlobalCache.getJSONObjet(orderId, userId)}def delItem(orderId:String, userId:String): Unit = {// 删除缓存的JSONObject对象GlobalCache.removeJSONObject(orderId, userId)}def getSparkSession(appName: String, localType: Int): SparkSession = {val builder: SparkSession.Builder = SparkSession.builder().appName(appName)if (localType == 1) {builder.master("local[8]") // 本地模式,启用8个核心}val spark = builder.getOrCreate() // 获取或创建一个新的SparkSessionspark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别spark}def main(args: Array[String]): Unit = {println("Start CacheTest")val spark: SparkSession = getSparkSession("CacheTest", 1)GlobalCache.setSparkSession(spark) // 构造全局缓存addItem("001", "456", "苹果") // 添加元素addItem("002", "789", "香蕉") // 添加元素var cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")delItem("001", "456") // 删除元素cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")spark.stop()}
}
4、控制台输出
Start CacheTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Cached Object: {"name":"苹果"}
Cached Object: nullProcess finished with exit code 0
相关文章:
大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache
1、broadcast广播 在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。 2、构建缓存 import org.apache.spark.sql.SparkSession import org.apache.s…...
Android Service的生命周期,两种启动方法,有什么区别
Android Service的生命周期,两种启动方法,有什么区别 Android Service是一种后台组件,用于在后台执行长时间运行的任务,而无需与用户界面进行交互。Service具有自己的生命周期,其主要包含以下几个状态:创建…...
测试开源C#人脸识别模块ViewFaceCore(5:质量检测和眼睛状态检测)
ViewFaceCore模块中的FaceQuality支持预测人脸质量,最初以为是预测人体体重,实际测试过程中才发现是评估人脸图片质量,主要调用Detect函数执行图片质量检测操作,其函数原型如下所示: //// 摘要:// 人脸质量评估///…...
Go语言网络库net/http
Go语言网络库net/http Http 协议(Hyper Text Transfer Protocol,超文本传输协议)是一个简单的请求-响应协议,它通常运行在 TCP 之 上。超文本传输协议是互联网上应用最为广泛的一种网络传输协议,所有的WWW文件都必须遵守这个标准。 Http 协…...
Acwing.91 最短Hamilton路径(动态规划)
题目 给定一张n个点的带权无向图,点从0~n-1标号,求起点0到终点n-1的最短Hamilton路径。Hamilton路径的定义是从0到n-1不重不漏地经过每个点恰好一次。 输入格式 第—行输入整数n。 接下来n行每行n个整数,其中第i行第j个整数表示点i到j的距…...
[hfut] [important] v4l2 vedio使用总结/opevx/ffpeg/v4l2/opencv/cuda
(158条消息) linux驱动camera//test ok_感知算法工程师的博客-CSDN博客 (158条消息) linux V4L2子系统——v4l2架构(1)之整体架构_感知算法工程师的博客-CSDN博客 (158条消息) linux V4L2子系统——v4l2的结构体(2)之video_devi…...
2023年深圳杯数学建模A题影响城市居民身体健康的因素分析
2023年深圳杯数学建模 A题 影响城市居民身体健康的因素分析 原题再现: 以心脑血管疾病、糖尿病、恶性肿瘤以及慢性阻塞性肺病为代表的慢性非传染性疾病(以下简称慢性病)已经成为影响我国居民身体健康的重要问题。随着人们生活方式的改变&am…...
指令调度(Instruction Scheduling)
指令调度(Instruction Scheduling) 指令调度的约束基本机器模型基本块调度全局调度 指令调度是为了提高指令级并行(ILP),对于超长指令字(VLIW, Very Long Instruction Word)和多发射系统&#x…...
【运维】Linux 跨服务器复制文件文件夹
【运维】Linux 跨服务器复制文件文件夹 如果是云服务 建议用内网ip scp是secure copy的简写,用于在Linux下进行远程拷贝文件的命令,和它类似的命令有cp,不过cp只是在本机进行拷贝不能跨服务器,而且scp传输是加密的。可能会稍微影…...
k8s apiserver如何支持http访问?
原本是可以通过设置api-server的--insecure-port来实现,但是这个参数已经被废弃了,更好的方法则是使用proxy来实现: 在集群任意一个节点上起一个proxy服务,并设置允许所有host访问: kubectl proxy --address0.0.0.0 …...
Safetensors,高效安全易用的深度学习新工具
大家好,本文将介绍一种为深度学习应用提供速度、效率、跨平台兼容性、用户友好性和安全性的新工具。 Safetensors简介 Hugging Face开发了一种名为Safetensors的新序列化格式,旨在简化和精简大型复杂张量的存储和加载。张量是深度学习中使用的主要数据…...
Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用
Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用 目录 Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用 一、简单介绍 二、NuGetForUnity 的下载导入 Unity 三、NuGetForUnity 在 Unity 的…...
运算放大器(二):恒流源
一、实现原理 恒流源的输出电流能够在一定范围内保持稳定,不会随负载的变化而变化。 通过运放,将输入的电压信号转换成满足一定关系的电流信号,转换后的电流相当一个输出可调的简易恒流源。 二、电路结构 常用的恒流源电路如…...
企业选择租用CRM还是一次性买断CRM?分别有哪些优势?
CRM是企业管理客户关系,提升销售业绩,实现业务增长的重要工具。市场上的CRM系统销售方式主要有两种——租用型和买断型。那么,租用CRM好还是一次性买断CRM好?本文将从以下几个方面进行分析: 1、什么是租用型CRM和买断…...
VBA_MF系列技术资料1-133
MF系列VBA技术资料 为了让广大学员在实际VBA编程中有切实可行的思路及有效的提高自己的编程技巧,我参考大量的资料,并结合自己的经验总结了这份MF系列VBA技术综合资料,而且开放源码(MF04除外),其中MF01-04属…...
Android 项目架构
🔥 什么是架构 🔥 在维基百科里是这样定义的: 软件架构是一个系统的轮廓 . 软件架构描述的对象是直接构成系统的抽象组件. 各个组件之间的连接则明确和相对细致地描述组件之间的通讯 . 在实现阶段, 这些抽象组件被细化为实际组件 , 比如具体某个类或者对象 . 面试的过程中…...
【Linux】进程通信 — 管道
文章目录 📖 前言1. 通信背景1.1 进程通信的目的:1.2 管道的引入: 2. 匿名管道2.1 匿名管道的原理:2.2 匿名管道的创建:2.3 父子进程通信:2.3.1 read()阻塞等待 2.4 父进程给子进程派发任务:2.5…...
ROS 2 — 托管(生命周期)节点简介
一、说明 这篇文章是关于理解ROS 2中托管(生命周期)节点的概念。我们描述了概念性的想法以及我们为什么需要它。所以让我们开始吧! 二、托管式节点 — 什么和为什么? 为了理解托管式节点,让我们从一个简单的问题陈述开…...
vue2企业级项目(六)
vue2企业级项目(六) 自定义指令 创建src/directive/index.js const directives require.context("./modules", true, /\.js$/);export default {install: (Vue) > {directives.keys().forEach((key) > {let directive directives(key…...
OSPF的选路原则
域内 --- 1类,2类LSA 域间 --- 3类LSA 域外 --- 5类,7类LSA --- 根据开销值的计算规则不同,还分为类型1和类型2 1.如果学到的路由都是通过1类,2类LSA获取的域内路由 --- 这种情况直接比较开销值,优先选择开销值小的路…...
springboot 百货中心供应链管理系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,百货中心供应链管理系统被用户普遍使用,为方…...
应用升级/灾备测试时使用guarantee 闪回点迅速回退
1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间, 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点,不需要开启数据库闪回。…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...
免费数学几何作图web平台
光锐软件免费数学工具,maths,数学制图,数学作图,几何作图,几何,AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...
