实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本
1. Hudi 表管理
1.1 Hudi 表基础管理
创建 Hudi 表
在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例):
CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRING,update_time TIMESTAMP
)
PARTITIONED BY (order_date)
STORED AS PARQUET
TBLPROPERTIES ('type'='MERGE_ON_READ','primaryKey'='order_id','preCombineField'='update_time'
);
1.2 数据操作
插入/更新数据
利用 Hudi 写入工具(如 Spark)进行批量或实时插入更新:
from pyspark.sql import SparkSession
from datetime import datetimespark = SparkSession.builder \.appName("Hudi Example") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.getOrCreate()# 加载数据
data = [{"order_id": "1", "user_id": "101", "product_id": "201", "amount": 99.99, "order_date": "2025-01-01", "update_time": datetime.now()},{"order_id": "2", "user_id": "102", "product_id": "202", "amount": 199.99, "order_date": "2025-01-01", "update_time": datetime.now()}
]
df = spark.createDataFrame(data)# 写入 Hudi
hudi_options = {"hoodie.table.name": "dwd_order_fact","hoodie.datasource.write.recordkey.field": "order_id","hoodie.datasource.write.precombine.field": "update_time","hoodie.datasource.write.partitionpath.field": "order_date","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.table.type": "MERGE_ON_READ","hoodie.datasource.hive.sync.enable": "true","hoodie.datasource.hive.database": "real_time_dw","hoodie.datasource.hive.table": "dwd_order_fact","hoodie.datasource.hive.partition_fields": "order_date"
}df.write.format("hudi").options(**hudi_options).mode("append").save("hdfs://path/to/hudi/dwd_order_fact")
1.3 Hudi 表维护
表清理
- 配置清理策略,清理过期版本:
保留最近 10 个提交版本。hoodie.cleaner.commits.retained=10 hoodie.cleaner.policy=KEEP_LATEST_COMMITS
表压缩
- 针对 MOR 表,定期运行 compaction 任务:
spark-submit --class org.apache.hudi.utilities.HoodieCompactor \--master yarn \--table-path hdfs://path/to/hudi/dwd_order_fact \--table-name dwd_order_fact
元数据管理
- 更新 Hive 元数据:
MSCK REPAIR TABLE real_time_dw.dwd_order_fact;
2. Flink 性能调优
2.1 Checkpoint 性能优化
增量 Checkpoint
启用 RocksDB 增量检查点,减少状态存储大小:
env.getCheckpointConfig().enableIncrementalCheckpoints(true);
异步快照
减少 Checkpoint 对性能的影响:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60秒超时
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 优先使用Checkpoint恢复
2.2 Watermark 优化
如果数据有延迟,可以允许一定的 out-of-order 数据处理:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 最大延迟5秒.withTimestampAssigner((event, timestamp) -> event.getEventTime());
2.3 状态管理优化
状态后端选择
- 优先选择 RocksDB 状态后端,支持更大的状态数据:
env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/checkpoints", true));
TTL(Time-to-Live)设置
- 自动清理无用状态:
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
2.4 Task Slot 配置
根据并发优化 TaskManager:
- 每个 TaskManager 提供更多 slots:
taskmanager.numberOfTaskSlots: 4
3. 治理工具脚本
3.1 数据质量治理(Great Expectations)
脚本自动化
以下 Python 脚本可以实现自动化数据校验(如字段非空和值域校验):
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContextcontext = DataContext()batch_request = BatchRequest(datasource_name="my_s3_datasource",data_connector_name="default_runtime_data_connector_name",data_asset_name="dwd_order_fact",runtime_parameters={"path": "s3://path/to/hudi/dwd_order_fact/"},batch_identifiers={"default_identifier_name": "2025-01-01"}
)validator = context.get_validator(batch_request=batch_request)# 非空校验
validator.expect_column_values_to_not_be_null("order_id")
# 值域校验
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "SHIPPED", "CANCELLED"])
# 保存结果
validator.save_expectation_suite("order_fact_suite")context.run_validation_operator("action_list_operator",assets_to_validate=[validator]
)
3.2 数据权限管理(Apache Ranger)
策略 JSON 配置
以下为权限策略 JSON 文件的示例,适用于 Ranger API 批量添加策略:
{"policyName": "dwd_order_fact_policy","serviceType": "hive","resources": {"database": {"values": ["real_time_dw"],"isExcludes": false,"isRecursive": false},"table": {"values": ["dwd_order_fact"],"isExcludes": false,"isRecursive": false}},"policyItems": [{"accesses": [{"type": "select", "isAllowed": true}],"users": ["bi_user"],"groups": ["BI_Group"]},{"accesses": [{"type": "select", "isAllowed": true}, {"type": "insert", "isAllowed": true}],"users": ["etl_user"],"groups": ["ETL_Team"]}]
}
通过 Ranger REST API 部署该策略:
curl -u admin:admin -H "Content-Type: application/json" -X POST -d @policy.json http://<RANGER_HOST>:6080/service/public/v2/api/policy
3.3 数据血缘治理(Apache Atlas)
Flink 血缘注册脚本
通过 REST API 自动将 Flink 作业的输入输出血缘关系上传到 Atlas:
curl -X POST http://<ATLAS_HOST>:21000/api/atlas/v2/entity \
-H "Content-Type: application/json" \
-d '{"entity": {"typeName": "process","attributes": {"name": "flink_order_job","inputs": [{"typeName": "kafka_topic", "uniqueAttributes": {"qualifiedName": "order_topic"}}],"outputs": [{"typeName": "hdfs_path", "uniqueAttributes": {"qualifiedName": "hdfs://path/to/hudi/dwd_order_fact"}}]}}
}'
相关文章:
实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本
1. Hudi 表管理 1.1 Hudi 表基础管理 创建 Hudi 表 在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例): CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRIN…...
Kotlin 数据类与密封类
Kotlin 数据类与密封类 引言 在 Kotlin 中,数据类和密封类是两种非常重要的类类型,它们各自具有独特的用途和优势。数据类主要用于存储数据,而密封类则用于表示受限的类层次结构。在本篇文章中,我们将深入探讨 Kotlin 中的数据类…...
大模型推理加速调研(框架、方法)
大模型推理加速调研(框架、方法) 大模型推理框架调研总结推理框架TensorRT-LLMllama.cppmnn-llmfastllmmlc-llm 环境搭建&部署推理环境llama.cppfastllmmnn-llmvllm vllm_openai_completions.pylmdeployTensorRT-LLM 大模型加速技术总结模型压缩量化…...
C语言进阶(3)--字符函数和字符串函数
本章重点 重点介绍处理字符和字符串的库函数的使用和注意事项 目录 0.前言 1.函数介绍 1.1 strlen - 计算字符串长度 1.2 strcpy - 复制字符串 1.3 strcat - 追加字符串 1.4 strcmp - 字符串比较 1.5 strncpy - 受限制复制 1.6 strncat - 受限制追加 1.7 strncmp - 受限制比…...
微服务拆分的艺术:构建高效、灵活的系统架构
目录 一、微服务拆分的重要性 二、微服务拆分的策略 1. 按照业务领域拆分 2. 按照团队结构拆分 3. 按照业务边界拆分 4. 按照数据和数据库拆分 5. 按照用户界面或外部接口拆分 6. 按照功能模块或领域驱动设计拆分 7. 按照性能和可伸缩性需求拆分 三、微服务拆分的实践…...
记录一次电脑被入侵用来挖矿的过程(Trojan、Miner、Hack、turminoob)
文章目录 0、总结1、背景2、端倪3、有个微软的系统更新,就想着更新看看(能否冲掉问题)4、更新没成功,自动重启电脑5、风险文件(好家伙命名还挺规范,一看名字就知道出问题了)6、开机有一些注册表…...
计算机xinput1_4.dll丢失怎么修复?
电脑运行时常见问题及修复指南 作为软件开发从业者,深知电脑在日常使用中难免会遇到各种问题,如文件丢失、文件损坏和系统报错等。这些问题不仅影响工作效率,还可能带来数据丢失的风险。本文将详细介绍一些常见问题及其解决办法,…...
高等数学学习笔记 ☞ 连续函数的运算与性质
1. 连续函数的运算 1. 连续函数的四则运算: (1)若函数在点处连续,则函数在点处也连续。 (2)若函数在区间上连续,则函数在区间上也连续。 2. 反函数的连续性: 若函数在定义域上是单…...
k8s基础(4)—Kubernetes-Service
Service概述 抽象层 k8s的Service是一种抽象层,用于为一组具有相同功能的Pod提供一个统一的入口地址,并通过负载均衡将网络流量分发到这些Pod上。 Service解决了Pod动态变化的问题,例如Pod的IP地址和端口可能会发生变化,通过…...
CAN或者CANFD的Busoff的恢复时间会受到报文周期的影响么?
目录 分析恢复机制角度快恢复和慢恢复策略角度特殊情况分析分析 Busoff的恢复时间通常不会直接受到报文周期的影响,以下是具体分析: 恢复机制角度 CAN总线的节点在Busoff状态下,恢复过程主要是等待总线上出现128个连续的11bit隐性位,与报文周期并无直接关联。无论报文周…...
【DevOps】Jenkins部署
Jenkins部署 文章目录 Jenkins部署资源列表基础环境一、部署Gilab1.1、安装Gitlab1.2、修改配置文件1.3、加载配置文件1.4、访问Gitlab1.5、修改root登录密码1.6、创建demo测试项目1.7、上传代码1.8、验证上传的代码 二、部署Jenkins所需软件2.1、部署JDK2.2、部署Tomcat2.3、部…...
【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法)
【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法) 引言 该文章实现了一个可解释的神经网络回归模型,使用BP神经网络(BPNN)来预测特征输出。该模型利用七个变量参数作为输入特征进行训练。为…...
【Shell编程 / 4】函数定义、脚本执行与输入输出操作
文章目录 函数 与 脚本定义函数示例:简单的 Shell 函数函数参数返回值 脚本执行创建脚本执行脚本 输入输出输出:echo 和 printf输入:read 命令 命令行参数示例:传递参数 函数 与 脚本 在 Shell 编程中,函数和脚本是组…...
RK3588+麒麟国产系统+FPGA+AI在电力和轨道交通视觉与采集系统的应用
工业视觉识别系统厂家提供的功能主要包括: 这些厂家通过先进的视觉识别技术,实现图像的采集、处理与分析。系统能够自动化地完成质量检测、物料分拣、设备监控等任务,显著提升生产效率和产品质量。同时,系统具备高度的灵活性和可扩…...
MySQL 01 02 章——数据库概述与MySQL安装篇
一、数据库概述 (1)为什么要使用数据库 数据库可以实现持久化,什么是持久化:数据持久化意味着将内存中的数据保存到硬盘上加以“固化”持久化的主要作用是:将内存中的数据存储在关系型数据库中,当然也可以…...
运行framework7
安装 framework7 下载地址https://gitcode.com/gh_mirrors/fr/framework7-vue node 下载 https://nodejs.cn/#ionic 配置npm 的镜像源 npm config set registry https://registry.npmmirror.com 下载nvm 进行nvm管理https://www.downza.cn/soft/352547.html 我一开始使用node…...
【Web】软件系统安全赛CachedVisitor——记一次二开工具的经历
明天开始考试周,百无聊赖开了一把CTF,还顺带体验了下二开工具,让无聊的Z3很开心🙂 CachedVisitor这题 大概描述一下:从main.lua加载一段visit.script中被##LUA_START##(.-)##LUA_END##包裹的lua代码 main.lua loca…...
实现自定义集合类:深入理解C#中的IEnumerable<T>接口
文章目录 介绍主要成员示例代码约束常见的约束类型示例代码介绍 在C#中,IEnumerable<T> 是一个泛型接口,用于表示可以被枚举的集合。它定义了用于遍历集合中元素的方法和属性。IEnumerable<T> 是 IEnumerable 的泛型版本,提供了类型安全的枚举功能。 当我们实…...
Compression Techniques for LLMs
Compression Techniques for LLMs 随着大型语言模型(LLMs)的迅速发展,提高其计算效率和存储效率成为研究的重要方向。为了实现这一目标,诸多压缩技术应运而生。本文将深入探讨几种有效的压缩技术,这些技术不仅能够降低…...
Nexus Message Transaction Services(MTS)
Nexus 系列交换机遇到以下情形时,可以尝试查看是否是 MTS 消息卡在缓冲区过多,因为 MTS 负责处理模块内以及跨模块(包括跨管理引擎)的各服务之间的消息路由和排队。 • CPU 高 • 命令行无响应、响应慢 • 控制平面中断 • 流量问…...
相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了: 这一篇我们开始讲: 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下: 一、场景操作步骤 操作步…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
ServerTrust 并非唯一
NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...
今日学习:Spring线程池|并发修改异常|链路丢失|登录续期|VIP过期策略|数值类缓存
文章目录 优雅版线程池ThreadPoolTaskExecutor和ThreadPoolTaskExecutor的装饰器并发修改异常并发修改异常简介实现机制设计原因及意义 使用线程池造成的链路丢失问题线程池导致的链路丢失问题发生原因 常见解决方法更好的解决方法设计精妙之处 登录续期登录续期常见实现方式特…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
短视频矩阵系统文案创作功能开发实践,定制化开发
在短视频行业迅猛发展的当下,企业和个人创作者为了扩大影响力、提升传播效果,纷纷采用短视频矩阵运营策略,同时管理多个平台、多个账号的内容发布。然而,频繁的文案创作需求让运营者疲于应对,如何高效产出高质量文案成…...
Selenium常用函数介绍
目录 一,元素定位 1.1 cssSeector 1.2 xpath 二,操作测试对象 三,窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四,弹窗 五,等待 六,导航 七,文件上传 …...
mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
