当前位置: 首页 > news >正文

Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

1. TableEnvironment

创建 TableEnvironment

from pyflink.table import Environmentsettings, TableEnvironment# create a streaming TableEnvironmentenv_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)# or create a batch TableEnvironmentenv_settings = Environmentsettings.in_batch_mode()table_env = TableEnvironment.create(env_settings)

TableEnvironment 是 Table API 和 SQL 集成的核心概念。

TableEnvironment 可以用来:

  • ·创建 Table
  • ·将 Table 注册成临时表
  • ·执行 SQL 查询
  • ·注册用户自定义的 (标量,表值,或者聚合) 函数
  • ·配置作业
  • ·管理 Python 依赖
  • ·提交作业执行

创建 source 表

table_env.execute_sql("""CREATE TABLE datagen (id INT,data STRING) WITH ('connector' = 'datagen','fields.id.kind' = 'sequence','fields.id.start' = '1','fields.id.end' = '10')""")

创建 sink 表

table_env.execute_sql("""CREATE TABLE print (id INT,data STRING) WITH ('connector' = 'print')""")

2. Table

Table 是 Python Table API 的核心组件。Table 是 Table API 作业中间结果的逻辑表示。

一个 Table 实例总是与一个特定的 TableEnvironment 相绑定。

不支持在同一个查询中合并来自不同 TableEnvironments 的表,例如 join 或者 union 它们。

通过列表类型的对象创建

你可以使用一个列表对象创建一张表:

from pyflink.table import Environmentsettings, TableEnvironment# 创建 批 TableEnvironmentenv_settings = Environmentsettings.in_batch_mode()table_env = TableEnvironment.create(env_settings)table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])table.to_pandas()==>print(table.to_pandas())table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])print(table.to_pandas())

通过 DDL 创建

你可以通过 DDL 创建一张表,execute_sql(stmt) 执行指定的语句并返回执行结果。

执行语句可以是 DDL/DML/DQL/SHOW/DESCRIBE/EXPLAIN/USE。

注意,对于 "INSERT INTO" 语句,这是一个异步操作,通常在向远程集群提交作业时才需要使用。

但是,如果在本地集群或者 IDE 中执行作业时,你需要等待作业执行完成。

from pyflink.table import Environmentsettings, TableEnvironment# 创建流 TableEnvironmentenv_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='3','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='6')""")table = table_env.from_path("random_source")table.to_pandas()

通过 Catalog 创建

Catalog

Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。

元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。

元数据也可以是持久化的,例如 Hive Metastore 中的元数据。

Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

Catalog类型

GenericInMemoryCatalog

基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

JdbcCatalog

JdbcCatalog使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。

PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。

HiveCatalog

HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。

警告 Hive Metastore 以小写形式存储所有元数据对象名称,GenericInMemoryCatalog 区分大小写。

用户自定义 Catalog

Catalog 是可扩展的,用户可以通过实现 Catalog 接口来开发自定义 Catalog。

想要在 SQL CLI 中使用自定义 Catalog,用户除了需要实现自定义的 Catalog 之外,还需要为这个 Catalog 实现对应的 CatalogFactory 接口。

CatalogFactory 定义了一组属性,用于 SQL CLI 启动时配置 Catalog。

这组属性集将传递给发现服务,在该服务中,服务会尝试将属性关联到 CatalogFactory 并初始化相应的 Catalog 实例。

创建 Flink 表并将其注册到 Catalog

使用 SQL DDL

用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。

from pyflink.table.catalog import HiveCatalog# Create a HiveCatalogcatalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")# Register the catalogt_env.register_catalog("myhive", catalog)# Create a catalog databaset_env.execute_sql("CREATE DATABASE mydb WITH (...)")# Create a catalog tablet_env.execute_sql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")# should return the tables in current catalog and database.t_env.list_tables()

通过 SQL DDL 创建的表和视图, 例如 “create table …” 和 “create view …",都存储在 catalog 中。

你可以通过 SQL 直接访问 catalog 中的表。

使用 Java/Scala

用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。

from pyflink.table import *
from pyflink.table.catalog import HiveCatalog, CatalogDatabase, ObjectPath, CatalogBaseTable
from pyflink.table.descriptors import Kafka
settings = Environmentsettings.in_batch_mode()
t_env = TableEnvironment.create(settings)
# Create a HiveCatalog
catalog = HiveCatalog("myhive", None, "<path_of_hive_conf>")
# Register the catalog
t_env.register_catalog("myhive", catalog)
# Create a catalog database
database = CatalogDatabase.create_instance({"k1": "v1"}, None)
catalog.create_database("mydb", database)
# Create a catalog table
schema = Schema.new_builder() \.column("name", DataTypes.STRING()) \.column("age", DataTypes.INT()) \.build()  
catalog_table = t_env.create_table("myhive.mydb.mytable", TableDescriptor.for_connector("kafka").schema(schema)// ….build())
# tables should contain "mytable"
tables = catalog.list_tables("mydb")

TableEnvironment 维护了一个使用标识符创建的表的 catalogs 映射。

Catalog 中的表既可以是临时的,并与单个 Flink 会话生命周期相关联,也可以是永久的,跨多个 Flink 会话可见。

如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象:

from_path(path)   通过指定路径下已注册的表来创建一个表,例如通过 create_temporary_view 注册表。

相关文章:

Flink流批一体计算(13):PyFlink Tabel API之SQL DDL

1. TableEnvironment 创建 TableEnvironment from pyflink.table import Environmentsettings, TableEnvironment# create a streaming TableEnvironmentenv_settings Environmentsettings.in_streaming_mode()table_env TableEnvironment.create(env_settings)# or create…...

java笔试手写算法面试题大全含答案

1.统计一篇英文文章单词个数。 public class WordCounting { public static void main(String[] args) { try(FileReader fr new FileReader("a.txt")) { int counter 0; boolean state false; int currentChar; while((currentChar fr.read()) ! -1) { i…...

点云平面拟合和球面拟合

一、介绍 In this tutorial we learn how to use a RandomSampleConsensus with a plane model to obtain the cloud fitting to this model. 二、代码 #include <iostream> #include <thread> #include <pcl/point_types.h> #include <pcl/common/io.…...

部署问题集合(十九)linux设置Tomcat、Docker,以及使用脚本开机自启(亲测)

前言 因为不想每次启动虚拟机都要手动启动一遍这些东西&#xff0c;所以想要设置成开机自启的状态 设置Tomcat开机自启 创建service文件 vi /etc/systemd/system/tomcat.service添加如下内容&#xff0c;注意修改启动脚本和关闭脚本的地址 [Unit] DescriptionTomcat9068 A…...

视觉SLAM:一直在入门,如何能精通,CV领域的绝境长城,

目录 前言 福利&#xff1a;文末有chat-gpt纯分享&#xff0c;无魔法&#xff0c;无限制 1 什么是SLAM&#xff1f; 2 为什么用SLAM&#xff1f; 3 视觉SLAM怎么实现&#xff1f; 4 前端视觉里程计 5 后端优化 6 回环检测 7 地图构建 8 结语 前言 上周的组会上&…...

【报错】yarn --version Unrecognized option: --version Error...

文章目录 问题分析解决问题 在使用 npm install -g yarn 全局安装 yarn 后,查看yarn 的版本号,报错如下 PS D:\global-data-display> yarn --version Unrecognized option: --version Error: Could...

二叉搜索树的(查找、插入、删除)

一、二叉搜索树的概念 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树: 1、若它的左子树不为空&#xff0c;则左子树上所有节点的值都小于根节点的值&#xff1b; 2、若它的右子树不为空&#xff0c;则右子树上所有节点的值都…...

电力虚拟仿真 | 高压电气试验VR教学系统

在科技进步的推动下&#xff0c;我们的教育方式也在发生着翻天覆地的变化。其中&#xff0c;虚拟现实&#xff08;VR&#xff09;技术的出现&#xff0c;为我们提供了一种全新的、富有沉浸感的学习和培训方式。特别是在电力行业领域&#xff0c;例如&#xff0c;电力系统的维护…...

innovus如何设置size only

我正在「拾陆楼」和朋友们讨论有趣的话题&#xff0c;你⼀起来吧&#xff1f; 拾陆楼知识星球入口 给instance设置size only属性命令如下: dbset [dbGet top.inst.name aa/bb -p] .dontTouch sizeOk 给一个module设置size only需要foreach循环一下: foreach inst [dbGet top.…...

Java之继承详解二

3.7 方法重写 3.7.1 概念 方法重写 &#xff1a;子类中出现与父类一模一样的方法时&#xff08;返回值类型&#xff0c;方法名和参数列表都相同&#xff09;&#xff0c;会出现覆盖效果&#xff0c;也称为重写或者复写。声明不变&#xff0c;重新实现。 3.7.2 使用场景与案例…...

国内常见的几款可视化Web组态软件

组态软件是一种用于控制和监控各种设备的软件&#xff0c;也是指在自动控制系统监控层一级的软件平台和开发环境。这类软件实际上也是一种通过灵活的组态方式&#xff0c;为用户提供快速构建工业自动控制系统监控功能的、通用层次的软件工具。通常用于工业控制&#xff0c;自动…...

通过 git上传到 gitee 仓库

介绍 Git是目前世界上最先进的分布式版本控制系统&#xff0c;有这么几个特点&#xff1a; 分布式 &#xff1a;是用来保存工程源代码历史状态的命令行工具。保存点 &#xff1a;保存点可以追溯源码中的文件&#xff0c;并能得到某个时间点上的整个工程项目额状态&#xff1b;…...

设置Windows主机的浏览器为wls2的默认浏览器

1. 准备工作 wsl是可以使用Windows主机上安装的exe程序&#xff0c;出于安全考虑&#xff0c;默认情况下改功能是无法使用。要使用的话&#xff0c;终端需要以管理员权限启动。 我这里以Windows Terminal为例&#xff0c;介绍如何默认使用管理员权限打开终端&#xff0c;具体…...

森林生物量(蓄积量)估算全流程

python森林生物量&#xff08;蓄积量&#xff09;估算全流程 一.哨兵2号获取/去云处理/提取参数1.1 影像处理与下载1.2 导入2A级产品1.3导入我们在第1步生成的云掩膜文件1.4.SNAP掩膜操作1.5采用gdal计算各类植被指数1.6 纹理特征参数提取 二.哨兵1号获取/处理/提取数据2.1 纹理…...

MySQL数据库概述

MySQL数据库概述 1 SQL SQL语句大小写不敏感。 SQL语句末尾应该使用分号结束。 1.1 SQL语句及相关操作示例 DDL&#xff1a;数据定义语言&#xff0c;负责数据库定义、数据库对象定义&#xff0c;由CREATE、ALTER与DROP三个语法所组成DML&#xff1a;数据操作语言&#xff…...

2023年国赛数学建模思路 - 案例:退火算法

文章目录 1 退火算法原理1.1 物理背景1.2 背后的数学模型 2 退火算法实现2.1 算法流程2.2算法实现 建模资料 ## 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 退火算法原理 1.1 物理背景 在热力学上&a…...

怎么借助ChatGPT处理数据结构的问题

目录 使用ChatGPT进行数据格式化转换 代码示例 ChatGPT格式化数据提示语 代码示例 批量格式化数据提示语 代码示例 ChatGPT生成的格式化批处理代码 使用ChatGPT合并不同数据源的数据 合并数据提示语 自动合并数据提示语 ChatGPT生成的自动合并代码 结论 数据合并是…...

Docker容器无法启动 Cannot find /usr/local/tomcat/bin/setclasspath.sh

报错信息如下 解决办法 权限不够 加上--privileged 获取最大权限 docker run --privileged --name lenglianerqi -p 9266:8080 -v /opt/docker/lenglianerqi/webapps:/usr/local/tomcat/webapps/ -v /opt/docker/lenglianerqi/webapps/userfile:/usr/local/tomcat/webapps/u…...

Pytorch-day08-模型进阶训练技巧-checkpoint

PyTorch 模型进阶训练技巧 自定义损失函数动态调整学习率 典型案例&#xff1a;loss上下震荡 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BndMyRX0-1692613806232)(attachment:image-2.png)] 1、自定义损失函数 1、PyTorch已经提供了很多常用…...

【ArcGIS Pro二次开发】(61):样式(Style)和符号(Symbol)

在 ArcGIS Pro SDK 中&#xff0c;地图要素符号&#xff08;Symbol&#xff09;和符号样式&#xff08;Style&#xff09;是2个很重要的概念。 【Symbol】是用于表示地图上不同类型的要素&#xff08;如点、线、面&#xff09;的图形化表示。 在地图中&#xff0c;各种要素都…...

基于FPGA的PID算法学习———实现PID比例控制算法

基于FPGA的PID算法学习 前言一、PID算法分析二、PID仿真分析1. PID代码2.PI代码3.P代码4.顶层5.测试文件6.仿真波形 总结 前言 学习内容&#xff1a;参考网站&#xff1a; PID算法控制 PID即&#xff1a;Proportional&#xff08;比例&#xff09;、Integral&#xff08;积分&…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法

树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作&#xff0c;无需更改相机配置。但是&#xff0c;一…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

今日科技热点速览

&#x1f525; 今日科技热点速览 &#x1f3ae; 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售&#xff0c;主打更强图形性能与沉浸式体验&#xff0c;支持多模态交互&#xff0c;受到全球玩家热捧 。 &#x1f916; 人工智能持续突破 DeepSeek-R1&…...

dify打造数据可视化图表

一、概述 在日常工作和学习中&#xff0c;我们经常需要和数据打交道。无论是分析报告、项目展示&#xff0c;还是简单的数据洞察&#xff0c;一个清晰直观的图表&#xff0c;往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server&#xff0c;由蚂蚁集团 AntV 团队…...

VM虚拟机网络配置(ubuntu24桥接模式):配置静态IP

编辑-虚拟网络编辑器-更改设置 选择桥接模式&#xff0c;然后找到相应的网卡&#xff08;可以查看自己本机的网络连接&#xff09; windows连接的网络点击查看属性 编辑虚拟机设置更改网络配置&#xff0c;选择刚才配置的桥接模式 静态ip设置&#xff1a; 我用的ubuntu24桌…...

动态 Web 开发技术入门篇

一、HTTP 协议核心 1.1 HTTP 基础 协议全称 &#xff1a;HyperText Transfer Protocol&#xff08;超文本传输协议&#xff09; 默认端口 &#xff1a;HTTP 使用 80 端口&#xff0c;HTTPS 使用 443 端口。 请求方法 &#xff1a; GET &#xff1a;用于获取资源&#xff0c;…...