当前位置: 首页 > news >正文

2.2 如何使用FlinkSQL读取写入到文件系统(HDFS\Local\Hive)

目录

1、文件系统 SQL 连接器

2、如何指定文件系统类型

3、如何指定文件格式

4、读取文件系统

4.1 开启 目录监控 

4.2 可用的 Metadata

5、写出文件系统

5.1 创建分区表

5.2 滚动策略、文件合并、分区提交

5.3 指定 Sink Parallelism

6、示例_通过FlinkSQL读取kafka在写入hive表

6.1、创建 kafka source表用于读取kafka

6.2、创建 hdfs sink表用于写出到hdfs

6.3、insert into 写入到 hdfs_sink_table

6.4、查询 hdfs_sink_table

6.5、创建hive表,指定local


1、文件系统 SQL 连接器

文件系统连接器允许从本地分布式文件系统进行读写数据

官网链接:文件系统 SQL 连接器


2、如何指定文件系统类型

创建表时通过 'path' = '协议名称:///path' 来指定 文件系统类型

参考官网:文件系统类型

CREATE TABLE filesystem_table (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem',-- 本地文件系统'path' = 'file:///URI',-- HDFS文件系统'path' = 'hdfs://URI',-- 阿里云对象存储 'path' = 'oss://URI','format' = 'json'
);

3、如何指定文件格式

FlinkSQL 文件系统连接器支持多种format,来读取和写入文件

比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型

来对数据进行解析后映射到表中的字段中

CREATE TABLE filesystem_table_file_format (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem',-- 指定文件格式类型'format' = 'json|csv|orc|raw'
);

4、读取文件系统

FlinkSQL可以将单个文件或整个目录的数据读取到单个表中

注意:

        1、当读取目录时,对目录中的文件进行 无序的读取

        2、默认情况下,读取文件时为批处理模式,只会扫描配置路径一遍后就会停止

             当开启目录监控(source.monitor-interval)时,才是流处理模式

4.1 开启 目录监控 

通过设置 source.monitor-interval 属性来开启目录监控,以便在新文件出现时继续扫描

注意:

        只会对指定目录内新增文件进行读取,不会读取更新后的旧文件

-- 目录监控
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (id INT,name STRING,`file.name` STRING NOT NULL METADATA
) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016','format' = 'json','source.monitor-interval' = '3' -- 开启目录监控,设置监控时间间隔
);-- 持续读取
select * from filesystem_source_table;

4.2 可用的 Metadata

使用FLinkSQL读取文件系统中的数据时,支持对 metadata 进行读取

注意: 所有 metadata 都是只读的

-- 可用的Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (id INT,name STRING,`file.path` STRING NOT NULL METADATA,`file.name` STRING NOT NULL METADATA,`file.size` BIGINT NOT NULL METADATA,`file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012','format' = 'json'
);select * from filesystem_source_table_read_metadata;

运行结果:


5、写出文件系统

5.1 创建分区表

FlinkSQL支持创建分区表,并且通过 insert into(追加) insert overwrite(覆盖) 写入数据

-- 创建分区表
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (id INT,name STRING,ds STRING
) partitioned by (ds) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012','partition.default-name' = 'default_partition','format' = 'json'
);-- 动态分区写入
insert into filesystem_source_table_partition
SELECT * FROM (VALUES(1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);-- 静态分区写入
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES(1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);-- 查询分区表数据
select * from filesystem_source_table_partition where ds = '20231010';

5.2 滚动策略、文件合并、分区提交

可以看之前的博客:flink写入文件时分桶策略

官网链接:官网分桶策略


5.3 指定 Sink Parallelism

当使用FlinkSQL写出到文件系统时,可以通过 sink.parallelism 设置sink算子的并行度

注意:当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常

CREATE TABLE hdfs_sink_table (`log` STRING,`dt` STRING,  -- 分区字段,天`hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka','sink.parallelism' = '2', -- 指定sink算子并行度'format' = 'raw'
);

6、示例_通过FlinkSQL读取kafka在写入hive表

需求:

        使用FlinkSQL将kafka数据写入到hdfs指定目录中

        根据kafka的timestamp进行分区(按小时分区)

6.1、创建 kafka source表用于读取kafka

-- TODO 创建读取kafka表时,同时读取kafka元数据字段
drop table kafka_source_table;
CREATE TABLE kafka_source_table(`log` STRING,`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
) WITH ('connector' = 'kafka','topic' = '20231017','properties.bootstrap.servers' = 'worker01:9092','properties.group.id' = 'FlinkConsumer','scan.startup.mode' = 'earliest-offset','format' = 'raw'
);

6.2、创建 hdfs sink表用于写出到hdfs

drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (`log` STRING,`dt` STRING,  -- 分区字段,天`hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH ('connector' = 'filesystem','path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka','sink.parallelism' = '2', -- 指定sink算子并行度'format' = 'raw'
);

6.3、insert into 写入到 hdfs_sink_table

-- 流式 sql,插入文件系统表
insert into hdfs_sink_table
select log,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;

6.4、查询 hdfs_sink_table

-- 批式 sql,使用分区修剪进行选择
select * from hdfs_sink_table;

6.5、创建hive表,指定local

create table `kafka_to_hive` (
`log` string comment '日志数据')comment '埋点日志数据' PARTITIONED BY (dt string,`hour` string) 
row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';

相关文章:

2.2 如何使用FlinkSQL读取写入到文件系统(HDFS\Local\Hive)

目录 1、文件系统 SQL 连接器 2、如何指定文件系统类型 3、如何指定文件格式 4、读取文件系统 4.1 开启 目录监控 4.2 可用的 Metadata 5、写出文件系统 5.1 创建分区表 5.2 滚动策略、文件合并、分区提交 5.3 指定 Sink Parallelism 6、示例_通过FlinkSQL读取kafk…...

call函数和apply函数的区别

call和apply是 JavaScript 中的两个函数方法,用于调用函数并指定函数内部的this值以及传递参数。它们的主要区别在于参数的传递方式。 call方法:call方法允许你在调用函数时,显式地指定函数内部的this值和参数列表。它的语法为: …...

JavaCV踩坑之路1——Mac上安装OpenCV

Mac无法安装opencv 更新Homebrew: 打开终端并运行以下命令来更新Homebrew: brew update 移除Taps(仓库): 可能与homebrew-services仓库有关。你可以尝试将它移除: brew untap homebrew/services重新安装OpenCV: 在移除…...

es6(三)——常用es6(函数、数组、对象的扩展)

ES6的系列文章目录 第一章 Python 机器学习入门之pandas的使用 文章目录 ES6的系列文章目录0、数值的扩展一、函数的扩展1、函数的默认值2、函数的reset参数 二、数组的扩展1. 将对象转成数组的Array.from()2. 将对象转成数组的Array.from()3. 实例方法 find(),fin…...

API网关与社保模块

API网关与社保模块 理解zuul网关的作用完成zuul网关的搭建 实现社保模块的代码开发 zuul网关 在学习完前面的知识后,微服务架构已经初具雏形。但还有一些问题:不同的微服务一般会有不同的网 络地址,客户端在访问这些微服务时必须记住几十甚至…...

linux 安装 docker

linux 安装 docker docker及版本一键安装docker(本人使用的是手动安装)Docker手动安装 docker及版本 Docker从17.03版本之后分为CE(Community Edition: 社区版)和EE(Enterprise Edition: 企业版)。相对于社区版本,企业…...

整数转罗马数字

题目: 罗马数字包含以下七种字符: I, V, X, L,C,D 和 M。 字符 数值 I 1 V 5 X 10 L 50 C 100 D 500 …...

利用爬虫采集音频信息完整代码示例

以下是一个使用WWW::RobotRules和duoip.cn/get_proxy的Perl下载器程序: #!/usr/bin/perluse strict; use warnings; use WWW::RobotRules; use LWP::UserAgent; use HTTP::Request; use HTTP::Response;# 创建一个UserAgent对象 my $ua LWP::UserAgent->new();#…...

WebSocket: 实时通信的新维度

介绍: 在现代Web应用程序中,实时通信对于提供即时更新和交互性至关重要。传统的HTTP协议虽然适合请求-响应模式,但对于需要频繁数据交换的场景并不理想。而WebSocket技术的出现填补了这个空白,为Web开发者们带来了一种高效、实时的…...

postgresql(openGauss)模糊匹配参数

被pg系这个show要求精准匹配参数恶心的不轻。 原理是用.psqlrc(openGauss用.gsqlrc)文件set一个select常量进去,需要用:调用这个常量。理论上也可以增强其他的各种功能。 我在openGauss做的一个例子 .gsqlrc(.psqlrc…...

jdk 加密 aes jar包解决

JDK1.8.0_151的无限制强度加密策略文件变动 JDK1.8.0_151无需去官网下载 local_policy.jar US_export_policy.jar这个jar包,只需要修改Java\jdk1.8.0_151\jre\lib\security这目录下的java.security文件配置即可。 随着越来越多的第三方工具只支持 JDK8&#xff0c…...

C++ Primer 第十一章 关联容器 重点解读

1 map自定义排序 #include <map> #include <iostream> #include <functional> using namespace std; int main() {function<bool(pair<int, int>, pair<int, int>)> cmp [&](pair<int, int> p1, pair<int, int> p2) -&g…...

MySQL 8 - 能够成功创建其他用户但无法修改 root 用户的密码

问题&#xff1a; 创建其他用户就可以&#xff0c;为什么修改root 密码不可以&#xff1f; 如果能够成功创建其他用户但无法修改 root 用户的密码&#xff0c;这可能是因为 MySQL 8 及更高版本引入了一个名为"caching_sha2_password"的身份验证插件作为默认设置&…...

k8s kubernetes 1.23.6 + flannel公网环境安装

准备环境&#xff0c;必须是同一个云服务厂商&#xff0c;如&#xff1a;华为&#xff0c;阿里、腾讯等&#xff0c;不要存在跨平台安装K8S&#xff0c;跨平台安装需要处理网络隧道才能实现所有节点在一个网络集群中&#xff0c;这里推荐使用同一家云服务厂商安装即可 这里使用…...

博客系统中的加盐算法

目录 一、为什么要对密码进行加盐加密&#xff1f; 1、明文 2、传统的 MD5 二、加盐加密 1、加盐算法实现思路 2、加盐算法解密思路 3、加盐算法代码实现 三、使用 Spring Security 加盐 1、引入 Spring Security 框架 2、排除 Spring Security 的自动加载 3、调用 S…...

同花顺动态Cookie反爬JS逆向分析

文章目录 1. 写在前面2. 请求分析3. Hook Cookie4. 补环境 1. 写在前面 最近有位朋友在大A失意&#xff0c;突发奇想自己闲来无事想要做一个小工具&#xff0c;监测一下市场行情的数据。自己再分析分析&#xff0c;虽是一名程序员但苦于对爬虫领域相关的技术不是特别熟悉。最后…...

异步加载JS的方法

异步加载 JavaScript (JS) 文件是提高网页性能的一种常用技术&#xff0c;这样可以使页面在等待 JS 文件加载和执行时不会阻塞。以下是一些异步加载 JS 的方法&#xff1a; 使用 <script> 标签的 async 属性 通过将 <script> 标签的 async 属性设为 true&#xf…...

IO/NIO交互模拟及渐进式实现

IO IO Server public class SocketServer {public static void main(String[] args) {//server编号和client编号对应&#xff0c;优缺点注释在server端//server1();//server2();server3();}/*** server1的缺点&#xff1a;* 1、accept()方法阻塞了线程&#xff0c;要等客户端…...

springboot+html实现密码重置功能

目录 登录注册&#xff1a; 前端&#xff1a; chnangePssword.html 后端&#xff1a; controller: Mapper层&#xff1a; 逻辑&#xff1a; 登录注册&#xff1a; https://blog.csdn.net/m0_67930426/article/details/133849132 前端&#xff1a; 通过点击忘记密码跳转…...

LeetCode 2525. 根据规则将箱子分类【模拟】1301

本文属于「征服LeetCode」系列文章之一&#xff0c;这一系列正式开始于2021/08/12。由于LeetCode上部分题目有锁&#xff0c;本系列将至少持续到刷完所有无锁题之日为止&#xff1b;由于LeetCode还在不断地创建新题&#xff0c;本系列的终止日期可能是永远。在这一系列刷题文章…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录

ASP.NET Core 是一个跨平台的开源框架&#xff0c;用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录&#xff0c;以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...

【JavaEE】-- HTTP

1. HTTP是什么&#xff1f; HTTP&#xff08;全称为"超文本传输协议"&#xff09;是一种应用非常广泛的应用层协议&#xff0c;HTTP是基于TCP协议的一种应用层协议。 应用层协议&#xff1a;是计算机网络协议栈中最高层的协议&#xff0c;它定义了运行在不同主机上…...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

【算法训练营Day07】字符串part1

文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接&#xff1a;344. 反转字符串 双指针法&#xff0c;两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

2025 后端自学UNIAPP【项目实战:旅游项目】6、我的收藏页面

代码框架视图 1、先添加一个获取收藏景点的列表请求 【在文件my_api.js文件中添加】 // 引入公共的请求封装 import http from ./my_http.js// 登录接口&#xff08;适配服务端返回 Token&#xff09; export const login async (code, avatar) > {const res await http…...

Module Federation 和 Native Federation 的比较

前言 Module Federation 是 Webpack 5 引入的微前端架构方案&#xff0c;允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...

Ascend NPU上适配Step-Audio模型

1 概述 1.1 简述 Step-Audio 是业界首个集语音理解与生成控制一体化的产品级开源实时语音对话系统&#xff0c;支持多语言对话&#xff08;如 中文&#xff0c;英文&#xff0c;日语&#xff09;&#xff0c;语音情感&#xff08;如 开心&#xff0c;悲伤&#xff09;&#x…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文全面剖析RNN核心原理&#xff0c;深入讲解梯度消失/爆炸问题&#xff0c;并通过LSTM/GRU结构实现解决方案&#xff0c;提供时间序列预测和文本生成…...