实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本
1. Hudi 表管理
1.1 Hudi 表基础管理
创建 Hudi 表
在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例):
CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRING,update_time TIMESTAMP
)
PARTITIONED BY (order_date)
STORED AS PARQUET
TBLPROPERTIES ('type'='MERGE_ON_READ','primaryKey'='order_id','preCombineField'='update_time'
);
1.2 数据操作
插入/更新数据
利用 Hudi 写入工具(如 Spark)进行批量或实时插入更新:
from pyspark.sql import SparkSession
from datetime import datetimespark = SparkSession.builder \.appName("Hudi Example") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.getOrCreate()# 加载数据
data = [{"order_id": "1", "user_id": "101", "product_id": "201", "amount": 99.99, "order_date": "2025-01-01", "update_time": datetime.now()},{"order_id": "2", "user_id": "102", "product_id": "202", "amount": 199.99, "order_date": "2025-01-01", "update_time": datetime.now()}
]
df = spark.createDataFrame(data)# 写入 Hudi
hudi_options = {"hoodie.table.name": "dwd_order_fact","hoodie.datasource.write.recordkey.field": "order_id","hoodie.datasource.write.precombine.field": "update_time","hoodie.datasource.write.partitionpath.field": "order_date","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.table.type": "MERGE_ON_READ","hoodie.datasource.hive.sync.enable": "true","hoodie.datasource.hive.database": "real_time_dw","hoodie.datasource.hive.table": "dwd_order_fact","hoodie.datasource.hive.partition_fields": "order_date"
}df.write.format("hudi").options(**hudi_options).mode("append").save("hdfs://path/to/hudi/dwd_order_fact")
1.3 Hudi 表维护
表清理
- 配置清理策略,清理过期版本:
保留最近 10 个提交版本。hoodie.cleaner.commits.retained=10 hoodie.cleaner.policy=KEEP_LATEST_COMMITS
表压缩
- 针对 MOR 表,定期运行 compaction 任务:
spark-submit --class org.apache.hudi.utilities.HoodieCompactor \--master yarn \--table-path hdfs://path/to/hudi/dwd_order_fact \--table-name dwd_order_fact
元数据管理
- 更新 Hive 元数据:
MSCK REPAIR TABLE real_time_dw.dwd_order_fact;
2. Flink 性能调优
2.1 Checkpoint 性能优化
增量 Checkpoint
启用 RocksDB 增量检查点,减少状态存储大小:
env.getCheckpointConfig().enableIncrementalCheckpoints(true);
异步快照
减少 Checkpoint 对性能的影响:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60秒超时
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 优先使用Checkpoint恢复
2.2 Watermark 优化
如果数据有延迟,可以允许一定的 out-of-order 数据处理:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 最大延迟5秒.withTimestampAssigner((event, timestamp) -> event.getEventTime());
2.3 状态管理优化
状态后端选择
- 优先选择 RocksDB 状态后端,支持更大的状态数据:
env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/checkpoints", true));
TTL(Time-to-Live)设置
- 自动清理无用状态:
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
2.4 Task Slot 配置
根据并发优化 TaskManager:
- 每个 TaskManager 提供更多 slots:
taskmanager.numberOfTaskSlots: 4
3. 治理工具脚本
3.1 数据质量治理(Great Expectations)
脚本自动化
以下 Python 脚本可以实现自动化数据校验(如字段非空和值域校验):
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContextcontext = DataContext()batch_request = BatchRequest(datasource_name="my_s3_datasource",data_connector_name="default_runtime_data_connector_name",data_asset_name="dwd_order_fact",runtime_parameters={"path": "s3://path/to/hudi/dwd_order_fact/"},batch_identifiers={"default_identifier_name": "2025-01-01"}
)validator = context.get_validator(batch_request=batch_request)# 非空校验
validator.expect_column_values_to_not_be_null("order_id")
# 值域校验
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "SHIPPED", "CANCELLED"])
# 保存结果
validator.save_expectation_suite("order_fact_suite")context.run_validation_operator("action_list_operator",assets_to_validate=[validator]
)
3.2 数据权限管理(Apache Ranger)
策略 JSON 配置
以下为权限策略 JSON 文件的示例,适用于 Ranger API 批量添加策略:
{"policyName": "dwd_order_fact_policy","serviceType": "hive","resources": {"database": {"values": ["real_time_dw"],"isExcludes": false,"isRecursive": false},"table": {"values": ["dwd_order_fact"],"isExcludes": false,"isRecursive": false}},"policyItems": [{"accesses": [{"type": "select", "isAllowed": true}],"users": ["bi_user"],"groups": ["BI_Group"]},{"accesses": [{"type": "select", "isAllowed": true}, {"type": "insert", "isAllowed": true}],"users": ["etl_user"],"groups": ["ETL_Team"]}]
}
通过 Ranger REST API 部署该策略:
curl -u admin:admin -H "Content-Type: application/json" -X POST -d @policy.json http://<RANGER_HOST>:6080/service/public/v2/api/policy
3.3 数据血缘治理(Apache Atlas)
Flink 血缘注册脚本
通过 REST API 自动将 Flink 作业的输入输出血缘关系上传到 Atlas:
curl -X POST http://<ATLAS_HOST>:21000/api/atlas/v2/entity \
-H "Content-Type: application/json" \
-d '{"entity": {"typeName": "process","attributes": {"name": "flink_order_job","inputs": [{"typeName": "kafka_topic", "uniqueAttributes": {"qualifiedName": "order_topic"}}],"outputs": [{"typeName": "hdfs_path", "uniqueAttributes": {"qualifiedName": "hdfs://path/to/hudi/dwd_order_fact"}}]}}
}'
相关文章:
实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本
1. Hudi 表管理 1.1 Hudi 表基础管理 创建 Hudi 表 在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例): CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRIN…...
Kotlin 数据类与密封类
Kotlin 数据类与密封类 引言 在 Kotlin 中,数据类和密封类是两种非常重要的类类型,它们各自具有独特的用途和优势。数据类主要用于存储数据,而密封类则用于表示受限的类层次结构。在本篇文章中,我们将深入探讨 Kotlin 中的数据类…...

大模型推理加速调研(框架、方法)
大模型推理加速调研(框架、方法) 大模型推理框架调研总结推理框架TensorRT-LLMllama.cppmnn-llmfastllmmlc-llm 环境搭建&部署推理环境llama.cppfastllmmnn-llmvllm vllm_openai_completions.pylmdeployTensorRT-LLM 大模型加速技术总结模型压缩量化…...

C语言进阶(3)--字符函数和字符串函数
本章重点 重点介绍处理字符和字符串的库函数的使用和注意事项 目录 0.前言 1.函数介绍 1.1 strlen - 计算字符串长度 1.2 strcpy - 复制字符串 1.3 strcat - 追加字符串 1.4 strcmp - 字符串比较 1.5 strncpy - 受限制复制 1.6 strncat - 受限制追加 1.7 strncmp - 受限制比…...
微服务拆分的艺术:构建高效、灵活的系统架构
目录 一、微服务拆分的重要性 二、微服务拆分的策略 1. 按照业务领域拆分 2. 按照团队结构拆分 3. 按照业务边界拆分 4. 按照数据和数据库拆分 5. 按照用户界面或外部接口拆分 6. 按照功能模块或领域驱动设计拆分 7. 按照性能和可伸缩性需求拆分 三、微服务拆分的实践…...

记录一次电脑被入侵用来挖矿的过程(Trojan、Miner、Hack、turminoob)
文章目录 0、总结1、背景2、端倪3、有个微软的系统更新,就想着更新看看(能否冲掉问题)4、更新没成功,自动重启电脑5、风险文件(好家伙命名还挺规范,一看名字就知道出问题了)6、开机有一些注册表…...

计算机xinput1_4.dll丢失怎么修复?
电脑运行时常见问题及修复指南 作为软件开发从业者,深知电脑在日常使用中难免会遇到各种问题,如文件丢失、文件损坏和系统报错等。这些问题不仅影响工作效率,还可能带来数据丢失的风险。本文将详细介绍一些常见问题及其解决办法,…...
高等数学学习笔记 ☞ 连续函数的运算与性质
1. 连续函数的运算 1. 连续函数的四则运算: (1)若函数在点处连续,则函数在点处也连续。 (2)若函数在区间上连续,则函数在区间上也连续。 2. 反函数的连续性: 若函数在定义域上是单…...

k8s基础(4)—Kubernetes-Service
Service概述 抽象层 k8s的Service是一种抽象层,用于为一组具有相同功能的Pod提供一个统一的入口地址,并通过负载均衡将网络流量分发到这些Pod上。 Service解决了Pod动态变化的问题,例如Pod的IP地址和端口可能会发生变化,通过…...
CAN或者CANFD的Busoff的恢复时间会受到报文周期的影响么?
目录 分析恢复机制角度快恢复和慢恢复策略角度特殊情况分析分析 Busoff的恢复时间通常不会直接受到报文周期的影响,以下是具体分析: 恢复机制角度 CAN总线的节点在Busoff状态下,恢复过程主要是等待总线上出现128个连续的11bit隐性位,与报文周期并无直接关联。无论报文周…...

【DevOps】Jenkins部署
Jenkins部署 文章目录 Jenkins部署资源列表基础环境一、部署Gilab1.1、安装Gitlab1.2、修改配置文件1.3、加载配置文件1.4、访问Gitlab1.5、修改root登录密码1.6、创建demo测试项目1.7、上传代码1.8、验证上传的代码 二、部署Jenkins所需软件2.1、部署JDK2.2、部署Tomcat2.3、部…...

【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法)
【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法) 引言 该文章实现了一个可解释的神经网络回归模型,使用BP神经网络(BPNN)来预测特征输出。该模型利用七个变量参数作为输入特征进行训练。为…...
【Shell编程 / 4】函数定义、脚本执行与输入输出操作
文章目录 函数 与 脚本定义函数示例:简单的 Shell 函数函数参数返回值 脚本执行创建脚本执行脚本 输入输出输出:echo 和 printf输入:read 命令 命令行参数示例:传递参数 函数 与 脚本 在 Shell 编程中,函数和脚本是组…...

RK3588+麒麟国产系统+FPGA+AI在电力和轨道交通视觉与采集系统的应用
工业视觉识别系统厂家提供的功能主要包括: 这些厂家通过先进的视觉识别技术,实现图像的采集、处理与分析。系统能够自动化地完成质量检测、物料分拣、设备监控等任务,显著提升生产效率和产品质量。同时,系统具备高度的灵活性和可扩…...

MySQL 01 02 章——数据库概述与MySQL安装篇
一、数据库概述 (1)为什么要使用数据库 数据库可以实现持久化,什么是持久化:数据持久化意味着将内存中的数据保存到硬盘上加以“固化”持久化的主要作用是:将内存中的数据存储在关系型数据库中,当然也可以…...

运行framework7
安装 framework7 下载地址https://gitcode.com/gh_mirrors/fr/framework7-vue node 下载 https://nodejs.cn/#ionic 配置npm 的镜像源 npm config set registry https://registry.npmmirror.com 下载nvm 进行nvm管理https://www.downza.cn/soft/352547.html 我一开始使用node…...

【Web】软件系统安全赛CachedVisitor——记一次二开工具的经历
明天开始考试周,百无聊赖开了一把CTF,还顺带体验了下二开工具,让无聊的Z3很开心🙂 CachedVisitor这题 大概描述一下:从main.lua加载一段visit.script中被##LUA_START##(.-)##LUA_END##包裹的lua代码 main.lua loca…...
实现自定义集合类:深入理解C#中的IEnumerable<T>接口
文章目录 介绍主要成员示例代码约束常见的约束类型示例代码介绍 在C#中,IEnumerable<T> 是一个泛型接口,用于表示可以被枚举的集合。它定义了用于遍历集合中元素的方法和属性。IEnumerable<T> 是 IEnumerable 的泛型版本,提供了类型安全的枚举功能。 当我们实…...
Compression Techniques for LLMs
Compression Techniques for LLMs 随着大型语言模型(LLMs)的迅速发展,提高其计算效率和存储效率成为研究的重要方向。为了实现这一目标,诸多压缩技术应运而生。本文将深入探讨几种有效的压缩技术,这些技术不仅能够降低…...
Nexus Message Transaction Services(MTS)
Nexus 系列交换机遇到以下情形时,可以尝试查看是否是 MTS 消息卡在缓冲区过多,因为 MTS 负责处理模块内以及跨模块(包括跨管理引擎)的各服务之间的消息路由和排队。 • CPU 高 • 命令行无响应、响应慢 • 控制平面中断 • 流量问…...

Linux应用开发之网络套接字编程(实例篇)
服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...

【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
树莓派超全系列教程文档--(62)使用rpicam-app通过网络流式传输视频
使用rpicam-app通过网络流式传输视频 使用 rpicam-app 通过网络流式传输视频UDPTCPRTSPlibavGStreamerRTPlibcamerasrc GStreamer 元素 文章来源: http://raspberry.dns8844.cn/documentation 原文网址 使用 rpicam-app 通过网络流式传输视频 本节介绍来自 rpica…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
java 实现excel文件转pdf | 无水印 | 无限制
文章目录 目录 文章目录 前言 1.项目远程仓库配置 2.pom文件引入相关依赖 3.代码破解 二、Excel转PDF 1.代码实现 2.Aspose.License.xml 授权文件 总结 前言 java处理excel转pdf一直没找到什么好用的免费jar包工具,自己手写的难度,恐怕高级程序员花费一年的事件,也…...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...

(二)原型模式
原型的功能是将一个已经存在的对象作为源目标,其余对象都是通过这个源目标创建。发挥复制的作用就是原型模式的核心思想。 一、源型模式的定义 原型模式是指第二次创建对象可以通过复制已经存在的原型对象来实现,忽略对象创建过程中的其它细节。 📌 核心特点: 避免重复初…...

Cinnamon修改面板小工具图标
Cinnamon开始菜单-CSDN博客 设置模块都是做好的,比GNOME简单得多! 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...