实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本
1. Hudi 表管理
1.1 Hudi 表基础管理
创建 Hudi 表
在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例):
CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRING,update_time TIMESTAMP
)
PARTITIONED BY (order_date)
STORED AS PARQUET
TBLPROPERTIES ('type'='MERGE_ON_READ','primaryKey'='order_id','preCombineField'='update_time'
);
1.2 数据操作
插入/更新数据
利用 Hudi 写入工具(如 Spark)进行批量或实时插入更新:
from pyspark.sql import SparkSession
from datetime import datetimespark = SparkSession.builder \.appName("Hudi Example") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.getOrCreate()# 加载数据
data = [{"order_id": "1", "user_id": "101", "product_id": "201", "amount": 99.99, "order_date": "2025-01-01", "update_time": datetime.now()},{"order_id": "2", "user_id": "102", "product_id": "202", "amount": 199.99, "order_date": "2025-01-01", "update_time": datetime.now()}
]
df = spark.createDataFrame(data)# 写入 Hudi
hudi_options = {"hoodie.table.name": "dwd_order_fact","hoodie.datasource.write.recordkey.field": "order_id","hoodie.datasource.write.precombine.field": "update_time","hoodie.datasource.write.partitionpath.field": "order_date","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.table.type": "MERGE_ON_READ","hoodie.datasource.hive.sync.enable": "true","hoodie.datasource.hive.database": "real_time_dw","hoodie.datasource.hive.table": "dwd_order_fact","hoodie.datasource.hive.partition_fields": "order_date"
}df.write.format("hudi").options(**hudi_options).mode("append").save("hdfs://path/to/hudi/dwd_order_fact")
1.3 Hudi 表维护
表清理
- 配置清理策略,清理过期版本:
保留最近 10 个提交版本。hoodie.cleaner.commits.retained=10 hoodie.cleaner.policy=KEEP_LATEST_COMMITS
表压缩
- 针对 MOR 表,定期运行 compaction 任务:
spark-submit --class org.apache.hudi.utilities.HoodieCompactor \--master yarn \--table-path hdfs://path/to/hudi/dwd_order_fact \--table-name dwd_order_fact
元数据管理
- 更新 Hive 元数据:
MSCK REPAIR TABLE real_time_dw.dwd_order_fact;
2. Flink 性能调优
2.1 Checkpoint 性能优化
增量 Checkpoint
启用 RocksDB 增量检查点,减少状态存储大小:
env.getCheckpointConfig().enableIncrementalCheckpoints(true);
异步快照
减少 Checkpoint 对性能的影响:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60秒超时
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 优先使用Checkpoint恢复
2.2 Watermark 优化
如果数据有延迟,可以允许一定的 out-of-order 数据处理:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 最大延迟5秒.withTimestampAssigner((event, timestamp) -> event.getEventTime());
2.3 状态管理优化
状态后端选择
- 优先选择 RocksDB 状态后端,支持更大的状态数据:
env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/checkpoints", true));
TTL(Time-to-Live)设置
- 自动清理无用状态:
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
2.4 Task Slot 配置
根据并发优化 TaskManager:
- 每个 TaskManager 提供更多 slots:
taskmanager.numberOfTaskSlots: 4
3. 治理工具脚本
3.1 数据质量治理(Great Expectations)
脚本自动化
以下 Python 脚本可以实现自动化数据校验(如字段非空和值域校验):
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContextcontext = DataContext()batch_request = BatchRequest(datasource_name="my_s3_datasource",data_connector_name="default_runtime_data_connector_name",data_asset_name="dwd_order_fact",runtime_parameters={"path": "s3://path/to/hudi/dwd_order_fact/"},batch_identifiers={"default_identifier_name": "2025-01-01"}
)validator = context.get_validator(batch_request=batch_request)# 非空校验
validator.expect_column_values_to_not_be_null("order_id")
# 值域校验
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "SHIPPED", "CANCELLED"])
# 保存结果
validator.save_expectation_suite("order_fact_suite")context.run_validation_operator("action_list_operator",assets_to_validate=[validator]
)
3.2 数据权限管理(Apache Ranger)
策略 JSON 配置
以下为权限策略 JSON 文件的示例,适用于 Ranger API 批量添加策略:
{"policyName": "dwd_order_fact_policy","serviceType": "hive","resources": {"database": {"values": ["real_time_dw"],"isExcludes": false,"isRecursive": false},"table": {"values": ["dwd_order_fact"],"isExcludes": false,"isRecursive": false}},"policyItems": [{"accesses": [{"type": "select", "isAllowed": true}],"users": ["bi_user"],"groups": ["BI_Group"]},{"accesses": [{"type": "select", "isAllowed": true}, {"type": "insert", "isAllowed": true}],"users": ["etl_user"],"groups": ["ETL_Team"]}]
}
通过 Ranger REST API 部署该策略:
curl -u admin:admin -H "Content-Type: application/json" -X POST -d @policy.json http://<RANGER_HOST>:6080/service/public/v2/api/policy
3.3 数据血缘治理(Apache Atlas)
Flink 血缘注册脚本
通过 REST API 自动将 Flink 作业的输入输出血缘关系上传到 Atlas:
curl -X POST http://<ATLAS_HOST>:21000/api/atlas/v2/entity \
-H "Content-Type: application/json" \
-d '{"entity": {"typeName": "process","attributes": {"name": "flink_order_job","inputs": [{"typeName": "kafka_topic", "uniqueAttributes": {"qualifiedName": "order_topic"}}],"outputs": [{"typeName": "hdfs_path", "uniqueAttributes": {"qualifiedName": "hdfs://path/to/hudi/dwd_order_fact"}}]}}
}'
相关文章:
实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本
1. Hudi 表管理 1.1 Hudi 表基础管理 创建 Hudi 表 在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例): CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRIN…...
Kotlin 数据类与密封类
Kotlin 数据类与密封类 引言 在 Kotlin 中,数据类和密封类是两种非常重要的类类型,它们各自具有独特的用途和优势。数据类主要用于存储数据,而密封类则用于表示受限的类层次结构。在本篇文章中,我们将深入探讨 Kotlin 中的数据类…...
大模型推理加速调研(框架、方法)
大模型推理加速调研(框架、方法) 大模型推理框架调研总结推理框架TensorRT-LLMllama.cppmnn-llmfastllmmlc-llm 环境搭建&部署推理环境llama.cppfastllmmnn-llmvllm vllm_openai_completions.pylmdeployTensorRT-LLM 大模型加速技术总结模型压缩量化…...
C语言进阶(3)--字符函数和字符串函数
本章重点 重点介绍处理字符和字符串的库函数的使用和注意事项 目录 0.前言 1.函数介绍 1.1 strlen - 计算字符串长度 1.2 strcpy - 复制字符串 1.3 strcat - 追加字符串 1.4 strcmp - 字符串比较 1.5 strncpy - 受限制复制 1.6 strncat - 受限制追加 1.7 strncmp - 受限制比…...
微服务拆分的艺术:构建高效、灵活的系统架构
目录 一、微服务拆分的重要性 二、微服务拆分的策略 1. 按照业务领域拆分 2. 按照团队结构拆分 3. 按照业务边界拆分 4. 按照数据和数据库拆分 5. 按照用户界面或外部接口拆分 6. 按照功能模块或领域驱动设计拆分 7. 按照性能和可伸缩性需求拆分 三、微服务拆分的实践…...
记录一次电脑被入侵用来挖矿的过程(Trojan、Miner、Hack、turminoob)
文章目录 0、总结1、背景2、端倪3、有个微软的系统更新,就想着更新看看(能否冲掉问题)4、更新没成功,自动重启电脑5、风险文件(好家伙命名还挺规范,一看名字就知道出问题了)6、开机有一些注册表…...
计算机xinput1_4.dll丢失怎么修复?
电脑运行时常见问题及修复指南 作为软件开发从业者,深知电脑在日常使用中难免会遇到各种问题,如文件丢失、文件损坏和系统报错等。这些问题不仅影响工作效率,还可能带来数据丢失的风险。本文将详细介绍一些常见问题及其解决办法,…...
高等数学学习笔记 ☞ 连续函数的运算与性质
1. 连续函数的运算 1. 连续函数的四则运算: (1)若函数在点处连续,则函数在点处也连续。 (2)若函数在区间上连续,则函数在区间上也连续。 2. 反函数的连续性: 若函数在定义域上是单…...
k8s基础(4)—Kubernetes-Service
Service概述 抽象层 k8s的Service是一种抽象层,用于为一组具有相同功能的Pod提供一个统一的入口地址,并通过负载均衡将网络流量分发到这些Pod上。 Service解决了Pod动态变化的问题,例如Pod的IP地址和端口可能会发生变化,通过…...
CAN或者CANFD的Busoff的恢复时间会受到报文周期的影响么?
目录 分析恢复机制角度快恢复和慢恢复策略角度特殊情况分析分析 Busoff的恢复时间通常不会直接受到报文周期的影响,以下是具体分析: 恢复机制角度 CAN总线的节点在Busoff状态下,恢复过程主要是等待总线上出现128个连续的11bit隐性位,与报文周期并无直接关联。无论报文周…...
【DevOps】Jenkins部署
Jenkins部署 文章目录 Jenkins部署资源列表基础环境一、部署Gilab1.1、安装Gitlab1.2、修改配置文件1.3、加载配置文件1.4、访问Gitlab1.5、修改root登录密码1.6、创建demo测试项目1.7、上传代码1.8、验证上传的代码 二、部署Jenkins所需软件2.1、部署JDK2.2、部署Tomcat2.3、部…...
【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法)
【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法) 引言 该文章实现了一个可解释的神经网络回归模型,使用BP神经网络(BPNN)来预测特征输出。该模型利用七个变量参数作为输入特征进行训练。为…...
【Shell编程 / 4】函数定义、脚本执行与输入输出操作
文章目录 函数 与 脚本定义函数示例:简单的 Shell 函数函数参数返回值 脚本执行创建脚本执行脚本 输入输出输出:echo 和 printf输入:read 命令 命令行参数示例:传递参数 函数 与 脚本 在 Shell 编程中,函数和脚本是组…...
RK3588+麒麟国产系统+FPGA+AI在电力和轨道交通视觉与采集系统的应用
工业视觉识别系统厂家提供的功能主要包括: 这些厂家通过先进的视觉识别技术,实现图像的采集、处理与分析。系统能够自动化地完成质量检测、物料分拣、设备监控等任务,显著提升生产效率和产品质量。同时,系统具备高度的灵活性和可扩…...
MySQL 01 02 章——数据库概述与MySQL安装篇
一、数据库概述 (1)为什么要使用数据库 数据库可以实现持久化,什么是持久化:数据持久化意味着将内存中的数据保存到硬盘上加以“固化”持久化的主要作用是:将内存中的数据存储在关系型数据库中,当然也可以…...
运行framework7
安装 framework7 下载地址https://gitcode.com/gh_mirrors/fr/framework7-vue node 下载 https://nodejs.cn/#ionic 配置npm 的镜像源 npm config set registry https://registry.npmmirror.com 下载nvm 进行nvm管理https://www.downza.cn/soft/352547.html 我一开始使用node…...
【Web】软件系统安全赛CachedVisitor——记一次二开工具的经历
明天开始考试周,百无聊赖开了一把CTF,还顺带体验了下二开工具,让无聊的Z3很开心🙂 CachedVisitor这题 大概描述一下:从main.lua加载一段visit.script中被##LUA_START##(.-)##LUA_END##包裹的lua代码 main.lua loca…...
实现自定义集合类:深入理解C#中的IEnumerable<T>接口
文章目录 介绍主要成员示例代码约束常见的约束类型示例代码介绍 在C#中,IEnumerable<T> 是一个泛型接口,用于表示可以被枚举的集合。它定义了用于遍历集合中元素的方法和属性。IEnumerable<T> 是 IEnumerable 的泛型版本,提供了类型安全的枚举功能。 当我们实…...
Compression Techniques for LLMs
Compression Techniques for LLMs 随着大型语言模型(LLMs)的迅速发展,提高其计算效率和存储效率成为研究的重要方向。为了实现这一目标,诸多压缩技术应运而生。本文将深入探讨几种有效的压缩技术,这些技术不仅能够降低…...
Nexus Message Transaction Services(MTS)
Nexus 系列交换机遇到以下情形时,可以尝试查看是否是 MTS 消息卡在缓冲区过多,因为 MTS 负责处理模块内以及跨模块(包括跨管理引擎)的各服务之间的消息路由和排队。 • CPU 高 • 命令行无响应、响应慢 • 控制平面中断 • 流量问…...
从GRID到Common Voice:不同语音语料库到底该怎么选?(附适用场景与优缺点对比)
语音语料库选型指南:从科研到落地的精准匹配策略 语音技术从业者常面临一个关键挑战:如何在众多语料库中找到最适合特定任务的数据资源?本文将深入解析主流语音语料库的核心特性、适用场景与潜在限制,帮助您建立系统化的选型决策框…...
UnrealPakViewer终极指南:5分钟掌握UE4 Pak文件分析的免费神器
UnrealPakViewer终极指南:5分钟掌握UE4 Pak文件分析的免费神器 【免费下载链接】UnrealPakViewer 查看 UE4 Pak 文件的图形化工具,支持 UE4 pak/ucas 文件 项目地址: https://gitcode.com/gh_mirrors/un/UnrealPakViewer 你是否曾被UE4项目中庞大…...
上海会场 | 5-6月学术会议征稿通知
6场会议覆盖图像处理、城市规划、半导体通信、风险管理、低碳能源与区块链经济 5月上海会议 第三届环境工程、城市规划与设计国际学术会议(EEUPD 2026) 开会时间:2026年5月8日-10日 会议亮点:环境工程、城市规划与设计同场讨论…...
D2DX终极指南:如何让经典暗黑破坏神2在现代PC上焕发新生
D2DX终极指南:如何让经典暗黑破坏神2在现代PC上焕发新生 【免费下载链接】d2dx D2DX is a complete solution to make Diablo II run well on modern PCs, with high fps and better resolutions. 项目地址: https://gitcode.com/gh_mirrors/d2/d2dx 你是否曾…...
计算机毕业设计:Python农作物产量智能预估与数据看板 Flask框架 XGBoost 机器学习 数据分析 可视化 大数据 大模型(建议收藏)✅
1、项目介绍 技术栈 采用 Python 语言开发,基于 Flask 框架搭建后端服务,使用 MySQL 数据库进行数据存储,通过 pymysql 连接数据库,运用 XGBoost 机器学习模型实现产量预测,前端结合 HTML、CSS、JavaScript、Echarts 和…...
最后90天窗口期:2026奇点大会确认的AGI算力-数据-对齐三角瓶颈即将被打破,你的团队准备好了吗?
第一章:2026奇点智能技术大会:通用人工智能最新进展 2026奇点智能技术大会(https://ml-summit.org) 本届大会首次披露多项突破性成果,标志着通用人工智能(AGI)正从理论验证迈入系统化工程实践阶段。来自DeepMind、Op…...
终极京东抢购神器:JDspyder自动化脚本完整使用指南
终极京东抢购神器:JDspyder自动化脚本完整使用指南 【免费下载链接】JDspyder 京东预约&抢购脚本,可以自定义商品链接 项目地址: https://gitcode.com/gh_mirrors/jd/JDspyder 还在为抢不到心仪商品而烦恼吗?JDspyder是一款专业的…...
如何用 event.composedPath 获取事件触发经过的所有节点
event.composedPath()用于获取事件在Shadow DOM中的完整传播路径,返回从目标节点到根节点的数组;适用于Web Components中跨Shadow边界精准判断事件来源或委托。event.composedPath() 是一个用于获取事件在 Shadow DOM 中传播路径的方法,它返回…...
终极Typhoeus常见问题解决手册:从超时设置到代理配置的完整指南
终极Typhoeus常见问题解决手册:从超时设置到代理配置的完整指南 【免费下载链接】typhoeus Typhoeus wraps libcurl in order to make fast and reliable requests. 项目地址: https://gitcode.com/gh_mirrors/ty/typhoeus Typhoeus是一个基于libcurl的Ruby…...
云网络架构设计
云网络架构设计:构建数字时代的智能连接 在数字化转型的浪潮中,云网络架构设计成为企业实现高效、弹性与安全的关键。随着云计算、大数据和人工智能技术的普及,传统网络架构已无法满足动态业务需求。云网络通过虚拟化、自动化和分布式技术&a…...
