实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本
1. Hudi 表管理
1.1 Hudi 表基础管理
创建 Hudi 表
在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例):
CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRING,update_time TIMESTAMP
)
PARTITIONED BY (order_date)
STORED AS PARQUET
TBLPROPERTIES ('type'='MERGE_ON_READ','primaryKey'='order_id','preCombineField'='update_time'
);
1.2 数据操作
插入/更新数据
利用 Hudi 写入工具(如 Spark)进行批量或实时插入更新:
from pyspark.sql import SparkSession
from datetime import datetimespark = SparkSession.builder \.appName("Hudi Example") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.getOrCreate()# 加载数据
data = [{"order_id": "1", "user_id": "101", "product_id": "201", "amount": 99.99, "order_date": "2025-01-01", "update_time": datetime.now()},{"order_id": "2", "user_id": "102", "product_id": "202", "amount": 199.99, "order_date": "2025-01-01", "update_time": datetime.now()}
]
df = spark.createDataFrame(data)# 写入 Hudi
hudi_options = {"hoodie.table.name": "dwd_order_fact","hoodie.datasource.write.recordkey.field": "order_id","hoodie.datasource.write.precombine.field": "update_time","hoodie.datasource.write.partitionpath.field": "order_date","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.table.type": "MERGE_ON_READ","hoodie.datasource.hive.sync.enable": "true","hoodie.datasource.hive.database": "real_time_dw","hoodie.datasource.hive.table": "dwd_order_fact","hoodie.datasource.hive.partition_fields": "order_date"
}df.write.format("hudi").options(**hudi_options).mode("append").save("hdfs://path/to/hudi/dwd_order_fact")
1.3 Hudi 表维护
表清理
- 配置清理策略,清理过期版本:
保留最近 10 个提交版本。hoodie.cleaner.commits.retained=10 hoodie.cleaner.policy=KEEP_LATEST_COMMITS
表压缩
- 针对 MOR 表,定期运行 compaction 任务:
spark-submit --class org.apache.hudi.utilities.HoodieCompactor \--master yarn \--table-path hdfs://path/to/hudi/dwd_order_fact \--table-name dwd_order_fact
元数据管理
- 更新 Hive 元数据:
MSCK REPAIR TABLE real_time_dw.dwd_order_fact;
2. Flink 性能调优
2.1 Checkpoint 性能优化
增量 Checkpoint
启用 RocksDB 增量检查点,减少状态存储大小:
env.getCheckpointConfig().enableIncrementalCheckpoints(true);
异步快照
减少 Checkpoint 对性能的影响:
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60秒超时
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 优先使用Checkpoint恢复
2.2 Watermark 优化
如果数据有延迟,可以允许一定的 out-of-order 数据处理:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 最大延迟5秒.withTimestampAssigner((event, timestamp) -> event.getEventTime());
2.3 状态管理优化
状态后端选择
- 优先选择 RocksDB 状态后端,支持更大的状态数据:
env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/checkpoints", true));
TTL(Time-to-Live)设置
- 自动清理无用状态:
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
2.4 Task Slot 配置
根据并发优化 TaskManager:
- 每个 TaskManager 提供更多 slots:
taskmanager.numberOfTaskSlots: 4
3. 治理工具脚本
3.1 数据质量治理(Great Expectations)
脚本自动化
以下 Python 脚本可以实现自动化数据校验(如字段非空和值域校验):
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContextcontext = DataContext()batch_request = BatchRequest(datasource_name="my_s3_datasource",data_connector_name="default_runtime_data_connector_name",data_asset_name="dwd_order_fact",runtime_parameters={"path": "s3://path/to/hudi/dwd_order_fact/"},batch_identifiers={"default_identifier_name": "2025-01-01"}
)validator = context.get_validator(batch_request=batch_request)# 非空校验
validator.expect_column_values_to_not_be_null("order_id")
# 值域校验
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "SHIPPED", "CANCELLED"])
# 保存结果
validator.save_expectation_suite("order_fact_suite")context.run_validation_operator("action_list_operator",assets_to_validate=[validator]
)
3.2 数据权限管理(Apache Ranger)
策略 JSON 配置
以下为权限策略 JSON 文件的示例,适用于 Ranger API 批量添加策略:
{"policyName": "dwd_order_fact_policy","serviceType": "hive","resources": {"database": {"values": ["real_time_dw"],"isExcludes": false,"isRecursive": false},"table": {"values": ["dwd_order_fact"],"isExcludes": false,"isRecursive": false}},"policyItems": [{"accesses": [{"type": "select", "isAllowed": true}],"users": ["bi_user"],"groups": ["BI_Group"]},{"accesses": [{"type": "select", "isAllowed": true}, {"type": "insert", "isAllowed": true}],"users": ["etl_user"],"groups": ["ETL_Team"]}]
}
通过 Ranger REST API 部署该策略:
curl -u admin:admin -H "Content-Type: application/json" -X POST -d @policy.json http://<RANGER_HOST>:6080/service/public/v2/api/policy
3.3 数据血缘治理(Apache Atlas)
Flink 血缘注册脚本
通过 REST API 自动将 Flink 作业的输入输出血缘关系上传到 Atlas:
curl -X POST http://<ATLAS_HOST>:21000/api/atlas/v2/entity \
-H "Content-Type: application/json" \
-d '{"entity": {"typeName": "process","attributes": {"name": "flink_order_job","inputs": [{"typeName": "kafka_topic", "uniqueAttributes": {"qualifiedName": "order_topic"}}],"outputs": [{"typeName": "hdfs_path", "uniqueAttributes": {"qualifiedName": "hdfs://path/to/hudi/dwd_order_fact"}}]}}
}'
相关文章:
实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本
1. Hudi 表管理 1.1 Hudi 表基础管理 创建 Hudi 表 在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例): CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRIN…...
Kotlin 数据类与密封类
Kotlin 数据类与密封类 引言 在 Kotlin 中,数据类和密封类是两种非常重要的类类型,它们各自具有独特的用途和优势。数据类主要用于存储数据,而密封类则用于表示受限的类层次结构。在本篇文章中,我们将深入探讨 Kotlin 中的数据类…...
大模型推理加速调研(框架、方法)
大模型推理加速调研(框架、方法) 大模型推理框架调研总结推理框架TensorRT-LLMllama.cppmnn-llmfastllmmlc-llm 环境搭建&部署推理环境llama.cppfastllmmnn-llmvllm vllm_openai_completions.pylmdeployTensorRT-LLM 大模型加速技术总结模型压缩量化…...
C语言进阶(3)--字符函数和字符串函数
本章重点 重点介绍处理字符和字符串的库函数的使用和注意事项 目录 0.前言 1.函数介绍 1.1 strlen - 计算字符串长度 1.2 strcpy - 复制字符串 1.3 strcat - 追加字符串 1.4 strcmp - 字符串比较 1.5 strncpy - 受限制复制 1.6 strncat - 受限制追加 1.7 strncmp - 受限制比…...
微服务拆分的艺术:构建高效、灵活的系统架构
目录 一、微服务拆分的重要性 二、微服务拆分的策略 1. 按照业务领域拆分 2. 按照团队结构拆分 3. 按照业务边界拆分 4. 按照数据和数据库拆分 5. 按照用户界面或外部接口拆分 6. 按照功能模块或领域驱动设计拆分 7. 按照性能和可伸缩性需求拆分 三、微服务拆分的实践…...
记录一次电脑被入侵用来挖矿的过程(Trojan、Miner、Hack、turminoob)
文章目录 0、总结1、背景2、端倪3、有个微软的系统更新,就想着更新看看(能否冲掉问题)4、更新没成功,自动重启电脑5、风险文件(好家伙命名还挺规范,一看名字就知道出问题了)6、开机有一些注册表…...
计算机xinput1_4.dll丢失怎么修复?
电脑运行时常见问题及修复指南 作为软件开发从业者,深知电脑在日常使用中难免会遇到各种问题,如文件丢失、文件损坏和系统报错等。这些问题不仅影响工作效率,还可能带来数据丢失的风险。本文将详细介绍一些常见问题及其解决办法,…...
高等数学学习笔记 ☞ 连续函数的运算与性质
1. 连续函数的运算 1. 连续函数的四则运算: (1)若函数在点处连续,则函数在点处也连续。 (2)若函数在区间上连续,则函数在区间上也连续。 2. 反函数的连续性: 若函数在定义域上是单…...
k8s基础(4)—Kubernetes-Service
Service概述 抽象层 k8s的Service是一种抽象层,用于为一组具有相同功能的Pod提供一个统一的入口地址,并通过负载均衡将网络流量分发到这些Pod上。 Service解决了Pod动态变化的问题,例如Pod的IP地址和端口可能会发生变化,通过…...
CAN或者CANFD的Busoff的恢复时间会受到报文周期的影响么?
目录 分析恢复机制角度快恢复和慢恢复策略角度特殊情况分析分析 Busoff的恢复时间通常不会直接受到报文周期的影响,以下是具体分析: 恢复机制角度 CAN总线的节点在Busoff状态下,恢复过程主要是等待总线上出现128个连续的11bit隐性位,与报文周期并无直接关联。无论报文周…...
【DevOps】Jenkins部署
Jenkins部署 文章目录 Jenkins部署资源列表基础环境一、部署Gilab1.1、安装Gitlab1.2、修改配置文件1.3、加载配置文件1.4、访问Gitlab1.5、修改root登录密码1.6、创建demo测试项目1.7、上传代码1.8、验证上传的代码 二、部署Jenkins所需软件2.1、部署JDK2.2、部署Tomcat2.3、部…...
【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法)
【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法) 引言 该文章实现了一个可解释的神经网络回归模型,使用BP神经网络(BPNN)来预测特征输出。该模型利用七个变量参数作为输入特征进行训练。为…...
【Shell编程 / 4】函数定义、脚本执行与输入输出操作
文章目录 函数 与 脚本定义函数示例:简单的 Shell 函数函数参数返回值 脚本执行创建脚本执行脚本 输入输出输出:echo 和 printf输入:read 命令 命令行参数示例:传递参数 函数 与 脚本 在 Shell 编程中,函数和脚本是组…...
RK3588+麒麟国产系统+FPGA+AI在电力和轨道交通视觉与采集系统的应用
工业视觉识别系统厂家提供的功能主要包括: 这些厂家通过先进的视觉识别技术,实现图像的采集、处理与分析。系统能够自动化地完成质量检测、物料分拣、设备监控等任务,显著提升生产效率和产品质量。同时,系统具备高度的灵活性和可扩…...
MySQL 01 02 章——数据库概述与MySQL安装篇
一、数据库概述 (1)为什么要使用数据库 数据库可以实现持久化,什么是持久化:数据持久化意味着将内存中的数据保存到硬盘上加以“固化”持久化的主要作用是:将内存中的数据存储在关系型数据库中,当然也可以…...
运行framework7
安装 framework7 下载地址https://gitcode.com/gh_mirrors/fr/framework7-vue node 下载 https://nodejs.cn/#ionic 配置npm 的镜像源 npm config set registry https://registry.npmmirror.com 下载nvm 进行nvm管理https://www.downza.cn/soft/352547.html 我一开始使用node…...
【Web】软件系统安全赛CachedVisitor——记一次二开工具的经历
明天开始考试周,百无聊赖开了一把CTF,还顺带体验了下二开工具,让无聊的Z3很开心🙂 CachedVisitor这题 大概描述一下:从main.lua加载一段visit.script中被##LUA_START##(.-)##LUA_END##包裹的lua代码 main.lua loca…...
实现自定义集合类:深入理解C#中的IEnumerable<T>接口
文章目录 介绍主要成员示例代码约束常见的约束类型示例代码介绍 在C#中,IEnumerable<T> 是一个泛型接口,用于表示可以被枚举的集合。它定义了用于遍历集合中元素的方法和属性。IEnumerable<T> 是 IEnumerable 的泛型版本,提供了类型安全的枚举功能。 当我们实…...
Compression Techniques for LLMs
Compression Techniques for LLMs 随着大型语言模型(LLMs)的迅速发展,提高其计算效率和存储效率成为研究的重要方向。为了实现这一目标,诸多压缩技术应运而生。本文将深入探讨几种有效的压缩技术,这些技术不仅能够降低…...
Nexus Message Transaction Services(MTS)
Nexus 系列交换机遇到以下情形时,可以尝试查看是否是 MTS 消息卡在缓冲区过多,因为 MTS 负责处理模块内以及跨模块(包括跨管理引擎)的各服务之间的消息路由和排队。 • CPU 高 • 命令行无响应、响应慢 • 控制平面中断 • 流量问…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
Zustand 状态管理库:极简而强大的解决方案
Zustand 是一个轻量级、快速和可扩展的状态管理库,特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
江苏艾立泰跨国资源接力:废料变黄金的绿色供应链革命
在华东塑料包装行业面临限塑令深度调整的背景下,江苏艾立泰以一场跨国资源接力的创新实践,重新定义了绿色供应链的边界。 跨国回收网络:废料变黄金的全球棋局 艾立泰在欧洲、东南亚建立再生塑料回收点,将海外废弃包装箱通过标准…...
Nuxt.js 中的路由配置详解
Nuxt.js 通过其内置的路由系统简化了应用的路由配置,使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...
[10-3]软件I2C读写MPU6050 江协科技学习笔记(16个知识点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16...
如何理解 IP 数据报中的 TTL?
目录 前言理解 前言 面试灵魂一问:说说对 IP 数据报中 TTL 的理解?我们都知道,IP 数据报由首部和数据两部分组成,首部又分为两部分:固定部分和可变部分,共占 20 字节,而即将讨论的 TTL 就位于首…...
保姆级教程:在无网络无显卡的Windows电脑的vscode本地部署deepseek
文章目录 1 前言2 部署流程2.1 准备工作2.2 Ollama2.2.1 使用有网络的电脑下载Ollama2.2.2 安装Ollama(有网络的电脑)2.2.3 安装Ollama(无网络的电脑)2.2.4 安装验证2.2.5 修改大模型安装位置2.2.6 下载Deepseek模型 2.3 将deepse…...
