当前位置: 首页 > news >正文

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上

22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs

30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)


文章目录

  • Flink 系列文章
  • 一、Table & SQL Connectors 示例: Apache Hive
    • 1、支持的Hive版本
    • 2、依赖项
      • 1)、使用 Flink 提供的 Hive jar
      • 2)、用户定义的依赖项
      • 3)、移动 planner jar 包
    • 3、Maven 依赖
    • 4、连接到Hive
    • 5、DDL&DML


本文介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。
本文依赖环境是hadoop、zookeeper、hive、flink环境好用,本文内容以flink1.17版本进行介绍的,具体示例是在1.13版本中运行的(因为hadoop集群环境是基于jdk8的,flink1.17版本需要jdk11)。
更多的内容详见后续关于hive的介绍。

一、Table & SQL Connectors 示例: Apache Hive

Apache Hive 已经成为了数据仓库生态系统中的核心。 它不仅仅是一个用于大数据分析和ETL场景的SQL引擎,同样它也是一个数据管理平台,可用于发现,定义,和演化数据。

Flink 与 Hive 的集成包含两个层面。

一是利用了 Hive 的 MetaStore 作为持久化的 Catalog,用户可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,用户可以使用HiveCatalog将其 Kafka 表或 Elasticsearch 表存储在 Hive Metastore 中,并后续在 SQL 查询中重新使用它们。

二是利用 Flink 来读写 Hive 的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以"开箱即用"的访问其已有的 Hive 数仓。 您不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

1、支持的Hive版本

Flink 支持以下的 Hive 版本。

  • 2.3
    2.3.0
    2.3.1
    2.3.2
    2.3.3
    2.3.4
    2.3.5
    2.3.6
    2.3.7
    2.3.8
    2.3.9
  • 3.1
    3.1.0
    3.1.1
    3.1.2
    3.1.3

某些功能是否可用取决于您使用的 Hive 版本,这些限制不是由 Flink 所引起的:

  • Hive 内置函数在使用 Hive-2.3.0 及更高版本时支持。
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-2.3.0 及更高版本时支持。
  • DATE列统计信息,在使用 Hive-2.3.0 及更高版时支持。

2、依赖项

要与 Hive 集成,您需要在 Flink 下的/lib/目录中添加一些额外的依赖包, 以便通过 Table API 或 SQL Client 与 Hive 进行交互。 或者,您可以将这些依赖项放在专用文件夹中,并分别使用 Table API 程序或 SQL Client 的-C或-l选项将它们添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上构建的, 首先您需要 Hadoop 的依赖,请参考 Providing Hadoop classes:

export HADOOP_CLASSPATH=`hadoop classpath`

有两种添加 Hive 依赖项的方法。第一种是使用 Flink 提供的 Hive Jar包。您可以根据使用的 Metastore 的版本来选择对应的 Hive jar。第二个方式是分别添加每个所需的 jar 包。如果您使用的 Hive 版本尚未在此处列出,则第二种方法会更适合。

注意:建议您优先使用 Flink 提供的 Hive jar 包。仅在 Flink 提供的 Hive jar 不满足您的需求时,再考虑使用分开添加 jar 包的方式。

1)、使用 Flink 提供的 Hive jar

下表列出了所有可用的 Hive jar。您可以选择一个并放在 Flink 发行版的/lib/ 目录中。
在这里插入图片描述

2)、用户定义的依赖项

您可以在下方找到不同Hive主版本所需要的依赖项。

  • Hive 2.3.4
/flink-1.17.1/lib// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jarsflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-2.3.4.jar// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar
  • Hive 3.1.0
/flink-1.17.1/lib// Flink's Hive connectorflink-connector-hive_2.12-1.17.1.jar// Hive dependencieshive-exec-3.1.0.jarlibfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately// add antlr-runtime if you need to use hive dialectantlr-runtime-3.5.2.jar

3)、移动 planner jar 包

把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移动到 FLINK_HOME/lib 下,并且将 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具体原因请参见 FLINK-25128。你可以使用如下命令来完成移动 planner jar 包的工作:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar

只有当要使用 Hive 语法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移动。 但是在集成 Hive 的时候,推荐进行上述的操作。

3、Maven 依赖

如果您在构建自己的应用程序,则需要在 mvn 文件中添加以下依赖项。 您应该在运行时添加以上的这些依赖项,而不要在已生成的 jar 文件中去包含它们。

<!-- Flink Dependency -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-hive_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>1.17.1</version><scope>provided</scope>
</dependency><!-- Hive Dependency -->
<dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>${hive.version}</version><scope>provided</scope>
</dependency>

4、连接到Hive

通过 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog连接到现有的 Hive 集群。

以下是如何连接到 Hive 的示例:

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");----------------------示例----------------------------
import java.util.List;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;/*** @author alanchan**/
public class TestHiveCatalogDemo {/*** @param args* @throws DatabaseNotExistException * @throws CatalogException */public static void main(String[] args) throws CatalogException, DatabaseNotExistException {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(env);String name = "alan_hive";// testhive 数据库名称String defaultDatabase = "testhive";String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);tenv.registerCatalog("alan_hive", hiveCatalog);// 使用注册的catalogtenv.useCatalog("alan_hive");List<String> tables = hiveCatalog.listTables(defaultDatabase); for (String table : tables) {System.out.println("Database:testhive  tables:" + table);}}}
  • sql
CREATE CATALOG myhive WITH ('type' = 'hive','default-database' = 'mydatabase','hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;------------------具体示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in setFlink SQL> CREATE CATALOG alan_hivecatalog WITH (
>     'type' = 'hive',
>     'default-database' = 'testhive',
>     'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.Flink SQL> show catalogs;
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in setFlink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.Flink SQL> show tables;
+-----------------------------------+
|                        table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
|                         apachelog |
|                          col2row1 |
|                          col2row2 |
|                       cookie_info |
|                              dual |
|                         dw_zipper |
|                               emp |
|                          employee |
|                  employee_address |
|               employee_connection |
|                 ods_zipper_update |
|                          row2col1 |
|                          row2col2 |
|                            singer |
|                           singer2 |
|                           student |
|                      student_dept |
|               student_from_insert |
|                      student_hdfs |
|                    student_hdfs_p |
|                      student_info |
|                     student_local |
|                 student_partition |
|              t_all_hero_part_msck |
|                     t_usa_covid19 |
|                   t_usa_covid19_p |
|                              tab1 |
|                         tb_dept01 |
|                    tb_dept_bucket |
|                            tb_emp |
|                          tb_emp01 |
|                     tb_emp_bucket |
|                     tb_json_test1 |
|                     tb_json_test2 |
|                          tb_login |
|                      tb_login_tmp |
|                          tb_money |
|                      tb_money_mtn |
|                            tb_url |
|              the_nba_championship |
|                             tmp_1 |
|                        tmp_zipper |
|                         user_dept |
|                     user_dept_sex |
|                             users |
|                 users_bucket_sort |
|                   website_pv_info |
|                  website_url_info |
+-----------------------------------+
49 rows in set
  • ymal
execution:...current-catalog: alan_hivecatalog  # set the HiveCatalog as the current catalog of the sessioncurrent-database: testhivecatalogs:- name: alan_hivecatalog  type: hivehive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf

下表列出了通过 YAML 文件或 DDL 定义 HiveCatalog 时所支持的参数。

在这里插入图片描述

5、DDL&DML

在 Flink 中执行 DDL 操作 Hive 的表、视图、分区、函数等元数据时,参考:33、Flink之hive
Flink 支持 DML 写入 Hive 表,请参考:33、Flink之hive
以上,介绍了Apache Hive连接器的使用,以具体的示例演示了通过java和flink sql cli创建catalog。

相关文章:

16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…...

6.Redis-hash

hash 哈希类型中的映射关系通常称为field-value&#xff0c;⽤于区分 Redis 整体的键值对&#xff08;key-value&#xff09;&#xff0c;注意这⾥的value是指field对应的值&#xff0c;不是键&#xff08;key&#xff09;对应的值&#xff0c;请注意 value 在不同上下⽂的作⽤…...

点云从入门到精通技术详解100篇-多时相机载激光雷达人工林点云匹配及生长监测(续)

目录 多时相机载激光雷达人工林点云匹配及变化监测 3.1 技术路线 3.2 数据准备 3.3 方法...

【Vue3 知识第七讲】reactive、shallowReactive、toRef、toRefs 等系列方法应用与对比

文章目录 一、reactive()二、readonly()三、shallowReactive()四、shallowReadonly()五、isReactive() 和 isReadonly()六、toRef()七、toRefs()八、toRaw()九、ref、toRef、toRefs 异同点 一、reactive() reactive() 函数用于返回一个对象的响应式代理。与 ref() 函数定义响应…...

Docker 摸门级简易手册

Docker 摸门级简易手册 文章目录 Docker 摸门级简易手册使用 Docker 构建 Java 项目镜像Docker 安装Install on MacInstall on WindowsInstall on Linux Dockerfile 说明FROMLABELENVWORKDIRCOPYADDRUNCMDEXPOSEENTRYPOINTVOLUMEUSER 使用 Docker 构建 Java 项目镜像 假设有个…...

Java类加载机制

简介 在Java的世界里&#xff0c;每一个类或者接口&#xff0c;在经历编译器后&#xff0c;都会生成一个个.class文件。 类加载机制指的是将这些.class文件中的二进制数据读入到内存中&#xff0c;并对数据进行校验&#xff0c;解析和初始化。最终&#xff0c;每一个类都会在…...

vue 自定义指令简单记录

自定义指令例子 // src/main.js import { createApp } from vue; import App from ./App.vue;const app = createApp(App);// 全局自定义指令 app.directive(color-directive, {mounted(el, binding) {// 当指令绑定到元素上时触发// el 是绑定的元素// binding 包含了指令的信…...

算法通关村-----快速排序的原理和实现

快速排序介绍 快速排序是一种经典高效的排序方法&#xff0c;是分治策略在排序上的具体体现。将一个大的待排序列分割成若干个小的有序序列&#xff0c;最终将各个小的有序序列合并成一个大的有序序列。 快速排序的实现原理 选择一个基准值&#xff0c;将小于基准值的元素放…...

百度抓取香港服务器抓取超时是什么情况?

​ 网络延迟导致抓取超时 网络延迟是指从发送请求到接收响应之间的时间延迟。如果网络延迟过高&#xff0c;服务器可能无法及时响应请求&#xff0c;导致超时。在香港服务器上抓取数据时&#xff0c;如果网络延迟过高&#xff0c;可能会出现抓取超时的情况。 服务器负载过高可能…...

Springboot上传文件

上传文件示例代码&#xff1a; ApiOperation("上传文件") PostMapping(value "/uploadFile", consumes MediaType.MULTIPART_FORM_DATA_VALUE) public ApiResult<String> uploadFile(RequestPart("file") MultipartFile file) { //调用七…...

kafka教程

kafka教程 Kafka是一个分布式、分区的、多副本的、多订阅者&#xff0c;基于zookeeper协调的分布式日志系统&#xff0c;其主要特点为&#xff1a; 以时间复杂度为O(1)的方式提供消息持久化能力&#xff0c;即使对TB级以上数据也能保证常数时间的访问性能高吞吐率。即使在非常…...

JVM的故事—— 内存分配策略

内存分配策略 文章目录 内存分配策略一、对象优先在Eden分配二、大对象直接进入老年代三、长期存活的对象将进入老年代四、动态对象年龄判定五、空间分配担保 一、对象优先在Eden分配 堆内存有新生代和老年代&#xff0c;新生代中有一个Eden区和一个Survivor区(from space或者…...

21.CSS的动态圆形进度条

效果 源码 <!doctype html> <html><head><meta charset="utf-8"><title>Animated Circular Progress | CSS Only</title><link rel="stylesheet" href="style.css"></head><body><di…...

Linux_VMware_虚拟机磁盘扩容

来源文章 &#xff1a;VMware教学-虚拟机扩容篇_vmware虚拟机扩容_系统免驱动的博客-CSDN博客 由于项目逐步的完善&#xff0c;需要搭建的中间件&#xff0c;软件越来越多&#xff0c;导致以前虚拟机配置20G的内存不够用了&#xff0c;又不想重新创建新的虚拟机&#xff0c;退…...

中欧财富:分布式数据库的应用历程和 TiDB 7.1 新特性探索

原文来源&#xff1a; https://tidb.net/blog/ccbaeda2 作者&#xff1a;张政俊&#xff0c; 中欧财富数据库负责人 导读 中欧财富是中欧基金控股的销售子公司&#xff0c;旗下 APP 实现业内基金品种全覆盖&#xff0c;提供基金交易、大数据选基、智慧定投、理财师咨询等…...

树莓 LUMA-OLED.EXAMPLE使用

详细介绍在文件目录下的README.rst中 第一步 $ sudo usermod -a -G i2c,spi,gpio pi //好像没什么用 $ sudo apt install python3-dev python3-pip python3-numpy libfreetype6-dev libjpeg-dev build-essential //安装依赖包&#xff0c;树莓派中好像已经有了 $ sudo a…...

C#,《小白学程序》第十一课:双向链表(Linked-List)其二,链表的插入与删除的方法(函数)与代码

1 文本格式 /// <summary> /// 改进的车站信息类 class /// 增加了 链表 需要的两个属性 Last Next /// </summary> public class StationAdvanced { /// <summary> /// 编号 /// </summary> public int Id { get; set; } 0; ///…...

java IDEA文件路径分层级

如下图这样 在设置里找到Compact Middle Packages&#xff0c;去掉勾选就行了...

Spring AOP+Redis实现接口访问限制

目录 一、需求二、实现思路三、代码实现3.1 导入依赖3.2 配置redis3.3 自定义注解3.4 定义切面类3.5 自定义异常类3.6 全局异常处理器 一、需求 在我们程序中&#xff0c;有时候需要对一些接口做访问控制&#xff0c;使程序更稳定&#xff0c;最常用的一种是通过ip限制&#x…...

互联网后端技术大全!

互联网后端技术大全&#xff01; 一. 系统开发 高内聚/低耦合 高内聚 高内聚指一个软件模块是由相关性很强的代码组成&#xff0c;只负责一项任务&#xff0c;也就是常说的单一责任原则。模块的内聚反映模块内部联系的紧密程度。 低耦合 模块之间联系越紧密&#xff0c;其…...

Android SDK 上手指南||第九章 Manifest文件

第九章 Manifest文件 到目前为止&#xff0c;我们已经熟悉了Android项目中的各个组成部分&#xff0c;包括其资源。在今天的文章中&#xff0c;我们将以项目Manifest文件作为核心内容。 对于一个项目来说&#xff0c;Manifest既可以很简单、也可以很复杂&#xff0c;其具体情…...

CVE-2023-3450:锐捷 RG-BCR860 命令执行漏洞复现

锐捷 RG-BCR860 命令执行漏洞(CVE-2023-3450)复现 0x01 前言 本次测试仅供学习使用&#xff0c;如若非法他用&#xff0c;与本文作者无关&#xff0c;需自行负责&#xff01;&#xff01;&#xff01; 0x02 漏洞描述 Ruijie Networks RG-BCR860是中国锐捷网络&#xff08;R…...

【ES】elasticsearch8.3.3

这里仅实践操作并根据实际问题进行记录笔记。 运行 ES8 我们需要在自己的电脑上安装好 Docker Desktop。接着我们运行如下的命令&#xff1a;出现两个异常&#xff0c;一个是需要使用winpty因为我使用win的docker desktop&#xff0c;另外一个问题是docker启动elasticsearchE…...

2023年下半年广州/深圳软考(中/高级)认证报名,当然弘博创新

软考是全国计算机技术与软件专业技术资格&#xff08;水平&#xff09;考试&#xff08;简称软考&#xff09;项目&#xff0c;是由国家人力资源和社会保障部、工业和信息化部共同组织的国家级考试&#xff0c;既属于国家职业资格考试&#xff0c;又是职称资格考试。 系统集成…...

2017. 网格游戏;2397. 被列覆盖的最多行数;2202. K 次操作后最大化顶端元素

2017. 网格游戏 核心思想&#xff1a;前缀和枚举。读完题后可以发现&#xff0c;第一个机器人走的路线就像一条分割线&#xff0c;第二个机器人只能获得上面白色部分或者下面白色部分的最大值。这个最大值怎么求&#xff0c;我们可以通过前缀和来求&#xff0c;然后通过枚举转…...

专访远航汽车远勤山:踏踏实实做好产品 直面挑战乘风远航

8月25日&#xff0c;第二十六届成都国际汽车展览会在中国西部国际博览城隆重开幕。车展举办期间&#xff0c;远航汽车董事长远勤山先生、产品研发总监王震先生向媒体分享了远航汽车品牌发展、产品研发、技术创新以及市场布局等内容。 “通过我们的付出和努力&#xff0c;让我们…...

Redis基本了解

Redis 基于内存进⾏存储&#xff0c;⽀持 key-value 的存储形式&#xff0c;底层是⽤ C 语⾔编写的。 基于 key-value 形式的数据字典&#xff0c;结构⾮常简单&#xff0c;没有数据表的概念&#xff0c;直接⽤键值对的形式完成数据的管理&#xff0c;Redis ⽀持 5 种数据类型…...

Seata处理分布式事务之1.7.0

https://blog.csdn.net/zhang33565417/article/details/122768300 1.5.0之后版本发生了很大改变 1.seata安装 1.1官网地址 http://seata.io/zh-cn/ 1.2下载地址 https://github.com/seata/seata/releases 下载的是seata-server-1.7.0.zip 1.3seata相关配置的修改 seata-…...

在k8s中用label控制Pod部署到指定的node上

案例-标注k8s-node1是配置了SSD的节点 kubectl label node k8s-node1 disktypessd 查看标记 测试 将pod部署到disktypessd的节点上&#xff08;这里设置了k8s-node1为ssd&#xff09; 部署后查看结果-副本全都运行在了k8s-node1上—符合预期 删除标记 kubectl label node k8…...

vue3 搭配ElementPlus做基础表单校验 自定义表单校验

<script setup> import { ref, reactive } from vue// 表单元素 const dom ref(null) // 校验规则 const rules {name: [{ required: true, message: 请输入活动名称, trigger: blur }],//校验手机号格式phone: [{ required: true, message: "请输入电话", t…...