Flink 流式读写文件、文件夹
文章目录
- 一、flink 流式读取文件夹、文件
- 二、flink 写入文件系统——StreamFileSink
- 三、查看完整代码
一、flink 流式读取文件夹、文件
Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);
StreamExecutionEnvironment.readFile()接收如下参数:
- FileInputFormat参数,负责读取文件中的内容。
- 文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
- PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
- 30000L表示多久扫描一次监听的文件。
FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。
DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。
在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。
二、flink 写入文件系统——StreamFileSink
该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。
streamFileSink中输出的文件,其生命周期会经历3中状态:
- in-progress Files 当前文件正在写入中
- Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
- Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !
数据文件格式是行式存储格式
BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();
其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。
所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:

将数据以列式存储的格式输出到文件中
三、查看完整代码
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.time.ZoneId;
import java.util.concurrent.TimeUnit;public class WordTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.设置CK&状态后端env.setStateBackend(new FsStateBackend("hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint"));env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成,否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);String sourcePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com";String savePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01";TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();source.map(line -> JSONObject.parseObject(line)).filter(line -> line.getString("text").length() > 200 && line.getInteger("id") % 7 == 0).map(line -> JSON.toJSONString(line)).addSink(fileSink);env.execute();}
}相关文章:
Flink 流式读写文件、文件夹
文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示: StreamExe…...
【SA8295P 源码分析】64 - QNX 与 Android GVM 显示 Dump 图片方法汇总
【SA8295P 源码分析】64 - QNX 与 Android GVM 显示 Dump 图片方法汇总 一、QNX侧1.1 surfacedump 功能1.2 screenshot 功能二、Android GVM 侧2.1 screencap -p 导出 PNG 图片2.2 screencap 不加 -p 参数,导出 RGB32 图片2.3 dumpsys SurfaceFlinger --display-id 方法系列文…...
字符串旋转(1)
目录 编辑 题目要求😍: 题目内容❤: 题目分析📚: 主函数部分📕:编辑 方法一🐒: 方法二🐒🐒: 方法三🐒…...
【SA8295P 源码分析】13 - Android GVM 虚拟机 QUPv3 UART / SPI / I2C功能配置及透传配置
【SA8295P 源码分析】13 - Android GVM 虚拟机 QUPv3 UART / SPI / I2C功能配置及透传配置 一、QUP v3 介绍二、QUP v3 UART 功能配置2.1 TrustZone 域 Uart 资源权限配置:以 QUPV3_0_SE2 为例2.2 QNX Host 域关闭 Uart 资源:以 QUPV3_0_SE2 为例2.3 Android Kernel 域使能 U…...
STM32 F103C8T6学习笔记10:OLED显示屏GIF动图取模—简易时钟—动图手表的制作~
今日尝试做一款有动图的OLED实时时钟,本文需要现学一个OLED的GIF动图取模 其余需要的知识点有不会的可以去我 STM32 F103C8T6学习笔记 系列专栏自己查阅把,闲话不多,直接开肝~~~ 文章提供源码,测试工程下载,测试效…...
大数据课程K3——Spark的常用案例
文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Spark的常用案例——WordCount; ⚪ 掌握Spark的常用案例——求平均值; ⚪ 掌握Spark的常用案例——求最大值和最小值; ⚪ 掌握Spark的常用案例——TopK; ⚪ 掌握Spark的常用案例…...
85-最大矩阵
题目 给定一个仅包含 0 和 1 、大小为 rows x cols 的二维二进制矩阵,找出只包含 1 的最大矩形,并返回其面积。 示例 1: 输入:matrix [[“1”,“0”,“1”,“0”,“0”],[“1”,“0”,“1”,“1”,“1”],[“1”,“1”,“1”,…...
8.3 【C语言】通过指针引用数组
8.3.1 数组元素的指针 所谓数组元素的指针就是数组元素的地址。 可以用一个指针变量指向一个数组元素。例如: int a[10]{1,3,5,7,9,11,13,15,17,19}; int *p; p&a[0]; 引用数组元素可以用下标法,也可以用指针法…...
基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】
文章目录 一、PostgreSQL作为数据来源(source),由flink读取1.postgre安装与配置2.flink安装与配置3.flink cdc postgre配置3.1 postgre配置(for flink cdc)3.2 flink cdc postgres的jar包下载 4.flink cdc postgre测试…...
Vue-5.编译器Idea
Vue专栏(帮助你搭建一个优秀的Vue架子) Vue-1.零基础学习Vue Vue-2.Nodejs的介绍和安装 Vue-3.Vue简介 Vue-4.编译器VsCode Vue-5.编译器Idea Vue-6.编译器webstorm Vue-7.命令创建Vue项目 Vue-8.Vue项目配置详解 Vue-9.集成(.editorconfig、…...
qiuzhiji3
本篇想介绍一下慧与,这里的工作氛围和企业文化令人难忘,希望更多人了解它 也想探讨一下不同的文化铸就的不同企业,究竟有哪些差别。 本篇将从我个人角度出发描述慧与。 2022/3/16至2023/7/31 本篇初次写于2023年8月20日 说起来在毕业之前那段…...
JVM——垃圾回收(垃圾回收算法+分代垃圾回收+垃圾回收器)
1.如何判断对象可以回收 1.1引用计数法 只要一个对象被其他对象所引用,就要让该对象的技术加1,某个对象不再引用其,则让它计数减1。当计数变为0时就可以作为垃圾被回收。 有一个弊端叫做循环引用,两个的引用计数都是1ÿ…...
QT TLS initialization failed问题(已解决) QT基础入门【网络编程】openssl
问题: qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed 这个问题的出现主要是使用了https请求:HTTPS ≈ HTTP + SSL,即有了加密层的HTTP 所以Qt 组件库需要OpenSSL dll 文件支持HTTPS 解决: 1.加入以下两行代码获取QT是否支持opensll以…...
SpringMVC之获取请求参数
文章目录 前言一、通过ServletAPI获取二、通过控制器方法的形参获取请求参数三、注解1.RequestParam2.RequestHeader3.CookieValue前面的代码总和:4.通过POJO获取请求参数 三、解决获取请求参数的乱码问题总结 前言 下面用到了thymeleaf,不知道的可以看…...
【无标题】QT应用编程: QtCreator配置Git版本控制(码云)
QT应用编程: QtCreator配置Git版本控制(码云) 感谢:DS小龙哥的文章,这篇主要参考小龙哥的内容。 https://cloud.tencent.com/developer/article/1930531?areaSource102001.15&traceIdW2mKALltGu5f8-HOI8fsN Qt Creater 自带了git支持。但是一直没…...
JVM面试题-2
1、有哪几种垃圾回收器,各自的优缺点是什么? 垃圾回收器主要分为以下几种:Serial、ParNew、Parallel Scavenge、Serial Old、Parallel Old、CMS、G1; Serial:单线程的收集器,收集垃圾时,必须stop the worl…...
kafka安装说明以及在项目中使用
一、window 安装 1.1、下载安装包 下载kafka 地址,其中官方版内置zk, kafka_2.12-3.4.0.tgz其中这个名称的意思是 kafka3.4.0 版本 ,所用语言 scala 版本为 2.12 1.2、安装配置 1、解压刚刚下载的配置文件,解压后如下&#x…...
二叉树搜索
✅<1>主页:我的代码爱吃辣📃<2>知识讲解:数据结构——二叉搜索树☂️<3>开发环境 :Visual Studio 2022💬<4>前言:在之前的我们已经学过了普通二叉树,了解了基本的二叉树…...
【先进PID控制算法(ADRC,TD,ESO)加入永磁同步电机发电控制仿真模型研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
k8s集群生产环境的问题处理
2 k8s上的服务均无法访问 执行命令kubectl get pods -ALL,k8s集群中的服务均是running状态 1 kuboard 网页无法访问 kuboard无法通过浏览器访问,但是查看端口是被占用的...
CORP开源协作框架:从人治到规则驱动的自动化协作协议
1. 项目概述:一个面向未来的开源协作框架最近在折腾一个开源项目,叫CORP,全称是“Collaborative Open-source Resource Platform”。这名字听起来挺唬人,但说白了,它想解决的就是开源世界里一个老生常谈但又一直没被彻…...
AI增强自动化工作流:从规则驱动到意图驱动的智能决策实践
1. 项目概述:当AI遇见自动化工作流最近在GitHub上看到一个挺有意思的项目,叫“NitroRCr/AIaW”。光看名字,可能有点摸不着头脑,但点进去研究一下,你会发现它其实是一个将人工智能(AI)与自动化工…...
OpenClaw:重新定义 AI 智能体,从对话到执行的全能 “龙虾
在 AI 技术飞速迭代的今天,大语言模型已能流畅对话、生成内容,但多数仍停留在 “只说不做” 的层面。OpenClaw(外号 “龙虾”)的出现,打破了这一僵局 —— 它是一款由奥地利工程师 Peter Steinberger 主导开发…...
Azure Quickstart Templates 多区域部署高可用架构设计终极指南:5步构建企业级灾难恢复方案
Azure Quickstart Templates 多区域部署高可用架构设计终极指南:5步构建企业级灾难恢复方案 【免费下载链接】azure-quickstart-templates Azure Quickstart Templates 项目地址: https://gitcode.com/gh_mirrors/az/azure-quickstart-templates 在当今数字化…...
开源项目metabase-mcp-server:用MCP协议连接Metabase与AI智能体,实现对话式数据分析
1. 项目概述:当开源BI工具遇上AI智能体如果你和我一样,在日常工作中既要用Metabase做数据可视化看板,又要和Claude、Cursor这类AI助手打交道,那你肯定也遇到过这样的痛点:想问问AI“上个月华东区的销售额趋势”&#x…...
基于大语言模型的自动化信息处理系统:从RSS聚合到AI摘要的实践
1. 项目概述:一个能帮你“读”新闻的AI助手 在信息爆炸的时代,每天光是处理订阅的RSS、关注的社交媒体动态、收藏的YouTube视频和没读完的长文,就足以让人精疲力尽。我们总想保持对行业趋势的敏感,却又被海量信息淹没,…...
Factool开源框架:构建可信AI的事实核查自动化流水线
1. 项目概述:从“事实核查”到“可信AI”的基石工具在信息爆炸的时代,我们每天都被海量的文本内容包围——新闻稿、分析报告、产品介绍、学术论文,甚至是AI模型自己生成的回答。一个核心的挑战随之而来:如何快速、准确地判断一段文…...
Perplexity无法解析Springer LaTeX公式?2024.06最新MathJax兼容补丁+3类数学文献精准摘要生成术
更多请点击: https://intelliparadigm.com 第一章:Perplexity解析Springer文献的底层机制与失效归因 Perplexity 作为衡量语言模型预测能力的关键指标,在学术文献解析场景中常被误用为“质量代理”,尤其在处理 Springer 出版集团…...
离散数学“黑话”指南:命题、谓词、群论,一次讲清程序员常遇到的术语
离散数学“黑话”指南:程序员视角下的概念破译 刚接触算法优化时,我盯着论文里的"幺半群"概念发愣——这和我在代码里写的if-else有什么关系?直到某天用状态机处理用户权限时突然顿悟:原来离散数学的抽象术语࿰…...
【无人机】基于动态反演和扩展状态观测器的无人机鲁棒姿态控制研究附Matlab代码
✅作者简介:热爱科研的Matlab仿真开发者,擅长数据处理、建模仿真、程序设计、完整代码获取、论文复现及科研仿真。🍎 完整代码获取 定制创新 论文复现点击:Matlab科研工作室🍊个人信条:格物致知,完整Matlab…...
