当前位置: 首页 > news >正文

Flink 流式读写文件、文件夹

文章目录

  • 一、flink 流式读取文件夹、文件
  • 二、flink 写入文件系统——StreamFileSink
  • 三、查看完整代码

一、flink 流式读取文件夹、文件

Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);

StreamExecutionEnvironment.readFile()接收如下参数:

  • FileInputFormat参数,负责读取文件中的内容。
  • 文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
  • PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
  • 30000L表示多久扫描一次监听的文件。

FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。

DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。

在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。

二、flink 写入文件系统——StreamFileSink

该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

streamFileSink中输出的文件,其生命周期会经历3中状态:

  • in-progress Files 当前文件正在写入中
  • Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
  • Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
    下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !

数据文件格式是行式存储格式

        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();

其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。

所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:
在这里插入图片描述
将数据以列式存储的格式输出到文件中

三、查看完整代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.time.ZoneId;
import java.util.concurrent.TimeUnit;public class WordTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.设置CK&状态后端env.setStateBackend(new FsStateBackend("hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint"));env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成,否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);String sourcePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com";String savePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01";TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();source.map(line -> JSONObject.parseObject(line)).filter(line -> line.getString("text").length() > 200 && line.getInteger("id") % 7 == 0).map(line -> JSON.toJSONString(line)).addSink(fileSink);env.execute();}
}

相关文章:

Flink 流式读写文件、文件夹

文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器&#xff0c;将文件看作流来读取数据。如下面的例子所示&#xff1a; StreamExe…...

【SA8295P 源码分析】64 - QNX 与 Android GVM 显示 Dump 图片方法汇总

【SA8295P 源码分析】64 - QNX 与 Android GVM 显示 Dump 图片方法汇总 一、QNX侧1.1 surfacedump 功能1.2 screenshot 功能二、Android GVM 侧2.1 screencap -p 导出 PNG 图片2.2 screencap 不加 -p 参数,导出 RGB32 图片2.3 dumpsys SurfaceFlinger --display-id 方法系列文…...

字符串旋转(1)

目录 ​编辑 题目要求&#x1f60d;&#xff1a; 题目内容❤&#xff1a; 题目分析&#x1f4da;&#xff1a; 主函数部分&#x1f4d5;&#xff1a;​编辑 方法一&#x1f412;&#xff1a; 方法二&#x1f412;&#x1f412;&#xff1a; 方法三&#x1f412;&#x1f…...

【SA8295P 源码分析】13 - Android GVM 虚拟机 QUPv3 UART / SPI / I2C功能配置及透传配置

【SA8295P 源码分析】13 - Android GVM 虚拟机 QUPv3 UART / SPI / I2C功能配置及透传配置 一、QUP v3 介绍二、QUP v3 UART 功能配置2.1 TrustZone 域 Uart 资源权限配置:以 QUPV3_0_SE2 为例2.2 QNX Host 域关闭 Uart 资源:以 QUPV3_0_SE2 为例2.3 Android Kernel 域使能 U…...

STM32 F103C8T6学习笔记10:OLED显示屏GIF动图取模—简易时钟—动图手表的制作~

今日尝试做一款有动图的OLED实时时钟&#xff0c;本文需要现学一个OLED的GIF动图取模 其余需要的知识点有不会的可以去我 STM32 F103C8T6学习笔记 系列专栏自己查阅把&#xff0c;闲话不多&#xff0c;直接开肝~~~ 文章提供源码&#xff0c;测试工程下载&#xff0c;测试效…...

大数据课程K3——Spark的常用案例

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Spark的常用案例——WordCount; ⚪ 掌握Spark的常用案例——求平均值; ⚪ 掌握Spark的常用案例——求最大值和最小值; ⚪ 掌握Spark的常用案例——TopK; ⚪ 掌握Spark的常用案例…...

85-最大矩阵

题目 给定一个仅包含 0 和 1 、大小为 rows x cols 的二维二进制矩阵&#xff0c;找出只包含 1 的最大矩形&#xff0c;并返回其面积。 示例 1&#xff1a; 输入&#xff1a;matrix [[“1”,“0”,“1”,“0”,“0”],[“1”,“0”,“1”,“1”,“1”],[“1”,“1”,“1”,…...

8.3 【C语言】通过指针引用数组

8.3.1 数组元素的指针 所谓数组元素的指针就是数组元素的地址。 可以用一个指针变量指向一个数组元素。例如&#xff1a; int a[10]{1,3,5,7,9,11,13,15,17,19}&#xff1b; int *p; p&a[0]&#xff1b; 引用数组元素可以用下标法&#xff0c;也可以用指针法&#xf…...

基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

文章目录 一、PostgreSQL作为数据来源&#xff08;source&#xff09;&#xff0c;由flink读取1.postgre安装与配置2.flink安装与配置3.flink cdc postgre配置3.1 postgre配置&#xff08;for flink cdc&#xff09;3.2 flink cdc postgres的jar包下载 4.flink cdc postgre测试…...

Vue-5.编译器Idea

Vue专栏&#xff08;帮助你搭建一个优秀的Vue架子&#xff09; Vue-1.零基础学习Vue Vue-2.Nodejs的介绍和安装 Vue-3.Vue简介 Vue-4.编译器VsCode Vue-5.编译器Idea Vue-6.编译器webstorm Vue-7.命令创建Vue项目 Vue-8.Vue项目配置详解 Vue-9.集成&#xff08;.editorconfig、…...

qiuzhiji3

本篇想介绍一下慧与&#xff0c;这里的工作氛围和企业文化令人难忘&#xff0c;希望更多人了解它 也想探讨一下不同的文化铸就的不同企业&#xff0c;究竟有哪些差别。 本篇将从我个人角度出发描述慧与。 2022/3/16至2023/7/31 本篇初次写于2023年8月20日 说起来在毕业之前那段…...

JVM——垃圾回收(垃圾回收算法+分代垃圾回收+垃圾回收器)

1.如何判断对象可以回收 1.1引用计数法 只要一个对象被其他对象所引用&#xff0c;就要让该对象的技术加1&#xff0c;某个对象不再引用其&#xff0c;则让它计数减1。当计数变为0时就可以作为垃圾被回收。 有一个弊端叫做循环引用&#xff0c;两个的引用计数都是1&#xff…...

QT TLS initialization failed问题(已解决) QT基础入门【网络编程】openssl

问题: qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed 这个问题的出现主要是使用了https请求:HTTPS ≈ HTTP + SSL,即有了加密层的HTTP 所以Qt 组件库需要OpenSSL dll 文件支持HTTPS 解决: 1.加入以下两行代码获取QT是否支持opensll以…...

SpringMVC之获取请求参数

文章目录 前言一、通过ServletAPI获取二、通过控制器方法的形参获取请求参数三、注解1.RequestParam2.RequestHeader3.CookieValue前面的代码总和&#xff1a;4.通过POJO获取请求参数 三、解决获取请求参数的乱码问题总结 前言 下面用到了thymeleaf&#xff0c;不知道的可以看…...

【无标题】QT应用编程: QtCreator配置Git版本控制(码云)

QT应用编程: QtCreator配置Git版本控制(码云) 感谢&#xff1a;DS小龙哥的文章&#xff0c;这篇主要参考小龙哥的内容。 https://cloud.tencent.com/developer/article/1930531?areaSource102001.15&traceIdW2mKALltGu5f8-HOI8fsN Qt Creater 自带了git支持。但是一直没…...

JVM面试题-2

1、有哪几种垃圾回收器&#xff0c;各自的优缺点是什么&#xff1f; 垃圾回收器主要分为以下几种&#xff1a;Serial、ParNew、Parallel Scavenge、Serial Old、Parallel Old、CMS、G1&#xff1b; Serial:单线程的收集器&#xff0c;收集垃圾时&#xff0c;必须stop the worl…...

kafka安装说明以及在项目中使用

一、window 安装 1.1、下载安装包 下载kafka 地址&#xff0c;其中官方版内置zk&#xff0c; kafka_2.12-3.4.0.tgz其中这个名称的意思是 kafka3.4.0 版本 &#xff0c;所用语言 scala 版本为 2.12 1.2、安装配置 1、解压刚刚下载的配置文件&#xff0c;解压后如下&#x…...

二叉树搜索

✅<1>主页&#xff1a;我的代码爱吃辣&#x1f4c3;<2>知识讲解&#xff1a;数据结构——二叉搜索树☂️<3>开发环境 &#xff1a;Visual Studio 2022&#x1f4ac;<4>前言&#xff1a;在之前的我们已经学过了普通二叉树&#xff0c;了解了基本的二叉树…...

【先进PID控制算法(ADRC,TD,ESO)加入永磁同步电机发电控制仿真模型研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

k8s集群生产环境的问题处理

2 k8s上的服务均无法访问 执行命令kubectl get pods -ALL,k8s集群中的服务均是running状态 1 kuboard 网页无法访问 kuboard无法通过浏览器访问&#xff0c;但是查看端口是被占用的...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

Linux链表操作全解析

Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表&#xff1f;1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...

基于服务器使用 apt 安装、配置 Nginx

&#x1f9fe; 一、查看可安装的 Nginx 版本 首先&#xff0c;你可以运行以下命令查看可用版本&#xff1a; apt-cache madison nginx-core输出示例&#xff1a; nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

React19源码系列之 事件插件系统

事件类别 事件类型 定义 文档 Event Event 接口表示在 EventTarget 上出现的事件。 Event - Web API | MDN UIEvent UIEvent 接口表示简单的用户界面事件。 UIEvent - Web API | MDN KeyboardEvent KeyboardEvent 对象描述了用户与键盘的交互。 KeyboardEvent - Web…...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

【Oracle】分区表

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...