源码解析flink文件连接源TextInputFormat
背景:
kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性
TextInputFormat源码解析
首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:
protected boolean testForUnsplittable(FileStatus pathFile) {if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {unsplittable = true;return true;}return false;
}private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {String fileExtension = extractFileExtension(path.getName());if (fileExtension != null) {return getInflaterInputStreamFactory(fileExtension);} else {return null;}
}

后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:
while (samplesTaken < numSamples && fileNum < allFiles.size()) {// make a split for the sample and use it to read a recordFileStatus file = allFiles.get(fileNum);
// 根据偏移量进行切分FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);// we open the split, read one line, and take its lengthtry {open(split);if (readLine()) {totalNumBytes += this.currLen + this.delimiter.length;samplesTaken++;}} finally {// close the file stream, do not release the bufferssuper.close();}
// 偏移量迁移offset += stepSize;// skip to the next file, if necessarywhile (fileNum < allFiles.size()&& offset >= (file = allFiles.get(fileNum)).getLen()) {offset -= file.getLen();fileNum++;}
}
再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);checkState(checkpointedState != null, "The operator state has not been properly initialized.");int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();// 算子列表状态checkpointedState.clear();// 获取文件的当前读取的偏移List<T> readerState = getReaderState();try {for (T split : readerState) {//保存到检查点路径中checkpointedState.add(split);}} catch (Exception e) {checkpointedState.clear();throw new Exception("Could not add timestamped file input splits to to operator "+ "state backend of operator "+ getOperatorName()+ '.',e);}if (LOG.isDebugEnabled()) {LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.",getClass().getSimpleName(),subtaskIdx,readerState.size(),readerState);}
}
从检查点中恢复状态的代码如下:
public void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);checkState(checkpointedState == null, "The reader state has already been initialized.");// 初始化算子操作状态checkpointedState =context.getOperatorStateStore().getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);splits = splits == null ? new PriorityQueue<>() : splits;for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块splits.add(split);}
}
相关文章:
源码解析flink文件连接源TextInputFormat
背景: kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性 TextInputFormat源码解析 首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取…...
SQL ORDER BY Keyword(按关键字排序)
SQL ORDER BY 关键字 ORDER BY 关键字用于按升序或降序对结果集进行排序。 ORDER BY 关键字默认情况下按升序排序记录。 如果需要按降序对记录进行排序,可以使用DESC关键字。 SQL ORDER BY 语法 SELECT column1, column2, ... FROM table_name ORDER BY column1, …...
光伏三相并网逆变器的控制策略与性能分析(Simulink仿真实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
【网络安全 --- xss-labs靶场】xss-labs靶场安装详细教程,让你巩固对xss漏洞的理解及绕过技巧和方法(提供资源)
一,资源下载准备 1-1 VMware 16.0 安装请参考以下博客,若已经安装请忽略: 【网络安全 --- 工具安装】VMware 16.0 详细安装过程(提供资源)-CSDN博客【网络安全 --- 工具安装】VMware 16.0 详细安装过程(…...
蓝桥每日一题(day 3: 蓝桥587.约数个数)--数学--easy
题目 解题核心: 分解质因数,每个质因数的次方1的累乘积就是anscode #include <iostream> #include<algorithm> #include<unordered_map> //# #include<> typedef long long LL; const int N 110, MOD 1e9 7;using namespac…...
深入剖析Java类加载过程:探寻类加载器的奥秘
摘要: 一个java文件从被加载到被卸载这个生命过程,总共要经历4个阶段: 加载->链接(验证准备解析)->初始化(使用前的准备)->使用->卸载 其中类加载过程包括加载、验证、准备、解析和初始化五个阶…...
PHP yield
概念: Generator:带 yield的function yield:Generator或task的中断关键字,执行到yield时一次调度周期执行完即阻塞,并返回右侧表达式结果,等待下一次调度器运行next()或迭代遍历才会继续往下执行࿰…...
react antd实现upload上传文件前form校验,同时请求带data
最近的需求,两个下拉框是必填项,点击上传按钮,如果有下拉框没选要有提示,如图 如果直接使用antd的Upload组件,一点击文件选择的窗口就打开了,哪怕在Button里再加点击事件,也只是(几乎…...
echars 设置滚动条演示,
dataZoom: [// 滑动条{zoomLock:true,xAxisIndex: 0, // 这里是从X轴的0刻度开始type: "slider", // 这个 dataZoom 组件是 slider 型 dataZoom 组件startValue: 0, // 从头开始。endValue: 20, // 一次性展示几个。// fillerColor: "#023661", // 选中范围…...
代码随想录算法训练营第五十八天|583.两个字符串的删除操作 、72. 编辑距离
代码随想录算法训练营第五十八天|583.两个字符串的删除操作 、72. 编辑距离 文章目录 代码随想录算法训练营第五十八天|583.两个字符串的删除操作 、72. 编辑距离[toc]583.两个字符串的删除操作求公共部分长度:即最长公共子串 72. 编辑距离 583.两个字符串的删除操作…...
1024网络技术命令汇总(第54课)
1024网络技术命令汇总(第54课) 1 查询命令 display ? display current-configuration //查看全部的配置信息 display interface brief //查看接口的信 display ip interface brief //查看IP地址的接口信息状态 display arp all …...
智慧河湖方案:AI赋能水利水务,构建河湖智能可视化监管大数据平台
一、方案背景 我国江河湖泊众多,水系发达。伴随着经济社会快速发展,水生态水环境问题成为群众最关注的民生议题之一。一些河流开发利用已接近甚至超出水环境承载能力,一些地区废污水排放量居高不下,一些地方侵占河道、围垦湖泊等…...
界面组件DevExpress WPF v23.1 - 全面升级文档处理功能
DevExpress WPF拥有120个控件和库,将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序,这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…...
【C语言必知必会 | 第八篇】一文带你精通循环结构
引言 C语言是一门面向过程的、抽象化的通用程序设计语言,广泛应用于底层开发。它在编程语言中具有举足轻重的地位。 此文为【C语言必知必会】系列第八篇,进行C语言循环结构的专项练习,结合专题优质题目,带领读者从0开始࿰…...
同一个线程池执行不同类型的任务
1、同一个线程池可以执行不同的任务类型,也可以带返回值,也可以不带返回值的 import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.vip.vman.result.BasicResult; import lombok.extern.slf4j.Slf4j; import org.springframewor…...
GEO生信数据挖掘(八)富集分析(GO 、KEGG、 GSEA 打包带走)
第六节,我们使用结核病基因数据,做了一个数据预处理的实操案例。例子中结核类型,包括结核,潜隐进展,对照和潜隐,四个类别。第七节延续上个数据,进行了差异分析。 本节对差异基因进行富集分析。 …...
高校教务系统登录页面JS分析——华南理工大学
高校教务系统密码加密逻辑及JS逆向 本文将介绍高校教务系统的密码加密逻辑以及使用JavaScript进行逆向分析的过程。通过本文,你将了解到密码加密的基本概念、常用加密算法以及如何通过逆向分析来破解密码。 本文仅供交流学习,勿用于非法用途。 一、密码加…...
人工智能之PyTorch数据操作-Python版
PyTorch数据操作 # 导入PyTorch import torch [张量表示一个由数值组成的数组,这个数组可能有多个维度]。 具有一个轴的张量对应数学上的向量(); 具有两个轴的张量对应数学上的矩阵(matrix);…...
星环科技向量数据库Transwarp Hippo1.1发布:一库搞定向量+全文联合检索,提升大模型准确率
星环科技向量数据库Transwarp Hippo自发布已来,受到了众多用户的欢迎,帮助用户实现向量数据的存储、管理和检索,探索和实践大模型场景。在与用户不断地深入交流以及实践中,Hippo迎来了V1.1版本,一套系统即可支持向量与全文联合检索,提高文本数据的召回精度,从而提升大语…...
理解LoadRunner,基于此工具进行后端性能测试的详细过程(下)
5、录制并增强虚拟用户脚本 从整体角度看,用LoadRunner 开发虚拟用户脚本主要包括下面四步骤: 识别测试应用使用的协议 录制脚本 完善录制得到的脚本 验证脚本的正确性 识别被测应用使用的协议 如果明确知道了被测系统所采用的协议,可…...
终极PHP导航菜单指南:从KnpMenu到Spatie Menu的完整实现方案
终极PHP导航菜单指南:从KnpMenu到Spatie Menu的完整实现方案 【免费下载链接】awesome-php A curated list of amazingly awesome PHP libraries, resources and shiny things. 项目地址: https://gitcode.com/gh_mirrors/aw/awesome-php PHP导航菜单是Web应…...
SpringBoot+Vue实验室开放管理系统源码+论文
代码可以查看文章末尾⬇️联系方式获取,记得注明来意哦~🌹 分享万套开题报告任务书答辩PPT模板 作者完整代码目录供你选择: 《SpringBoot网站项目》1800套 《SSM网站项目》1500套 《小程序项目》1600套 《APP项目》1500套 《Python网站项目》…...
KMS_VL_ALL_AIO:Windows和Office批量激活的终极解决方案
KMS_VL_ALL_AIO:Windows和Office批量激活的终极解决方案 【免费下载链接】KMS_VL_ALL_AIO Smart Activation Script 项目地址: https://gitcode.com/gh_mirrors/km/KMS_VL_ALL_AIO KMS_VL_ALL_AIO 是一款功能强大的智能激活脚本,专为解决Windows操…...
企业数仓揭秘:数据决策背后的核心引擎
公司里人人都在提的“数仓”,到底是什么? 目录 公司里人人都在提的“数仓”,到底是什么? 一、一句话讲透:数仓到底是什么? 二、关键区分:数仓 vs 业务数据库,90%的人都搞混了 三、为什么现在几乎所有公司,都必须建自己的数仓? 四、企业数仓的核心架构:分层设计,到…...
量子-经典混合编排难题全解析,基于MCP 2026标准的4类典型故障诊断与容错加固指南
更多请点击: https://intelliparadigm.com 第一章:量子-经典混合编排的MCP 2026标准演进与核心约束 MCP 2026(Mixed Classical-Quantum Orchestration Protocol)标志着量子计算基础设施从实验性调度迈向生产级协同编排的关键转折…...
故障分级标准(Incident Severity)P级别 / SEV级别介绍(P0 / SEV1)
文章目录一文讲透故障分级标准(P0 / SEV1 等)一、为什么需要分级?二、两种主流命名体系1️⃣ 国内常见:P0 / P1 / P22️⃣ 国外常见:SEV1 / SEV2 / SEV33️⃣ 本质区别三、标准分级模型(推荐实践࿰…...
【独家首发】MCP 2026适配Checklist V2.3(工信部智能网联汽车准入预审备案专用版)
更多请点击: https://intelliparadigm.com 第一章:MCP 2026车载系统适配合规性总览 MCP 2026(Mobile Computing Platform 2026)是新一代车载智能计算平台,其适配需同步满足功能安全(ISO 26262 ASIL-B&…...
如何用evernote-backup三步实现Evernote数据完整备份与永久掌控
如何用evernote-backup三步实现Evernote数据完整备份与永久掌控 【免费下载链接】evernote-backup Backup & export all Evernote notes and notebooks 项目地址: https://gitcode.com/gh_mirrors/ev/evernote-backup 你是否曾担心Evernote中的珍贵笔记突然消失&…...
别再瞎学AI了!这张路线图,帮你从入门到落地,少走90% 的弯路
AI时代的红利,从来不属于 “跟风学” 的人。有人学了半年 Python,还是只会写 “Hello World”;有人刷了一堆算法课,面试时连项目都拿不出手;有人跟风报了 LLM 班,学完依然不知道怎么把模型部署到服务器上……...
孤能子视角:“Anthropic招STEM研究员驻场补齐Claude判断力短板“解读,以及“异质大模型耦合“
(这次Kimi回答,信兄再分析。姑且当科幻小说看)我的问题:Anthropic招募STEM研究员,驻场补齐Claude判断力短板这不是三线模型吗?想自动消除那些"幻觉",一般方法难。要异质大模型耦合,应该会好些。Kimi回答信兄…...
