4.2、Flink任务怎样读取文件中的数据
目录
1、前言
2、readTextFile(已过时,不推荐使用)
3、readFile(已过时,不推荐使用)
4、fromSource(FileSource) 推荐使用
1、前言
思考: 读取文件时可以设置哪些规则呢?
1. 文件的格式(txt、csv、二进制...)
2. 文件的分隔符(按\n 分割)
3. 是否需要监控文件变化(一次读取、持续读取)
基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法
2、readTextFile(已过时,不推荐使用)
语法说明:
定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath)); // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定Flink中的数据类型 TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName); // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }
代码示例:
public static void readTextFile() throws Exception {/** TODO 功能说明* readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}
3、readFile(已过时,不推荐使用)
语法说明:
定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath : 指定 文件路径watchType : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据
代码示例:
public static void readFile() throws Exception {/** TODO 功能说明* readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。* readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)* 按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:* @inputFormat : 指定文件输入格式* @filePath : 指定文件路径* @watchType : 指定监控类型,提供了两种读取策略* PROCESS_ONCE : 只读取一次* PROCESS_CONTINUOUSLY :持续读取,监控新增数据* @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:* 1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该* 文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}
4、fromSource(FileSource) 推荐使用
public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}
相关文章:
4.2、Flink任务怎样读取文件中的数据
目录 1、前言 2、readTextFile(已过时,不推荐使用) 3、readFile(已过时,不推荐使用) 4、fromSource(FileSource) 推荐使用 1、前言 思考: 读取文件时可以设置哪些规则呢? 1. 文件的格式(tx…...
Effective Java笔记(28)列表优于数组
数组与泛型相比,有两个重要的不同点 。 首先,数组是协变的( covariant ) 。 这个词听起来有点吓人,其实只是表示如果 Sub 为 Super 的子类型,那么数组类型 Sub[ ]就是Super[ ]的子类型。 相反,泛…...
做BI领域的ChatGPT,思迈特升级一站式ABI平台
8月8日,以「指标驱动 智能决策」为主题,2023 Smartbi V11系列新品发布会在广州丽思卡尔顿酒店开幕。 后疫情时代,BI发展趋势的观察与应对 在发布会上,思迈特CEO吴华夫在开场致辞中表示,当前大环境背景下…...
ELFK——ELK结合filebeat日志分析系统(2)
目录 一、filebeat 二、ELFK 1.原理简介 2.在ELK基础上部署filebeat 一、filebeat Filebeat,轻量级的开源日志文件数据搜集器。通常在需要采集数据的客户端安装 Filebeat,并指定目录与日志格式,Filebeat 就能快速收集数据,并…...
webSocket 协议是什么
webSocket 协议是什么,能简述一下吗? websocket 协议 HTML5 带来的新协议,相对于 http,它是一个持久连接的协议,它利用 http 协议完成握手,然后通过 TCP 连接通道发送消息,使用 websocket 协议可…...
CentOS 7迁移Anolis OS 8
背景:生产环境客户要求操作系统国产化 操作系统:Centos7.9 内核:5.4.108 服务器可以联网,进行在线迁移: # 下载迁移工具软件源 wget https://mirrors.openanolis.cn/anolis/migration/anolis-migration.repo -O /etc/y…...
Transformer 立体视觉 Depth Estimation
1. Intro 立体深度估计具有重要的意义,因为它能够重建三维信息。为此,在左右相机图像之间匹配相应的像素;对应像素位置的差异,即视差,可以用来推断深度并重建3D场景。最近基于深度学习的立体深度估计方法已经显示出有希望的结果,但仍然存在一些挑战。 其中一个挑战涉及使…...
vue去掉所有输入框两边空格,封装指令去空格,支持Vue2和Vue3,ElementUI Input去空格
需求背景 就是页面很多表单输入框,期望在提交的时候,都要把用户两边的空格去掉 ❌使用 vue 的指令 .trim 去掉空格 中间会输入不了空格, 比如我想输入 你好啊 中国, 这中间的空格输入不了,只能变成 你好啊中国 ❌在提交的时候使用…...
认识FFMPEG框架
FFMPEG全称: Fast Forward Moving Picture Experts Group (MPEG:动态图像专家组) ffmpeg相关网站: git://source.ffmpeg.org/ffmpeg.git http://git.videolan.org/?pffmpeg.git https://github.com/FFmpeg/FFmpeg FFMPEG框架基本组件: AVFormat , AVCodec, AVDevice, AVFil…...
Vue3 大屏数字滚动效果
父组件: <template> <div class"homePage"> <NumRoll v-for"(v, i) in numberList" :key"i" :number"v"></NumRoll> </div> </template> <script setup> import { onMounted, r…...
【深度学习注意力机制系列】—— SENet注意力机制(附pytorch实现)
深度学习中的注意力机制(Attention Mechanism)是一种模仿人类视觉和认知系统的方法,它允许神经网络在处理输入数据时集中注意力于相关的部分。通过引入注意力机制,神经网络能够自动地学习并选择性地关注输入中的重要信息ÿ…...
go 函数
go 语言函数 go 函数函数定义Go函数的特点如下函数作为参数可变参数相同类型可变参数不同类型可变参数 return语句作用概述空的return语句空白标识符多个返回值命名返回值 defer 语句作用引申出来的面试题for defer下面是一个使用defer的示例代码输出结果 匿名函数定义匿名函数…...
python之正则表达式
目录 正则表达式 python正则表达式方法 match search findall finditer compile 元字符匹配 元字符 量词 贪婪匹配和惰性匹配 正则表达式的group 语法 案例 正则表达式 正则表达式又称规则表达式,是使用单个字符串来描述、匹配某个句法规则的字符串…...
【LeetCode每日一题】——219.存在重复元素II
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 简单 三【题目编号】 219.存在重复元素II 四【题目描述】 给你一个…...
篇六:适配器模式:让不兼容变兼容
篇六:“适配器模式:让不兼容变兼容” 开始本篇文章之前先推荐一个好用的学习工具,AIRIght,借助于AI助手工具,学习事半功倍。欢迎访问:http://airight.fun/ 另外有2本不错的关于设计模式的资料,…...
【云原生】Docker-compose中所有模块学习
compose模块 模板文件是使用 Compose 的核心,涉及到的指令关键字也比较多。但大家不用担心,这里面大部分指令跟 docker run 相关参数的含义都是类似的。 默认的模板文件名称为 docker-compose.yml,格式为 YAML 格式。 version: "3&quo…...
广义积分练习
前置知识 无穷限积分瑕积分 练习 计算 ∫ 0 ∞ 1 x ( 1 x ) d x \int_0^{\infty}\dfrac{1}{\sqrt x(1x)}dx ∫0∞x (1x)1dx 解: x 0 \qquad x0 x0为瑕点 \qquad 原式 lim a → 0 lim b → ∞ ∫ a b 1 x ( 1 x ) d x lim a → 0 lim …...
element-ui树形表格,左边勾选,右边显示选中的数据-功能(如动图)
功能如图 功能需求 表格树形表格勾选数据,右边显示对应勾选的数据内容,选中客户,自动勾选所有的店铺(子级),选中其中一个店铺,自动勾选上客户(父级),同时会存在只有客户(下面没有子级的情况&am…...
Android数字价格变化的动画效果的简单实现
原理:使用ValueAnimator属性动画类实现,它通过值的改变手动设置对象的属性值来实现动画效果。直接贴代码: public static void doNumberAnim(TextView tvPrice, float startNumber, float endNumber) {ValueAnimator animator ValueAnimato…...
Win10无法投影关闭3D模式
Win10不小心开启了3D模式,插上投影仪就一闪一闪的,无法正投影 解决办法: 1. 打开注册表工具regedit,删除以下注册表,重启电脑 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\GraphicsDrivers\Configurat…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
MongoDB学习和应用(高效的非关系型数据库)
一丶 MongoDB简介 对于社交类软件的功能,我们需要对它的功能特点进行分析: 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具: mysql:关系型数据库&am…...
FastAPI 教程:从入门到实践
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,支持 Python 3.6。它基于标准 Python 类型提示,易于学习且功能强大。以下是一个完整的 FastAPI 入门教程,涵盖从环境搭建到创建并运行一个简单的…...
大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
linux 错误码总结
1,错误码的概念与作用 在Linux系统中,错误码是系统调用或库函数在执行失败时返回的特定数值,用于指示具体的错误类型。这些错误码通过全局变量errno来存储和传递,errno由操作系统维护,保存最近一次发生的错误信息。值得注意的是,errno的值在每次系统调用或函数调用失败时…...
【单片机期末】单片机系统设计
主要内容:系统状态机,系统时基,系统需求分析,系统构建,系统状态流图 一、题目要求 二、绘制系统状态流图 题目:根据上述描述绘制系统状态流图,注明状态转移条件及方向。 三、利用定时器产生时…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比
目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...
