当前位置: 首页 > news >正文

spark超大数据批量写入redis

利用spark的分布式优势,一次性批量将7000多万的数据写入到redis中。

# 配置spark接口
import os
import findspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
os.environ["JAVA_HOME"] = "/usr/local/jdk1.8.0_192"
findspark.init("/usr/local/hadoop/spark-2.4.4-bin-hadoop2.6/")
# 设置配置信息
conf = SparkConf()
conf.set("spark.driver.memory", "16g")
conf.set("spark.executor.memory", "16g")
conf.set("spark.driver.maxResultSize","3g")
conf.set("spark.executor.maxResultSize", "3g")
conf.set("spark.ui.showConsoleProgress","false") # 取消进度条显示
spark = SparkSession.builder.appName("local_redis_spark").master("local[*]").enableHiveSupport().config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # 提升日志级别
import redis
# 初始化一个全局函数来获取Redis连接池
def get_redis_connection_pool():# 配置redis参数host='127.0.0.1' # 替换为redis的服务地址即可port=6379password='123456' # 密码db=1 # db库如果不设置 默认为0max_connections=10  # 设置最大连接数redis_pool = redis.ConnectionPool(host=host, port=port, db=db, password=password, max_connections=max_connections)  return redis_pool# 清空旧数据
with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.flushdb() # 清空当前库的所有数据 而flushall()则情况所有库数据
%%time
# 并行处理函数serv_id
def servid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.serv_id] = str((rw.r_inst_id, rw.avg_value))# 批量写入write_to_redis(dat)# 并行处理函数one_id
def oneid_pfun(sdf_data):# 定义redis写入函数 以连接池的方式获取链接 及时释放def write_to_redis(data_dict):with redis.Redis(connection_pool=get_redis_connection_pool()) as r:r.mset(data_dict)# 构建一个空字典 批量写入dat = {}for rw in sdf_data:dat[rw.r_inst_id] = str((rw.offer_list,rw.filter_prod_offer_inst_list,rw.fuka_serv_offer_list,rw.filter_list,rw.new_serv_id))# 批量写入write_to_redis(dat)# 加载缓存数据
oneid_sdf = spark.sql("""select * from database.table1""")servid_sdf = spark.sql("""select * from database.table2""")# 设置分区数 如果批量写入的内存大小以及最大链接数有限制
# servid_num_parts = 50000
# oneid_num_parts = 10000 # 使用repartition方法进行重新分区
# servid_sdf_part = servid_sdf.repartition(servid_num_parts)
# oneid_sdf_part = oneid_sdf.repartition(oneid_num_parts)# 分批写入redis
servid_sdf.foreachPartition(servid_pfun)
print(f"servid字典缓存成功")
oneid_sdf.foreachPartition(oneid_pfun)
print(f"oneid字典缓存成功")
# 关闭spark
spark.stop() 
print(f"redis缓存插入成功")

执行时间可能跟资源环境有关,测试整个过程大概只需要5分钟左右,非常快速。

相关文章:

spark超大数据批量写入redis

利用spark的分布式优势,一次性批量将7000多万的数据写入到redis中。 # 配置spark接口 import os import findspark from pyspark import SparkConf from pyspark.sql import SparkSession os.environ["JAVA_HOME"] "/usr/local/jdk1.8.0_192"…...

C# Socket的使用

C# 中的 System.Net.Sockets.Socket 类是 .NET Framework 提供的核心类,用于处理网络套接字编程。Socket 类是用于网络编程的基础类,它位于 System.Net.Sockets 命名空间中。 使用 Socket 类,可以创建客户端和服务器应用程序来进行基于TCP、…...

Spring Cloud + Vue前后端分离-第17章 生产打包与发布

源代码在GitHub - 629y/course: Spring Cloud Vue前后端分离-在线课程 Spring Cloud Vue前后端分离-第17章 生产打包与发布 17-1 注册中心配置中心Nacos 注册中心 Nacos 快速开始 | Nacos 本节内容:使用nacos作注册中心配置中心,不用eureka Nacos…...

力扣热题100_普通数组_56_合并区间

文章目录 题目链接解题思路解题代码 题目链接 56. 合并区间 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区…...

Springcloud OpenFeign 的实现(二)

Springcloud OpenFeign 的实现(一) 一、Feign request/response 压缩 您可以考虑为您的外部请求启用请求或响应GZIP压缩。您可以通过启用以下属性之一来完成此操作: feign.compression.request.enabledtrue feign.compression.response.en…...

[C++]智能指针用法

一、智能指针存在的意义 智能指针主要解决以下问题: (1)内存泄漏:内存手动释放,使用智能指针可以自动释放。 (2)共享所有权指针的传播和释放,比如多线程使用同一个对象时析构问题…...

六、行列式基本知识

目录 1、行列式的特性 2、行列式的计算方法: 2.1 通过行列式的定义去计算:对角法则。 2. 2 利用行列式的性质将行列式转化为上三角行列式: ①行列式的性质 : 性质一: 性质二: 性质三: 性质四:行列式之间的加法...

中断系统(详解与使用)

讲解 简介 中断是指计算机运行过程中,出现某些意外情况需主机干预时,机器能自动停止正在运行的程序并转入处理新情况的程序,处理完毕后又返回原被暂停的程序继续运行。 假设一个人在家看电视,这时候突然门铃响了,这个人此时就要停止看电视去开门,然后关上门后继续回来…...

uniapp开发微信小程序跳转到另一个小程序中

注意:一开始我的云上务工模块是单独的tabbar界面,但是小程序跳转好像不能直接点击tabbar进行,所以我将这里改成了点击首页中的按钮进行跳转 点击这里进行小程序跳转 目录 基础讲解 uniapp小程序跳转的两个方法 调用说明(半屏跳转…...

chatGPT 使用随想

一年前 chatGPT 刚出的时候,我就火速注册试用了。 因为自己就是 AI 行业的,所以想看看国际上最牛的 AI 到底发展到什么程度了. 自从一年前 chatGPT 火出圈之后,国际上的 AI 就一直被 OpenAI 这家公司引领潮流,一直到现在&#x…...

unity Aaimation Rigging使用多个约束导致部分约束失去作用

在应用多个约束时,在Hierarchy的顺序可能会影响最终的效果。例如先应用了Aim Constraint,然后再应用Two Bone Constraint,可能会导致Two Bone Constraint受到Aim Constraint的影响而失效。因此,在使用多个约束时,应该仔…...

什么是ChatGPT

国外有篇文章解释了ChatGPT的开发技术是什么,GPT-3和GPT-4的区别,以及未来的可能性。 截至 2023 年,ChatGPT 等生成式 AI 服务正在全球引起关注,并且正在探索在广泛领域的应用。 您可能想知道 ChatGPT 是使用哪种开发技术制作的&a…...

当我们浪费时我们在浪费什么

世界上的物质和能量不会增加也不会减少,为什么会存在浪费一说呢?是因为人类可以利用和支配的物质和能量是有限的,而且物质和能量的不同组织方式对于人类有着不同的价值。 人类对于世界的事物都有价值评估。例如一个玻璃杯摔碎了,…...

一文搞懂TCP三次握手与四次挥手

什么是TCP协议? TCP(Transmission control protocol)即传输控制协议,是一种面向连接、可靠的数据传输协议,它是为了在不可靠的互联网上提供可靠的端到端字节流而专门设计的一个传输协议。 面向连接:数据传…...

FairyGUI × Cocos Creator 3.7.3 引入报错解决

Cocos Creator 3.7.3引入fgui库 package.json添加这个依赖 "devDependencies": {"fairygui-cc": "latest"}执行npm i 报错解决 使用import引入fairygui-cc,就会有报错和警告,简单处理一下。 鼠标随便点一下也会出警告…...

网络原理 - HTTP/HTTPS(5)

HTTPS HTTPS也是一个应用层协议.在HTTP协议的基础上引入了一个加密层. HTTP协议内容都是按照文本的方式明文传输的. 这就导致了在传输过程中出现了一些被篡改的情况. 臭名昭著的"运营商劫持" 下载一个天天动听. 未被劫持的效果,点击下载按钮,就会弹出天天动听的…...

设计模式——抽象工厂模式

定义: 抽象工厂模式(Abstract Factory Pattern)提供一个创建一系列或相互依赖对象的接口,而无须指定它们具体的类。 概述:一个工厂可以提供创建多种相关产品的接口,而无需像工厂方法一样,为每一个产品都提供一个具体…...

详解编译和链接!

目录 1. 翻译环境和运行环境 2. 翻译环境 2.1 预处理 2.2 编译 2.3 汇编 2.4 链接 3. 运行环境 4.完结散花 悟已往之不谏,知来者犹可追 创作不易,宝子们!如果这篇文章对你们…...

力扣226 翻转二叉树 Java版本

文章目录 题目描述解题思路代码 题目描述 给你一棵二叉树的根节点 root ,翻转这棵二叉树,并返回其根节点。 示例 1: 输入:root [4,2,7,1,3,6,9] 输出:[4,7,2,9,6,3,1] 示例 2: 输入:root…...

免费的数据恢复软件哪个好?这10个数据恢复软件可以试试

遇到电脑、硬盘或U盘等设备中数据丢失,不用着急,数据恢复软件来帮你。 在遇到数据丢失的问题时,很多朋友都会很着急也不知道该怎么办。作为数据恢复小白,我们可以选择使用数据恢复软件进行扫描恢复。现在市面上的数据恢复软件很多…...

别急着升级glibc!解决scikit-learn的libgomp内存错误,我更推荐这个方法

生产环境避坑指南:如何优雅解决scikit-learn的libgomp内存分配错误 当你的AI服务突然抛出cannot allocate memory in static TLS block错误时,第一反应可能是升级系统库——但请先放下这个危险的念头。作为经历过三次生产环境崩溃的运维老兵,…...

当翻译成本趋近于零:AI原生时代,软件工程如何重塑?

当翻译成本趋近于零,软件工程的瓶颈就从“如何写对代码”变成了“如何定义对的事”。 一、两条路线之争:代码约束还是提示约束? 当前AI智能体演进中,出现了一条清晰的分野:以Claude Code为代表的“代码硬约束”路线&am…...

在Aspen Plus中用Linde - Hampson工艺液化CO₂:从燃煤电厂捕获气体的模拟探索

在 Aspen Plus 中使用 Linde-Hampson 工艺液化CO2该模拟使用 Aspen Plus 对从燃煤电厂捕获的富含二氧化碳的气体进行液化。在应对气候变化的征程中,二氧化碳捕获与封存(CCS)技术愈发关键。从燃煤电厂捕获富含二氧化碳的气体并将其液化&#x…...

用Matlab/Simulink手把手教你设计交错式升压DC-DC转换器(附PI参数整定代码)

从零构建交错式升压DC-DC转换器的MATLAB实战指南 交错式升压拓扑正在新能源领域掀起一场静默革命——当电动汽车的电池管理系统需要稳定升压时,当光伏逆变器要处理不稳定的直流输入时,这种能显著降低电流纹波的结构已成为工程师的秘密武器。但理论图纸与…...

TranslateGemma高可用部署:健康检查、监控与自动恢复策略

TranslateGemma高可用部署:健康检查、监控与自动恢复策略 1. 为什么高可用部署对TranslateGemma至关重要 TranslateGemma作为企业级神经机器翻译系统,在生产环境中面临着724小时不间断服务的严苛要求。不同于开发测试环境,生产部署必须考虑…...

GTE中文嵌入模型部署案例:中文新闻聚合平台热点事件发现系统

GTE中文嵌入模型部署案例:中文新闻聚合平台热点事件发现系统 1. 项目背景与需求 在信息爆炸的时代,每天都有海量的新闻内容产生。对于新闻聚合平台来说,如何从成千上万的新闻文章中快速识别出热点事件,成为了一个关键的技术挑战…...

GeoScene Maps避坑指南:从图层闪烁到内存泄漏的7个常见问题解决方案

GeoScene Maps深度调试指南:7个生产环境典型问题解决方案 当你在凌晨三点被警报惊醒,发现线上地图服务出现大面积图层闪烁时,那种头皮发麻的感觉我太熟悉了。作为经历过数十个GeoScene Maps项目的老兵,我想分享那些官方文档不会告…...

别再到处找教程了!Ubuntu 18.04 + Carla 0.9.13 + ROS Melodic 联合仿真环境保姆级搭建实录

Ubuntu 18.04 Carla 0.9.13 ROS Melodic 联合仿真环境实战指南 自动驾驶仿真环境的搭建往往充满挑战,特别是当多个复杂系统需要协同工作时。本文将带你一步步完成Ubuntu 18.04系统下Carla 0.9.13与ROS Melodic的联合仿真环境搭建,避开那些令人头疼的&…...

用Matlab+Yalmip+Gurobi搞定微电网优化配置:从电工杯A题到实战避坑指南

MatlabYalmipGurobi微电网优化实战:从建模到竞赛应用的完整指南 微电网优化配置是能源系统研究中的经典问题,也是数学建模竞赛中的高频考点。去年电工杯A题就曾让参赛者头疼——如何在满足负荷需求的前提下,合理配置风光储系统,实…...

3分钟上手!FrankMocap让普通摄像头变身专业动捕设备

3分钟上手!FrankMocap让普通摄像头变身专业动捕设备 【免费下载链接】frankmocap A Strong and Easy-to-use Single View 3D HandBody Pose Estimator 项目地址: https://gitcode.com/gh_mirrors/fr/frankmocap 在数字内容创作与交互设计领域,3D动…...