当前位置: 首页 > news >正文

0基础学习PyFlink——使用Table API实现SQL功能

大纲

  • Souce
    • schema
    • descriptor
  • Sink
    • schema
    • descriptor
  • Execute
  • 完整代码
  • 参考资料

《0基础学习PyFlink——使用PyFlink的Sink将结果输出到Mysql》一文中,我们讲到如何通过定义Souce、Sink和Execute三个SQL,来实现数据读取、清洗、计算和入库。
如下图所示SQL是最高层级的抽象,在它之下是Table API。本文我们会将例子中的SQL翻译成Table API来实现等价的功能。
在这里插入图片描述

Souce

    # """create table source (#         word STRING#     ) with (#         'connector' = 'filesystem',#         'format' = 'csv',#         'path' = '{}'#     )# """.format(input_path)

下面的SQL分为两部分:

  • Table结构:该表只有一个名字为word,类型为string的字段。
  • 连接器:是“文件系统”(filesystem)类型,格式是csv的文件。这样输入就会按csv格式进行解析。

SQL中的Table对应于Table API中的schema。它用于定义表的结构,比如有哪些类型的字段和主键等。
上述整个SQL整体对应于descriptor。即我们可以认为descriptor是表结构+连接器。
我们可以让不同的表和不同的连接器结合,形成不同的descriptor。这是一个组合关系,我们将在下面看到它们的组合方式。

schema

    # define the source schemasource_schema = Schema.new_builder() \.column("word", DataTypes.STRING()) \.build()

new_builder()会返回一个Schema.Builder对象;
column(self, column_name: str, data_type: Union[str, DataType])方法用于声明该表存在哪些类型、哪些名字的字段,同时返回之前的Builder对象;
最后的build(self)方法返回Schema.Builder对象构造的Schema对象。

descriptor

    # Create a source descriptorsource_descriptor= TableDescriptor.for_connector("filesystem") \.schema(source_schema) \.option('path', input_path) \.format("csv") \.build()

for_connector(connector: str)方法返回一个TableDescriptor.Builder对象;
schema(self, schema: Schema)将上面生成的source_schema 对象和descriptor关联;
option(self, key: Union[str, ConfigOption], value)用于指定一些参数,比如本例用于指定输入文件的路径;
format(self, format: Union[str, ‘FormatDescriptor’], format_option: ConfigOption[str] = None)用于指定内容的格式,这将指导怎么解析和入库;
build(self)方法返回TableDescriptor.Builder对象构造的TableDescriptor对象。

Sink

    # """CREATE TABLE WordsCountTableSink (#         `word` STRING,#         `count` BIGINT,#         PRIMARY KEY (`word`) NOT ENFORCED#     ) WITH (#         'connector' = 'jdbc',#         'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',#         'table-name' = 'WordsCountTable',#         'driver'='com.mysql.jdbc.Driver',#         'username'='admin',#         'password'='pwd123'#     );# """

schema

    sink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()

大部分代码在之前已经解释过了。我们主要关注于区别点:

  • primary_key(self, *column_names: str) 用于指定表的主键。
  • 主键的类型需要使用调用not_null(),以表明其非空。

descriptor

    # Create a sink descriptorsink_descriptor = TableDescriptor.for_connector("jdbc") \.schema(sink_schema) \.option("url", "jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false") \.option("table-name", "WordsCountTable") \.option("driver", "com.mysql.jdbc.Driver") \.option("username", "admin") \.option("password", "pwd123") \.build()

这块代码主要是通过option来设置一些连接器相关的设置。可以看到这是用KV形式设计的,这样就可以让option方法有很大的灵活性以应对不同连接器千奇百怪的设置。

Execute

使用下面的代码将表创建出来,以供后续使用。

t_env.create_table("source", source_descriptor)
tab = t_env.from_path('source')
t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    # execute insert# """insert into WordsCountTableSink#     select word, count(1) as `count`#     from source#     group by word# """
    tab.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()

这儿需要介绍的就是lit。它用于生成一个表达式,诸如sum、max、avg和count等。
execute_insert(self, table_path_or_descriptor: Union[str, TableDescriptor], overwrite: bool = False)用于将之前的计算结果插入到Sink表中

完整代码

import argparse
import logging
import sysfrom pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, coldef word_count(input_path):config = Configuration()# write all the data to one fileconfig.set_string('parallelism.default', '1')env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()t_env = TableEnvironment.create(env_settings)# """create table source (#         word STRING#     ) with (#         'connector' = 'filesystem',#         'format' = 'csv',#         'path' = '{}'#     )# """# define the source schemasource_schema = Schema.new_builder() \.column("word", DataTypes.STRING()) \.build()# Create a source descriptorsource_descriptor = TableDescriptor.for_connector("filesystem") \.schema(source_schema) \.option('path', input_path) \.format("csv") \.build()t_env.create_table("source", source_descriptor)# """CREATE TABLE WordsCountTableSink (#         `word` STRING,#         `count` BIGINT,#         PRIMARY KEY (`word`) NOT ENFORCED#     ) WITH (#         'connector' = 'jdbc',#         'url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false',#         'table-name' = 'WordsCountTable',#         'driver'='com.mysql.jdbc.Driver',#         'username'='admin',#         'password'='pwd123'#     );# """# define the sink schemasink_schema = Schema.new_builder() \.column("word", DataTypes.STRING().not_null()) \.column("count", DataTypes.BIGINT()) \.primary_key("word") \.build()# Create a sink descriptorsink_descriptor = TableDescriptor.for_connector("jdbc") \.schema(sink_schema) \.option("url", "jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false") \.option("table-name", "WordsCountTable") \.option("driver", "com.mysql.jdbc.Driver") \.option("username", "admin") \.option("password", "pwd123") \.build()t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)# execute insert# """insert into WordsCountTableSink#     select word, count(1) as `count`#     from source#     group by word# """tab = t_env.from_path('source')tab.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert("WordsCountTableSink") \.wait()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input)

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/overview/
  • https://nightlies.apache.org/flink/flink-docs-release-1.17/api/python//reference/pyflink.table/descriptors.html

相关文章:

0基础学习PyFlink——使用Table API实现SQL功能

大纲 Souceschemadescriptor Sinkschemadescriptor Execute完整代码参考资料 《0基础学习PyFlink——使用PyFlink的Sink将结果输出到Mysql》一文中,我们讲到如何通过定义Souce、Sink和Execute三个SQL,来实现数据读取、清洗、计算和入库。 如下图所示SQL是最高层级的…...

文心一言 VS 讯飞星火 VS chatgpt (123)-- 算法导论10.4 4题

四、用go语言,对于一个含 n个结点的任意有根树,写出一个 O(n)时间的过程,输出其所有关键字该树以左孩子右兄弟表示法存储。 文心一言: 在计算机科学中,左孩子右兄弟表示法是一种用于表示树状结构的方法,其…...

[读论文] On Joint Learning for Solving Placement and Routing in Chip Design

0. Abstract 由于 GPU 在加速计算方面的优势和对人类专家的依赖较少,机器学习已成为解决布局和布线问题的新兴工具,这是现代芯片设计流程中的两个关键步骤。它仍处于早期阶段,存在一些基本问题:可扩展性、奖励设计和端到端学习范…...

L2-1 插松枝

L2-1 插松枝 分数 25 全屏浏览题目 切换布局 作者 陈越 单位 浙江大学 人造松枝加工场的工人需要将各种尺寸的塑料松针插到松枝干上,做成大大小小的松枝。他们的工作流程(并不)是这样的: 每人手边有一只小盒子,初始…...

Android 使用ContentObserver监听SettingsProvider值的变化

1、Settings原理 Settings 设置、保存的一些值,最终是存储到 SettingsProvider 的数据库 例如: Settings.Global.putInt(getContentResolver(), "SwitchLaunch", 0); Settings.System.putInt(getContentResolver(), "SwitchLaunch&quo…...

二进制安装部署k8s

概要 常见的K8S按照部署方式 minikube 是一个工具,可以在本地快速运行一个单节点微型K8S,仅用于学习,预习K8S的一些特性使用。 Kubeadmin kubeadmin也是一个工具,特工kubeadm init 和kubedm join,用于快速部署k8s…...

多输入多输出 | Matlab实现k-means-ELM(k均值聚类结合极限学习机)多输入多输出组合预测

多输入多输出 | Matlab实现k-means-ELM(k均值聚类结合极限学习机)多输入多输出组合预测 目录 多输入多输出 | Matlab实现k-means-ELM(k均值聚类结合极限学习机)多输入多输出组合预测预测效果基本描述程序设计参考资料 预测效果 基…...

ITSource 分享 第5期【校园信息墙系统】

项目介绍 本期给大家介绍一个 校园信息墙 系统,可以发布信息,表白墙,分享墙,校园二手买卖,咨询分享等墙信息。整个项目还是比较系统的,分为服务端,管理后台,用户Web端,小…...

记 : CTF2023羊城杯 - Reverse 方向 Blast 题目复现and学习记录

文章目录 前言题目分析and复习过程exp 前言 羊城杯题目复现: 第一题 知识点 :DES算法 : 链接:Ez加密器 第二题 知识点 :动态调试 : 链接:CSGO 这一题的查缺补漏: 虚假控制流的去除…...

【数据结构练习题】删除有序数组中的重复项

✨博客主页:小钱编程成长记 🎈博客专栏:数据结构练习题 🎈相关博文:消失的数字 — 三种解法超详解 删除有序数组中的重复项 1.🎈题目2. 🎈解题思路3. 🎈具体代码🎇总结 1…...

leetcode-链表

链表是一个用指针串联起来的线性结构,每个结点由数据域和指针域构成,指针域存放的是指向下一个节点的指针,最后一个节点指向NULL,第一个结点称为头节点head。 常见的链表有单链表、双向链表、循环链表。双向链表就是多了一个pre指…...

CV计算机视觉每日开源代码Paper with code速览-2023.10.27

精华置顶 墙裂推荐!小白如何1个月系统学习CV核心知识:链接 点击CV计算机视觉,关注更多CV干货 论文已打包,点击进入—>下载界面 点击加入—>CV计算机视觉交流群 1.【基础网络架构:Transformer】(Ne…...

“赋能信创,物联未来” AntDB数据库携高可用解决方案亮相2023世界数字经济大会

10月14日,在2023世界数字经济大会暨京甬信创物联网产融对接会上,AntDB数据库技术总监北陌应邀发表《AntDB国产分布式数据库创新演进与高可用解决方案》主题演讲,就AntDB数据库助力客户数智化升级的高可用信创解决方案进行了详实、真挚地分享&…...

Kitex踩坑 [Error] KITEX: processing request error,i/o timeout

报错问题 2023/010/28 17:20:10.250768 default_server_handler.go:234: [Error] KITEX: processing request error, remoteService, remoteAddr127.0.0.1:65425, errordefault codec read failed: read tcp 127.0.0.1:8888->127.0.0.1:65425: i/o timeout 分析原因 Hert…...

前端移动web高级详细解析二

移动 Web 第二天 01-空间转换 空间转换简介 空间:是从坐标轴角度定义的 X 、Y 和 Z 三条坐标轴构成了一个立体空间,Z 轴位置与视线方向相同。 空间转换也叫 3D转换 属性:transform 平移 transform: translate3d(x, y, z); transform…...

Cesium 展示——对每段线、点、label做分组实体管理

文章目录 需求分析需求 对多组实体的管理,每组实体中包含多个点和一条线,并可对该组进行删除操作 分析 删除操作中用到了 viewer.entities.remove(radarEntity); 根据ID获取实体var radar = viewer.entities.getById(radar); viewer.entities.remove(radar );...

前端学习之Babel转码器

前言 Babel转码器可以将ES6转为ES5代码,从而在老版本的浏览器运行。这说明你可以用ES6的方式编码,又不用担心现有环境是否支持。 浏览器支持性查看:https://caniuse.com/ Babel官网:https://babeljs.io/ Babel安装流程 安装Babe…...

智能井盖监测系统功能,万宾科技传感器效果

智能井盖传感器的出现是高科技产品的更新换代,同时也是智慧城市建设中的需求。在智慧城市建设过程之中,高科技产品的应用数不胜数,智能井盖传感器的出现,解决了城市道路安全保护着城市地下生命线,改善着传统井盖带来的…...

LangChain+LLM实战---BERT主要的创新之处和注意力机制中的QKV

BERT主要的创新之处 BERT(Bidirectional Encoder Representations from Transformers)是一种基于Transformer架构的预训练语言模型,由Google在2018年提出。它的创新之处主要包括以下几个方面: 双向性(Bidirectional&…...

使用 @antfu/eslint-config 配置 eslint (包含兼容uniapp方法)

安装 pnpm i -D eslint antfu/eslint-config创建 eslint.config.js 文件 // 如果没有在 page.json 配置 "type": "module" const antfu require(antfu/eslint-config).default module.exports antfu()// 配置了 "type": "module" …...

nli-distilroberta-base入门教程:零基础理解自然语言推理任务

nli-distilroberta-base入门教程:零基础理解自然语言推理任务 1. 什么是自然语言推理? 自然语言推理(Natural Language Inference,简称NLI)是让计算机理解两段文本之间逻辑关系的任务。想象一下老师批改作业的场景&a…...

ContextMenuManager:让Windows交互回归高效本质

ContextMenuManager:让Windows交互回归高效本质 【免费下载链接】ContextMenuManager 🖱️ 纯粹的Windows右键菜单管理程序 项目地址: https://gitcode.com/gh_mirrors/co/ContextMenuManager 当你在Windows系统中右键点击文件时,是否…...

Python打包神器大PK:Nuitka vs PyInstaller,谁才是你的菜?(附实测数据)

Python打包工具深度评测:Nuitka与PyInstaller的终极对决 当开发者需要将Python项目分发给没有Python环境的用户时,打包工具的选择往往成为关键决策。本文将深入分析两大主流工具Nuitka和PyInstaller在多个维度的表现,帮助开发者根据项目需求做…...

CCC数字钥匙Release 3实战:如何用BLE/UWB实现无钥匙进入(附避坑指南)

CCC数字钥匙Release 3实战:BLE/UWB无钥匙进入系统开发全解析 当你的手机靠近车辆时,车门自动解锁——这种科幻般的体验正通过CCC数字钥匙Release 3标准变为现实。作为汽车电子工程师,我曾用nRF5340开发板搭配UWB模块完整实现了这套系统&#…...

Pixel Couplet Gen实战案例:某AI开发者大会现场扫码生成像素春联纪念品

Pixel Couplet Gen实战案例:某AI开发者大会现场扫码生成像素春联纪念品 1. 项目背景与创意来源 1.1 传统与创新的碰撞 在2024年某AI开发者大会现场,我们推出了一款名为"Pixel Couplet Gen"的互动装置。这款产品将中国传统春节文化与现代AI技…...

[Python3高阶编程] - 异步编程深度学习指南二: 同步原语

概述在 Python 异步编程中,虽然协程(coroutine)天然避免了线程切换开销,但多个协程仍可能同时访问共享资源(如全局变量、文件、数据库连接),从而引发竞态条件(Race Condition&#x…...

别再数据线了!用FastAPI 分钟搭个局域网文件+剪贴板神器

AI Agent 时代的沙箱需求 从 Copilot 到 Agent:执行能力的质变 在生成式 AI 的早期阶段,应用主要以“Copilot”形式存在,AI 仅作为辅助生成建议。然而,随着 AutoGPT、BabyAGI 以及 OpenAI Code Interpreter(现为 Advan…...

STM32开发方式对比与HAL库实战指南

1. STM32开发方式概述作为一名嵌入式开发者,我亲历了STM32开发方式的变迁。从早期的寄存器操作到标准库,再到如今主流的HAL库,每种方式都有其独特的优势和适用场景。对于刚接触STM32的新手来说,选择合适的开发方式往往是个令人困惑…...

保姆级教程:在Ubuntu 22.04上手动编译FFmpeg+OpenCV,搞定昇腾CANN C++推理环境

昇腾NPU开发实战:从零构建FFmpegOpenCV的C推理环境 在昇腾NPU上进行C开发时,环境配置往往是第一个拦路虎。不同于常见的x86架构,昇腾平台的异构计算特性要求开发者对底层依赖有更深入的理解。本文将手把手带你完成FFmpeg和OpenCV的源码编译&a…...

别再为MoveIt安装发愁了!Ubuntu 20.04 + ROS Noetic 保姆级配置全流程

别再为MoveIt安装发愁了!Ubuntu 20.04 ROS Noetic 保姆级配置全流程 刚接触ROS和机械臂控制时,MoveIt的安装过程就像一道难以逾越的门槛。记得我第一次尝试配置时,整整两天都卡在依赖报错和环境变量设置上。本文将带你用最稳妥的方式&#x…...