当前位置: 首页 > news >正文

实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本

1. Hudi 表管理

1.1 Hudi 表基础管理

创建 Hudi 表
在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例):

CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRING,update_time TIMESTAMP
)
PARTITIONED BY (order_date)
STORED AS PARQUET
TBLPROPERTIES ('type'='MERGE_ON_READ','primaryKey'='order_id','preCombineField'='update_time'
);
1.2 数据操作

插入/更新数据
利用 Hudi 写入工具(如 Spark)进行批量或实时插入更新:

from pyspark.sql import SparkSession
from datetime import datetimespark = SparkSession.builder \.appName("Hudi Example") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.getOrCreate()# 加载数据
data = [{"order_id": "1", "user_id": "101", "product_id": "201", "amount": 99.99, "order_date": "2025-01-01", "update_time": datetime.now()},{"order_id": "2", "user_id": "102", "product_id": "202", "amount": 199.99, "order_date": "2025-01-01", "update_time": datetime.now()}
]
df = spark.createDataFrame(data)# 写入 Hudi
hudi_options = {"hoodie.table.name": "dwd_order_fact","hoodie.datasource.write.recordkey.field": "order_id","hoodie.datasource.write.precombine.field": "update_time","hoodie.datasource.write.partitionpath.field": "order_date","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.table.type": "MERGE_ON_READ","hoodie.datasource.hive.sync.enable": "true","hoodie.datasource.hive.database": "real_time_dw","hoodie.datasource.hive.table": "dwd_order_fact","hoodie.datasource.hive.partition_fields": "order_date"
}df.write.format("hudi").options(**hudi_options).mode("append").save("hdfs://path/to/hudi/dwd_order_fact")
1.3 Hudi 表维护

表清理

  • 配置清理策略,清理过期版本:
    hoodie.cleaner.commits.retained=10
    hoodie.cleaner.policy=KEEP_LATEST_COMMITS
    
    保留最近 10 个提交版本。

表压缩

  • 针对 MOR 表,定期运行 compaction 任务:
    spark-submit --class org.apache.hudi.utilities.HoodieCompactor \--master yarn \--table-path hdfs://path/to/hudi/dwd_order_fact \--table-name dwd_order_fact
    

元数据管理

  • 更新 Hive 元数据:
    MSCK REPAIR TABLE real_time_dw.dwd_order_fact;
    

2. Flink 性能调优

2.1 Checkpoint 性能优化

增量 Checkpoint
启用 RocksDB 增量检查点,减少状态存储大小:

env.getCheckpointConfig().enableIncrementalCheckpoints(true);

异步快照
减少 Checkpoint 对性能的影响:

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60秒超时
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 优先使用Checkpoint恢复
2.2 Watermark 优化

如果数据有延迟,可以允许一定的 out-of-order 数据处理:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 最大延迟5秒.withTimestampAssigner((event, timestamp) -> event.getEventTime());
2.3 状态管理优化

状态后端选择

  • 优先选择 RocksDB 状态后端,支持更大的状态数据:
    env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/checkpoints", true));
    

TTL(Time-to-Live)设置

  • 自动清理无用状态:
    stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
    
2.4 Task Slot 配置

根据并发优化 TaskManager:

  • 每个 TaskManager 提供更多 slots:
    taskmanager.numberOfTaskSlots: 4
    

3. 治理工具脚本

3.1 数据质量治理(Great Expectations)

脚本自动化
以下 Python 脚本可以实现自动化数据校验(如字段非空和值域校验):

from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContextcontext = DataContext()batch_request = BatchRequest(datasource_name="my_s3_datasource",data_connector_name="default_runtime_data_connector_name",data_asset_name="dwd_order_fact",runtime_parameters={"path": "s3://path/to/hudi/dwd_order_fact/"},batch_identifiers={"default_identifier_name": "2025-01-01"}
)validator = context.get_validator(batch_request=batch_request)# 非空校验
validator.expect_column_values_to_not_be_null("order_id")
# 值域校验
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "SHIPPED", "CANCELLED"])
# 保存结果
validator.save_expectation_suite("order_fact_suite")context.run_validation_operator("action_list_operator",assets_to_validate=[validator]
)
3.2 数据权限管理(Apache Ranger)

策略 JSON 配置
以下为权限策略 JSON 文件的示例,适用于 Ranger API 批量添加策略:

{"policyName": "dwd_order_fact_policy","serviceType": "hive","resources": {"database": {"values": ["real_time_dw"],"isExcludes": false,"isRecursive": false},"table": {"values": ["dwd_order_fact"],"isExcludes": false,"isRecursive": false}},"policyItems": [{"accesses": [{"type": "select", "isAllowed": true}],"users": ["bi_user"],"groups": ["BI_Group"]},{"accesses": [{"type": "select", "isAllowed": true}, {"type": "insert", "isAllowed": true}],"users": ["etl_user"],"groups": ["ETL_Team"]}]
}

通过 Ranger REST API 部署该策略:

curl -u admin:admin -H "Content-Type: application/json" -X POST -d @policy.json http://<RANGER_HOST>:6080/service/public/v2/api/policy
3.3 数据血缘治理(Apache Atlas)

Flink 血缘注册脚本
通过 REST API 自动将 Flink 作业的输入输出血缘关系上传到 Atlas:

curl -X POST http://<ATLAS_HOST>:21000/api/atlas/v2/entity \
-H "Content-Type: application/json" \
-d '{"entity": {"typeName": "process","attributes": {"name": "flink_order_job","inputs": [{"typeName": "kafka_topic", "uniqueAttributes": {"qualifiedName": "order_topic"}}],"outputs": [{"typeName": "hdfs_path", "uniqueAttributes": {"qualifiedName": "hdfs://path/to/hudi/dwd_order_fact"}}]}}
}'

相关文章:

实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本

1. Hudi 表管理 1.1 Hudi 表基础管理 创建 Hudi 表 在 HDFS 上创建一个 Hudi 表&#xff08;以 Merge-on-Read 为例&#xff09;&#xff1a; CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRIN…...

Kotlin 数据类与密封类

Kotlin 数据类与密封类 引言 在 Kotlin 中&#xff0c;数据类和密封类是两种非常重要的类类型&#xff0c;它们各自具有独特的用途和优势。数据类主要用于存储数据&#xff0c;而密封类则用于表示受限的类层次结构。在本篇文章中&#xff0c;我们将深入探讨 Kotlin 中的数据类…...

大模型推理加速调研(框架、方法)

大模型推理加速调研&#xff08;框架、方法&#xff09; 大模型推理框架调研总结推理框架TensorRT-LLMllama.cppmnn-llmfastllmmlc-llm 环境搭建&部署推理环境llama.cppfastllmmnn-llmvllm vllm_openai_completions.pylmdeployTensorRT-LLM 大模型加速技术总结模型压缩量化…...

C语言进阶(3)--字符函数和字符串函数

本章重点 重点介绍处理字符和字符串的库函数的使用和注意事项 目录 0.前言 1.函数介绍 1.1 strlen - 计算字符串长度 1.2 strcpy - 复制字符串 1.3 strcat - 追加字符串 1.4 strcmp - 字符串比较 1.5 strncpy - 受限制复制 1.6 strncat - 受限制追加 1.7 strncmp - 受限制比…...

微服务拆分的艺术:构建高效、灵活的系统架构

目录 一、微服务拆分的重要性 二、微服务拆分的策略 1. 按照业务领域拆分 2. 按照团队结构拆分 3. 按照业务边界拆分 4. 按照数据和数据库拆分 5. 按照用户界面或外部接口拆分 6. 按照功能模块或领域驱动设计拆分 7. 按照性能和可伸缩性需求拆分 三、微服务拆分的实践…...

记录一次电脑被入侵用来挖矿的过程(Trojan、Miner、Hack、turminoob)

文章目录 0、总结1、背景2、端倪3、有个微软的系统更新&#xff0c;就想着更新看看&#xff08;能否冲掉问题&#xff09;4、更新没成功&#xff0c;自动重启电脑5、风险文件&#xff08;好家伙命名还挺规范&#xff0c;一看名字就知道出问题了&#xff09;6、开机有一些注册表…...

计算机xinput1_4.dll丢失怎么修复?

电脑运行时常见问题及修复指南 作为软件开发从业者&#xff0c;深知电脑在日常使用中难免会遇到各种问题&#xff0c;如文件丢失、文件损坏和系统报错等。这些问题不仅影响工作效率&#xff0c;还可能带来数据丢失的风险。本文将详细介绍一些常见问题及其解决办法&#xff0c;…...

高等数学学习笔记 ☞ 连续函数的运算与性质

1. 连续函数的运算 1. 连续函数的四则运算&#xff1a; &#xff08;1&#xff09;若函数在点处连续&#xff0c;则函数在点处也连续。 &#xff08;2&#xff09;若函数在区间上连续&#xff0c;则函数在区间上也连续。 2. 反函数的连续性&#xff1a; 若函数在定义域上是单…...

k8s基础(4)—Kubernetes-Service

Service概述 抽象层 ‌k8s的Service是一种抽象层&#xff0c;用于为一组具有相同功能的Pod提供一个统一的入口地址&#xff0c;并通过负载均衡将网络流量分发到这些Pod上。‌ Service解决了Pod动态变化的问题&#xff0c;例如Pod的IP地址和端口可能会发生变化&#xff0c;通过…...

CAN或者CANFD的Busoff的恢复时间会受到报文周期的影响么?

目录 分析恢复机制角度快恢复和慢恢复策略角度特殊情况分析分析 Busoff的恢复时间通常不会直接受到报文周期的影响,以下是具体分析: 恢复机制角度 CAN总线的节点在Busoff状态下,恢复过程主要是等待总线上出现128个连续的11bit隐性位,与报文周期并无直接关联。无论报文周…...

【DevOps】Jenkins部署

Jenkins部署 文章目录 Jenkins部署资源列表基础环境一、部署Gilab1.1、安装Gitlab1.2、修改配置文件1.3、加载配置文件1.4、访问Gitlab1.5、修改root登录密码1.6、创建demo测试项目1.7、上传代码1.8、验证上传的代码 二、部署Jenkins所需软件2.1、部署JDK2.2、部署Tomcat2.3、部…...

【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法)

【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型&#xff08;敏感性分析方法&#xff09; 引言 该文章实现了一个可解释的神经网络回归模型&#xff0c;使用BP神经网络&#xff08;BPNN&#xff09;来预测特征输出。该模型利用七个变量参数作为输入特征进行训练。为…...

【Shell编程 / 4】函数定义、脚本执行与输入输出操作

文章目录 函数 与 脚本定义函数示例&#xff1a;简单的 Shell 函数函数参数返回值 脚本执行创建脚本执行脚本 输入输出输出&#xff1a;echo 和 printf输入&#xff1a;read 命令 命令行参数示例&#xff1a;传递参数 函数 与 脚本 在 Shell 编程中&#xff0c;函数和脚本是组…...

RK3588+麒麟国产系统+FPGA+AI在电力和轨道交通视觉与采集系统的应用

工业视觉识别系统厂家提供的功能主要包括&#xff1a; 这些厂家通过先进的视觉识别技术&#xff0c;实现图像的采集、处理与分析。系统能够自动化地完成质量检测、物料分拣、设备监控等任务&#xff0c;显著提升生产效率和产品质量。同时&#xff0c;系统具备高度的灵活性和可扩…...

MySQL 01 02 章——数据库概述与MySQL安装篇

一、数据库概述 &#xff08;1&#xff09;为什么要使用数据库 数据库可以实现持久化&#xff0c;什么是持久化&#xff1a;数据持久化意味着将内存中的数据保存到硬盘上加以“固化”持久化的主要作用是&#xff1a;将内存中的数据存储在关系型数据库中&#xff0c;当然也可以…...

运行framework7

安装 framework7 下载地址https://gitcode.com/gh_mirrors/fr/framework7-vue node 下载 https://nodejs.cn/#ionic 配置npm 的镜像源 npm config set registry https://registry.npmmirror.com 下载nvm 进行nvm管理https://www.downza.cn/soft/352547.html 我一开始使用node…...

【Web】软件系统安全赛CachedVisitor——记一次二开工具的经历

明天开始考试周&#xff0c;百无聊赖开了一把CTF&#xff0c;还顺带体验了下二开工具&#xff0c;让无聊的Z3很开心&#x1f642; CachedVisitor这题 大概描述一下&#xff1a;从main.lua加载一段visit.script中被##LUA_START##(.-)##LUA_END##包裹的lua代码 main.lua loca…...

实现自定义集合类:深入理解C#中的IEnumerable<T>接口

文章目录 介绍主要成员示例代码约束常见的约束类型示例代码介绍 在C#中,IEnumerable<T> 是一个泛型接口,用于表示可以被枚举的集合。它定义了用于遍历集合中元素的方法和属性。IEnumerable<T> 是 IEnumerable 的泛型版本,提供了类型安全的枚举功能。 当我们实…...

Compression Techniques for LLMs

Compression Techniques for LLMs 随着大型语言模型&#xff08;LLMs&#xff09;的迅速发展&#xff0c;提高其计算效率和存储效率成为研究的重要方向。为了实现这一目标&#xff0c;诸多压缩技术应运而生。本文将深入探讨几种有效的压缩技术&#xff0c;这些技术不仅能够降低…...

Nexus Message Transaction Services(MTS)

Nexus 系列交换机遇到以下情形时&#xff0c;可以尝试查看是否是 MTS 消息卡在缓冲区过多&#xff0c;因为 MTS 负责处理模块内以及跨模块&#xff08;包括跨管理引擎&#xff09;的各服务之间的消息路由和排队。 • CPU 高 • 命令行无响应、响应慢 • 控制平面中断 • 流量问…...

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇&#xff0c;在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下&#xff1a; 【Note】&#xff1a;如果你已经完成安装等操作&#xff0c;可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作&#xff0c;重…...

装饰模式(Decorator Pattern)重构java邮件发奖系统实战

前言 现在我们有个如下的需求&#xff0c;设计一个邮件发奖的小系统&#xff0c; 需求 1.数据验证 → 2. 敏感信息加密 → 3. 日志记录 → 4. 实际发送邮件 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xff0c;同时又不改变其…...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

k8s从入门到放弃之Ingress七层负载

k8s从入门到放弃之Ingress七层负载 在Kubernetes&#xff08;简称K8s&#xff09;中&#xff0c;Ingress是一个API对象&#xff0c;它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress&#xff0c;你可…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

Redis数据倾斜问题解决

Redis 数据倾斜问题解析与解决方案 什么是 Redis 数据倾斜 Redis 数据倾斜指的是在 Redis 集群中&#xff0c;部分节点存储的数据量或访问量远高于其他节点&#xff0c;导致这些节点负载过高&#xff0c;影响整体性能。 数据倾斜的主要表现 部分节点内存使用率远高于其他节…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...

WPF八大法则:告别模态窗口卡顿

⚙️ 核心问题&#xff1a;阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程&#xff0c;导致后续逻辑无法执行&#xff1a; var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题&#xff1a…...

nnUNet V2修改网络——暴力替换网络为UNet++

更换前,要用nnUNet V2跑通所用数据集,证明nnUNet V2、数据集、运行环境等没有问题 阅读nnU-Net V2 的 U-Net结构,初步了解要修改的网络,知己知彼,修改起来才能游刃有余。 U-Net存在两个局限,一是网络的最佳深度因应用场景而异,这取决于任务的难度和可用于训练的标注数…...

springboot 日志类切面,接口成功记录日志,失败不记录

springboot 日志类切面&#xff0c;接口成功记录日志&#xff0c;失败不记录 自定义一个注解方法 import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target;/***…...