当前位置: 首页 > news >正文

spark超大数据批量写入redis

利用spark的分布式优势,一次性批量将7000多万的数据写入到redis中。

# 配置spark接口
import os
import findspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/local/jdk1.8.0_192"
findspark.init("/usr/local/hadoop/spark-2.4.4-bin-hadoop2.6/")
# 设置配置信息
conf = SparkConf()
conf.set("spark.driver.memory", "16g")
conf.set("spark.executor.memory", "16g")
conf.set("spark.driver.maxResultSize","3g")
conf.set("spark.executor.maxResultSize", "3g")
conf.set("spark.ui.showConsoleProgress","false") # 取消进度条显示
spark = SparkSession.builder.appName("local_redis_spark").master("local[*]").enableHiveSupport().config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 提升日志级别
import redis
# 初始化一个全局函数来获取Redis连接池
def get_redis_connection_pool():# 配置redis参数host='127.0.0.1' # 替换为redis的服务地址即可port=6379password='123456' # 密码db=1 # db库如果不设置 默认为0max_connections=10  # 设置最大连接数redis_pool = redis.ConnectionPool(host=host, port=port, db=db, password=password, max_connections=max_connections)  return redis_pool# 清空旧数据
with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.flushdb() # 清空当前库的所有数据 而flushall()则情况所有库数据
%%time
# 并行处理函数serv_id
def servid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.serv_id] = str((rw.r_inst_id, rw.avg_value))# 批量写入write_to_redis(dat)# 并行处理函数one_id
def oneid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.r_inst_id] = str((rw.offer_list,rw.filter_prod_offer_inst_list,rw.fuka_serv_offer_list,rw.filter_list,rw.new_serv_id))# 批量写入write_to_redis(dat)# 加载缓存数据
oneid_sdf = spark.sql("""select * from database.table1""")servid_sdf = spark.sql("""select * from database.table2""")# 设置分区数 如果批量写入的内存大小以及最大链接数有限制
# servid_num_parts = 50000
# oneid_num_parts = 10000 # 使用repartition方法进行重新分区
# servid_sdf_part = servid_sdf.repartition(servid_num_parts)
# oneid_sdf_part = oneid_sdf.repartition(oneid_num_parts)# 分批写入redis
servid_sdf.foreachPartition(servid_pfun)
print(f"servid字典缓存成功")
oneid_sdf.foreachPartition(oneid_pfun)
print(f"oneid字典缓存成功")
# 关闭spark
spark.stop() 
print(f"redis缓存插入成功")

执行时间可能跟资源环境有关,测试整个过程大概只需要5分钟左右,非常快速。

相关文章:

spark超大数据批量写入redis

利用spark的分布式优势,一次性批量将7000多万的数据写入到redis中。 # 配置spark接口 import os import findspark from pyspark import SparkConf from pyspark.sql import SparkSession os.environ["JAVA_HOME"] "/usr/local/jdk1.8.0_192"…...

C# Socket的使用

C# 中的 System.Net.Sockets.Socket 类是 .NET Framework 提供的核心类,用于处理网络套接字编程。Socket 类是用于网络编程的基础类,它位于 System.Net.Sockets 命名空间中。 使用 Socket 类,可以创建客户端和服务器应用程序来进行基于TCP、…...

Spring Cloud + Vue前后端分离-第17章 生产打包与发布

源代码在GitHub - 629y/course: Spring Cloud Vue前后端分离-在线课程 Spring Cloud Vue前后端分离-第17章 生产打包与发布 17-1 注册中心配置中心Nacos 注册中心 Nacos 快速开始 | Nacos 本节内容:使用nacos作注册中心配置中心,不用eureka Nacos…...

力扣热题100_普通数组_56_合并区间

文章目录 题目链接解题思路解题代码 题目链接 56. 合并区间 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区…...

Springcloud OpenFeign 的实现(二)

Springcloud OpenFeign 的实现(一) 一、Feign request/response 压缩 您可以考虑为您的外部请求启用请求或响应GZIP压缩。您可以通过启用以下属性之一来完成此操作: feign.compression.request.enabledtrue feign.compression.response.en…...

[C++]智能指针用法

一、智能指针存在的意义 智能指针主要解决以下问题: (1)内存泄漏:内存手动释放,使用智能指针可以自动释放。 (2)共享所有权指针的传播和释放,比如多线程使用同一个对象时析构问题…...

六、行列式基本知识

目录 1、行列式的特性 2、行列式的计算方法: 2.1 通过行列式的定义去计算:对角法则。 2. 2 利用行列式的性质将行列式转化为上三角行列式: ①行列式的性质 : 性质一: 性质二: 性质三: 性质四:行列式之间的加法...

中断系统(详解与使用)

讲解 简介 中断是指计算机运行过程中,出现某些意外情况需主机干预时,机器能自动停止正在运行的程序并转入处理新情况的程序,处理完毕后又返回原被暂停的程序继续运行。 假设一个人在家看电视,这时候突然门铃响了,这个人此时就要停止看电视去开门,然后关上门后继续回来…...

uniapp开发微信小程序跳转到另一个小程序中

注意:一开始我的云上务工模块是单独的tabbar界面,但是小程序跳转好像不能直接点击tabbar进行,所以我将这里改成了点击首页中的按钮进行跳转 点击这里进行小程序跳转 目录 基础讲解 uniapp小程序跳转的两个方法 调用说明(半屏跳转…...

chatGPT 使用随想

一年前 chatGPT 刚出的时候,我就火速注册试用了。 因为自己就是 AI 行业的,所以想看看国际上最牛的 AI 到底发展到什么程度了. 自从一年前 chatGPT 火出圈之后,国际上的 AI 就一直被 OpenAI 这家公司引领潮流,一直到现在&#x…...

unity Aaimation Rigging使用多个约束导致部分约束失去作用

在应用多个约束时,在Hierarchy的顺序可能会影响最终的效果。例如先应用了Aim Constraint,然后再应用Two Bone Constraint,可能会导致Two Bone Constraint受到Aim Constraint的影响而失效。因此,在使用多个约束时,应该仔…...

什么是ChatGPT

国外有篇文章解释了ChatGPT的开发技术是什么,GPT-3和GPT-4的区别,以及未来的可能性。 截至 2023 年,ChatGPT 等生成式 AI 服务正在全球引起关注,并且正在探索在广泛领域的应用。 您可能想知道 ChatGPT 是使用哪种开发技术制作的&a…...

当我们浪费时我们在浪费什么

世界上的物质和能量不会增加也不会减少,为什么会存在浪费一说呢?是因为人类可以利用和支配的物质和能量是有限的,而且物质和能量的不同组织方式对于人类有着不同的价值。 人类对于世界的事物都有价值评估。例如一个玻璃杯摔碎了,…...

一文搞懂TCP三次握手与四次挥手

什么是TCP协议? TCP(Transmission control protocol)即传输控制协议,是一种面向连接、可靠的数据传输协议,它是为了在不可靠的互联网上提供可靠的端到端字节流而专门设计的一个传输协议。 面向连接:数据传…...

FairyGUI × Cocos Creator 3.7.3 引入报错解决

Cocos Creator 3.7.3引入fgui库 package.json添加这个依赖 "devDependencies": {"fairygui-cc": "latest"}执行npm i 报错解决 使用import引入fairygui-cc,就会有报错和警告,简单处理一下。 鼠标随便点一下也会出警告…...

网络原理 - HTTP/HTTPS(5)

HTTPS HTTPS也是一个应用层协议.在HTTP协议的基础上引入了一个加密层. HTTP协议内容都是按照文本的方式明文传输的. 这就导致了在传输过程中出现了一些被篡改的情况. 臭名昭著的"运营商劫持" 下载一个天天动听. 未被劫持的效果,点击下载按钮,就会弹出天天动听的…...

设计模式——抽象工厂模式

定义: 抽象工厂模式(Abstract Factory Pattern)提供一个创建一系列或相互依赖对象的接口,而无须指定它们具体的类。 概述:一个工厂可以提供创建多种相关产品的接口,而无需像工厂方法一样,为每一个产品都提供一个具体…...

详解编译和链接!

目录 1. 翻译环境和运行环境 2. 翻译环境 2.1 预处理 2.2 编译 2.3 汇编 2.4 链接 3. 运行环境 4.完结散花 悟已往之不谏,知来者犹可追 创作不易,宝子们!如果这篇文章对你们…...

力扣226 翻转二叉树 Java版本

文章目录 题目描述解题思路代码 题目描述 给你一棵二叉树的根节点 root ,翻转这棵二叉树,并返回其根节点。 示例 1: 输入:root [4,2,7,1,3,6,9] 输出:[4,7,2,9,6,3,1] 示例 2: 输入:root…...

免费的数据恢复软件哪个好?这10个数据恢复软件可以试试

遇到电脑、硬盘或U盘等设备中数据丢失,不用着急,数据恢复软件来帮你。 在遇到数据丢失的问题时,很多朋友都会很着急也不知道该怎么办。作为数据恢复小白,我们可以选择使用数据恢复软件进行扫描恢复。现在市面上的数据恢复软件很多…...

自然语言处理——循环神经网络

自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM&#xff09…...

力扣-35.搜索插入位置

题目描述 给定一个排序数组和一个目标值,在数组中找到目标值,并返回其索引。如果目标值不存在于数组中,返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 class Solution {public int searchInsert(int[] nums, …...

C++.OpenGL (20/64)混合(Blending)

混合(Blending) 透明效果核心原理 #mermaid-svg-SWG0UzVfJms7Sm3e {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-icon{fill:#552222;}#mermaid-svg-SWG0UzVfJms7Sm3e .error-text{fill…...

(一)单例模式

一、前言 单例模式属于六大创建型模式,即在软件设计过程中,主要关注创建对象的结果,并不关心创建对象的过程及细节。创建型设计模式将类对象的实例化过程进行抽象化接口设计,从而隐藏了类对象的实例是如何被创建的,封装了软件系统使用的具体对象类型。 六大创建型模式包括…...

解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用

在工业制造领域,无损检测(NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统,以非接触式光学麦克风技术为核心,打破传统检测瓶颈,为半导体、航空航天、汽车制造等行业提供了高灵敏…...

人工智能 - 在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型

在Dify、Coze、n8n、FastGPT和RAGFlow之间做出技术选型。这些平台各有侧重,适用场景差异显著。下面我将从核心功能定位、典型应用场景、真实体验痛点、选型决策关键点进行拆解,并提供具体场景下的推荐方案。 一、核心功能定位速览 平台核心定位技术栈亮…...

高效的后台管理系统——可进行二次开发

随着互联网技术的迅猛发展,企业的数字化管理变得愈加重要。后台管理系统作为数据存储与业务管理的核心,成为了现代企业不可或缺的一部分。今天我们要介绍的是一款名为 若依后台管理框架 的系统,它不仅支持跨平台应用,还能提供丰富…...

FOPLP vs CoWoS

以下是 FOPLP(Fan-out panel-level packaging 扇出型面板级封装)与 CoWoS(Chip on Wafer on Substrate)两种先进封装技术的详细对比分析,涵盖技术原理、性能、成本、应用场景及市场趋势等维度: 一、技术原…...

Yii2项目自动向GitLab上报Bug

Yii2 项目自动上报Bug 原理 yii2在程序报错时, 会执行指定action, 通过重写ErrorAction, 实现Bug自动提交至GitLab的issue 步骤 配置SiteController中的actions方法 public function actions(){return [error > [class > app\helpers\web\ErrorAction,],];}重写Error…...

信息系统分析与设计复习

2024试卷 单选题(20) 1、在一个聊天系统(类似ChatGPT)中,属于控制类的是()。 A. 话语者类 B.聊天文字输入界面类 C. 聊天主题辨别类 D. 聊天历史类 ​解析 B-C-E备选架构中分析类分为边界类、控制类和实体类。 边界…...