当前位置: 首页 > news >正文

大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

1、broadcast广播

在这里插入图片描述

在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。

2、构建缓存

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject// 定义全局缓存单例对象
object GlobalCache extends Serializable {// 广播变量,用于存储缓存数据private var cacheData: Broadcast[collection.mutable.Map[String, JSONObject]] = _// 设置 SparkSession 和广播变量def setSparkSession(spark: SparkSession): Unit = {cacheData = spark.sparkContext.broadcast(collection.mutable.Map.empty[String, JSONObject])}// 按订单ID和用户ID缓存JSONObject对象def cacheJSONObject(orderId: String, userId: String, jsonObject: JSONObject): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.put(generateKey(orderId, userId), jsonObject)}}// 根据订单ID和用户ID删除缓存的JSONObject对象def removeJSONObject(orderId: String, userId: String): Unit = {// 获取广播变量的值并进行修改val data = cacheData.valuedata.synchronized {data.remove(generateKey(orderId, userId))}}// 根据订单ID和用户ID获取缓存的JSONObject对象def getJSONObjet(orderId: String, userId: String): JSONObject = {// 获取广播变量的值并进行访问val data = cacheData.valuedata.synchronized {data.get(generateKey(orderId, userId)).orNull}}// 生成缓存键,使用订单ID和用户ID拼接private def generateKey(orderId: String, userId: String): String = s"$orderId|$userId"
}

3、缓存测试

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}object CacheTest {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getRootLogger().setLevel(Level.ERROR) // 设置日志级别def addItem(orderId:String, userId:String, name:String): Unit = {val jsonObject = new JSONObject()jsonObject.put("name", name)// 缓存JSONObject对象GlobalCache.cacheJSONObject(orderId, userId, jsonObject)}def getCache(orderId: String, userId: String): JSONObject = {// 获取缓存的JSONObject对象GlobalCache.getJSONObjet(orderId, userId)}def delItem(orderId:String, userId:String): Unit = {// 删除缓存的JSONObject对象GlobalCache.removeJSONObject(orderId, userId)}def getSparkSession(appName: String, localType: Int): SparkSession = {val builder: SparkSession.Builder = SparkSession.builder().appName(appName)if (localType == 1) {builder.master("local[8]") // 本地模式,启用8个核心}val spark = builder.getOrCreate() // 获取或创建一个新的SparkSessionspark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别spark}def main(args: Array[String]): Unit = {println("Start CacheTest")val spark: SparkSession = getSparkSession("CacheTest", 1)GlobalCache.setSparkSession(spark)  // 构造全局缓存addItem("001", "456", "苹果")      // 添加元素addItem("002", "789", "香蕉")      // 添加元素var cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")delItem("001", "456")      // 删除元素cachedObject = getCache("001", "456")println(s"Cached Object: $cachedObject")spark.stop()}
}

4、控制台输出

Start CacheTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Cached Object: {"name":"苹果"}
Cached Object: nullProcess finished with exit code 0

相关文章:

大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

1、broadcast广播 在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。 2、构建缓存 import org.apache.spark.sql.SparkSession import org.apache.s…...

Android Service的生命周期,两种启动方法,有什么区别

Android Service的生命周期,两种启动方法,有什么区别 Android Service是一种后台组件,用于在后台执行长时间运行的任务,而无需与用户界面进行交互。Service具有自己的生命周期,其主要包含以下几个状态:创建…...

测试开源C#人脸识别模块ViewFaceCore(5:质量检测和眼睛状态检测)

ViewFaceCore模块中的FaceQuality支持预测人脸质量,最初以为是预测人体体重,实际测试过程中才发现是评估人脸图片质量,主要调用Detect函数执行图片质量检测操作,其函数原型如下所示: //// 摘要:// 人脸质量评估///…...

Go语言网络库net/http

Go语言网络库net/http Http 协议(Hyper Text Transfer Protocol,超文本传输协议)是一个简单的请求-响应协议,它通常运行在 TCP 之 上。超文本传输协议是互联网上应用最为广泛的一种网络传输协议,所有的WWW文件都必须遵守这个标准。 Http 协…...

Acwing.91 最短Hamilton路径(动态规划)

题目 给定一张n个点的带权无向图,点从0~n-1标号,求起点0到终点n-1的最短Hamilton路径。Hamilton路径的定义是从0到n-1不重不漏地经过每个点恰好一次。 输入格式 第—行输入整数n。 接下来n行每行n个整数,其中第i行第j个整数表示点i到j的距…...

[hfut] [important] v4l2 vedio使用总结/opevx/ffpeg/v4l2/opencv/cuda

(158条消息) linux驱动camera//test ok_感知算法工程师的博客-CSDN博客 (158条消息) linux V4L2子系统——v4l2架构(1)之整体架构_感知算法工程师的博客-CSDN博客 (158条消息) linux V4L2子系统——v4l2的结构体(2)之video_devi…...

2023年深圳杯数学建模A题影响城市居民身体健康的因素分析

2023年深圳杯数学建模 A题 影响城市居民身体健康的因素分析 原题再现: 以心脑血管疾病、糖尿病、恶性肿瘤以及慢性阻塞性肺病为代表的慢性非传染性疾病(以下简称慢性病)已经成为影响我国居民身体健康的重要问题。随着人们生活方式的改变&am…...

指令调度(Instruction Scheduling)

指令调度(Instruction Scheduling) 指令调度的约束基本机器模型基本块调度全局调度 指令调度是为了提高指令级并行(ILP),对于超长指令字(VLIW, Very Long Instruction Word)和多发射系统&#x…...

【运维】Linux 跨服务器复制文件文件夹

【运维】Linux 跨服务器复制文件文件夹 如果是云服务 建议用内网ip scp是secure copy的简写,用于在Linux下进行远程拷贝文件的命令,和它类似的命令有cp,不过cp只是在本机进行拷贝不能跨服务器,而且scp传输是加密的。可能会稍微影…...

k8s apiserver如何支持http访问?

原本是可以通过设置api-server的--insecure-port来实现,但是这个参数已经被废弃了,更好的方法则是使用proxy来实现: 在集群任意一个节点上起一个proxy服务,并设置允许所有host访问: kubectl proxy --address0.0.0.0 …...

Safetensors,高效安全易用的深度学习新工具

大家好,本文将介绍一种为深度学习应用提供速度、效率、跨平台兼容性、用户友好性和安全性的新工具。 Safetensors简介 Hugging Face开发了一种名为Safetensors的新序列化格式,旨在简化和精简大型复杂张量的存储和加载。张量是深度学习中使用的主要数据…...

Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用

Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用 目录 Unity 工具之 NuGetForUnity 包管理器,方便在 Unity 中的进行包管理的简单使用 一、简单介绍 二、NuGetForUnity 的下载导入 Unity 三、NuGetForUnity 在 Unity 的…...

运算放大器(二):恒流源

一、实现原理 恒流源的输出电流能够在一定范围内保持稳定,不会随负载的变化而变化。 通过运放,将输入的电压信号转换成满足一定关系的电流信号,转换后的电流相当一个输出可调的简易恒流源。 二、电路结构 常用的恒流源电路如…...

企业选择租用CRM还是一次性买断CRM?分别有哪些优势?

CRM是企业管理客户关系,提升销售业绩,实现业务增长的重要工具。市场上的CRM系统销售方式主要有两种——租用型和买断型。那么,租用CRM好还是一次性买断CRM好?本文将从以下几个方面进行分析: 1、什么是租用型CRM和买断…...

VBA_MF系列技术资料1-133

MF系列VBA技术资料 为了让广大学员在实际VBA编程中有切实可行的思路及有效的提高自己的编程技巧,我参考大量的资料,并结合自己的经验总结了这份MF系列VBA技术综合资料,而且开放源码(MF04除外),其中MF01-04属…...

Android 项目架构

🔥 什么是架构 🔥 在维基百科里是这样定义的: 软件架构是一个系统的轮廓 . 软件架构描述的对象是直接构成系统的抽象组件. 各个组件之间的连接则明确和相对细致地描述组件之间的通讯 . 在实现阶段, 这些抽象组件被细化为实际组件 , 比如具体某个类或者对象 . 面试的过程中…...

【Linux】进程通信 — 管道

文章目录 📖 前言1. 通信背景1.1 进程通信的目的:1.2 管道的引入: 2. 匿名管道2.1 匿名管道的原理:2.2 匿名管道的创建:2.3 父子进程通信:2.3.1 read()阻塞等待 2.4 父进程给子进程派发任务:2.5…...

ROS 2 — 托管(生命周期)节点简介

一、说明 这篇文章是关于理解ROS 2中托管(生命周期)节点的概念。我们描述了概念性的想法以及我们为什么需要它。所以让我们开始吧! 二、托管式节点 — 什么和为什么? 为了理解托管式节点,让我们从一个简单的问题陈述开…...

vue2企业级项目(六)

vue2企业级项目(六) 自定义指令 创建src/directive/index.js const directives require.context("./modules", true, /\.js$/);export default {install: (Vue) > {directives.keys().forEach((key) > {let directive directives(key…...

OSPF的选路原则

域内 --- 1类,2类LSA 域间 --- 3类LSA 域外 --- 5类,7类LSA --- 根据开销值的计算规则不同,还分为类型1和类型2 1.如果学到的路由都是通过1类,2类LSA获取的域内路由 --- 这种情况直接比较开销值,优先选择开销值小的路…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施,由雇主和个人按一定比例缴纳保险费,建立社会医疗保险基金,支付雇员医疗费用的一种医疗保险制度, 它是促进社会文明和进步的…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1):从基础到实战的深度解析-CSDN博客,但实际面试中,企业更关注候选人对复杂场景的应对能力(如多设备并发扫描、低功耗与高发现率的平衡)和前沿技术的…...

postgresql|数据库|只读用户的创建和删除(备忘)

CREATE USER read_only WITH PASSWORD 密码 -- 连接到xxx数据库 \c xxx -- 授予对xxx数据库的只读权限 GRANT CONNECT ON DATABASE xxx TO read_only; GRANT USAGE ON SCHEMA public TO read_only; GRANT SELECT ON ALL TABLES IN SCHEMA public TO read_only; GRANT EXECUTE O…...

linux 错误码总结

1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统,支持多语言对话(如 中文,英文,日语),语音情感(如 开心,悲伤)&#x…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...

Golang——6、指针和结构体

指针和结构体 1、指针1.1、指针地址和指针类型1.2、指针取值1.3、new和make 2、结构体2.1、type关键字的使用2.2、结构体的定义和初始化2.3、结构体方法和接收者2.4、给任意类型添加方法2.5、结构体的匿名字段2.6、嵌套结构体2.7、嵌套匿名结构体2.8、结构体的继承 3、结构体与…...

Web后端基础(基础知识)

BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器,应用程序的逻辑和数据都存储在服务端。 优点:维护方便缺点:体验一般 CS架构:Client/Server,客户端/服务器架构模式。需要单独…...

Spring AI Chat Memory 实战指南:Local 与 JDBC 存储集成

一个面向 Java 开发者的 Sring-Ai 示例工程项目,该项目是一个 Spring AI 快速入门的样例工程项目,旨在通过一些小的案例展示 Spring AI 框架的核心功能和使用方法。 项目采用模块化设计,每个模块都专注于特定的功能领域,便于学习和…...

【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权

摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题:安全。文章将详细阐述认证(Authentication) 与授权(Authorization的核心概念,对比传统 Session-Cookie 与现代 JWT(JS…...