4.2、Flink任务怎样读取文件中的数据
目录
1、前言
2、readTextFile(已过时,不推荐使用)
3、readFile(已过时,不推荐使用)
4、fromSource(FileSource) 推荐使用
1、前言
思考: 读取文件时可以设置哪些规则呢?
1. 文件的格式(txt、csv、二进制...)
2. 文件的分隔符(按\n 分割)
3. 是否需要监控文件变化(一次读取、持续读取)
基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法
2、readTextFile(已过时,不推荐使用)
语法说明:
定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath)); // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定Flink中的数据类型 TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName); // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }
代码示例:
public static void readTextFile() throws Exception {/** TODO 功能说明* readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}
3、readFile(已过时,不推荐使用)
语法说明:
定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath : 指定 文件路径watchType : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据
代码示例:
public static void readFile() throws Exception {/** TODO 功能说明* readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。* readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)* 按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:* @inputFormat : 指定文件输入格式* @filePath : 指定文件路径* @watchType : 指定监控类型,提供了两种读取策略* PROCESS_ONCE : 只读取一次* PROCESS_CONTINUOUSLY :持续读取,监控新增数据* @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:* 1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该* 文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}
4、fromSource(FileSource) 推荐使用
public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}
相关文章:
4.2、Flink任务怎样读取文件中的数据
目录 1、前言 2、readTextFile(已过时,不推荐使用) 3、readFile(已过时,不推荐使用) 4、fromSource(FileSource) 推荐使用 1、前言 思考: 读取文件时可以设置哪些规则呢? 1. 文件的格式(tx…...
Effective Java笔记(28)列表优于数组
数组与泛型相比,有两个重要的不同点 。 首先,数组是协变的( covariant ) 。 这个词听起来有点吓人,其实只是表示如果 Sub 为 Super 的子类型,那么数组类型 Sub[ ]就是Super[ ]的子类型。 相反,泛…...
做BI领域的ChatGPT,思迈特升级一站式ABI平台
8月8日,以「指标驱动 智能决策」为主题,2023 Smartbi V11系列新品发布会在广州丽思卡尔顿酒店开幕。 后疫情时代,BI发展趋势的观察与应对 在发布会上,思迈特CEO吴华夫在开场致辞中表示,当前大环境背景下…...
ELFK——ELK结合filebeat日志分析系统(2)
目录 一、filebeat 二、ELFK 1.原理简介 2.在ELK基础上部署filebeat 一、filebeat Filebeat,轻量级的开源日志文件数据搜集器。通常在需要采集数据的客户端安装 Filebeat,并指定目录与日志格式,Filebeat 就能快速收集数据,并…...
webSocket 协议是什么
webSocket 协议是什么,能简述一下吗? websocket 协议 HTML5 带来的新协议,相对于 http,它是一个持久连接的协议,它利用 http 协议完成握手,然后通过 TCP 连接通道发送消息,使用 websocket 协议可…...
CentOS 7迁移Anolis OS 8
背景:生产环境客户要求操作系统国产化 操作系统:Centos7.9 内核:5.4.108 服务器可以联网,进行在线迁移: # 下载迁移工具软件源 wget https://mirrors.openanolis.cn/anolis/migration/anolis-migration.repo -O /etc/y…...
Transformer 立体视觉 Depth Estimation
1. Intro 立体深度估计具有重要的意义,因为它能够重建三维信息。为此,在左右相机图像之间匹配相应的像素;对应像素位置的差异,即视差,可以用来推断深度并重建3D场景。最近基于深度学习的立体深度估计方法已经显示出有希望的结果,但仍然存在一些挑战。 其中一个挑战涉及使…...
vue去掉所有输入框两边空格,封装指令去空格,支持Vue2和Vue3,ElementUI Input去空格
需求背景 就是页面很多表单输入框,期望在提交的时候,都要把用户两边的空格去掉 ❌使用 vue 的指令 .trim 去掉空格 中间会输入不了空格, 比如我想输入 你好啊 中国, 这中间的空格输入不了,只能变成 你好啊中国 ❌在提交的时候使用…...
认识FFMPEG框架
FFMPEG全称: Fast Forward Moving Picture Experts Group (MPEG:动态图像专家组) ffmpeg相关网站: git://source.ffmpeg.org/ffmpeg.git http://git.videolan.org/?pffmpeg.git https://github.com/FFmpeg/FFmpeg FFMPEG框架基本组件: AVFormat , AVCodec, AVDevice, AVFil…...
Vue3 大屏数字滚动效果
父组件: <template> <div class"homePage"> <NumRoll v-for"(v, i) in numberList" :key"i" :number"v"></NumRoll> </div> </template> <script setup> import { onMounted, r…...
【深度学习注意力机制系列】—— SENet注意力机制(附pytorch实现)
深度学习中的注意力机制(Attention Mechanism)是一种模仿人类视觉和认知系统的方法,它允许神经网络在处理输入数据时集中注意力于相关的部分。通过引入注意力机制,神经网络能够自动地学习并选择性地关注输入中的重要信息ÿ…...
go 函数
go 语言函数 go 函数函数定义Go函数的特点如下函数作为参数可变参数相同类型可变参数不同类型可变参数 return语句作用概述空的return语句空白标识符多个返回值命名返回值 defer 语句作用引申出来的面试题for defer下面是一个使用defer的示例代码输出结果 匿名函数定义匿名函数…...
python之正则表达式
目录 正则表达式 python正则表达式方法 match search findall finditer compile 元字符匹配 元字符 量词 贪婪匹配和惰性匹配 正则表达式的group 语法 案例 正则表达式 正则表达式又称规则表达式,是使用单个字符串来描述、匹配某个句法规则的字符串…...
【LeetCode每日一题】——219.存在重复元素II
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 简单 三【题目编号】 219.存在重复元素II 四【题目描述】 给你一个…...
篇六:适配器模式:让不兼容变兼容
篇六:“适配器模式:让不兼容变兼容” 开始本篇文章之前先推荐一个好用的学习工具,AIRIght,借助于AI助手工具,学习事半功倍。欢迎访问:http://airight.fun/ 另外有2本不错的关于设计模式的资料,…...
【云原生】Docker-compose中所有模块学习
compose模块 模板文件是使用 Compose 的核心,涉及到的指令关键字也比较多。但大家不用担心,这里面大部分指令跟 docker run 相关参数的含义都是类似的。 默认的模板文件名称为 docker-compose.yml,格式为 YAML 格式。 version: "3&quo…...
广义积分练习
前置知识 无穷限积分瑕积分 练习 计算 ∫ 0 ∞ 1 x ( 1 x ) d x \int_0^{\infty}\dfrac{1}{\sqrt x(1x)}dx ∫0∞x (1x)1dx 解: x 0 \qquad x0 x0为瑕点 \qquad 原式 lim a → 0 lim b → ∞ ∫ a b 1 x ( 1 x ) d x lim a → 0 lim …...
element-ui树形表格,左边勾选,右边显示选中的数据-功能(如动图)
功能如图 功能需求 表格树形表格勾选数据,右边显示对应勾选的数据内容,选中客户,自动勾选所有的店铺(子级),选中其中一个店铺,自动勾选上客户(父级),同时会存在只有客户(下面没有子级的情况&am…...
Android数字价格变化的动画效果的简单实现
原理:使用ValueAnimator属性动画类实现,它通过值的改变手动设置对象的属性值来实现动画效果。直接贴代码: public static void doNumberAnim(TextView tvPrice, float startNumber, float endNumber) {ValueAnimator animator ValueAnimato…...
Win10无法投影关闭3D模式
Win10不小心开启了3D模式,插上投影仪就一闪一闪的,无法正投影 解决办法: 1. 打开注册表工具regedit,删除以下注册表,重启电脑 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\GraphicsDrivers\Configurat…...
AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南
点一下关注吧!!!非常感谢!!持续更新!!! 🚀 AI篇持续更新中!(长期更新) 目前2025年06月05日更新到: AI炼丹日志-28 - Aud…...
pam_env.so模块配置解析
在PAM(Pluggable Authentication Modules)配置中, /etc/pam.d/su 文件相关配置含义如下: 配置解析 auth required pam_env.so1. 字段分解 字段值说明模块类型auth认证类模块,负责验证用户身份&am…...
linux arm系统烧录
1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 (忘了有没有这步了 估计有) 刷机程序 和 镜像 就不提供了。要刷的时…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词
Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵,其中每行,每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid,其中有多少个 3 3 的 “幻方” 子矩阵&am…...
优选算法第十二讲:队列 + 宽搜 优先级队列
优选算法第十二讲:队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...
智能分布式爬虫的数据处理流水线优化:基于深度强化学习的数据质量控制
在数字化浪潮席卷全球的今天,数据已成为企业和研究机构的核心资产。智能分布式爬虫作为高效的数据采集工具,在大规模数据获取中发挥着关键作用。然而,传统的数据处理流水线在面对复杂多变的网络环境和海量异构数据时,常出现数据质…...
免费PDF转图片工具
免费PDF转图片工具 一款简单易用的PDF转图片工具,可以将PDF文件快速转换为高质量PNG图片。无需安装复杂的软件,也不需要在线上传文件,保护您的隐私。 工具截图 主要特点 🚀 快速转换:本地转换,无需等待上…...
