当前位置: 首页 > news >正文

Flink 流式读写文件、文件夹

文章目录

  • 一、flink 流式读取文件夹、文件
  • 二、flink 写入文件系统——StreamFileSink
  • 三、查看完整代码

一、flink 流式读取文件夹、文件

Apache Flink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);

StreamExecutionEnvironment.readFile()接收如下参数:

  • FileInputFormat参数,负责读取文件中的内容。
  • 文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。
  • PROCESS_CONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。
  • 30000L表示多久扫描一次监听的文件。

FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的input splits。一个input split将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个input split,读取被split定义的文件范围,然后返回对应的数据。

DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。

在Flink 1.7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。

二、flink 写入文件系统——StreamFileSink

该Sink不但可以将数据写入到各种文件系统中,而且整合了checkpoint机制来保证Exacly Once语义,还可以对文件进行分桶存储,还支持以列式存储的格式写入,功能更强大。

streamFileSink中输出的文件,其生命周期会经历3中状态:

  • in-progress Files 当前文件正在写入中
  • Pending Files 当处于 In-progress 状态的文件关闭closed了,就变为 Pending 状态
  • Finished Files 在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态
    下面是一个简答的例子 , 将接收到的数据流 ,写入到文件中保存 !

数据文件格式是行式存储格式

        BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();

其中特别说明了,如果使用 FileSink 在 STREAMING 模式的时候,必须开启 checkpoint,不然的话会导致每个分片文件一直处于 in-progress 或者 pending 状态,不能保证整个写入流程的安全性。

所以在我们上述的示例中,我们并未开启 checkpoint 导致写出文件一直处于 inprogress 状态。如果加上 checkpoint 后:
在这里插入图片描述
将数据以列式存储的格式输出到文件中

三、查看完整代码

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;import java.time.ZoneId;
import java.util.concurrent.TimeUnit;public class WordTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.设置CK&状态后端env.setStateBackend(new FsStateBackend("hdfs://nameservice1/tmp/kafka_test/data/chatgpt/mnbvc/checkpoint"));env.enableCheckpointing(1000*60*3);// 每 ** ms 开始一次 checkpointenv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置模式为精确一次env.getCheckpointConfig().setCheckpointTimeout(1000*60*5);// Checkpoint 必须在** ms内完成,否则就会被抛弃env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 同一时间只允许一个 checkpoint 进行env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);// 确认 checkpoints 之间的时间会进行 ** msenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);// 允许两个连续的 checkpoint 错误env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10,TimeUnit.SECONDS)));//重启策略:重启3次,间隔10s// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);String sourcePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_com";String savePath = "hdfs://nameservice1/ec/data/chatgpt/mnbvc/mnbvc_website/format_filter_01";TextInputFormat textInputFormat = new TextInputFormat(null);DataStreamSource<String> source = env.readFile(textInputFormat, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 30000L);BucketAssigner<String, String> assigner = new DateTimeBucketAssigner<>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path(savePath),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(TimeUnit.MINUTES.toMillis(20))//至少包含 20 分钟的数据.withInactivityInterval(TimeUnit.MINUTES.toMillis(20))//最近 20 分钟没有收到新的数据.withMaxPartSize(1024 * 1024 * 1024)//文件大小已达到 1 GB.build()).withBucketAssigner(assigner).build();source.map(line -> JSONObject.parseObject(line)).filter(line -> line.getString("text").length() > 200 && line.getInteger("id") % 7 == 0).map(line -> JSON.toJSONString(line)).addSink(fileSink);env.execute();}
}

相关文章:

Flink 流式读写文件、文件夹

文章目录 一、flink 流式读取文件夹、文件二、flink 写入文件系统——StreamFileSink三、查看完整代码 一、flink 流式读取文件夹、文件 Apache Flink针对文件系统实现了一个可重置的source连接器&#xff0c;将文件看作流来读取数据。如下面的例子所示&#xff1a; StreamExe…...

【SA8295P 源码分析】64 - QNX 与 Android GVM 显示 Dump 图片方法汇总

【SA8295P 源码分析】64 - QNX 与 Android GVM 显示 Dump 图片方法汇总 一、QNX侧1.1 surfacedump 功能1.2 screenshot 功能二、Android GVM 侧2.1 screencap -p 导出 PNG 图片2.2 screencap 不加 -p 参数,导出 RGB32 图片2.3 dumpsys SurfaceFlinger --display-id 方法系列文…...

字符串旋转(1)

目录 ​编辑 题目要求&#x1f60d;&#xff1a; 题目内容❤&#xff1a; 题目分析&#x1f4da;&#xff1a; 主函数部分&#x1f4d5;&#xff1a;​编辑 方法一&#x1f412;&#xff1a; 方法二&#x1f412;&#x1f412;&#xff1a; 方法三&#x1f412;&#x1f…...

【SA8295P 源码分析】13 - Android GVM 虚拟机 QUPv3 UART / SPI / I2C功能配置及透传配置

【SA8295P 源码分析】13 - Android GVM 虚拟机 QUPv3 UART / SPI / I2C功能配置及透传配置 一、QUP v3 介绍二、QUP v3 UART 功能配置2.1 TrustZone 域 Uart 资源权限配置:以 QUPV3_0_SE2 为例2.2 QNX Host 域关闭 Uart 资源:以 QUPV3_0_SE2 为例2.3 Android Kernel 域使能 U…...

STM32 F103C8T6学习笔记10:OLED显示屏GIF动图取模—简易时钟—动图手表的制作~

今日尝试做一款有动图的OLED实时时钟&#xff0c;本文需要现学一个OLED的GIF动图取模 其余需要的知识点有不会的可以去我 STM32 F103C8T6学习笔记 系列专栏自己查阅把&#xff0c;闲话不多&#xff0c;直接开肝~~~ 文章提供源码&#xff0c;测试工程下载&#xff0c;测试效…...

大数据课程K3——Spark的常用案例

文章作者邮箱:yugongshiye@sina.cn 地址:广东惠州 ▲ 本章节目的 ⚪ 掌握Spark的常用案例——WordCount; ⚪ 掌握Spark的常用案例——求平均值; ⚪ 掌握Spark的常用案例——求最大值和最小值; ⚪ 掌握Spark的常用案例——TopK; ⚪ 掌握Spark的常用案例…...

85-最大矩阵

题目 给定一个仅包含 0 和 1 、大小为 rows x cols 的二维二进制矩阵&#xff0c;找出只包含 1 的最大矩形&#xff0c;并返回其面积。 示例 1&#xff1a; 输入&#xff1a;matrix [[“1”,“0”,“1”,“0”,“0”],[“1”,“0”,“1”,“1”,“1”],[“1”,“1”,“1”,…...

8.3 【C语言】通过指针引用数组

8.3.1 数组元素的指针 所谓数组元素的指针就是数组元素的地址。 可以用一个指针变量指向一个数组元素。例如&#xff1a; int a[10]{1,3,5,7,9,11,13,15,17,19}&#xff1b; int *p; p&a[0]&#xff1b; 引用数组元素可以用下标法&#xff0c;也可以用指针法&#xf…...

基于Flink CDC实时同步PostgreSQL与Tidb【Flink SQL Client模式下亲测可行,详细教程】

文章目录 一、PostgreSQL作为数据来源&#xff08;source&#xff09;&#xff0c;由flink读取1.postgre安装与配置2.flink安装与配置3.flink cdc postgre配置3.1 postgre配置&#xff08;for flink cdc&#xff09;3.2 flink cdc postgres的jar包下载 4.flink cdc postgre测试…...

Vue-5.编译器Idea

Vue专栏&#xff08;帮助你搭建一个优秀的Vue架子&#xff09; Vue-1.零基础学习Vue Vue-2.Nodejs的介绍和安装 Vue-3.Vue简介 Vue-4.编译器VsCode Vue-5.编译器Idea Vue-6.编译器webstorm Vue-7.命令创建Vue项目 Vue-8.Vue项目配置详解 Vue-9.集成&#xff08;.editorconfig、…...

qiuzhiji3

本篇想介绍一下慧与&#xff0c;这里的工作氛围和企业文化令人难忘&#xff0c;希望更多人了解它 也想探讨一下不同的文化铸就的不同企业&#xff0c;究竟有哪些差别。 本篇将从我个人角度出发描述慧与。 2022/3/16至2023/7/31 本篇初次写于2023年8月20日 说起来在毕业之前那段…...

JVM——垃圾回收(垃圾回收算法+分代垃圾回收+垃圾回收器)

1.如何判断对象可以回收 1.1引用计数法 只要一个对象被其他对象所引用&#xff0c;就要让该对象的技术加1&#xff0c;某个对象不再引用其&#xff0c;则让它计数减1。当计数变为0时就可以作为垃圾被回收。 有一个弊端叫做循环引用&#xff0c;两个的引用计数都是1&#xff…...

QT TLS initialization failed问题(已解决) QT基础入门【网络编程】openssl

问题: qt.network.ssl: QSslSocket::connectToHostEncrypted: TLS initialization failed 这个问题的出现主要是使用了https请求:HTTPS ≈ HTTP + SSL,即有了加密层的HTTP 所以Qt 组件库需要OpenSSL dll 文件支持HTTPS 解决: 1.加入以下两行代码获取QT是否支持opensll以…...

SpringMVC之获取请求参数

文章目录 前言一、通过ServletAPI获取二、通过控制器方法的形参获取请求参数三、注解1.RequestParam2.RequestHeader3.CookieValue前面的代码总和&#xff1a;4.通过POJO获取请求参数 三、解决获取请求参数的乱码问题总结 前言 下面用到了thymeleaf&#xff0c;不知道的可以看…...

【无标题】QT应用编程: QtCreator配置Git版本控制(码云)

QT应用编程: QtCreator配置Git版本控制(码云) 感谢&#xff1a;DS小龙哥的文章&#xff0c;这篇主要参考小龙哥的内容。 https://cloud.tencent.com/developer/article/1930531?areaSource102001.15&traceIdW2mKALltGu5f8-HOI8fsN Qt Creater 自带了git支持。但是一直没…...

JVM面试题-2

1、有哪几种垃圾回收器&#xff0c;各自的优缺点是什么&#xff1f; 垃圾回收器主要分为以下几种&#xff1a;Serial、ParNew、Parallel Scavenge、Serial Old、Parallel Old、CMS、G1&#xff1b; Serial:单线程的收集器&#xff0c;收集垃圾时&#xff0c;必须stop the worl…...

kafka安装说明以及在项目中使用

一、window 安装 1.1、下载安装包 下载kafka 地址&#xff0c;其中官方版内置zk&#xff0c; kafka_2.12-3.4.0.tgz其中这个名称的意思是 kafka3.4.0 版本 &#xff0c;所用语言 scala 版本为 2.12 1.2、安装配置 1、解压刚刚下载的配置文件&#xff0c;解压后如下&#x…...

二叉树搜索

✅<1>主页&#xff1a;我的代码爱吃辣&#x1f4c3;<2>知识讲解&#xff1a;数据结构——二叉搜索树☂️<3>开发环境 &#xff1a;Visual Studio 2022&#x1f4ac;<4>前言&#xff1a;在之前的我们已经学过了普通二叉树&#xff0c;了解了基本的二叉树…...

【先进PID控制算法(ADRC,TD,ESO)加入永磁同步电机发电控制仿真模型研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

k8s集群生产环境的问题处理

2 k8s上的服务均无法访问 执行命令kubectl get pods -ALL,k8s集群中的服务均是running状态 1 kuboard 网页无法访问 kuboard无法通过浏览器访问&#xff0c;但是查看端口是被占用的...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

基于服务器使用 apt 安装、配置 Nginx

&#x1f9fe; 一、查看可安装的 Nginx 版本 首先&#xff0c;你可以运行以下命令查看可用版本&#xff1a; apt-cache madison nginx-core输出示例&#xff1a; nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

视频字幕质量评估的大规模细粒度基准

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用&#xff0c;因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型&#xff08;VLMs&#xff09;在字幕生成方面…...

CRMEB 框架中 PHP 上传扩展开发:涵盖本地上传及阿里云 OSS、腾讯云 COS、七牛云

目前已有本地上传、阿里云OSS上传、腾讯云COS上传、七牛云上传扩展 扩展入口文件 文件目录 crmeb\services\upload\Upload.php namespace crmeb\services\upload;use crmeb\basic\BaseManager; use think\facade\Config;/*** Class Upload* package crmeb\services\upload* …...

Spring是如何解决Bean的循环依赖:三级缓存机制

1、什么是 Bean 的循环依赖 在 Spring框架中,Bean 的循环依赖是指多个 Bean 之间‌互相持有对方引用‌,形成闭环依赖关系的现象。 多个 Bean 的依赖关系构成环形链路,例如: 双向依赖:Bean A 依赖 Bean B,同时 Bean B 也依赖 Bean A(A↔B)。链条循环: Bean A → Bean…...

Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

目录 关键点 技术实现1 技术实现2 摘要&#xff1a; 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式&#xff08;自动驾驶、人工驾驶、远程驾驶、主动安全&#xff09;&#xff0c;并通过实时消息推送更新车…...

小木的算法日记-多叉树的递归/层序遍历

&#x1f332; 从二叉树到森林&#xff1a;一文彻底搞懂多叉树遍历的艺术 &#x1f680; 引言 你好&#xff0c;未来的算法大神&#xff01; 在数据结构的世界里&#xff0c;“树”无疑是最核心、最迷人的概念之一。我们中的大多数人都是从 二叉树 开始入门的&#xff0c;它…...

论文阅读:Matting by Generation

今天介绍一篇关于 matting 抠图的文章&#xff0c;抠图也算是计算机视觉里面非常经典的一个任务了。从早期的经典算法到如今的深度学习算法&#xff0c;已经有很多的工作和这个任务相关。这两年 diffusion 模型很火&#xff0c;大家又开始用 diffusion 模型做各种 CV 任务了&am…...

Linux 下 DMA 内存映射浅析

序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存&#xff0c;但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程&#xff0c;可以参考这篇文章&#xff0c;我觉得写的非常…...

小智AI+MCP

什么是小智AI和MCP 如果还不清楚的先看往期文章 手搓小智AI聊天机器人 MCP 深度解析&#xff1a;AI 的USB接口 如何使用小智MCP 1.刷支持mcp的小智固件 2.下载官方MCP的示例代码 Github&#xff1a;https://github.com/78/mcp-calculator 安这个步骤执行 其中MCP_ENDPOI…...