Flink Hive Catalog操作案例
在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。
基础环境
Hive-3.1.3
Flink-1.17.1
基本操作与准备
1、上传依赖jar包到flink/lib目录下
cp flink-sql-connector-hive-3.1.3_2.12-1.17.1.jar
cp mysql-connector-j-8.1.0.jar
2、更换planner依赖(Hive集成的推荐设置)
mv /usr/sft/flink-1.17.1/opt/flink-table-planner_2.12-1.17.1.jar /usr/sft/flink-1.17.1/lib/
mv /usr/sft/flink-1.17.1/lib/flink-table-planner-loader-1.17.1.jar /usr/sft/flink-1.17.1/opt/
3、启动Hive MetaStore
nohup hive --service metastore 2>&1 &
4、启动flink集群和sql-client
yarn-session.sh -d -nm flink-cluster
sql-client.sh embedded -s yarn-session
5、在flink sql-client中创建hive catalog
CREATE CATALOG hive WITH ('type' = 'hive','default-database' = 'sty','hive-conf-dir' = '/usr/sft/hive-3.1.3/conf'
);
非分区表读写
1、Hive中建表并插入数据
create table behavior(
username string,
behavior string
);
insert into behavior values('lisi','buy'),('zhangsan','read');
2、使用hive catalog
use catalog hive;
2、flink sql-client中执行数据插入与数据查询(和常规sql一致)
insert into behavior values('sisi','buy'),('tracy','read');
select *from behavior;

分区表读写
这里和非分区表有所不同,主要体现在建表层面,参考博客:https://www.jianshu.com/p/295066a24092
写入到hive分区表
streamEnv需要开启checkpoint,保证flink写入hive分区表的写入一致性
hive表ddl中需要指定以下TBLPROPERTIES:
sink.partition-commit.trigger:分区提交触发器,单选,可选值为partition-time、process-time(默认), 其中partition-time需要根据当前数据的watermark来判断分区是否需要提交,当watermark + delay大于等于分区上的时间时就会提交该分区元数据;process-time的话根据当前系统处理时间来判断分区是否需要提交,当系统处理时间大于等于分区上的时间就会提交该分区元数据
partition.time-extractor.timestamp-pattern:使用partition-time触发器时使用该配置项。表示从表字段中提取出表达某个分区的时间的格式,需要提取到的时间必须为yyyy-MM-dd HH:mm:ss的格式。比如字段dt的格式为yyyy-MM-dd,则配置为$dt 00:00:00则表示分区时间取值为dt的value的0点0分0秒,可以选择多个表字段组合。当表字段无法抽取出符合的格式时,则使用自定义提取器partition.time-extractor.class。
sink.partition-commit.delay: 表示watermark允许event time的最大乱序时间,使用partition-time触发器时可以使用,默认为0s
sink.partition-commit.policy.kind:分区提交方式,多选,可选值为metastore、success-file、custom,metastore表示写入元数据库,success-file表示往hdfs分区目录写入一个标志文件,custom表示使用自定义提交方式,通常使用metastore,success-file组合
partition.time-extractor.kind:当要使用自定义分区时间提取器时需要配置此项,值配置为custom
partition.time-extractor.class:当要使用自定义分区时间提取器时需要配置此项,值配置为自定义提取器的类路径。在集群中运行时,需要把该类打成jar包放到flink lib目录下。
某个分区触发提交后,后续再有此分区的数据进来,仍然会写入hive该分区。
作者:spongebobZ
链接:https://www.jianshu.com/p/295066a24092
来源:简书
1、hive创建分区表并插入数据
create table userinfo(
name string,
age int
)
partitioned by (dt string)
stored as orc
tblproperties('sink.partition-commit.trigger' = 'partition-time','sink.partition-commit.policy.kind'='metastore,success-file','partition.time-extractor.timestamp-pattern' ='yyyy-MM-dd HH:mm:ss','sink.partition-commit.delay' = '10'
);insert into table userInfo partition(dt='2023-10-26') values('zhangsan',23);
insert into table userInfo partition(dt='2023-10-26') values('lisi',26),('wangwu',27);
注意:若建表时未在tblproperties中配置恰当的sink.partition-commit.policy.kind,flink sql-client插入数据时将遇到如下报错:
Could not execute SQL statement. Reason:
org.apache.flink.connectors.hive.FlinkHiveException: Streaming write to partitioned hive table `hive`.`sty`.`userInfo` without providing a commit policy. Make sure to set a proper value for sink.partition-commit.policy.kind
2、flink sql-client插入与查询数据
insert into userinfo partition(dt='2023-10-24') values('tracy',26),('lily',27);
select *from userinfo;

相关文章:
Flink Hive Catalog操作案例
在此对Flink读写Hive表操作进行逐步记录,需要指出的是,其中操作Hive分区表和非分区表的DDL有所不同,以下分别记录。 基础环境 Hive-3.1.3 Flink-1.17.1 基本操作与准备 1、上传依赖jar包到flink/lib目录下 cp flink-sql-connector-hive-…...
NSSCTF做题第9页(3)
[GKCTF 2020]CheckIN 代码审计 这段代码定义了一个名为ClassName的类,并在脚本的最后创建了一个ClassName类的实例。 在ClassName类的构造函数中,首先通过调用$this->x()方法获取了请求参数$_REQUEST中的值,并将其赋值给$this->code属性…...
从瀑布模式到水母模式:ChatGPT如何赋能软件研发全流程【文末送书五本】
从瀑布模式到水母模式:ChatGPT如何赋能软件研发全流程 前言内容简介购买链接作者简介专家推荐读者对象参与方式往期赠书回 🏘️🏘️个人简介:以山河作礼。 🎖️🎖️:Python领域新星创作者,CSDN实…...
设置使用LibreOffice作为默认程序打开word、excel等文档
以win7为例。打开控制面板,点击程序: 点击“设置默认程序”: 左侧选中LibreOffice,然后在右下方点击“选择此程序的默认值”: 然后根据自己的需要勾选就行了:...
创新领航 | 竹云参编《基于区块链的数据资产评估实施指南》正式发布!
10月25日,由深圳数宝数据服务股份有限公司和深圳职业技术大学提出,中国科学院深圳先进技术研究院、中国电子技术标准化研究院、中国(天津)自由贸易试验区政策与产业创新发展局、网络空间治理与数字经济法治(长三角&…...
【Docker】Linux网桥连接多个命名空间
veth实现了点对点的虚拟连接,可以通过veth连接两个namespace,如果我们需要将3个或者多个namespace接入同一个二层网络时,就不能只使用veth了。 在物理网络中,如果需要连接多个主机,我们会使用bridge(网桥&…...
ES6新特性:let关键字详解
文章目录 1 声明提升2 作用域3 重复声明 在JavaScript中,let 和 var 都是声明变量的关键字,但在用法和作用域方面有一些区别。 let 是ES6引入的新的声明变量的关键字,它与 var 相比,更加严格,语法更加规范,…...
鸿运主动安全监控云平台任意文件下载漏洞复现 [附POC]
文章目录 鸿运主动安全监控云平台任意文件下载漏洞复现 [附POC]0x01 前言0x02 漏洞描述0x03 影响版本0x04 漏洞环境0x05 漏洞复现1.访问漏洞环境2.构造POC3.复现 鸿运主动安全监控云平台任意文件下载漏洞复现 [附POC] 0x01 前言 免责声明:请勿利用文章内的相关技术…...
使用pycharm远程连接到Linux服务器进行开发
预计达到的效果 本地的 PyCharm 能达到和远程服务器之间的文件同步;本地的 PyCharm 能够使用远程服务器的开发环境; 环境配置 PyCharm:PyCharm 2021.3 (Professional Edition)Linux服务器:Ubuntu20.04 步骤 1.进入配置项 配…...
JavaScript 中 BOM 基础知识有哪些?
浏览器对象模型(Browser Object Model,简称 BOM)是 JavaScript 的组成部分之一,BOM 赋予了 JavaScript 程序与浏览器交互的能力。 window 对象是 BOM 的核心,用来表示当前浏览器窗口,其中提供了一系列用来…...
【PointNet—论文笔记分享】
第一个直接基于原始点云数据进行分割、分类的模型,之前都是基于多视图或者体素的方式。 论文: PointNet: Deep Learning on Point Sets for 3D Classification and Segmentation代码: TensorFlow版 Pytorch版 基本模型架构: 分别对每个点进行特征提取…...
Mysql8.1.0 windows 绿色版安装
Mysql8.1.0 windows 绿色版安装 目录 Mysql8.1.0 windows 绿色版安装1、下载mysql8.1.0_windows(mysql-8.1.0-winx64.zip)2、解压到安装目录3、添加环境变量4、新建mysql配置文件5、安装mysql服务6、初始化数据文件7、启动mysql服务8、进入mysql管理模式…...
何为自制力?如何提高自制力?
什么是自制力? 自制力也即是自我控制能力,是一个人如何去抵御外部诱惑力,从而坚持自己的原本计划,坚定去完成目标。除了外部诱惑力,也可以指的是面对困境,不良情绪等外部因素。 自制力是自我管理能力的体…...
第1篇 目标检测概述 —(3)目标检测评价指标
前言:Hello大家好,我是小哥谈。目标检测评价指标是用来衡量目标检测算法性能的指标,主要包括几个指标:精确率(Precision)、召回率(Recall)、交并比(IoU)、平均…...
剑指JUC原理-3.线程常用方法及状态
常用方法 start和run 调用run public static void main(String[] args) {Thread t1 new Thread("t1") {Overridepublic void run() {log.debug(Thread.currentThread().getName());FileReader.read(Constants.MP4_FULL_PATH);}};t1.run();log.debug("do othe…...
MYSQL8-sql语句使用集合。MYCAT-sql语法使用集合
MYSQL 1.MYSQL事务与锁问题处理 SELECT * FROM information_schema.INNODB_LOCKs; -- 查询锁select * from information_schema.INNODB_LOCK_WAITS; -- 查询等待锁SELECT * FROM information_schema.INNODB_TRX; -- 查询事务select * from information_schema.processlist wh…...
UNIX 域协议(本地通信协议)
概述 Unix 域协议并不是一个实际的协议族,而是在单个主机上执行客户/服务通信的一种方式。是进程间通信(IPC)的一种方式。 它提供了两类套接字:字节流套接字 SOCK_STREAM(有点像 TCP)和数据报套接字 SOCK_…...
分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测(SE注意力机制)
分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测(SE注意力机制) 目录 分类预测 | MATLAB实现SSA-CNN-BiGRU-Attention数据分类预测(SE注意力机制)分类效果基本描述模型描述程序设计参考资料 分类效果 基本描述 1.MATLA…...
基于FPGA的图像PSNR质量评估计算实现,包含testbench和MATLAB辅助验证程序
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 设置较大的干扰,PSNR15。 设置较小的干扰,PSNR25。 2.算法运行软件版本 matlab2022a vivado2019.2 3.部分核心程序 ti…...
算法进修Day-38
算法进修Day-38 77. 组合 难度:中等 题目要求: 给定两个整数 n 和 k,返回范围 [1, n] 中所有可能的 k 个数的组合。 示例1 输入:n 4, k 2 输出: [ [2,4], [3,4], [2,3], [1,2], [1,3], [1,4], ] 示例2 输入&#…...
FFmpeg 低延迟同屏方案
引言 在实时互动需求激增的当下,无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作,还是游戏直播的画面实时传输,低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架,凭借其灵活的编解码、数据…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
解锁数据库简洁之道:FastAPI与SQLModel实战指南
在构建现代Web应用程序时,与数据库的交互无疑是核心环节。虽然传统的数据库操作方式(如直接编写SQL语句与psycopg2交互)赋予了我们精细的控制权,但在面对日益复杂的业务逻辑和快速迭代的需求时,这种方式的开发效率和可…...
ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
MVC 数据库
MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
如何为服务器生成TLS证书
TLS(Transport Layer Security)证书是确保网络通信安全的重要手段,它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书,可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...
Psychopy音频的使用
Psychopy音频的使用 本文主要解决以下问题: 指定音频引擎与设备;播放音频文件 本文所使用的环境: Python3.10 numpy2.2.6 psychopy2025.1.1 psychtoolbox3.0.19.14 一、音频配置 Psychopy文档链接为Sound - for audio playback — Psy…...
从零实现STL哈希容器:unordered_map/unordered_set封装详解
本篇文章是对C学习的STL哈希容器自主实现部分的学习分享 希望也能为你带来些帮助~ 那咱们废话不多说,直接开始吧! 一、源码结构分析 1. SGISTL30实现剖析 // hash_set核心结构 template <class Value, class HashFcn, ...> class hash_set {ty…...
