当前位置: 首页 > news >正文

Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)

背景

在Apache Hudi初探(一)(与flink的结合)中,我们提到了Pipelines.hoodieStreamWrite 写hudi文件,这个操作真正写hudi是在Pipelines.hoodieStreamWrite方法下的transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory),具体分析一下写入的过程。

分析

对于transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)这个代码片段,我们主要看operatorFactory 这个对象(transform这个操作是Flink框架的操作):

public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {public StreamWriteOperator(Configuration conf) {super(new StreamWriteFunction<>(conf));}public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf));}
}

最主要的hudi算子为StreamWriteOperator,其中最主要的操作是由StreamWriteFunction来完成的:

// StreamWriteFunction@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {this.taskID = getRuntimeContext().getIndexOfThisSubtask();this.metaClient = StreamerUtil.createMetaClient(this.config);this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext());this.writeStatuses = new ArrayList<>();this.writeMetadataState = context.getOperatorStateStore().getListState(new ListStateDescriptor<>("write-metadata-state",TypeInformation.of(WriteMetadataEvent.class)));this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());this.currentInstant = lastPendingInstant();if (context.isRestored()) {restoreWriteMetadata();} else {sendBootstrapEvent();}// blocks flushing until the coordinator starts a new instantthis.confirming = true;}@Overridepublic void open(Configuration parameters) throws IOException {this.tracer = new TotalSizeTracer(this.config);initBuffer();initWriteFunction();}@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {if (inputEnded) {return;}snapshotState();// Reload the snapshot state as the current state.reloadWriteMetaState();}@Overridepublic void snapshotState() {// Based on the fact that the coordinator starts the checkpoint first,// it would check the validity.// wait for the buffer data flush out and request a new instantflushRemaining(false);}@Overridepublic void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {bufferRecord((HoodieRecord<?>) value);}
  • initializeState操作,主要是做一些初始化的操作

    • this.taskID = getRuntimeContext().getIndexOfThisSubtask();
      获取当前的task的索引下标,用来向operator coordinator发送event给operator coordinator,之后 StreamWriteOperatorCoordinator(operator coordinator) 进行处理,后续会说到StreamWriteOperatorCoordinator

    • metaClient = StreamerUtil.createMetaClient(this.config)
      writeClient = FlinkWriteClients.createWriteClient
      初始化hudi的元数据客户端(这里是HoodieTableMetaClient)和写入客户端(这里是HoodieFlinkWriteClient)

    • writeStatuses = new ArrayList<>()
      记录后续的写入hudi文件的信息

    • writeMetadataState = context.getOperatorStateStore().getListState
      记录写入hudi的元数据事件,会在后续的操作中,会包装成event发送给operator coordinator(StreamWriteOperatorCoordinator)

    • ckpMetadata = CkpMetadata.getInstance
      Flink的checkpoint的元数据信息路径,默认的路径是/${hoodie.basePath}/.hoodie/.aux/ckp_meta

    • currentInstant = lastPendingInstant()
      获取上次还没有完成的commit

    • restoreWriteMetadata或者sendBootstrapEvent,根据是否是从checkpoint恢复过来的进行不同消息的发送,
      这里的operator coordinator(StreamWriteOperatorCoordinator)会进行统一的处理,并初始化一个commit

  • open操作
    写入hudi前的前置操作,比如说 初始化TotalSizeTracer记录maxBufferSize便于flush操作
    根据write.operation的值(默认是upsert)选择后续的操作是insert或upsert或overwrite,这里是upsert

  • processElement操作
    这里对传入的HoodieRecord进行缓存,主要是bufferRecord做的事情,

    • 首先会获取bucketID,之后再往对应的bucket中插入数据
    • 如果超出write.batch.size(默认是128MB),则会进行flushBucket操作,该操作主要是写入hudi操作 //TODO: 具体的写入hudi操作
      • 首先会获取新的需要提交的commit
      • 再进行写入的实际操作
      • 写入的文件元数据信息回传到operator coordinator进行统一处理
  • snapshotState 操作

    • 调用flushRemaining 写入剩下的数据到hudi存储中
    • 重新加载当前写入的hudi文件元数据信息到当前flink的state中

hudi StreamWriteOperatorCoordinator作用

总的来说,StreamWriteOperatorCoordinator扮演的角色和在Spark中driver的角色一样,都是来最后来提交 元数据信息到huid中。
具体的作用还是得从具体的方法来看:

  @Overridepublic void handleEventFromOperator(int i, OperatorEvent operatorEvent) {ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,"The coordinator can only handle WriteMetaEvent");WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;if (event.isEndInput()) {// handle end input event synchronously// wrap handleEndInputEvent in executeSync to preserve the order of eventsexecutor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant);} else {executor.execute(() -> {if (event.isBootstrap()) {handleBootstrapEvent(event);} else {handleWriteMetaEvent(event);}}, "handle write metadata event for instant %s", this.instant);}}...@Overridepublic void notifyCheckpointComplete(long checkpointId) {executor.execute(() -> {// The executor thread inherits the classloader of the #notifyCheckpointComplete// caller, which is a AppClassLoader.Thread.currentThread().setContextClassLoader(getClass().getClassLoader());// for streaming mode, commits the ever received events anyway,// the stream write task snapshot and flush the data buffer synchronously in sequence,// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)final boolean committed = commitInstant(this.instant, checkpointId);if (tableState.scheduleCompaction) {// if async compaction is on, schedule the compactionCompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);}if (tableState.scheduleClustering) {// if async clustering is on, schedule the clusteringClusteringUtil.scheduleClustering(conf, writeClient, committed);}if (committed) {// start new instant.startInstant();// sync Hive if is enabledsyncHiveAsync();}}, "commits the instant %s", this.instant);}
  • handleEventFromOperator方法用来接受task发送的消息

    • 对于BootStrap类型的WriteMetadataEvent(在StreamWriteFunction方法initializeState中),相当于函数初始化也就会触发
      该类型的消息由handleBootstrapEvent来处理(我们这里假设每个任务operator都完成了初始化的操作),对应的数据流如下:

      initInstant||\/
      reset => startInstant
      

      startInstant 这里就会初始化一个hudi写操作的commit信息

    • 对于一般的write的信息的event,(比如说在processElement的flushBucket函数中),由handleWriteMetaEvent来处理:

       if (this.eventBuffer[event.getTaskID()] != null) {this.eventBuffer[event.getTaskID()].mergeWith(event);} else {this.eventBuffer[event.getTaskID()] = event;}
      

      这里只是加到变量名为eventBuffer 的WriteMetadataEvent类型的数组中,后续中会进行处理

    • 对于isEndInputtrue的event,这种一般source是基于文件的这种,这里先不讨论

  • notifyCheckpointComplete 当对应的checkpointId完成以后,该方法会被调用

    • commitInstant 提交hudi元数据,如果如果有发生异常,则回滚当前hudi对应的commit
    • scheduleCompaction && scheduleClustering 进行hui的CompcationClustering
    • 如果成功的提交了,则会开启一个新的commit,如果开了hive同步(hive_sync.enabled默认为false),则会同步元数据信息到hive

总结

用一张图总结一下交互方式,如下:
在这里插入图片描述

相关文章:

Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)

背景 在Apache Hudi初探(一)(与flink的结合)中&#xff0c;我们提到了Pipelines.hoodieStreamWrite 写hudi文件,这个操作真正写hudi是在Pipelines.hoodieStreamWrite方法下的transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFa…...

Office ---- excel ---- 怎么批量设置行高

解决方法&#xff1a; 调整行高即可...

Wlan——STA上线流程与802.11MAC帧讲解

目录 802.11MAC帧基本概念 802.11帧结构 802.11MAC帧的分类 管理帧 控制帧 数据帧 STA接入无线网络流程 信号扫描—管理帧 链路认证—管理帧 用户关联—管理帧 用户上线 802.11MAC帧基本概念 802.11协议在802家族中的角色位置 其中802.3标准属于以太网的一种帧格式…...

HTTP的并发连接限制和连接线程池

为什么有并发连接限制和连接线程池 大量的客户端连接到服务器&#xff0c;会导致服务器端需要大量的维护连接资源&#xff0c;同时需要处理客户端的请求&#xff0c;这是如何高效的执行任务成了一个关键的问题&#xff0c;所以&#xff0c;并发连接限制和连接线程池的出现就是…...

【从零学习python 】45.Python中的类方法和静态方法

文章目录 类方法、静态方法类方法静态方法使用场景 进阶案例 类方法、静态方法 类方法 类方法是以类对象作为第一个参数的方法。需要使用装饰器classmethod来标识其为类方法。对于类方法&#xff0c;第一个参数必须是类对象&#xff0c;一般以cls作为第一个参数。 class Dog…...

基于 VisualFoxPro 环境开发应用程序的过程

应用程序开发前开发者要与用户之间广泛沟通&#xff0c;作大量的调查研究和分析工 作&#xff0c;从而明确用户的要求、程序应具备的功能及可以完成的任务。为此要进行两方 面的分析&#xff0c;数据分析和功能分析。数据分析的目的是收集系统应包含的数据、数据 的真实性、…...

SpringBoot整合Quartz,实现数据库方式执行定时任务

springboot整合quartz&#xff0c;实现数据库方式执行定时任务。把定时任务信息存进数据库&#xff0c;项目启动后自动执行定时任务。 1.引入依赖包&#xff1a; <dependency> <groupId>org.springframework.boot</groupId> <ar…...

java中多个list怎么用List表示?

如果你有多个List对象&#xff0c;想要将它们合并成一个List对象&#xff0c;可以使用addAll()方法来实现。addAll()方法将会把一个List中的元素逐个添加到另一个List中。 以下是一个示例&#xff0c;展示了如何将多个List对象合并为一个List对象&#xff1a; import java.ut…...

postgresql 数据排序

postgresql 常见操作 排序总结 排序 -- 排序的时候null是最大的值(看一下) select employee_id,manager_id from employeesorder by manager_id desc;-- nulls first使null值排在第一位 select employee_id,manager_id from employeesorder by manager_id nulls first;-- null…...

虚拟机 net、桥接、主机三种网络模式寻根问底

虚拟机使用物理主机上的网络适配器直接连接到物理网络中。 这意味着虚拟机就像是通过网线直接连接到路由器一样,成为物理网络中的一个独立设备。 虚拟机可以获取一个永久的IP地址,通过DHCP或手动设置。 虚拟机和物理主机都可以访问对方以及公共网络中的其他设备,比如文件服务…...

python代码——批量将PPT转换成长图

语言&#xff1a;python 3 用法&#xff1a;点击运行后&#xff0c;弹出窗口&#xff0c;选择文件夹&#xff0c;程序运行会将文件夹内的所有PPT文件全部转换成PPT长图&#xff0c;图片名称与PPT文件名称相同&#xff0c;保存位置相同。 如运行中报错&#xff0c;需要自行根据…...

C++信息学奥赛2046:【例5.15】替换字母

这段代码的功能是对输入的字符串进行处理&#xff0c;将字符串中的字符 a 替换为字符 b 后输出结果。 #include<bits/stdc.h> using namespace std; int main() {string s; // 定义字符串变量s&#xff0c;用来存储输入的字符串char a, b; // 定义字符变量a和b&#xff…...

每天一道leetcode:1306. 跳跃游戏 III(图论中等广度优先遍历)

今日份题目&#xff1a; 这里有一个非负整数数组 arr&#xff0c;你最开始位于该数组的起始下标 start 处。当你位于下标 i 处时&#xff0c;你可以跳到 i arr[i] 或者 i - arr[i]。 请你判断自己是否能够跳到对应元素值为 0 的 **任一** 下标处。 注意&#xff0c;不管是什…...

76参考链接

参考链接 官方文件综合介绍[let 和 const](https://es6.ruanyifeng.com/#docs/reference#let 和 const)解构赋值字符串正则数值数组函数对象Symbol[Set 和 Map](https://es6.ruanyifeng.com/#docs/reference#Set 和 Map)[Proxy 和 Reflect](https://es6.ruanyifeng.com/#docs/…...

浅析Linux SCSI子系统:调试方法

文章目录 SCSI日志调试功能scsi_logging_level调整SCSI日志等级 SCSI trace events使能SCSI trace events方式一&#xff1a;通过set_event接口方式二&#xff1a;通过enable 跟踪trace信息 相关参考 SCSI日志调试功能 SCSI子系统支持内核选项CONFIG_SCSI_LOGGING配置日志调试…...

【Unity3D】水面特效

1 前言 水波特效 中通过屏幕后处理实现了环形水波效果&#xff0c;本文通过 Shader Graph 实现了模拟水面特效&#xff0c;包含以下特效细节。 深水区和浅水区颜色差异&#xff1b;水面有波纹&#xff0c;并且在移动&#xff1b;水面起伏波动&#xff1b;水面边缘有水泡&#…...

CSS中的flex布局详细讲解

Flex 布局 Flex 布局是一种现代的 CSS 布局模型&#xff0c;用于实现灵活的盒子布局。它提供了强大的布局能力&#xff0c;使得元素可以自动调整大小、对齐和分布&#xff0c;适用于构建响应式和可伸缩的布局。 Flex 布局使用 flex 容器和 flex 项目的概念。容器是一个父元素…...

Python功能制作之简单的音乐播放器

需要导入的库&#xff1a; pip install PyQt5 源码&#xff1a; import os from PyQt5.QtCore import Qt, QUrl from PyQt5.QtGui import QIcon, QPixmap from PyQt5.QtMultimedia import QMediaPlayer, QMediaContent from PyQt5.QtWidgets import QApplication, QMainWind…...

GAN生成对抗模型根据minist数据集生成手写数字图片

文章目录 1.项目介绍2相关网站3具体的代码及结果导入工具包设置超参数定义优化器&#xff0c;以及损失函数训练时的迭代过程训练结果的展示 1.项目介绍 通过用minist数据集进行训练&#xff0c;得到一个GAN模型&#xff0c;可以生成与minist数据集类似的图片。 GAN是一种生成模…...

【K8S源码之Pod漂移】整体概况分析 controller-manager 中的 nodelifecycle controller(Pod的驱逐)

参考 k8s 污点驱逐详解-源码分析 - 掘金 k8s驱逐篇(5)-kube-controller-manager驱逐 - 良凯尔 - 博客园 k8s驱逐篇(6)-kube-controller-manager驱逐-NodeLifecycleController源码分析 - 良凯尔 - 博客园 k8s驱逐篇(7)-kube-controller-manager驱逐-taintManager源码分析 - 良…...

同花顺期货通指标编写指南:从零开始构建趋势波段共振系统(含避坑技巧)

同花顺期货通指标编写指南&#xff1a;从零开始构建趋势波段共振系统&#xff08;含避坑技巧&#xff09; 在期货交易中&#xff0c;技术指标是交易者不可或缺的分析工具。同花顺期货通作为国内主流的期货交易软件&#xff0c;其内置的指标编写功能为交易者提供了强大的自定义能…...

ComfyUI-WanVideoWrapper视频生成工具零基础快速部署实战教程

ComfyUI-WanVideoWrapper视频生成工具零基础快速部署实战教程 【免费下载链接】ComfyUI-WanVideoWrapper 项目地址: https://gitcode.com/GitHub_Trending/co/ComfyUI-WanVideoWrapper ComfyUI-WanVideoWrapper是一款功能强大的视频生成工具&#xff0c;它能让用户在Co…...

MIT6.S081 Lab11实战:手把手教你实现E1000网卡驱动的关键函数(附避坑指南)

MIT6.S081 Lab11实战&#xff1a;从零实现E1000网卡驱动的核心逻辑 在操作系统开发领域&#xff0c;网络驱动是连接内核与物理世界的关键桥梁。MIT6.S081课程的Lab11将带领我们深入xv6内核&#xff0c;亲手实现Intel E1000网卡驱动的核心功能。这个实验不仅考验我们对DMA、环形…...

【windows】VirtualBox网络配置及实战-Host Only 仅主机模式

1.概述 仅 主 机 网 络 &#xff1a; 用 来 创 建 一 个 包 含 主 日 一 组 虚拟机的 网 络 &#xff0c; 而 不 需 要 主 机 的 物 理 网 络 接 口 &#xff0e;相反 &#xff0c;在虚拟机上创建了一个类似于环回接口的虚拟网络接口。提 供 虚 似 机 和 主 机 之 间 的 连 接 …...

3大增强型功能体系:重新定义设计师工作方式

3大增强型功能体系&#xff1a;重新定义设计师工作方式 【免费下载链接】illustrator-scripts Adobe Illustrator scripts 项目地址: https://gitcode.com/gh_mirrors/il/illustrator-scripts 在当今快节奏的设计行业中&#xff0c;效率就是竞争力。这款开源Illustrator…...

小米Pad 5 Windows驱动完整配置指南:解锁平板的桌面级生产力

小米Pad 5 Windows驱动完整配置指南&#xff1a;解锁平板的桌面级生产力 【免费下载链接】MiPad5-Drivers Based on Surface Duo Drivers. 项目地址: https://gitcode.com/gh_mirrors/mi/MiPad5-Drivers 想要让小米Pad 5变身真正的生产力工具吗&#xff1f;这款基于高通…...

手把手教你给RK3588开发板添加RTL8188EUS USB无线网卡驱动(附完整配置流程)

RK3588开发板实战&#xff1a;RTL8188EUS无线网卡驱动移植全指南 在嵌入式开发领域&#xff0c;为特定硬件平台添加第三方外设驱动是开发者常遇到的挑战。本文将详细介绍如何在Rockchip RK3588开发板上为RTL8188EUS USB无线网卡移植驱动&#xff0c;从环境准备到功能验证&#…...

炉石传说脚本Hearthstone-Script:三步从零到精通的自动化游戏指南 [特殊字符]

炉石传说脚本Hearthstone-Script&#xff1a;三步从零到精通的自动化游戏指南 &#x1f3ae; 【免费下载链接】Hearthstone-Script Hearthstone script&#xff08;炉石传说脚本&#xff09;&#xff08;2024.01.25停更至国服回归&#xff09; 项目地址: https://gitcode.com…...

故障诊断指南:用STFT在5分钟内定位工业设备异常时间点(MATLAB版)

故障诊断实战&#xff1a;STFT在工业设备异常定位中的高效应用&#xff08;MATLAB实现&#xff09; 工业设备的异常检测如同医生听诊&#xff0c;需要精准捕捉故障的"心跳节律"。传统方法往往只能告诉我们"设备病了"&#xff0c;却难以定位"何时发病…...

Blazor开发中的高效筛选技术:MudBlazor数据表格优化指南

Blazor开发中的高效筛选技术&#xff1a;MudBlazor数据表格优化指南 【免费下载链接】MudBlazor Blazor Component Library based on Material design with an emphasis on ease of use. Mainly written in C# with Javascript kept to a bare minimum it empowers .NET develo…...