当前位置: 首页 > news >正文

拉链表-spark版本

采用spark实现的拉链表

拉链表初始化

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.lit/*** 拉链表初始化*/
object table_zip_initial {val lastDay = "9999-12-31"def main(args: Array[String]): Unit = {var table_base = "t_uac_organization" //基表var table_zip = "ods_uac_org_zip" //拉链表/*** 基于该天的t_uac_organization*/var dt = "2023-01-31"System.setProperty("HADOOP_USER_NAME", "root")val builder = SparkUtils.getBuilderif (System.getProperties.getProperty("os.name").contains("Windows")) {builder.master("local[*]")} else {table_base = args(0)table_zip = args(1)dt = args(2)}val spark = builder.appName(this.getClass.getName).getOrCreate()val hive_db = "common"spark.sql(s"use $hive_db")/*** 初始化,一次*/if (!TableUtils.tableExists(spark, hive_db, table_zip)) {println(s"$table_zip not exists,初始化")init(dt, spark, hive_db, table_base, table_zip)} else {val t_zip = spark.sql(s"""||select * from $table_zip where dt='$lastDay'||""".stripMargin)if (t_zip.isEmpty) {//initprintln(s"$table_zip isEmpty 初始化")init(dt, spark, hive_db, table_base, table_zip)} else {println(s"$table_zip exist and not empty,无需初始化!!!")}}spark.stop()}private def init(dt: String, spark: SparkSession, hive_db: String, table_base: String, table_zip: String): Unit = {val t_base = spark.sql(s"""||select * from $table_base where dt='${dt}'|""".stripMargin)println(s"$table_base show")t_base.show(false)val ods_zip = t_base.drop("dt").withColumn("t_start", lit(dt)).withColumn("t_end", lit(lastDay)).withColumn("dt", lit(lastDay))if (!ods_zip.isEmpty) {println(s"$table_zip show")ods_zip.show(false)println(s"$table_zip 初始化...")SinkUtil.sink_to_hive(lastDay, spark, ods_zip, hive_db, hive_table = s"$table_zip", "parquet", MySaveMode.OverWriteByDt)} else {println(s"$table_zip is empty,初始化失败...")}}
}

拉链表每日滚动计算

import org.apache.spark.sql.functions.{count, lit}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.storage.StorageLevel/*** 拉链表只能从装载首日起,一天一天滚动计算*/
object ods_uac_org_zip {val lastDay = "9999-12-31"def main(args: Array[String]): Unit = {var dt = "2023-02-01"var dt1 = "2023-02-02"System.setProperty("HADOOP_USER_NAME", "root")val builder = SparkUtils.getBuilderif (System.getProperties.getProperty("os.name").contains("Windows")) {builder.master("local[*]")} else {dt = args(0)dt1 = args(1)}val spark = builder.appName(this.getClass.getName).getOrCreate()val hive_db = "common"spark.sql(s"use $hive_db")new IDate {override def onDate(dt: String): Unit = {processByDt(spark, dt, hive_db)}}.invoke(dt, dt1)spark.stop()}/*** 滚动计算每个dt的对应的过期数据*/def processByDt(spark: SparkSession, dt: String, hive_db: String): Unit = {val theDayBeforeDt = DateUtil.back1Day(dt + " 00:00:00").split(" ")(0)/*** 一定需要先缓存* 否则重算则fileNotFoundException* 因此需要借助临时表处理或者设置ck*/var ods_uac_org_zip = spark.sql(s"""||select * from ods_uac_org_zip where dt='$lastDay'|""".stripMargin).persist(StorageLevel.MEMORY_ONLY_SER_2)/*** 持久化为临时表*/ods_uac_org_zip.repartition(3).write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(s"${hive_db}.ods_uac_org_zip_tmp")/*** 已经指向临时表* 后续方便对源表(ods_uac_org_zip)进行更新*/ods_uac_org_zip = spark.sql(s"""||select * from ods_uac_org_zip_tmp|""".stripMargin)/*** old,已经存在的拉链表的最新全量*/val f_old_9999 = ods_uac_org_zip.drop("dt")println("f_old_9999 show")f_old_9999.show(false)/*** dt该天的新增和变化*/val f_new = spark.sql(s"""||select * from new_change_t_uac_organization where dt='${dt}'|""".stripMargin).drop("dt").withColumnRenamed("id", "id2").withColumnRenamed("org_name", "org_name2").withColumnRenamed("parent_id", "parent_id2").withColumnRenamed("sort", "sort2").withColumnRenamed("org_type", "org_type2").withColumnRenamed("org_level", "org_level2").withColumnRenamed("is_auth_scope", "is_auth_scope2").withColumnRenamed("parent_auth_scope_id", "parent_auth_scope_id2").withColumnRenamed("status", "status2").withColumnRenamed("icon_class", "icon_class2").withColumnRenamed("create_id", "create_id2").withColumnRenamed("create_time", "create_time2").withColumnRenamed("update_id", "update_id2").withColumnRenamed("update_time", "update_time2").withColumnRenamed("version", "version2").withColumn("t_start2", lit(dt)).withColumn("t_end2", lit(lastDay))println("f_new show")f_new.show(false)val f1 = f_old_9999.join(f_new, f_old_9999.col("id") === f_new.col("id2"), "full_outer")f1.createOrReplaceTempView("v1")println("v1 temp show")f1.show(false)f1.filter(s"id='1008'").show(false)/*** 这是所有dt=9999的*/val f_9999: DataFrame = spark.sql("""||select|nvl(id2,id) as id|,nvl(org_name2,org_name) as org_name|,nvl(parent_id2,parent_id) as parent_id|,nvl(sort2,sort) as sort|,nvl(org_type2,org_type) as org_type|,nvl(org_level2,org_level) as org_level|,nvl(is_auth_scope2,is_auth_scope) as is_auth_scope|,nvl(parent_auth_scope_id2,parent_auth_scope_id) as parent_auth_scope_id|,nvl(status2,status) as status|,nvl(icon_class2,icon_class) as icon_class|,nvl(create_id2,create_id) as create_id|,nvl(create_time2,create_time) as create_time|,nvl(update_id2,update_id) as update_id|,nvl(update_time2,update_time) as update_time|,nvl(version2,version) as version|,nvl(t_start2,t_start) as t_start|,nvl(t_end2,t_end) as t_end|,nvl(t_end2,t_end) as dt||from v1|||""".stripMargin)/*** +----+--------------------------------------------+---------+----+--------+---------+-------------+--------------------+------+-------------------------+---------+-------------------+---------+-------------------+-------+----------+----------+----------+* |id  |org_name                                    |parent_id|sort|org_type|org_level|is_auth_scope|parent_auth_scope_id|status|icon_class               |create_id|create_time        |update_id|update_time        |version|t_start   |t_end     |dt        |* +----+--------------------------------------------+---------+----+--------+---------+-------------+--------------------+------+-------------------------+---------+-------------------+---------+-------------------+-------+----------+----------+----------+* |1   |运营系统                                    |0        |0   |4       |1        |N            |null                |1     |iconfont icon-xitong     |655      |2019-05-20 17:58:11|null     |null               |null   |2023-01-31|9999-12-31|9999-12-31|*/println("f_9999 show")f_9999.show(false)println(s"在${dt}的发生状态变化的,新的有效区间[$dt,$lastDay]...")f_9999.filter(s"t_start='$dt'").show()f_9999.groupBy("dt").agg(count("id")).show()/*** 过期的数据* 需要闭合t_end* dt天发现有变化,那么则在dt-1天过期* 过期的数据:上一次的起始时间,必然小于dt(这个条件很重要,否则幂等计算会有问题,会把计算过的历史分区的起始时间给覆盖掉)*/val f_expire = spark.sql(s"""||select|id,|org_name,|parent_id,|sort,|org_type,|org_level,|is_auth_scope,|parent_auth_scope_id,|status,|icon_class,|create_id,|create_time,|update_id,|update_time,|version,|t_start,|cast(date_add('${dt}',-1) as string) as t_end,|cast(date_add('${dt}',-1) as string) as dt||from v1|where id2 is not null and id is not null and t_start<'$dt'||""".stripMargin)println("f_expire show")f_expire.show(false)/*** 没有动态分区,那就分别各自持久化*/if (!f_9999.isEmpty) {SinkUtil.sink_to_hive(lastDay, spark, f_9999, hive_db, hive_table = "ods_uac_org_zip", "parquet", MySaveMode.OverWriteByDt)}if (!f_expire.isEmpty) {SinkUtil.sink_to_hive(theDayBeforeDt, spark, f_expire, hive_db, hive_table = "ods_uac_org_zip", "parquet", MySaveMode.OverWriteByDt)}}
}

相关文章:

拉链表-spark版本

采用spark实现的拉链表 拉链表初始化 import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.lit/*** 拉链表初始化*/ object table_zip_initial {val lastDay "9999-12-31"def main(args: Array[String]): Unit {var table_base &q…...

【笔记1-2】Qt系列:QkeyEvent 键盘事件 设定快捷键

参考文献 QKeyEvent 类用来描述一个键盘事件。当键盘按键被按下或者被释放时&#xff0c;键盘事件便会被发送给拥有键盘输人焦点的部件。QKeyEvent 的 key() 函数可以获取具体的按键关键字。需要特别说明的是&#xff0c;回车键在这里是 Qt::Key_Return&#xff1b;键盘上的一…...

adb突然获取不到华为/荣耀手机。。。

手机一开始都是好好的&#xff0c;adb获取正常&#xff0c;adb执行命令正常。突然有一天不好使了。。。。。 重启、换usb线都试过。。。。。。 看到hisuite模式和adb冲突这篇帖子&#xff0c;尝试下载华为手机助手去链接&#xff0c;但一直连接不上。 最后我的处理方法是&#…...

layui的layer.confirm获取按钮焦点

因为ayer.confirm的按钮并非采用button&#xff0c;而是a标签&#xff0c;所以获取按钮焦点获取不到&#xff0c;要采用别的方法&#xff0c;下面介绍在ie11中和ie8中不同的写法 在ie11中 layer.confirm(确定取消这个弹窗吗&#xff1f;,{btn: [确定, 取消],success:function…...

【HarmonyOS】鸿蒙应用开发基础认证题目

系列文章目录 【HarmonyOS】鸿蒙应用开发基础认证题目&#xff1b; 文章目录 系列文章目录前言一、判断题二、单选题三、多选题总结 前言 随着鸿蒙系统的不断发展&#xff0c;前不久&#xff0c;华为宣布了重磅消息&#xff0c;HarmonyOS next 开发者版本会在明年&#xff08;…...

Mocha

Mocha介绍 介绍 Cypress 底层依赖于很多优秀的开源测试框架&#xff0c;其中就有 MochaMocha 是一个适用于 Node.js 和浏览器的测试框架&#xff0c;它使得异步测试变得简单 JS 语言带来的问题 JS 是单线程异步执行的&#xff0c;这使得测试变得复杂&#xff0c;因为无法像…...

Java详解I/O

前言&#xff1a; 小弟能力不足&#xff0c;认知有限&#xff0c;难免考虑不全面&#xff0c;希望大佬能给出更好的建议&#xff0c;指出存在的问题和不足&#xff0c;在此跪谢。 IO发展史 Java中对于I/O能力的支持主要分为三个比较关键的阶段&#xff1a; BIO 第一个阶段…...

数据处理生产环境_spark获取df列当前日期的前一天日期

需求描述&#xff1a; 我现在有一个dataframe,名为dfin,样例数据如下 a1_id_lxha2_PHtime比亚迪_汉1232023-11-15 12:12:23比亚迪_汉1252023-11-15 13:14:51比亚迪_汉1232023-11-15 12:13:23比亚迪_汉1262023-11-16 14:12:34比亚迪_秦2312023-11-15 14:12:28比亚迪_秦2342023…...

第四代智能井盖传感器,实时守护井盖位安全

城市管理中井盖的安全问题始终是一个不容忽视的方面。传统的巡检方式不仅效率低下&#xff0c;无法实现实时监测&#xff0c;而且很难准确掌握井盖的异动状态。因此智能井盖传感器的应用具有重要意义。这种智能传感器可以帮助政府实时掌握井盖的状态&#xff0c;一旦发现异常情…...

【前端知识】Node——文件流的读写操作

四种基本流类型: 1.Writable: 可以向其写入数据的流 2.Readable: 可以从中读取数据的流 3.Duplex&#xff1a;同时为Readable 和 Writable 4.Transform: Duplex可以在写入和读取数据时修改或转换数据的流 一、Readable const fs require(fs);// 创建文件的Readable const rea…...

解决证书加密问题:OpenSSL与urllib3的兼容性与优化

在使用客户端证书进行加密通信时&#xff0c;用户可能会遇到一些问题。特别是当客户端证书被加密并需要密码保护时&#xff0c;OpenSSL会要求用户输入密码。这对于包含多个调用的大型会话来说并不方便&#xff0c;因为密码无法在连接的多个调用之间进行缓存和重复使用。用户希望…...

#gStore-weekly | gAnswer源码解析 调用NE模块流程

简介 gAnswer系统的主要思想&#xff0c;是将自然语言问题转化为语义查询图&#xff0c;再和RDF图做子图匹配。在转换成查询图的第一步就是确定查询图的节点&#xff0c;即节点提取&#xff08;Node Extraction, NE&#xff09;。 查询图中的节点由实体&#xff08;entity&am…...

vscode 配置 lua

https://luabinaries.sourceforge.net/ 官网链接 主要分为4个步骤 下载压缩包&#xff0c;然后解压配置系统环境变量配置vscode的插件测试 这里你可以选择用户变量或者系统环境变量都行。 不推荐空格的原因是 再配置插件的时候含空格的路径 会出错&#xff0c;原因是空格会断…...

vscode设置代码模板

一键生成vue3模板代码 效果演示 输入vue3 显示快捷键 按回车键 一键生成自定义模板 实现方法 进入用户代码片段设置 选择片段语言 vue.json输入自定义的代码片段 prefix是触发的内容&#xff0c;按自己的喜好来就行&#xff1b; body是模板代码&#xff0c;写入自己需要的…...

用css实现原生form中radio单选框和input的hover已经focus的样式

一.问题描述&#xff1a;用css实现原生form中radio单选框和input的hover已经focus的样式 在实际的开发中&#xff0c;一般公司ui都会给效果图&#xff0c;比如单选按钮radio样式&#xff0c;input输入框hover的时候样式&#xff0c;以及focus的时候样式&#xff0c;等等&#…...

uniapp:录音权限检查,录音功能

1.可以使用&#xff1a;plus.navigator.checkPermission检查运行环境的权限 2.如果是"undetermined"表示程序未确定是否可使用此权限&#xff0c;此时调用对应的API时系统会弹出提示框让用户确认&#xff1a;plus.audio.getRecorder() <template><view cla…...

Rust开发——切片(slice)类型

1、什么是切片 在 Rust 中&#xff0c;切片&#xff08;slice&#xff09;是一种基本类型和序列类型。在 Rust 官方文档中&#xff0c;切片被定义为“对连续序列的动态大小视图”。 但在rust的Github 源码中切片被定义如下&#xff1a; 切片是对一块内存的视图&#xff0c;表…...

如何给shopify motion主题的产品系列添加description

一、Description是什么 Description是一种HTML标签类型&#xff0c;通过指定Description的内容&#xff0c;可以帮助搜索引擎以及用户更好的理解当前网页包含的主要了内容。 二、Description有什么作用 1、基本作用&#xff0c;对于网站和网页做一个简单的说明。 2、吸引点击&…...

力扣刷题-二叉树-二叉树最小深度

给定一个二叉树&#xff0c;找出其最小深度。 最小深度是从根节点到最近叶子节点的最短路径上的节点数量。 说明&#xff1a;叶子节点是指没有子节点的节点。&#xff08;注意题意&#xff09; 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#x…...

注解方式优雅的实现 Redisson 分布式锁

1前言 日常开发中&#xff0c;难免遇到一些并发的场景&#xff0c;为了保证接口执行的一致性&#xff0c;通常采用加锁的方式&#xff0c;因为服务是分布式部署模式&#xff0c;本地锁Reentrantlock和Synchnorized这些就先放到一边了&#xff0c;Redis的setnx锁存在无法抱保证…...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

PHP和Node.js哪个更爽?

先说结论&#xff0c;rust完胜。 php&#xff1a;laravel&#xff0c;swoole&#xff0c;webman&#xff0c;最开始在苏宁的时候写了几年php&#xff0c;当时觉得php真的是世界上最好的语言&#xff0c;因为当初活在舒适圈里&#xff0c;不愿意跳出来&#xff0c;就好比当初活在…...

Day131 | 灵神 | 回溯算法 | 子集型 子集

Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣&#xff08;LeetCode&#xff09; 思路&#xff1a; 笔者写过很多次这道题了&#xff0c;不想写题解了&#xff0c;大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

2021-03-15 iview一些问题

1.iview 在使用tree组件时&#xff0c;发现没有set类的方法&#xff0c;只有get&#xff0c;那么要改变tree值&#xff0c;只能遍历treeData&#xff0c;递归修改treeData的checked&#xff0c;发现无法更改&#xff0c;原因在于check模式下&#xff0c;子元素的勾选状态跟父节…...

【单片机期末】单片机系统设计

主要内容&#xff1a;系统状态机&#xff0c;系统时基&#xff0c;系统需求分析&#xff0c;系统构建&#xff0c;系统状态流图 一、题目要求 二、绘制系统状态流图 题目&#xff1a;根据上述描述绘制系统状态流图&#xff0c;注明状态转移条件及方向。 三、利用定时器产生时…...

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...

零基础设计模式——行为型模式 - 责任链模式

第四部分&#xff1a;行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习&#xff01;行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想&#xff1a;使多个对象都有机会处…...

iOS性能调优实战:借助克魔(KeyMob)与常用工具深度洞察App瓶颈

在日常iOS开发过程中&#xff0c;性能问题往往是最令人头疼的一类Bug。尤其是在App上线前的压测阶段或是处理用户反馈的高发期&#xff0c;开发者往往需要面对卡顿、崩溃、能耗异常、日志混乱等一系列问题。这些问题表面上看似偶发&#xff0c;但背后往往隐藏着系统资源调度不当…...

SQL慢可能是触发了ring buffer

简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...