当前位置: 首页 > news >正文

spark超大数据批量写入redis

利用spark的分布式优势,一次性批量将7000多万的数据写入到redis中。

# 配置spark接口
import os
import findspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/local/jdk1.8.0_192"
findspark.init("/usr/local/hadoop/spark-2.4.4-bin-hadoop2.6/")
# 设置配置信息
conf = SparkConf()
conf.set("spark.driver.memory", "16g")
conf.set("spark.executor.memory", "16g")
conf.set("spark.driver.maxResultSize","3g")
conf.set("spark.executor.maxResultSize", "3g")
conf.set("spark.ui.showConsoleProgress","false") # 取消进度条显示
spark = SparkSession.builder.appName("local_redis_spark").master("local[*]").enableHiveSupport().config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 提升日志级别
import redis
# 初始化一个全局函数来获取Redis连接池
def get_redis_connection_pool():# 配置redis参数host='127.0.0.1' # 替换为redis的服务地址即可port=6379password='123456' # 密码db=1 # db库如果不设置 默认为0max_connections=10  # 设置最大连接数redis_pool = redis.ConnectionPool(host=host, port=port, db=db, password=password, max_connections=max_connections)  return redis_pool# 清空旧数据
with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.flushdb() # 清空当前库的所有数据 而flushall()则情况所有库数据
%%time
# 并行处理函数serv_id
def servid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.serv_id] = str((rw.r_inst_id, rw.avg_value))# 批量写入write_to_redis(dat)# 并行处理函数one_id
def oneid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.r_inst_id] = str((rw.offer_list,rw.filter_prod_offer_inst_list,rw.fuka_serv_offer_list,rw.filter_list,rw.new_serv_id))# 批量写入write_to_redis(dat)# 加载缓存数据
oneid_sdf = spark.sql("""select * from database.table1""")servid_sdf = spark.sql("""select * from database.table2""")# 设置分区数 如果批量写入的内存大小以及最大链接数有限制
# servid_num_parts = 50000
# oneid_num_parts = 10000 # 使用repartition方法进行重新分区
# servid_sdf_part = servid_sdf.repartition(servid_num_parts)
# oneid_sdf_part = oneid_sdf.repartition(oneid_num_parts)# 分批写入redis
servid_sdf.foreachPartition(servid_pfun)
print(f"servid字典缓存成功")
oneid_sdf.foreachPartition(oneid_pfun)
print(f"oneid字典缓存成功")
# 关闭spark
spark.stop() 
print(f"redis缓存插入成功")

执行时间可能跟资源环境有关,测试整个过程大概只需要5分钟左右,非常快速。

相关文章:

spark超大数据批量写入redis

利用spark的分布式优势,一次性批量将7000多万的数据写入到redis中。 # 配置spark接口 import os import findspark from pyspark import SparkConf from pyspark.sql import SparkSession os.environ["JAVA_HOME"] "/usr/local/jdk1.8.0_192"…...

C# Socket的使用

C# 中的 System.Net.Sockets.Socket 类是 .NET Framework 提供的核心类,用于处理网络套接字编程。Socket 类是用于网络编程的基础类,它位于 System.Net.Sockets 命名空间中。 使用 Socket 类,可以创建客户端和服务器应用程序来进行基于TCP、…...

Spring Cloud + Vue前后端分离-第17章 生产打包与发布

源代码在GitHub - 629y/course: Spring Cloud Vue前后端分离-在线课程 Spring Cloud Vue前后端分离-第17章 生产打包与发布 17-1 注册中心配置中心Nacos 注册中心 Nacos 快速开始 | Nacos 本节内容:使用nacos作注册中心配置中心,不用eureka Nacos…...

力扣热题100_普通数组_56_合并区间

文章目录 题目链接解题思路解题代码 题目链接 56. 合并区间 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区…...

Springcloud OpenFeign 的实现(二)

Springcloud OpenFeign 的实现(一) 一、Feign request/response 压缩 您可以考虑为您的外部请求启用请求或响应GZIP压缩。您可以通过启用以下属性之一来完成此操作: feign.compression.request.enabledtrue feign.compression.response.en…...

[C++]智能指针用法

一、智能指针存在的意义 智能指针主要解决以下问题: (1)内存泄漏:内存手动释放,使用智能指针可以自动释放。 (2)共享所有权指针的传播和释放,比如多线程使用同一个对象时析构问题…...

六、行列式基本知识

目录 1、行列式的特性 2、行列式的计算方法: 2.1 通过行列式的定义去计算:对角法则。 2. 2 利用行列式的性质将行列式转化为上三角行列式: ①行列式的性质 : 性质一: 性质二: 性质三: 性质四:行列式之间的加法...

中断系统(详解与使用)

讲解 简介 中断是指计算机运行过程中,出现某些意外情况需主机干预时,机器能自动停止正在运行的程序并转入处理新情况的程序,处理完毕后又返回原被暂停的程序继续运行。 假设一个人在家看电视,这时候突然门铃响了,这个人此时就要停止看电视去开门,然后关上门后继续回来…...

uniapp开发微信小程序跳转到另一个小程序中

注意:一开始我的云上务工模块是单独的tabbar界面,但是小程序跳转好像不能直接点击tabbar进行,所以我将这里改成了点击首页中的按钮进行跳转 点击这里进行小程序跳转 目录 基础讲解 uniapp小程序跳转的两个方法 调用说明(半屏跳转…...

chatGPT 使用随想

一年前 chatGPT 刚出的时候,我就火速注册试用了。 因为自己就是 AI 行业的,所以想看看国际上最牛的 AI 到底发展到什么程度了. 自从一年前 chatGPT 火出圈之后,国际上的 AI 就一直被 OpenAI 这家公司引领潮流,一直到现在&#x…...

unity Aaimation Rigging使用多个约束导致部分约束失去作用

在应用多个约束时,在Hierarchy的顺序可能会影响最终的效果。例如先应用了Aim Constraint,然后再应用Two Bone Constraint,可能会导致Two Bone Constraint受到Aim Constraint的影响而失效。因此,在使用多个约束时,应该仔…...

什么是ChatGPT

国外有篇文章解释了ChatGPT的开发技术是什么,GPT-3和GPT-4的区别,以及未来的可能性。 截至 2023 年,ChatGPT 等生成式 AI 服务正在全球引起关注,并且正在探索在广泛领域的应用。 您可能想知道 ChatGPT 是使用哪种开发技术制作的&a…...

当我们浪费时我们在浪费什么

世界上的物质和能量不会增加也不会减少,为什么会存在浪费一说呢?是因为人类可以利用和支配的物质和能量是有限的,而且物质和能量的不同组织方式对于人类有着不同的价值。 人类对于世界的事物都有价值评估。例如一个玻璃杯摔碎了,…...

一文搞懂TCP三次握手与四次挥手

什么是TCP协议? TCP(Transmission control protocol)即传输控制协议,是一种面向连接、可靠的数据传输协议,它是为了在不可靠的互联网上提供可靠的端到端字节流而专门设计的一个传输协议。 面向连接:数据传…...

FairyGUI × Cocos Creator 3.7.3 引入报错解决

Cocos Creator 3.7.3引入fgui库 package.json添加这个依赖 "devDependencies": {"fairygui-cc": "latest"}执行npm i 报错解决 使用import引入fairygui-cc,就会有报错和警告,简单处理一下。 鼠标随便点一下也会出警告…...

网络原理 - HTTP/HTTPS(5)

HTTPS HTTPS也是一个应用层协议.在HTTP协议的基础上引入了一个加密层. HTTP协议内容都是按照文本的方式明文传输的. 这就导致了在传输过程中出现了一些被篡改的情况. 臭名昭著的"运营商劫持" 下载一个天天动听. 未被劫持的效果,点击下载按钮,就会弹出天天动听的…...

设计模式——抽象工厂模式

定义: 抽象工厂模式(Abstract Factory Pattern)提供一个创建一系列或相互依赖对象的接口,而无须指定它们具体的类。 概述:一个工厂可以提供创建多种相关产品的接口,而无需像工厂方法一样,为每一个产品都提供一个具体…...

详解编译和链接!

目录 1. 翻译环境和运行环境 2. 翻译环境 2.1 预处理 2.2 编译 2.3 汇编 2.4 链接 3. 运行环境 4.完结散花 悟已往之不谏,知来者犹可追 创作不易,宝子们!如果这篇文章对你们…...

力扣226 翻转二叉树 Java版本

文章目录 题目描述解题思路代码 题目描述 给你一棵二叉树的根节点 root ,翻转这棵二叉树,并返回其根节点。 示例 1: 输入:root [4,2,7,1,3,6,9] 输出:[4,7,2,9,6,3,1] 示例 2: 输入:root…...

免费的数据恢复软件哪个好?这10个数据恢复软件可以试试

遇到电脑、硬盘或U盘等设备中数据丢失,不用着急,数据恢复软件来帮你。 在遇到数据丢失的问题时,很多朋友都会很着急也不知道该怎么办。作为数据恢复小白,我们可以选择使用数据恢复软件进行扫描恢复。现在市面上的数据恢复软件很多…...

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…...

华为云AI开发平台ModelArts

华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...

HTML 语义化

目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案&#xff1a; 语义化标签&#xff1a; <header>&#xff1a;页头<nav>&#xff1a;导航<main>&#xff1a;主要内容<article>&#x…...

Spring Boot 实现流式响应(兼容 2.7.x)

在实际开发中&#xff0c;我们可能会遇到一些流式数据处理的场景&#xff0c;比如接收来自上游接口的 Server-Sent Events&#xff08;SSE&#xff09; 或 流式 JSON 内容&#xff0c;并将其原样中转给前端页面或客户端。这种情况下&#xff0c;传统的 RestTemplate 缓存机制会…...

JVM垃圾回收机制全解析

Java虚拟机&#xff08;JVM&#xff09;中的垃圾收集器&#xff08;Garbage Collector&#xff0c;简称GC&#xff09;是用于自动管理内存的机制。它负责识别和清除不再被程序使用的对象&#xff0c;从而释放内存空间&#xff0c;避免内存泄漏和内存溢出等问题。垃圾收集器在Ja…...

定时器任务——若依源码分析

分析util包下面的工具类schedule utils&#xff1a; ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类&#xff0c;封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz&#xff0c;先构建任务的 JobD…...

Matlab | matlab常用命令总结

常用命令 一、 基础操作与环境二、 矩阵与数组操作(核心)三、 绘图与可视化四、 编程与控制流五、 符号计算 (Symbolic Math Toolbox)六、 文件与数据 I/O七、 常用函数类别重要提示这是一份 MATLAB 常用命令和功能的总结,涵盖了基础操作、矩阵运算、绘图、编程和文件处理等…...

Spring AI 入门:Java 开发者的生成式 AI 实践之路

一、Spring AI 简介 在人工智能技术快速迭代的今天&#xff0c;Spring AI 作为 Spring 生态系统的新生力量&#xff0c;正在成为 Java 开发者拥抱生成式 AI 的最佳选择。该框架通过模块化设计实现了与主流 AI 服务&#xff08;如 OpenAI、Anthropic&#xff09;的无缝对接&…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...