当前位置: 首页 > news >正文

大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

1、broadcast广播

在这里插入图片描述

在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。

2、构建缓存

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject// 定义全局缓存单例对象
object GlobalCache extends Serializable {// 广播变量,用于存储缓存数据private var cacheData: Broadcast[collection.mutable.Map[String, JSONObject]] = _// 设置 SparkSession 和广播变量def setSparkSession(spark: SparkSession): Unit = {cacheData = spark.sparkContext.broadcast(collection.mutable.Map.empty[String, JSONObject])}// 按订单ID和用户ID缓存JSONObject对象def cacheJSONObject(orderId: String, userId: String, jsonObject: JSONObject): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.put(generateKey(orderId, userId), jsonObject)}}// 根据订单ID和用户ID删除缓存的JSONObject对象def removeJSONObject(orderId: String, userId: String): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.remove(generateKey(orderId, userId))}}// 根据订单ID和用户ID获取缓存的JSONObject对象def getJSONObjet(orderId: String, userId: String): JSONObject = {// 获取广播变量的值并进行访问val data = cacheData.valuedata.synchronized {data.get(generateKey(orderId, userId)).orNull}}// 生成缓存键,使用订单ID和用户ID拼接private def generateKey(orderId: String, userId: String): String = s"$orderId|$userId"
}

3、缓存测试

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}object CacheTest {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getRootLogger().setLevel(Level.ERROR) // 设置日志级别def addItem(orderId:String, userId:String, name:String): Unit = {val jsonObject = new JSONObject()jsonObject.put("name", name)// 缓存JSONObject对象GlobalCache.cacheJSONObject(orderId, userId, jsonObject)}def getCache(orderId: String, userId: String): JSONObject = {// 获取缓存的JSONObject对象GlobalCache.getJSONObjet(orderId, userId)}def delItem(orderId:String, userId:String): Unit = {// 删除缓存的JSONObject对象GlobalCache.removeJSONObject(orderId, userId)}def getSparkSession(appName: String, localType: Int): SparkSession = {val builder: SparkSession.Builder = SparkSession.builder().appName(appName)if (localType == 1) {builder.master("local[8]") // 本地模式,启用8个核心}val spark = builder.getOrCreate() // 获取或创建一个新的SparkSessionspark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别spark}def main(args: Array[String]): Unit = {println("Start CacheTest")val spark: SparkSession = getSparkSession("CacheTest", 1)GlobalCache.setSparkSession(spark)  // 构造全局缓存addItem("001", "456", "苹果")      // 添加元素addItem("002", "789", "香蕉")      // 添加元素var cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")delItem("001", "456")      // 删除元素cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")spark.stop()}
}

4、控制台输出

Start CacheTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Cached Object: {"name":"苹果"}
Cached Object: nullProcess finished with exit code 0

相关文章:

大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

1、broadcast广播 在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。 2、构建缓存 import org.apache.spark.sql.SparkSession import org.apache.s…...

Android Service的生命周期,两种启动方法,有什么区别

Android Service的生命周期,两种启动方法,有什么区别 Android Service是一种后台组件,用于在后台执行长时间运行的任务,而无需与用户界面进行交互。Service具有自己的生命周期,其主要包含以下几个状态:创建…...

测试开源C#人脸识别模块ViewFaceCore(5:质量检测和眼睛状态检测)

ViewFaceCore模块中的FaceQuality支持预测人脸质量,最初以为是预测人体体重,实际测试过程中才发现是评估人脸图片质量,主要调用Detect函数执行图片质量检测操作,其函数原型如下所示: //// 摘要:// 人脸质量评估///…...

Go语言网络库net/http

Go语言网络库net/http Http 协议(Hyper Text Transfer Protocol,超文本传输协议)是一个简单的请求-响应协议,它通常运行在 TCP 之 上。超文本传输协议是互联网上应用最为广泛的一种网络传输协议,所有的WWW文件都必须遵守这个标准。 Http 协…...

Acwing.91 最短Hamilton路径(动态规划)

题目 给定一张n个点的带权无向图,点从0~n-1标号,求起点0到终点n-1的最短Hamilton路径。Hamilton路径的定义是从0到n-1不重不漏地经过每个点恰好一次。 输入格式 第—行输入整数n。 接下来n行每行n个整数,其中第i行第j个整数表示点i到j的距…...

[hfut] [important] v4l2 vedio使用总结/opevx/ffpeg/v4l2/opencv/cuda

(158条消息) linux驱动camera//test ok_感知算法工程师的博客-CSDN博客 (158条消息) linux V4L2子系统——v4l2架构(1)之整体架构_感知算法工程师的博客-CSDN博客 (158条消息) linux V4L2子系统——v4l2的结构体(2)之video_devi…...

2023年深圳杯数学建模A题影响城市居民身体健康的因素分析

2023年深圳杯数学建模 A题 影响城市居民身体健康的因素分析 原题再现: 以心脑血管疾病、糖尿病、恶性肿瘤以及慢性阻塞性肺病为代表的慢性非传染性疾病(以下简称慢性病)已经成为影响我国居民身体健康的重要问题。随着人们生活方式的改变&am…...

指令调度(Instruction Scheduling)

指令调度(Instruction Scheduling) 指令调度的约束基本机器模型基本块调度全局调度 指令调度是为了提高指令级并行(ILP),对于超长指令字(VLIW, Very Long Instruction Word)和多发射系统&#x…...

【运维】Linux 跨服务器复制文件文件夹

【运维】Linux 跨服务器复制文件文件夹 如果是云服务 建议用内网ip scp是secure copy的简写,用于在Linux下进行远程拷贝文件的命令,和它类似的命令有cp,不过cp只是在本机进行拷贝不能跨服务器,而且scp传输是加密的。可能会稍微影…...

k8s apiserver如何支持http访问?

原本是可以通过设置api-server的--insecure-port来实现,但是这个参数已经被废弃了,更好的方法则是使用proxy来实现: 在集群任意一个节点上起一个proxy服务,并设置允许所有host访问: kubectl proxy --address0.0.0.0 …...

Safetensors,高效安全易用的深度学习新工具

大家好,本文将介绍一种为深度学习应用提供速度、效率、跨平台兼容性、用户友好性和安全性的新工具。 Safetensors简介 Hugging Face开发了一种名为Safetensors的新序列化格式,旨在简化和精简大型复杂张量的存储和加载。张量是深度学习中使用的主要数据…...

Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用

Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用 目录 Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用 一、简单介绍 二、NuGetForUnity 的下载导入 Unity 三、NuGetForUnity 在 Unity 的…...

运算放大器(二):恒流源

一、实现原理 恒流源的输出电流能够在一定范围内保持稳定,不会随负载的变化而变化。 通过运放,将输入的电压信号转换成满足一定关系的电流信号,转换后的电流相当一个输出可调的简易恒流源。 二、电路结构 常用的恒流源电路如…...

企业选择租用CRM还是一次性买断CRM?分别有哪些优势?

CRM是企业管理客户关系,提升销售业绩,实现业务增长的重要工具。市场上的CRM系统销售方式主要有两种——租用型和买断型。那么,租用CRM好还是一次性买断CRM好?本文将从以下几个方面进行分析: 1、什么是租用型CRM和买断…...

VBA_MF系列技术资料1-133

MF系列VBA技术资料 为了让广大学员在实际VBA编程中有切实可行的思路及有效的提高自己的编程技巧,我参考大量的资料,并结合自己的经验总结了这份MF系列VBA技术综合资料,而且开放源码(MF04除外),其中MF01-04属…...

Android 项目架构

🔥 什么是架构 🔥 在维基百科里是这样定义的: 软件架构是一个系统的轮廓 . 软件架构描述的对象是直接构成系统的抽象组件. 各个组件之间的连接则明确和相对细致地描述组件之间的通讯 . 在实现阶段, 这些抽象组件被细化为实际组件 , 比如具体某个类或者对象 . 面试的过程中…...

【Linux】进程通信 — 管道

文章目录 📖 前言1. 通信背景1.1 进程通信的目的:1.2 管道的引入: 2. 匿名管道2.1 匿名管道的原理:2.2 匿名管道的创建:2.3 父子进程通信:2.3.1 read()阻塞等待 2.4 父进程给子进程派发任务:2.5…...

ROS 2 — 托管(生命周期)节点简介

一、说明 这篇文章是关于理解ROS 2中托管(生命周期)节点的概念。我们描述了概念性的想法以及我们为什么需要它。所以让我们开始吧! 二、托管式节点 — 什么和为什么? 为了理解托管式节点,让我们从一个简单的问题陈述开…...

vue2企业级项目(六)

vue2企业级项目(六) 自定义指令 创建src/directive/index.js const directives require.context("./modules", true, /\.js$/);export default {install: (Vue) > {directives.keys().forEach((key) > {let directive directives(key…...

OSPF的选路原则

域内 --- 1类,2类LSA 域间 --- 3类LSA 域外 --- 5类,7类LSA --- 根据开销值的计算规则不同,还分为类型1和类型2 1.如果学到的路由都是通过1类,2类LSA获取的域内路由 --- 这种情况直接比较开销值,优先选择开销值小的路…...

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目:3442. 奇偶频次间的最大差值 I 思路 :哈希,时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况,哈希表这里用数组即可实现。 C版本: class Solution { public:int maxDifference(string s) {int a[26]…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中,拉取视频流只要求udp方式,从2016开始要求新增支持tcp被动和tcp主动两种方式,udp理论上会丢包的,所以实际使用过程可能会出现画面花屏的情况,而tcp肯定不丢包,起码…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见,必须要保持数据不可变,管理员都无法修改和留痕的要求。比如医疗的电子病历中,影像检查检验结果不可篡改行的,药品追溯过程中数据只可插入无法删除的特性需求;登录日志、修改日志…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建

制造业采购供应链管理是企业运营的核心环节,供应链协同管理在供应链上下游企业之间建立紧密的合作关系,通过信息共享、资源整合、业务协同等方式,实现供应链的全面管理和优化,提高供应链的效率和透明度,降低供应链的成…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)

骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...

今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存

文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...

【VLNs篇】07:NavRL—在动态环境中学习安全飞行

项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...

免费数学几何作图web平台

光锐软件免费数学工具,maths,数学制图,数学作图,几何作图,几何,AR开发,AR教育,增强现实,软件公司,XR,MR,VR,虚拟仿真,虚拟现实,混合现实,教育科技产品,职业模拟培训,高保真VR场景,结构互动课件,元宇宙http://xaglare.c…...

Web中间件--tomcat学习

Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机,它可以执行Java字节码。Java虚拟机是Java平台的一部分,Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...