PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序
设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。
代码如下:
import json
from pyspark.sql import SparkSessiondef load_config(config_path):with open(config_path, 'r') as f:return json.load(f)def main(config_path, base_s3_path):# 初始化SparkSession,配置S3和Excel支持spark = SparkSession.builder \.appName("DataExportJob") \.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1") \.getOrCreate()# 配置S3访问(根据实际环境配置)spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", "YOUR_ACCESS_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "YOUR_SECRET_KEY")spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.amazonaws.com")config = load_config(config_path)for template in config['templates']:label = template['label']sql_template = template['sql_template']parameters_list = template['parameters']for params in parameters_list:# 验证参数数量是否匹配placeholders = sql_template.count('{')if len(params) != placeholders:raise ValueError(f"参数数量不匹配,模板需要{placeholders}个参数,但当前参数为{len(params)}个")# 替换SQL中的占位符formatted_sql = sql_template.format(*params)df = spark.sql(formatted_sql)# 生成文件名参数部分param_str = "_".join(params)base_filename = f"{label}_{param_str}"# 定义输出路径output_paths = {'parquet': f"{base_s3_path}/parquet/{base_filename}",'csv': f"{base_s3_path}/csv/{base_filename}",'excel': f"{base_s3_path}/excel/{base_filename}.xlsx"}# 写入Parquetdf.write.mode('overwrite').parquet(output_paths['parquet'])# 写入CSV(自动生成header)df.write.mode('overwrite') \.option("header", "true") \.csv(output_paths['csv'])# 写入Excel(使用spark-excel包)df.write.format("com.crealytics.spark.excel") \.option("header", "true") \.option("inferSchema", "true") \.mode("overwrite") \.save(output_paths['excel'])spark.stop()if __name__ == "__main__":import argparseparser = argparse.ArgumentParser()parser.add_argument('--config', type=str, required=True, help='Path to config JSON file')parser.add_argument('--s3-path', type=str, required=True, help='Base S3 path (e.g., s3a://your-bucket/data)')args = parser.parse_args()main(args.config, args.s3_path)
配置文件示例(config.json)
{"templates": [{"label": "sales_report","sql_template": "SELECT * FROM sales WHERE date = '{0}' AND region = '{1}'","parameters": [["202301", "north"],["202302", "south"]]},{"label": "user_activity","sql_template": "SELECT user_id, COUNT(*) AS cnt FROM activity WHERE day = '{0}' GROUP BY user_id","parameters": [["2023-01-01"],["2023-01-02"]]}]
}
使用说明
-
依赖管理:
- 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 your_script.py
- 确保Spark集群已安装Hadoop AWS和Spark Excel依赖:
-
S3配置:
- 替换代码中的
YOUR_ACCESS_KEY和YOUR_SECRET_KEY为实际AWS凭证 - 根据S3兼容存储调整
endpoint(如使用MinIO需特殊配置)
- 替换代码中的
-
执行命令:
spark-submit --packages com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.1 \ data_export.py --config config.json --s3-path s3a://your-bucket/exports
输出结构
s3a://your-bucket/exports
├── parquet
│ ├── sales_report_202301_north
│ ├── sales_report_202302_south
│ └── user_activity_2023-01-01
├── csv
│ ├── sales_report_202301_north
│ ├── sales_report_202302_south
│ └── user_activity_2023-01-01
└── excel├── sales_report_202301_north.xlsx├── sales_report_202302_south.xlsx└── user_activity_2023-01-01.xlsx
相关文章:
PySPARK带多组参数和标签的SparkSQL批量数据导出到S3的程序
设计一个基于多个带标签SparkSQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数自动批量地将数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已…...
蓝桥杯备考:模拟算法之字符串展开
P1098 [NOIP 2007 提高组] 字符串的展开 - 洛谷 | 计算机科学教育新生态 #include <iostream> #include <cctype> #include <algorithm> using namespace std; int p1,p2,p3; string s,ret; void add(char left,char right) {string tmp;for(char ch left1;…...
使用LLaMA-Factory对AI进行认知的微调
使用LLaMA-Factory对AI进行认知的微调 引言1. 安装LLaMA-Factory1.1. 克隆仓库1.2. 创建虚拟环境1.3. 安装LLaMA-Factory1.4. 验证 2. 准备数据2.1. 创建数据集2.2. 更新数据集信息 3. 启动LLaMA-Factory4. 进行微调4.1. 设置模型4.2. 预览数据集4.3. 设置学习率等参数4.4. 预览…...
@Nullable 注解
文章目录 解释 Nullable 注解注解的组成部分:如何使用 Nullable 注解a. 标注方法返回值:b. 标注方法参数:c. 标注字段: 结合其他工具与 Nonnull 配合使用总结 Nullable 注解在 Java 中的使用场景通常与 Nullability(空…...
Arduino大师练成手册 -- 控制 AS608 指纹识别模块
要在 Arduino 上控制 AS608 指纹识别模块,你可以按照以下步骤进行: 硬件连接 连接指纹模块:将 AS608 指纹模块与 Arduino 连接。通常,AS608 使用 UART 接口进行通信。你需要将 AS608 的 TX、RX、VCC 和 GND 引脚分别连接到 Ardu…...
Mask R-CNN与YOLOv8的区别
Mask R-CNN与YOLOv8虽然都是深度学习在计算机视觉领域的应用,但它们属于不同类型的视觉框架,各有特点和优势。 以下是关于 Mask R-CNN 和 YOLOv8 的详细对比分析,涵盖核心原理、性能差异、应用场景和选择建议: 1. 核心原理与功能…...
在Ubuntu上使用Docker部署DeepSeek
在Ubuntu上使用Docker部署DeepSeek,并确保其可以访问公网网址进行对话,可以按照以下步骤进行: 一、安装Docker 更新Ubuntu的软件包索引: sudo apt-get update安装必要的软件包,这些软件包允许apt通过HTTPS使用存储库…...
MySQL的覆盖索引
MySQL的覆盖索引 前言 当一个索引包含了查询所需的全部字段时,就可以提高查询效率,这样的索引又被称之为覆盖索引。 以MySQL常见的三种存储引擎为例:InnoDB、MyISAM、Memory,对于覆盖索引提高查询效率的方式均不同,…...
【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】2.12 连续数组:为什么contiguous这么重要?
2.12 连续数组:为什么contiguous这么重要? 目录 #mermaid-svg-wxhozKbHdFIldAkj {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-wxhozKbHdFIldAkj .error-icon{fill:#552222;}#mermaid-svg-…...
在React中使用redux
一、首先安装两个插件 1.Redux Toolkit 2.react-redux 第一步:创建模块counterStore 第二步:在store的入口文件进行子模块的导入组合 第三步:在index.js中进行store的全局注入 第四步:在组件中进行使用 第五步:在组件中…...
lstm预测
import numpy as np import pandas as pd import tensorflow as tf import math import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler from keras.layers import LSTM,Activation,Dense,Dropout# 时间序列数据转换为监督学习的格式 def creatXY(d…...
《 C++ 点滴漫谈: 二十五 》空指针,隐秘而危险的杀手:程序崩溃的真凶就在你眼前!
摘要 本博客全面解析了 C 中指针与空值的相关知识,从基础概念到现代 C 的改进展开,涵盖了空指针的定义、表示方式、使用场景以及常见注意事项。同时,深入探讨了 nullptr 的引入及智能指针在提升代码安全性和简化内存管理方面的优势。通过实际…...
【AI】探索自然语言处理(NLP):从基础到前沿技术及代码实践
Hi ! 云边有个稻草人-CSDN博客 必须有为成功付出代价的决心,然后想办法付出这个代价。 目录 引言 1. 什么是自然语言处理(NLP)? 2. NLP的基础技术 2.1 词袋模型(Bag-of-Words,BoWÿ…...
2025年Android开发趋势全景解读
文章目录 一、界面开发:从"手写代码"到"智能拼装"1.1 Jetpack Compose实战进化1.2 淘汰XML布局的三大信号 二、AI融合开发:无需炼丹的普惠智能2.1 设备端AI三大杀手级应用2.2 成本对比:设备端VS云端AI 三、跨平台演进&am…...
C#面试常考随笔11:Dictionary<K, V>、Hashtable的内部实现原理是什么?效率如何?
Dictionary<K, V> 底层数据结构:使用哈希表(Hash Table),由一个数组和链表(或在.NET Core 2.1 及之后版本中,当链表长度达到一定阈值时转换为红黑树)组成。数组中的每个元素称为一个桶&a…...
Linux防火墙基础
一、Linux防火墙的状态机制 1.iptables是可以配置有状态的防火墙,其有状态的特点是能够指定并记住发送或者接收信息包所建立的连接状态,其一共有四种状态,分别为established invalid new related。 established:该信息包已建立连接&#x…...
Qt u盘自动升级软件
Qt u盘自动升级软件 Chapter1 Qt u盘自动升级软件u盘自动升级软件思路:step1. 获取U盘 判断U盘名字是否正确, 升级文件是否存在。step2. 升级step3. 升级界面 Chapter2 Qt 嵌入式设备应用程序,通过U盘升级的一种思路Chapter3 在开发板上运行的…...
【Conda 和 虚拟环境详细指南】
Conda 和 虚拟环境的详细指南 什么是 Conda? Conda 是一个开源的包管理和环境管理系统,支持多种编程语言(如Python、R等),最初由Continuum Analytics开发。 主要功能: 包管理:安装、更新、删…...
Python递归函数深度解析:从原理到实战
Python递归函数深度解析:从原理到实战 递归是计算机科学中重要的编程范式,也是算法设计的核心思想之一。本文将通过20实战案例,带你深入理解Python递归函数的精髓,掌握递归算法的实现技巧。 一、递归函数核心原理 1.1 递归三要…...
OpenGL学习笔记(五):Textures 纹理
文章目录 纹理坐标纹理环绕方式纹理过滤——处理纹理分辨率低的情况多级渐远纹理Mipmap——处理纹理分辨率高的情况加载与创建纹理 ( <stb_image.h> )生成纹理应用纹理纹理单元练习1练习2练习3练习4 通过上一篇着色部分的学习,我们可以…...
调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
Golang dig框架与GraphQL的完美结合
将 Go 的 Dig 依赖注入框架与 GraphQL 结合使用,可以显著提升应用程序的可维护性、可测试性以及灵活性。 Dig 是一个强大的依赖注入容器,能够帮助开发者更好地管理复杂的依赖关系,而 GraphQL 则是一种用于 API 的查询语言,能够提…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...
mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
PostgreSQL——环境搭建
一、Linux # 安装 PostgreSQL 15 仓库 sudo dnf install -y https://download.postgresql.org/pub/repos/yum/reporpms/EL-$(rpm -E %{rhel})-x86_64/pgdg-redhat-repo-latest.noarch.rpm# 安装之前先确认是否已经存在PostgreSQL rpm -qa | grep postgres# 如果存在࿰…...
零知开源——STM32F103RBT6驱动 ICM20948 九轴传感器及 vofa + 上位机可视化教程
STM32F1 本教程使用零知标准板(STM32F103RBT6)通过I2C驱动ICM20948九轴传感器,实现姿态解算,并通过串口将数据实时发送至VOFA上位机进行3D可视化。代码基于开源库修改优化,适合嵌入式及物联网开发者。在基础驱动上新增…...
【深度学习新浪潮】什么是credit assignment problem?
Credit Assignment Problem(信用分配问题) 是机器学习,尤其是强化学习(RL)中的核心挑战之一,指的是如何将最终的奖励或惩罚准确地分配给导致该结果的各个中间动作或决策。在序列决策任务中,智能体执行一系列动作后获得一个最终奖励,但每个动作对最终结果的贡献程度往往…...
