CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总
需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。
CDH版本为:6.3.2
spark版本为:2.4
python版本:2.7.5
操作系统:CentOS Linux 7
集群方式:yarn-cluster
一、在linux中将excel文件转换成CSV文件,然后上传到hdfs中。
为何要先转csv呢?主要原因是pyspark直接读取excel的话,涉及到版本的冲突问题。commons-collections-3.2.2.jar 在CDH6.3.2中的版本是3.2.2.但是pyspark直接读取excel要求collections4以上的版本,虽然也尝试将4以上的版本下载放进去,但是也没效果,因为时间成本的问题,所以没有做过多的尝试了,直接转为csv后再读吧。
spark引用第三方包
1.1 转csv的python代码(python脚本)
#-*- coding:utf-8 -*-
import pandas as pd
import os, xlrd ,sysdef xlsx_to_csv_pd(fn):path1="/home/lzl/datax/"+fn+".xlsx"path2="/home/lzl/datax/"+fn+".csv"data_xls = pd.read_excel(path1, index_col=0)data_xls.to_csv(path2, encoding='utf-8')if __name__ == '__main__':fn=sys.argv[1]print(fn)try:xlsx_to_csv_pd(fn)print("转成成功!")except Exception as e:print("转成失败!")
1.2 数据中台上的代码(shell脚本):
#!/bin/bash
#@description:这是一句描述
#@author: admin(admin)
#@email:
#@date: 2023-09-26 14:44:3# 文件名称
fn="项目投运计划"# xlsx转换成csv格式
ssh root@cdh02 " cd /home/lzl/shell; python xlsx2csv.py $fn" # 将文件上传到hfds上
ssh root@cdh02 "cd /home/lzl/datax; hdfs dfs -put $fn.csv /origin_data/sgd/excel/"
echo "上传成功~!"# 删除csv文件
ssh root@cdh02 "cd /home/lzl/datax; rm -rf $fn.csv"
echo "删除成功~!"
二、pyspark写入hive中
2.1 写入过程中遇到的问题点
2.1.1 每列的前后空格、以及存在换行符等问题。采取的措施是:循环列,采用trim函数、regexp_replace函数处理。
# 循环对每列去掉前后空格,以及删除换行符
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replacefor name in df.columns:df = df.withColumn(name, F.trim(df[name]))df = df.withColumn(name, regexp_replace(col(name), "\n", ""))
2.1.2 个别字段存在科学计数法,需要用cast转换
from pyspark.sql.types import *# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))
去掉换行符另一种方法:换行符问题也可以参照这个
2.2 数据中台代码(pyspark)
# -*- coding:utf-8
# coding=UTF-8# 引入sys,方便输出到控制台时不是乱码
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )# 引入模块
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import *# 设定资源大小
conf=SparkConf()\.set("spark.jars.packages","com.crealytics:spark-excel_2.11:0.11.1")\.set("spark.sql.shuffle.partitions", "4")\.set("spark.sql.execution.arrow.enabled", "true")\.set("spark.driver.maxResultSize","6G")\.set('spark.driver.memory','6G')\.set('spark.executor.memory','6G')# 建立SparkSession
spark = SparkSession \.builder\.config(conf=conf)\.master("local[*]")\.appName("dataFrameApply") \.enableHiveSupport() \.getOrCreate()# 读取cvs文件
# 文件名称和文件位置
fp= r"/origin_data/sgd/excel/项目投运计划.csv"
df = spark.read \.option("header", "true") \.option("inferSchema", "true") \.option("multiLine", "true") \.option("delimiter", ",") \.format("csv") \.load(fp)# 查看数据类型
# df.printSchema()# 循环对每列去掉前后空格,以及删除换行符
for name in df.columns:df = df.withColumn(name, F.trim(df[name]))df = df.withColumn(name, regexp_replace(col(name), "\n", ""))# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))df.show(25,truncate = False) # 查看数据,允许输出25行# 设置日志级别 (这两个没用)
sc = spark.sparkContext
sc.setLogLevel("ERROR")# 写入hive中
spark.sql("use sgd_dev") # 指定数据库# 创建临时表格 ,注意建表时不能用'/'和''空格分隔,否则会影响2023/9/4和2023-07-31 00:00:00这样的数据
spark.sql("""
CREATE TABLE IF NOT EXISTS ods_sgd_project_operating_plan_info_tmp (project_no string ,sale_order_no string ,customer_name string ,unoperating_amt decimal(19,2) , expected_operating_time string ,operating_amt decimal(19,2) , operating_progress_track string ,is_Supplied string ,operating_submit_time string ,Signing_contract_time string ,remake string )ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
""")# 注册临时表
df.createOrReplaceTempView("hdfs_df")
# spark.sql("select * from hdfs_df limit 5").show() #查看前5行数据# 将数据插入hive临时表中
spark.sql("""insert overwrite table ods_sgd_project_operating_plan_info_tmp select * from hdfs_df
""")# 将数据导入正式环境的hive中
spark.sql("""insert overwrite table ods_sgd_project_operating_plan_info select * from ods_sgd_project_operating_plan_info_tmp
""")# 查看导入后的数据
spark.sql("select * from ods_sgd_project_operating_plan_info limit 20").show(20,truncate = False)# 删除注册的临时表
spark.sql("""drop table hdfs_df
""")# 删除临时表
spark.sql("""drop table ods_sgd_project_operating_plan_info_tmp
""")
关于spark的更多知识,可以参看Spark SQL总结
相关文章:
CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总
需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。 CDH版本为:6.3.2 spark版本为:2.4 python版本:2.7.5 操作系统:CentOS Linux 7 集群方式:yarn-cluster …...
2120 -- 预警系统题解
Description OiersOiers 国的预警系统是一棵树,树中有 �n 个结点,编号 1∼�1∼n,树中每条边的长度均为 11。预警系统中只有一个预警信号发射站,就是树的根结点 11 号结点,其它 �−1…...
C++入门-day01
一、认识C C融合了三种不同的编程方式 C代表的过程性语言在C基础上添加的类、结构体puls代表的面向对象语言C模板支持泛型编程 C完全兼容C的特性 Tips:侯捷老师提倡的Modren C是指C11、C14、C17和C20这些新标准所引入的一系列新特性和改进。在我们练习的时候也应当去…...
Android开源 Skeleton 骨架屏 V1.3.0
目录 一、简介 二、效果图 三、引用 Skeleton 添加jitpack 仓库 添加依赖: 四、新增 “块”骨架屏 1、bind方法更改和变化: 2、load方法更改和变化: 五、关于上一个版本 一、简介 骨架屏的作用是在网络请求较慢时,提供基础占位&…...
网络资料搬运(2)
(1) Ubuntu 22.04: 为 Ubuntu22.04 系统添加中文输入法 linux解压gz文件的命令 Ubuntu20.04出现Unit ssh.service could not be found 详解使用SSH远程连接Ubuntu服务器系统 Configuring networks(配置网络) (2) Python && OpenCV: …...
SEO搜索引擎
利用搜索引擎的规则提高网站在有关搜索引擎内的自然排名,吸引更多的用户访问网站,提高网站的访问量,提高网站的销售能力和宣传能力,从而提升网站的品牌效应 搜索引擎优化的技术手段 黑帽SEO 通过欺骗技术和滥用搜索算法来推销毫不…...
动态规划-状态机(188. 买卖股票的最佳时机 IV)
状态分类: f[i,j,0]考虑前i只股票,进行了j笔交易,目前未持有股票 所能获得最大利润 f[i,j,1]考虑前i只股票,进行了j笔交易,目前持有股票 所能获得最大利润 状态转移: f[i][j][0] Math.max(f[i-1][j][0],f[…...
银行业务队列简单模拟(队列应用)
设某银行有A、B两个业务窗口,且处理业务的速度不一样,其中A窗口处理速度是B窗口的2倍 —— 即当A窗口每处理完2个顾客时,B窗口处理完1个顾客。给定到达银行的顾客序列,请按业务完成的顺序输出顾客序列。假定不考虑顾客先后到达的时…...
2023/8/8 下午10:42:04 objectarx
2023/8/8 下午10:42:04 objectarx 2023/8/8 下午10:42:16 ObjectARX(AutoCAD Runtime Extension)是用于开发和自定义AutoCAD软件的编程接口。ObjectARX允许开发者使用C++、.NET等编程语言来创建插件、扩展功能和定制化AutoCAD的行为。 通过ObjectARX,开发者可以访问Auto…...
Day-06 基于 Docker安装 Nginx 镜像
1.去官方公有仓库查询nginx镜像 docker search nginx 2.拉取该镜像 docker pull nginx 3. 启动镜像,使用nginx服务,代理本机8080端口(测试是不是好使) docker run -d -p 8080:80 --name nginx-8080 nginx docker ps curl 127.0.0.1:8080...
linux入门---信号的保存和捕捉
目录标题 信号的一些概念信号的保存pending表block表handler表 信号的捕捉内核态和用户态信号的捕捉 信号的一些概念 1.进程会收到各种各样的信号,那么程序对该信号进行实际处理的动作叫做信号的递达。 2.我们之前说过当进程收到信号的时候可能并不会立即处理这个信…...
5.外部中断
中断初始化配置步骤: IO口初始化配置 开启中断总允许EA 打开某个IO口的中断允许 打开IO口的某一位的中断允许 配置该位的中断触发方式 中断函数: #pragma vector PxINT_VECTOR __interrupt void 函数名(void){}#pragma vector PxINT_VECTOR __int…...
Mydb数据库问题
1、请简要介绍一下这个基于 Java 的简易数据库管理系统。它的主要功能是什么? TM(Transaction Manager):事务管理器,用于维护事务的状态,并提供接口供其他模块查询某个事务的状态。DM(Data Man…...
部署并应用ByteTrack实现目标跟踪
尽管YOLOv8已经集成了ByteTrack算法,但在这里我还是想利用ByteTrack官网的代码,自己实现目标跟踪。 要想应用ByteTrack算法,首先就要从ByteTrack官网上下载并安装。虽然官网上介绍得很简单,只需要区区6行代码,但对于国…...
MacOS怎么配置JDK环境变量
1 输入命令看是否配置了JDk 的环境变量:echo $JAVA_HOME 要是什么也没输出 证明是没配置 2 输入命令编辑 sudo vim ~/.bash_profile 然后按 i ,进入编辑模式,粘贴下面的代码,注意:JAVA_HOME后面路径需要改成自己的版…...
Spring Boot 开发16个实用的技巧
当涉及到使用Spring Boot开发应用程序时,以下是16个实用的技巧: 1. **使用Spring Initializr**:Spring Initializr是一个快速创建Spring Boot项目的工具,可以帮助您选择项目依赖和生成项目骨架。 2. **自动配置**:Sp…...
《机器学习实战》学习记录-ch2
PS: 个人笔记,建议不看 原书资料:https://github.com/ageron/handson-ml2 2.1数据获取 import pandas as pd data pd.read_csv(r"C:\Users\cyan\Desktop\AI\ML\handson-ml2\datasets\housing\housing.csv")data.head() data.info()<clas…...
lv7 嵌入式开发-网络编程开发 07 TCP服务器实现
目录 1 函数介绍 1.1 socket函数 与 通信域 1.2 bind函数 与 通信结构体 1.3 listen函数 与 accept函数 2 TCP服务端代码实现 3 TCP客户端代码实现 4 代码优化 5 练习 1 函数介绍 其中read、write、close在IO中已经介绍过,只需了解socket、bind、listen、acc…...
mysql技术文档--阿里巴巴java准则《Mysql数据库建表规约》--结合阿丹理解尝试解读--国庆开卷
阿丹: 国庆快乐呀大家! 在项目开始前一个好的设计、一个健康的表关系,不仅会让开发变的有趣舒服,也会在后期的维护和升级迭代中让系统不断的成长。那么今天就认识和解读一下阿里的准则!! 建表规约 表达是…...
Qt+openCV学习笔记(十六)Qt6.6.0rc+openCV4.8.1+emsdk3.1.37编译静态库
前言: 有段时间没来写文章了,趁编译库的空闲,再写一篇记录文档 WebAssembly的发展逐渐成熟,即便不了解相关技术,web前端也在不经意中使用了相关技术的库,本篇文档记录下如何编译WebAssembly版本的openCV&…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
AI Agent与Agentic AI:原理、应用、挑战与未来展望
文章目录 一、引言二、AI Agent与Agentic AI的兴起2.1 技术契机与生态成熟2.2 Agent的定义与特征2.3 Agent的发展历程 三、AI Agent的核心技术栈解密3.1 感知模块代码示例:使用Python和OpenCV进行图像识别 3.2 认知与决策模块代码示例:使用OpenAI GPT-3进…...
为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?
在建筑行业,项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升,传统的管理模式已经难以满足现代工程的需求。过去,许多企业依赖手工记录、口头沟通和分散的信息管理,导致效率低下、成本失控、风险频发。例如&#…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...
