Spark写PGSQL分区表
这里写目录标题
- 需求
- 碰到的问题
- 格式问题
- 分区问题(重点)
- 解决
- 完整代码
- 效果
需求
spark程序计算后的数据需要往PGSQL中的分区表进行写入。
碰到的问题
格式问题
使用了字符串格式,导致插入报错。
val frame = df.withColumn("insert_time",current_timestamp()))
Batch entry 0 INSERT INTO t ("a","insert_time") VALUES
(1,'2023-08-01 10:00:00') was aborted: ERROR: column
"insert_time" is of type timestamp without time zone but
expression is of type character varying
分区问题(重点)
一直都是spark计算完后写单表或者hive的表,都需要去手动去维护分区。但是写PGSQL空表(只有表字段,还没有数据,没有创建分区),需要手动先创建分区,否则会报错。
报错信息
Partition key of the failing row contains (insert_time) =
(2023-08-04 21:14:09.641). Call getNextException to see other
errors in the batch.
插入失败的行的分区键包含的时间戳值 2023-08-04 21:14:09.641 在分区表中找不到对应的分区范围。
解决
最终的解决方案是在插入数据之前,通过代码去添加分区,添加好分区后再写入数据即可。
object WritePgSQL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkPostgreSQLPartitionedTable").config("spark.master", "local").getOrCreate()// 设置PostgreSQL连接信息val postgresUrl = "jdbc:postgresql://192.168.160.123:5432/test"val connectionProperties = new java.util.Properties()connectionProperties.setProperty("user", "test")connectionProperties.setProperty("password", "123456")// 创建测试数据val data = Seq((1, "2023-08-01 10:00:00"),(2, "2023-08-02 12:00:00"),(3, "2023-08-03 15:00:00"))val columns = Seq("a", "insert_time1")val df = spark.createDataFrame(data).toDF(columns: _*)val frame = df.drop("insert_time1").withColumn("insert_time", current_timestamp().cast("timestamp"))// 动态创建分区范围// p1 可以换成p20230804这样的分区格式// t为表名// (TIMESTAMP '2023-08-04 00:00:00') 分区开始范围,一般通过代码生成,为计算时间的零点// (TIMESTAMP '2023-08-05 00:00:00') 分区结束范围,一般通过代码生成,为计算时间的下一天零点val createPartitionSql =s"""CREATE TABLE "p1" PARTITION OF t FOR VALUES FROM (TIMESTAMP '2023-08-04 00:00:00') TO (TIMESTAMP '2023-08-05 00:00:00') ;"""println(createPartitionSql)// 执行创建分区 SQLval connection = java.sql.DriverManager.getConnection(postgresUrl, connectionProperties)val statement = connection.createStatement()statement.executeUpdate(createPartitionSql)connection.close()// 将数据写入PostgreSQL分区表frame.write.mode("append").jdbc(postgresUrl, "t", connectionProperties)}
}
完整代码
自动生成当天日期和分区名称
object WritePgSQL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkPostgreSQLPartitionedTable").config("spark.master", "local").getOrCreate()// 设置PostgreSQL连接信息val postgresUrl = "jdbc:postgresql://192.168.160.123:5432/test"val connectionProperties = new java.util.Properties()connectionProperties.setProperty("user", "test")connectionProperties.setProperty("password", "123456")// 创建测试数据val data = Seq((1, "2023-08-01 10:00:00"),(2, "2023-08-02 12:00:00"),(3, "2023-08-03 15:00:00"))val columns = Seq("a", "insert_time1")val df = spark.createDataFrame(data).toDF(columns: _*)val frame = df.drop("insert_time1").withColumn("insert_time", current_timestamp().cast("timestamp"))// 获取今天和明天的时间范围// 获取当前日期val currentDate = LocalDate.now()// 获取下一天的日期val nextDayDate = currentDate.plusDays(1)// 创建固定的时间部分(00:00:00)val startTime = LocalTime.of(0, 0, 0)// 组合日期和时间来得到完整的日期时间,并格式化为字符串val currentDateTimeString = LocalDateTime.of(currentDate, startTime).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))val nextDayDateTimeString = LocalDateTime.of(nextDayDate, startTime).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))// 格式化为yyyyMMdd字符串val dateFormatter = DateTimeFormatter.ofPattern("yyyyMMdd")val currentDateString = currentDate.format(dateFormatter)// 动态创建分区范围val createPartitionSql =s"""CREATE TABLE "p$currentDateString" PARTITION OF tFOR VALUES FROM (TIMESTAMP '$currentDateTimeString') TO (TIMESTAMP '$nextDayDateTimeString') ;"""// 执行创建分区 SQLval connection = java.sql.DriverManager.getConnection(postgresUrl, connectionProperties)val statement = connection.createStatement()statement.executeUpdate(createPartitionSql)connection.close()// 将数据写入PostgreSQL分区表frame.write.mode("append").jdbc(postgresUrl, "t", connectionProperties)}
}
效果


相关文章:
Spark写PGSQL分区表
这里写目录标题 需求碰到的问题格式问题分区问题(重点) 解决完整代码效果 需求 spark程序计算后的数据需要往PGSQL中的分区表进行写入。 碰到的问题 格式问题 使用了字符串格式,导致插入报错。 val frame df.withColumn("insert_t…...
Git 命令行登录
有时候登录命令行版本的git会出现这个错误 1remote: Support for password authentication was removed on August 13, 2021. 2remote: Please see https://docs.github.com/en/get-started/getting-started-with-git/about-remote-repositories#cloning-with-https-urls for …...
性能分析记录
4实例压测TPS浮动在200-300 1.TPS浮动200-300,ART浮动的可能性是10-20ms,链路复杂是可接受的,链路简单则需要分析原因。 1)缓存没命中,对某些账号缓存没命中,或缓存失效后导致隔段时间耗时升高。 2&…...
Java反射学习(大综合)
第一天 Java反射及动态代理... 2 一、 Java反射... 2 1、什么是反射:... 2 2、反射的原理... 2 3、反射的优缺点:... 2 4、反射的用途:... 3 5、反射机制常用的类:... 3 1、获得Class:主要有三…...
Vite+Vue3 开发UI组件库并发布到npm
一直对开源UI组件库比较感兴趣,摸索着开发了一套,虽然还只是开始,但是从搭建到发布这套流程基本弄明白了,现在分享给大家,希望对同样感兴趣的同学有所帮助。 目前我的这套名为hasaki-ui的组件库仅有两个组件࿰…...
vue- form动态表单验证规则-表单验证
前言 以element官网的form表单的-动态增减表单项为例讲解表单验证规则 动态的功能就是v-model配合push v-for 便利来实现的 我们需要熟知2个知识点prop表单验证需要跟v-model绑定的值是一样的, 如果是一个数组便利的表单,那就需要绑定这个数组每一项…...
FPGA学习—通过数码管实现电子秒表模拟
文章目录 一、数码管简介二、项目分析三、项目源码及分析四、实现效果五、总结 一、数码管简介 请参阅博主以前写过的一篇电子时钟模拟,在此不再赘述。 https://blog.csdn.net/qq_54347584/article/details/130402287 二、项目分析 项目说明:本次项目…...
区块链媒体发稿:区块链媒体宣发常见问题解析
据统计,由于区块链应用和虚拟货币的兴起,越来越多媒体对区块链领域开展报导,特别是世界各国媒体宣发全是热火朝天。但是,随着推卸责任媒体宣发的五花八门,让很多人因而上当受骗,乃至伤害一大笔资产。身为投…...
openGauss学习笔记-28 openGauss 高级数据管理-NULL值
文章目录 openGauss学习笔记-28 openGauss 高级数据管理-NULL值28.1 IS NOT NULL28.2 IS NULL openGauss学习笔记-28 openGauss 高级数据管理-NULL值 NULL值代表未知数据。无法比较NULL和0,因为它们是不等价的。 创建表时,可以指定列可以存放或者不能存…...
DAO和XML文件参数和返回值
①MyBatis中resultType和resultMap的区别 1.使用MyBatis查询数据库记录时,返回类型常用的有两种:resultType和resultMap。那么两者之间有什么区别呢? 如果只是返回一个值,简单类型,比如说String或者int,那…...
vue 浏览器右侧可拖拽小组件
目录 0. 使用场景 1. 动图示例 2. 实现方式 2.1 创建drag.js 2.2 使用v-drag 3. 结尾 0. 使用场景 很多网页在浏览器右侧有"导航"或者“智能助手”的悬浮小气泡框,比如我们的csdn☞ 作为页面友好型的引导标注,某些场景下这些小气泡可以…...
SpringMvc学习笔记五
Restful 风格路由 1. 配置类 1.1、SpringMvcConfig配置类 Configuration ComponentScan({"com.itheima.controller", "com.itheima.config"}) 方式1.2 添加com.itheima.config 扫描目录 EnableWebMvc public class SpringMvcConfig { } 1.2、ServletCo…...
ORACLE-DG总结
述 当主库的某些日志没有成功传送到备库,那么这时候就发生了归档裂缝(Archive Gap)。目前Oracle提供了两种日志GAP的检测和处理机制,分别是自动GAP处理(Automatic Gap Resolution)和FAL进程GAP处理(FAL Gap Resolution)。自动GAP处理即主库上的ARCn进程会每分钟检查备库…...
机器学习中的 K-均值聚类算法及其优缺点
K-均值聚类算法是一种常用的无监督学习算法,用于将相似的数据点分组为聚类。 其步骤如下: 1. 初始化:选择聚类数K,随机选取K个聚类中心。 2. 计算距离:计算每个数据点与K个聚类中心的距离,将其分配到距离最…...
【数据化分析和建模】一般步骤(个人工作经验总结)
近期关于【数据化分析和建模】一般步骤的思考如下。 以终为始,要解决什么问题,实现什么效果? 数据可视化分析的首要目标是通过将数据以可视化图表的形式真实、完整地呈现业务现状,为发现业务问题打好基础,包括实时的…...
视频安防监控EasyCVR平台海康大华设备国标GB28181告警布防的报文说明
TSINGSEE青犀视频监控综合管理平台EasyCVR基于云边端协同,可支持海量视频的轻量化接入与汇聚管理。平台既具备传统安防视频监控的能力,比如:视频监控直播、云端录像、云存储、录像检索与回看、告警上报、平台级联、云台控制、语音对讲等&…...
T31开发笔记:librtmp拉流测试
若该文为原创文章,转载请注明原文出处。 T31使用librtmp拉流并保存成FLV文件或H264和AAC文件。 librtmp编译在前面有教程,自行编译。 实现的目的是想要获取获取rtmp的AAC流并播放,实时双向对讲功能。 一、硬件和开发环境 1、硬件࿱…...
2308C++概念化
原文 库 //概念化(需要C20) struct 可画 {void 画(小出流 &out) const {te::call([](auto const &s, auto &out)-> decltype(s.画(out)) { s.画(out); }, *this, out);} }; struct 方形 {void 画(小出流 &out) const { out << "方形"; } }…...
flutter开发实战-实现自定义按钮类似UIButton效果
flutter开发实战-实现自定义按钮类似UIButton效果 最近开发过程中需要实现一下UIButton效果的flutter按钮,这里使用的是监听手势点击事件。 一、GestureDetector GestureDetector属性定义 GestureDetector({super.key,this.child,this.onTapDown,this.onTapUp,t…...
深度优先搜索|1034, 1020, 1254
深度优先搜索|1034. 边界着色, 机器人的运动范围,529. 扫雷游戏 边界着色机器人的运动范围扫雷问题 边界着色 把这个题分段了,先找到包括 (row, col) 的连通分量,然后再去找符合条件的边界,找到以后涂上颜色就行。 c…...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
【Linux】C语言执行shell指令
在C语言中执行Shell指令 在C语言中,有几种方法可以执行Shell指令: 1. 使用system()函数 这是最简单的方法,包含在stdlib.h头文件中: #include <stdlib.h>int main() {system("ls -l"); // 执行ls -l命令retu…...
理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端
🌟 什么是 MCP? 模型控制协议 (MCP) 是一种创新的协议,旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议,它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...
Python爬虫实战:研究feedparser库相关技术
1. 引言 1.1 研究背景与意义 在当今信息爆炸的时代,互联网上存在着海量的信息资源。RSS(Really Simple Syndication)作为一种标准化的信息聚合技术,被广泛用于网站内容的发布和订阅。通过 RSS,用户可以方便地获取网站更新的内容,而无需频繁访问各个网站。 然而,互联网…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
USB Over IP专用硬件的5个特点
USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中,从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备(如专用硬件设备),从而消除了直接物理连接的需要。USB over IP的…...
重启Eureka集群中的节点,对已经注册的服务有什么影响
先看答案,如果正确地操作,重启Eureka集群中的节点,对已经注册的服务影响非常小,甚至可以做到无感知。 但如果操作不当,可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...
九天毕昇深度学习平台 | 如何安装库?
pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子: 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...
