4.2、Flink任务怎样读取文件中的数据
目录
1、前言
2、readTextFile(已过时,不推荐使用)
3、readFile(已过时,不推荐使用)
4、fromSource(FileSource) 推荐使用
1、前言
思考: 读取文件时可以设置哪些规则呢?
1. 文件的格式(txt、csv、二进制...)
2. 文件的分隔符(按\n 分割)
3. 是否需要监控文件变化(一次读取、持续读取)
基于以上规则,Flink为我们提供了非常灵活的 读取文件的方法
2、readTextFile(已过时,不推荐使用)
语法说明:
定义:def readTextFile(filePath: String): DataStream[String]def readTextFile(filePath: String, charsetName: String)功能:1.读取文本格式的文件2.按行读取(\n为分隔符),每行数据被封装为 DataStream 的一个元素3.可以指定字符集(默认为UDF-8)4.文件只会读取一次源码分析:public DataStreamSource<String> readTextFile(String filePath, String charsetName) {// 初始化 TextInputFormat对象TextInputFormat format = new TextInputFormat(new Path(filePath)); // 指定路径过滤器(使用默认过滤器)format.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定Flink中的数据类型 TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; // 指定字符集format.setCharsetName(charsetName); // 调用 readFile 方法return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }
代码示例:
public static void readTextFile() throws Exception {/** TODO 功能说明* readTextFile(path) - 读取文本文件(一次读取),例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源env.readTextFile("data/1.txt").setParallelism(4).print();// 3.触发程序执行env.execute();}
3、readFile(已过时,不推荐使用)
语法说明:
定义:def readFile[T: TypeInformation](inputFormat: FileInputFormat[T],filePath: String,watchType: FileProcessingMode,interval: Long): DataStream[T] = {val typeInfo = implicitly[TypeInformation[T]] // 隐私转换(将java 数据类型 转换为 Flink数据类型)asScalaStream(javaEnv.readFile(inputFormat, filePath, watchType, interval, typeInfo))}参数:inputFormat : 指定 FileInputFormat 实现类(根据文件类型 选择相适应的实例)filePath : 指定 文件路径watchType : 指定 读取模式(提供了2个枚举值)PROCESS_ONCE :只读取一次PROCESS_CONTINUOUSLY :按照指定周期扫描文件interval : 指定 扫描文件的周期(单位为毫秒)功能:按照 指定的 文件格式 和 读取方式 读取数据

代码示例:
public static void readFile() throws Exception {/** TODO 功能说明* readFile(fileInputFormat, path) - 按照指定的文件输入格式读取(一次)文件。* readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)* 按照指定的文件输入格式读取(持续的读取)文件* */// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源String filePath = "data/1.txt";TextInputFormat textInputFormat = new TextInputFormat(new Path(filePath));textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter()); // 指定过滤器textInputFormat.setCharsetName("UTF-8"); // 指定编码格式/** readFile(inputFormat: FileInputFormat[OUT], filePath: String, watchType: FileProcessingMode, interval: Long)* 参数说明:* @inputFormat : 指定文件输入格式* @filePath : 指定文件路径* @watchType : 指定监控类型,提供了两种读取策略* PROCESS_ONCE : 只读取一次* PROCESS_CONTINUOUSLY :持续读取,监控新增数据* @interval : 指定连续扫描文件的周期(毫秒)* 重点提示:* 1.如果watchType设置为PROCESS_CONTINUOUSLY时,当一个文件被修改时,将会导致重新读取该* 文件的全部内容,这将会打破`精确一次`的语义* */env.readFile(textInputFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).print();// 3.触发程序执行env.execute();}
4、fromSource(FileSource) 推荐使用
public static void FileSource() throws Exception {// 1.获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.将文本文件作为数据源FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path("data/1.txt")).build();env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "read fileSource").print();// 3.触发程序执行env.execute();}
相关文章:

4.2、Flink任务怎样读取文件中的数据
目录 1、前言 2、readTextFile(已过时,不推荐使用) 3、readFile(已过时,不推荐使用) 4、fromSource(FileSource) 推荐使用 1、前言 思考: 读取文件时可以设置哪些规则呢? 1. 文件的格式(tx…...

Effective Java笔记(28)列表优于数组
数组与泛型相比,有两个重要的不同点 。 首先,数组是协变的( covariant ) 。 这个词听起来有点吓人,其实只是表示如果 Sub 为 Super 的子类型,那么数组类型 Sub[ ]就是Super[ ]的子类型。 相反,泛…...

做BI领域的ChatGPT,思迈特升级一站式ABI平台
8月8日,以「指标驱动 智能决策」为主题,2023 Smartbi V11系列新品发布会在广州丽思卡尔顿酒店开幕。 后疫情时代,BI发展趋势的观察与应对 在发布会上,思迈特CEO吴华夫在开场致辞中表示,当前大环境背景下…...

ELFK——ELK结合filebeat日志分析系统(2)
目录 一、filebeat 二、ELFK 1.原理简介 2.在ELK基础上部署filebeat 一、filebeat Filebeat,轻量级的开源日志文件数据搜集器。通常在需要采集数据的客户端安装 Filebeat,并指定目录与日志格式,Filebeat 就能快速收集数据,并…...
webSocket 协议是什么
webSocket 协议是什么,能简述一下吗? websocket 协议 HTML5 带来的新协议,相对于 http,它是一个持久连接的协议,它利用 http 协议完成握手,然后通过 TCP 连接通道发送消息,使用 websocket 协议可…...
CentOS 7迁移Anolis OS 8
背景:生产环境客户要求操作系统国产化 操作系统:Centos7.9 内核:5.4.108 服务器可以联网,进行在线迁移: # 下载迁移工具软件源 wget https://mirrors.openanolis.cn/anolis/migration/anolis-migration.repo -O /etc/y…...
Transformer 立体视觉 Depth Estimation
1. Intro 立体深度估计具有重要的意义,因为它能够重建三维信息。为此,在左右相机图像之间匹配相应的像素;对应像素位置的差异,即视差,可以用来推断深度并重建3D场景。最近基于深度学习的立体深度估计方法已经显示出有希望的结果,但仍然存在一些挑战。 其中一个挑战涉及使…...

vue去掉所有输入框两边空格,封装指令去空格,支持Vue2和Vue3,ElementUI Input去空格
需求背景 就是页面很多表单输入框,期望在提交的时候,都要把用户两边的空格去掉 ❌使用 vue 的指令 .trim 去掉空格 中间会输入不了空格, 比如我想输入 你好啊 中国, 这中间的空格输入不了,只能变成 你好啊中国 ❌在提交的时候使用…...

认识FFMPEG框架
FFMPEG全称: Fast Forward Moving Picture Experts Group (MPEG:动态图像专家组) ffmpeg相关网站: git://source.ffmpeg.org/ffmpeg.git http://git.videolan.org/?pffmpeg.git https://github.com/FFmpeg/FFmpeg FFMPEG框架基本组件: AVFormat , AVCodec, AVDevice, AVFil…...

Vue3 大屏数字滚动效果
父组件: <template> <div class"homePage"> <NumRoll v-for"(v, i) in numberList" :key"i" :number"v"></NumRoll> </div> </template> <script setup> import { onMounted, r…...

【深度学习注意力机制系列】—— SENet注意力机制(附pytorch实现)
深度学习中的注意力机制(Attention Mechanism)是一种模仿人类视觉和认知系统的方法,它允许神经网络在处理输入数据时集中注意力于相关的部分。通过引入注意力机制,神经网络能够自动地学习并选择性地关注输入中的重要信息ÿ…...
go 函数
go 语言函数 go 函数函数定义Go函数的特点如下函数作为参数可变参数相同类型可变参数不同类型可变参数 return语句作用概述空的return语句空白标识符多个返回值命名返回值 defer 语句作用引申出来的面试题for defer下面是一个使用defer的示例代码输出结果 匿名函数定义匿名函数…...
python之正则表达式
目录 正则表达式 python正则表达式方法 match search findall finditer compile 元字符匹配 元字符 量词 贪婪匹配和惰性匹配 正则表达式的group 语法 案例 正则表达式 正则表达式又称规则表达式,是使用单个字符串来描述、匹配某个句法规则的字符串…...

【LeetCode每日一题】——219.存在重复元素II
文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 哈希表 二【题目难度】 简单 三【题目编号】 219.存在重复元素II 四【题目描述】 给你一个…...
篇六:适配器模式:让不兼容变兼容
篇六:“适配器模式:让不兼容变兼容” 开始本篇文章之前先推荐一个好用的学习工具,AIRIght,借助于AI助手工具,学习事半功倍。欢迎访问:http://airight.fun/ 另外有2本不错的关于设计模式的资料,…...

【云原生】Docker-compose中所有模块学习
compose模块 模板文件是使用 Compose 的核心,涉及到的指令关键字也比较多。但大家不用担心,这里面大部分指令跟 docker run 相关参数的含义都是类似的。 默认的模板文件名称为 docker-compose.yml,格式为 YAML 格式。 version: "3&quo…...
广义积分练习
前置知识 无穷限积分瑕积分 练习 计算 ∫ 0 ∞ 1 x ( 1 x ) d x \int_0^{\infty}\dfrac{1}{\sqrt x(1x)}dx ∫0∞x (1x)1dx 解: x 0 \qquad x0 x0为瑕点 \qquad 原式 lim a → 0 lim b → ∞ ∫ a b 1 x ( 1 x ) d x lim a → 0 lim …...

element-ui树形表格,左边勾选,右边显示选中的数据-功能(如动图)
功能如图 功能需求 表格树形表格勾选数据,右边显示对应勾选的数据内容,选中客户,自动勾选所有的店铺(子级),选中其中一个店铺,自动勾选上客户(父级),同时会存在只有客户(下面没有子级的情况&am…...
Android数字价格变化的动画效果的简单实现
原理:使用ValueAnimator属性动画类实现,它通过值的改变手动设置对象的属性值来实现动画效果。直接贴代码: public static void doNumberAnim(TextView tvPrice, float startNumber, float endNumber) {ValueAnimator animator ValueAnimato…...

Win10无法投影关闭3D模式
Win10不小心开启了3D模式,插上投影仪就一闪一闪的,无法正投影 解决办法: 1. 打开注册表工具regedit,删除以下注册表,重启电脑 HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Control\GraphicsDrivers\Configurat…...
Clickhouse统计指定表中各字段的空值、空字符串或零值比例
下面是一段Clickhouse SQL代码,用于统计指定数据库中多张表的字段空值情况。代码通过动态生成查询语句实现自动化统计,处理逻辑如下: 从系统表获取指定数据库(替换your_database)中所有表的字段元数据根据字段类型动态…...

基于 Transformer robert的情感分类任务实践总结之二——R-Drop
基于 Transformer robert的情感分类任务实践总结之一 核心改进点 1. R-Drop正则化 原理:通过在同一个输入上两次前向传播(利用Dropout的随机性),强制模型对相同输入生成相似的输出分布,避免过拟合。实现:…...

1.springmvc基础入门(一)
1.Spring MVC概念 Spring MVC 是 Spring Framework 提供的 Web 组件,全称是 Spring Web MVC,是⽬前主流的实现 MVC 设计模式的框架,提供前端路由映射、视图解析等功能。 Java Web 开发者必须要掌握的技术框架。 2.Spring MVC 功能 MVC&am…...

HarmonyOS开发:显示图片功能详解
目录 前言 Image组件基础 1、Image组件概述 2、加载图片资源 3、存档图类型数据源 (1)本地资源 (2)网络资源 (3)Resource资源 (4)媒体库file://data/storage (…...
前端(vue)学习笔记(CLASS 7):vuex
vuex概述 vuex是一个vue的状态管理工具,状态就是数据 大白话:vuex是一个插件,可以帮我们管理vue通用的数据(多组件共享的数据) 场景 1、某个状态在很多个组件来使用(个人信息) 2、多个组件…...
基于深度学习(Unet和SwinUnet)的医学图像分割系统设计与实现:超声心脏分割
基于深度学习的医学图像分割系统设计与实现 摘要 本文提出了一种基于深度学习的医学图像分割系统,该系统采用U-Net和Swin-Unet作为核心网络架构,实现了高效的医学图像分割功能。系统包含完整的训练、验证和推理流程,并提供了用户友好的图形界面。实验结果表明,该系统在医…...

【工作记录】接口功能测试总结
如何对1个接口进行接口测试 一、单接口功能测试 1、接口文档信息 理解接口文档的内容: 请求URL: https://[ip]:[port]/xxxserviceValidation 请求方法: POST 请求参数: serviceCode(必填), servicePsw(必填) 响应参数: status, token 2、编写测试用例 2.1 正…...
DDPM优化目标公式推导
DDPM优化目标公式推导 DDPM优化目标公式推导**1. 问题定义****2. 优化目标:最大化对数似然****3. 变分下界的分解****4. 关键步骤:简化 KL 散度项****(a) 后验分布 q ( x t − 1 ∣ x t , x 0 ) q(\mathbf{x}_{t-1} | \mathbf{x}_t, \mathbf{x}_0) q(xt…...

免费插件集-illustrator插件-Ai插件-随机填色
文章目录 1.介绍2.安装3.通过窗口>扩展>知了插件4.功能解释5.总结 1.介绍 本文介绍一款免费插件,加强illustrator使用人员工作效率,实现路径随机填色。首先从下载网址下载这款插件https://download.csdn.net/download/m0_67316550/87890501&#…...

计算机组成与体系结构:补码数制二(Complementary Number Systems)
目录 4位二进制的减法 补码系统 🧠减基补码 名字解释: 减基补码有什么用? 计算方法 ❓为什么这样就能计算减基补码 💡 原理揭示:按位减法,模拟总减法! 那对于二进制呢?&…...