2.2 如何使用FlinkSQL读取写入到文件系统(HDFS\Local\Hive)
目录
1、文件系统 SQL 连接器
2、如何指定文件系统类型
3、如何指定文件格式
4、读取文件系统
4.1 开启 目录监控
4.2 可用的 Metadata
5、写出文件系统
5.1 创建分区表
5.2 滚动策略、文件合并、分区提交
5.3 指定 Sink Parallelism
6、示例_通过FlinkSQL读取kafka在写入hive表
6.1、创建 kafka source表用于读取kafka
6.2、创建 hdfs sink表用于写出到hdfs
6.3、insert into 写入到 hdfs_sink_table
6.4、查询 hdfs_sink_table
6.5、创建hive表,指定local
1、文件系统 SQL 连接器
文件系统连接器允许从本地或分布式文件系统进行读写数据
官网链接:文件系统 SQL 连接器
2、如何指定文件系统类型
创建表时通过 'path' = '协议名称:///path' 来指定 文件系统类型
参考官网:文件系统类型
CREATE TABLE filesystem_table (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem',-- 本地文件系统'path' = 'file:///URI',-- HDFS文件系统'path' = 'hdfs://URI',-- 阿里云对象存储 'path' = 'oss://URI','format' = 'json'
);
3、如何指定文件格式
FlinkSQL 文件系统连接器支持多种format,来读取和写入文件
比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型
来对数据进行解析后映射到表中的字段中
CREATE TABLE filesystem_table_file_format (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem',-- 指定文件格式类型'format' = 'json|csv|orc|raw'
);
4、读取文件系统
FlinkSQL可以将单个文件或整个目录的数据读取到单个表中
注意:
1、当读取目录时,对目录中的文件进行 无序的读取
2、默认情况下,读取文件时为批处理模式,只会扫描配置路径一遍后就会停止
当开启目录监控(source.monitor-interval)时,才是流处理模式
4.1 开启 目录监控
通过设置 source.monitor-interval
属性来开启目录监控,以便在新文件出现时继续扫描
注意:
只会对指定目录内新增文件进行读取,不会读取更新后的旧文件
-- 目录监控
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (id INT,name STRING,`file.name` STRING NOT NULL METADATA
) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016','format' = 'json','source.monitor-interval' = '3' -- 开启目录监控,设置监控时间间隔
);-- 持续读取
select * from filesystem_source_table;
4.2 可用的 Metadata
使用FLinkSQL读取文件系统中的数据时,支持对 metadata 进行读取
注意: 所有 metadata 都是只读的
-- 可用的Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (id INT,name STRING,`file.path` STRING NOT NULL METADATA,`file.name` STRING NOT NULL METADATA,`file.size` BIGINT NOT NULL METADATA,`file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012','format' = 'json'
);select * from filesystem_source_table_read_metadata;
运行结果:
5、写出文件系统
5.1 创建分区表
FlinkSQL支持创建分区表,并且通过 insert into(追加) 和 insert overwrite(覆盖) 写入数据
-- 创建分区表
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012','partition.default-name' = 'default_partition','format' = 'json'
);-- 动态分区写入
insert into filesystem_source_table_partition
SELECT * FROM (VALUES(1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);-- 静态分区写入
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES(1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);-- 查询分区表数据
select * from filesystem_source_table_partition where ds = '20231010';
5.2 滚动策略、文件合并、分区提交
可以看之前的博客:flink写入文件时分桶策略
官网链接:官网分桶策略
5.3 指定 Sink Parallelism
当使用FlinkSQL写出到文件系统时,可以通过 sink.parallelism 设置sink算子的并行度
注意:当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常
CREATE TABLE hdfs_sink_table (`log` STRING,`dt` STRING, -- 分区字段,天`hour` STRING -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka','sink.parallelism' = '2', -- 指定sink算子并行度'format' = 'raw'
);
6、示例_通过FlinkSQL读取kafka在写入hive表
需求:
使用FlinkSQL将kafka数据写入到hdfs指定目录中
根据kafka的timestamp进行分区(按小时分区)
6.1、创建 kafka source表用于读取kafka
-- TODO 创建读取kafka表时,同时读取kafka元数据字段
drop table kafka_source_table;
CREATE TABLE kafka_source_table(`log` STRING,`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
) WITH ('connector' = 'kafka','topic' = '20231017','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','format' = 'raw'
);
6.2、创建 hdfs sink表用于写出到hdfs
drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (`log` STRING,`dt` STRING, -- 分区字段,天`hour` STRING -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka','sink.parallelism' = '2', -- 指定sink算子并行度'format' = 'raw'
);
6.3、insert into 写入到 hdfs_sink_table
-- 流式 sql,插入文件系统表
insert into hdfs_sink_table
select log,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;
6.4、查询 hdfs_sink_table
-- 批式 sql,使用分区修剪进行选择
select * from hdfs_sink_table;
6.5、创建hive表,指定local
create table `kafka_to_hive` (
`log` string comment '日志数据')comment '埋点日志数据' PARTITIONED BY (dt string,`hour` string)
row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';
相关文章:

2.2 如何使用FlinkSQL读取写入到文件系统(HDFS\Local\Hive)
目录 1、文件系统 SQL 连接器 2、如何指定文件系统类型 3、如何指定文件格式 4、读取文件系统 4.1 开启 目录监控 4.2 可用的 Metadata 5、写出文件系统 5.1 创建分区表 5.2 滚动策略、文件合并、分区提交 5.3 指定 Sink Parallelism 6、示例_通过FlinkSQL读取kafk…...
call函数和apply函数的区别
call和apply是 JavaScript 中的两个函数方法,用于调用函数并指定函数内部的this值以及传递参数。它们的主要区别在于参数的传递方式。 call方法:call方法允许你在调用函数时,显式地指定函数内部的this值和参数列表。它的语法为: …...
JavaCV踩坑之路1——Mac上安装OpenCV
Mac无法安装opencv 更新Homebrew: 打开终端并运行以下命令来更新Homebrew: brew update 移除Taps(仓库): 可能与homebrew-services仓库有关。你可以尝试将它移除: brew untap homebrew/services重新安装OpenCV: 在移除…...

es6(三)——常用es6(函数、数组、对象的扩展)
ES6的系列文章目录 第一章 Python 机器学习入门之pandas的使用 文章目录 ES6的系列文章目录0、数值的扩展一、函数的扩展1、函数的默认值2、函数的reset参数 二、数组的扩展1. 将对象转成数组的Array.from()2. 将对象转成数组的Array.from()3. 实例方法 find(),fin…...

API网关与社保模块
API网关与社保模块 理解zuul网关的作用完成zuul网关的搭建 实现社保模块的代码开发 zuul网关 在学习完前面的知识后,微服务架构已经初具雏形。但还有一些问题:不同的微服务一般会有不同的网 络地址,客户端在访问这些微服务时必须记住几十甚至…...
linux 安装 docker
linux 安装 docker docker及版本一键安装docker(本人使用的是手动安装)Docker手动安装 docker及版本 Docker从17.03版本之后分为CE(Community Edition: 社区版)和EE(Enterprise Edition: 企业版)。相对于社区版本,企业…...
整数转罗马数字
题目: 罗马数字包含以下七种字符: I, V, X, L,C,D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 …...

利用爬虫采集音频信息完整代码示例
以下是一个使用WWW::RobotRules和duoip.cn/get_proxy的Perl下载器程序: #!/usr/bin/perluse strict; use warnings; use WWW::RobotRules; use LWP::UserAgent; use HTTP::Request; use HTTP::Response;# 创建一个UserAgent对象 my $ua LWP::UserAgent->new();#…...

WebSocket: 实时通信的新维度
介绍: 在现代Web应用程序中,实时通信对于提供即时更新和交互性至关重要。传统的HTTP协议虽然适合请求-响应模式,但对于需要频繁数据交换的场景并不理想。而WebSocket技术的出现填补了这个空白,为Web开发者们带来了一种高效、实时的…...

postgresql(openGauss)模糊匹配参数
被pg系这个show要求精准匹配参数恶心的不轻。 原理是用.psqlrc(openGauss用.gsqlrc)文件set一个select常量进去,需要用:调用这个常量。理论上也可以增强其他的各种功能。 我在openGauss做的一个例子 .gsqlrc(.psqlrc…...
jdk 加密 aes jar包解决
JDK1.8.0_151的无限制强度加密策略文件变动 JDK1.8.0_151无需去官网下载 local_policy.jar US_export_policy.jar这个jar包,只需要修改Java\jdk1.8.0_151\jre\lib\security这目录下的java.security文件配置即可。 随着越来越多的第三方工具只支持 JDK8,…...

C++ Primer 第十一章 关联容器 重点解读
1 map自定义排序 #include <map> #include <iostream> #include <functional> using namespace std; int main() {function<bool(pair<int, int>, pair<int, int>)> cmp [&](pair<int, int> p1, pair<int, int> p2) -&g…...
MySQL 8 - 能够成功创建其他用户但无法修改 root 用户的密码
问题: 创建其他用户就可以,为什么修改root 密码不可以? 如果能够成功创建其他用户但无法修改 root 用户的密码,这可能是因为 MySQL 8 及更高版本引入了一个名为"caching_sha2_password"的身份验证插件作为默认设置&…...

k8s kubernetes 1.23.6 + flannel公网环境安装
准备环境,必须是同一个云服务厂商,如:华为,阿里、腾讯等,不要存在跨平台安装K8S,跨平台安装需要处理网络隧道才能实现所有节点在一个网络集群中,这里推荐使用同一家云服务厂商安装即可 这里使用…...

博客系统中的加盐算法
目录 一、为什么要对密码进行加盐加密? 1、明文 2、传统的 MD5 二、加盐加密 1、加盐算法实现思路 2、加盐算法解密思路 3、加盐算法代码实现 三、使用 Spring Security 加盐 1、引入 Spring Security 框架 2、排除 Spring Security 的自动加载 3、调用 S…...

同花顺动态Cookie反爬JS逆向分析
文章目录 1. 写在前面2. 请求分析3. Hook Cookie4. 补环境 1. 写在前面 最近有位朋友在大A失意,突发奇想自己闲来无事想要做一个小工具,监测一下市场行情的数据。自己再分析分析,虽是一名程序员但苦于对爬虫领域相关的技术不是特别熟悉。最后…...
异步加载JS的方法
异步加载 JavaScript (JS) 文件是提高网页性能的一种常用技术,这样可以使页面在等待 JS 文件加载和执行时不会阻塞。以下是一些异步加载 JS 的方法: 使用 <script> 标签的 async 属性 通过将 <script> 标签的 async 属性设为 true…...
IO/NIO交互模拟及渐进式实现
IO IO Server public class SocketServer {public static void main(String[] args) {//server编号和client编号对应,优缺点注释在server端//server1();//server2();server3();}/*** server1的缺点:* 1、accept()方法阻塞了线程,要等客户端…...

springboot+html实现密码重置功能
目录 登录注册: 前端: chnangePssword.html 后端: controller: Mapper层: 逻辑: 登录注册: https://blog.csdn.net/m0_67930426/article/details/133849132 前端: 通过点击忘记密码跳转…...
LeetCode 2525. 根据规则将箱子分类【模拟】1301
本文属于「征服LeetCode」系列文章之一,这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁,本系列将至少持续到刷完所有无锁题之日为止;由于LeetCode还在不断地创建新题,本系列的终止日期可能是永远。在这一系列刷题文章…...

K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...

3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...

分布式增量爬虫实现方案
之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面,避免重复抓取,以节省资源和时间。 在分布式环境下,增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路:将增量判…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...

DingDing机器人群消息推送
文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人,点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置,详见说明文档 成功后,记录Webhook 2 API文档说明 点击设置说明 查看自…...

如何更改默认 Crontab 编辑器 ?
在 Linux 领域中,crontab 是您可能经常遇到的一个术语。这个实用程序在类 unix 操作系统上可用,用于调度在预定义时间和间隔自动执行的任务。这对管理员和高级用户非常有益,允许他们自动执行各种系统任务。 编辑 Crontab 文件通常使用文本编…...

打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
一、方案背景 在现代生产与生活场景中,如工厂高危作业区、医院手术室、公共场景等,人员违规打手机的行为潜藏着巨大风险。传统依靠人工巡查的监管方式,存在效率低、覆盖面不足、判断主观性强等问题,难以满足对人员打手机行为精…...