Flink流批一体计算(14):PyFlink Tabel API之SQL查询
举个例子
查询 source 表,同时执行计算
# 通过 Table API 创建一张表:
source_table = table_env.from_path("datagen")
# 或者通过 SQL 查询语句创建一张表:
source_table = table_env.sql_query("SELECT * FROM datagen")
result_table = source_table.select(source_table.id + 1, source_table.data)
Table API 查询
Table 对象有许多方法,可以用于进行关系操作。
这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。
这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。
Table API 文档描述了流和批处理上所有支持的 Table API 操作。
以下示例展示了一个简单的 Table API 聚合查询:
from pyflink.table import Environmentsettings, TableEnvironment
# 通过 batch table environment 来执行查询
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])
# 计算所有来自法国客户的收入
revenue = orders \.select(orders.name, orders.country, orders.revenue) \.where(orders.country == 'FRANCE') \.group_by(orders.name) \.select(orders.name, orders.revenue.sum.alias('rev_sum'))
revenue.to_pandas()
Table API 也支持行操作的 API, 这些行操作包括 Map Operation, FlatMap Operation, Aggregate Operation 和 FlatAggregate Operation.
以下示例展示了一个简单的 Table API 基于行操作的查询
from pyflink.table import Environmentsettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd# 通过 batch table environment 来执行查询
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue'])
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),result_type=DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.BIGINT())]),func_type="pandas")
orders.map(map_function).alias('name', 'revenue').to_pandas()
SQL 查询
Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。SQL 查询语句使用字符串来表达。SQL 支持Flink 对流和批处理。
下面示例展示了一个简单的 SQL 聚合查询:
from pyflink.table import Environmentsettings, TableEnvironment# 通过 stream table environment 来执行查询env_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='8','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='11')""")table_env.execute_sql("""CREATE TABLE print_sink (id BIGINT,data_sum TINYINT) WITH ('connector' = 'print')""")table_env.execute_sql("""INSERT INTO print_sinkSELECT id, sum(data) as data_sum FROM(SELECT id / 2 as id, data FROM random_source)WHERE id > 1GROUP BY id""").wait()
Table API 和 SQL 的混合使用
Table API 中的 Table 对象和 SQL 中的 Table 可以自由地相互转换。
下面例子展示了如何在 SQL 中使用 Table 对象:
create_temporary_view(view_path, table) 将一个 `Table` 对象注册为一张临时表,类似于 SQL 的临时表。
# 创建一张 sink 表来接收结果数据
table_env.execute_sql("""CREATE TABLE table_sink (id BIGINT,data VARCHAR) WITH ('connector' = 'print')
""")
# 将 Table API 表转换成 SQL 中的视图
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)
# 将 Table API 表的数据写入结果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()
下面例子展示了如何在 Table API 中使用 SQL 表:
sql_query(query) 执行一条 SQL 查询,并将查询的结果作为一个 `Table` 对象。
# 创建一张 SQL source 表
table_env.execute_sql("""CREATE TABLE sql_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='4','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='7')
""")# 将 SQL 表转换成 Table API 表
table = table_env.from_path("sql_source")
# 或者通过 SQL 查询语句创建表
table = table_env.sql_query("SELECT * FROM sql_source")
# 将表中的数据写出
table.to_pandas()
优化
数据倾斜
当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。
- 需要重新设计key,以更小粒度的key使得task大小合理化。
- 修改并行度。
- 调用rebalance操作,使数据分区均匀。
缓冲区超时设置
由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。
示例可以参考如下:
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
相关文章:
Flink流批一体计算(14):PyFlink Tabel API之SQL查询
举个例子 查询 source 表,同时执行计算 # 通过 Table API 创建一张表: source_table table_env.from_path("datagen") # 或者通过 SQL 查询语句创建一张表: source_table table_env.sql_query("SELECT * FROM datagen&quo…...
JRebel插件扩展-mac版
前言 上一篇分享了mac开发环境的搭建,但是欠了博友几个优化的债,今天先还一个,那就是idea里jRebel插件的扩展。 一、场景回眸 这个如果在win环境那扩展是分分钟,一个exe文件点点就行。现在在mac环境就没有这样的dmg可以执行的&…...
C语言中常见的一些语法概念和功能
常用代码: 程序入口:int main() 函数用于定义程序的入口点。 输出:使用 printf() 函数可以在控制台打印输出。 输入:使用 scanf() 函数可以接收用户的输入。 条件判断:使用 if-else 语句可以根据条件执行不同的代码…...
Python土力学与基础工程计算.PDF-钻探泥浆制备
Python 求解代码如下: 1. rho1 2.5 # 黏土密度,单位:t/m 2. rho2 1.0 # 泥浆密度,单位:t/m 3. rho3 1.0 # 水的密度,单位:t/m 4. V 1.0 # 泥浆容积,单位:…...
【机器学习】— 2 图神经网络GNN
一、说明 在本文中,我们探讨了图神经网络(GNN)在推荐系统中的潜力,强调了它们相对于传统矩阵完成方法的优势。GNN为利用图论来改进推荐系统提供了一个强大的框架。在本文中,我们将在推荐系统的背景下概述图论和图神经网…...
QT的布局与间隔器介绍
布局与间隔器 1、概述 QT中使用绝对定位的布局方式,无法适用窗口的变化,但是,也可以通过尺寸策略来进行 调整,使得 可以适用窗口变化。 布局管理器作用最主要用来在qt设计师中进行控件的排列,另外,布局管理…...
深入浅出Pytorch函数——torch.nn.Linear
分类目录:《深入浅出Pytorch函数》总目录 对输入数据做线性变换 y x A T b yxA^Tb yxATb 语法 torch.nn.Linear(in_features, out_features, biasTrue, deviceNone, dtypeNone)参数 in_features:[int] 每个输入样本的大小out_features :…...
Vue3.2+TS的defineExpose的应用
defineExpose通俗来讲,其实就是讲子组件的方法或者数据,暴露给父组件进行使用,这样对组件的封装使用,有很大的帮助,那么defineExpose应该如何使用,下面我来用一些实际的代码,带大家快速学会defi…...
牛客网Python入门103题练习|【08--元组】
⭐NP62 运动会双人项目 描述 牛客运动会上有一项双人项目,因为报名成功以后双人成员不允许被修改,因此请使用元组(tuple)进行记录。先输入两个人的名字,请输出他们报名成功以后的元组。 输入描述: 第一…...
Jenkins改造—nginx配置鉴权
先kill掉8082的端口进程 netstat -natp | grep 8082 kill 10256 1、下载nginx nginx安装 EPEL 仓库中有 Nginx 的安装包。如果你还没有安装过 EPEL,可以通过运行下面的命令来完成安装 sudo yum install epel-release 输入以下命令来安装 Nginx sudo yum inst…...
(二)VisionOS平台概述
2.VisionOS平台概述 1. VisionOS平台概述 Unity 对VisionOS的支持将 Unity 编辑器和运行时引擎的全部功能与RealityKit提供的渲染功能结合起来。Unity 的核心功能(包括脚本、物理、动画混合、AI、场景管理等)无需修改即可支持。这允许游戏和应用程序逻…...
菜单中的类似iOS中开关的样式
背景是我们有需求,做类似ios中开关的按钮。github上有一些开源项目,比如 SwitchButton, 但是这个项目中提供了很多选项,并且实际使用中会出现一些奇怪的问题。 我调整了下代码,把无关的功能都给删了,保留核…...
Vue 2 动态组件和异步组件
先阅读 【Vue 2 组件基础】中的初步了解动态组件。 动态组件与keep-alive 我们知道动态组件使用is属性和component标签结合来切换不同组件。 下面给出一个示例: <!DOCTYPE html> <html><head><title>Vue 动态组件</title><scri…...
MongoDB升级经历(4.0.23至5.0.19)
MongoDB从4.0.23至5.0.19升级经历 引子:为了解决MongoDB的两个漏洞决定把MongoDB升级至最新版本,期间也踩了不少坑,在这里分享出来供大家学习与避坑~ 1、MongoDB的两个漏洞 漏洞1:MongoDB Server 安全漏洞(CVE-2021-20330) 漏洞2…...
iPhone上的个人热点丢失了怎么办?如何修复iPhone上不见的个人热点?
个人热点功能可将我们的iPhone手机转变为 Wi-Fi 热点,有了Wi-Fi 热点后就可以与附近的其他设备共享其互联网连接。 一般情况下,个人热点打开就可以使用,但也有部分用户在升级系统或越狱后发现 iPhone 的个人热点消失了。 iPhone上的个人热点…...
AI 媒人:为什么图形神经网络比 MLP 更好?
一、说明 G拉夫神经网络(GNN)!想象他们是人工智能世界的媒人,通过探索他们的联系,不知疲倦地帮助数据点找到朋友和人气。数字派对上的终极僚机。 现在,为什么这些GNN如此重要,你问?好…...
信息学奥赛一本通 1984:【19CSPJ普及组】纪念品 | 洛谷 P5662 [CSP-J2019] 纪念品
【题目链接】 ybt 1984:【19CSPJ普及组】纪念品 洛谷 P5662 [CSP-J2019] 纪念品 【题目考点】 1. 动态规划:完全背包 【解题思路】 由于小伟每天都可以买卖物品无限次,我们可以假想每天开始时,他把所有的商品都卖出ÿ…...
JVM——JVM参数指南
文章目录 1.概述2.堆内存相关2.1.显式指定堆内存–Xms和-Xmx2.2.显式新生代内存(Young Ceneration)2.3.显示指定永久代/元空间的大小 3.垃圾收集相关3.1.垃圾回收器3.2.GC记录 1.概述 在本篇文章中,你将掌握最常用的 JVM 参数配置。如果对于下面提到了一些概念比如…...
马上七夕到了,用各种编程语言实现10种浪漫表白方式
目录 1. 直接表白:2. 七夕节表白:3. 猜心游戏:4. 浪漫诗句:5. 爱的方程式:6. 爱心Python:7. 心形图案JavaScript 代码:8. 心形并显示表白信息HTML 页面:9. Java七夕快乐:…...
Spring Clould 注册中心 - Eureka,Nacos
视频地址:微服务(SpringCloudRabbitMQDockerRedis搜索分布式) Eureka 微服务技术栈导学(P1、P2) 微服务涉及的的知识 认识微服务-服务架构演变(P3、P4) 总结: 认识微服务-微服务技…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
3.3.1_1 检错编码(奇偶校验码)
从这节课开始,我们会探讨数据链路层的差错控制功能,差错控制功能的主要目标是要发现并且解决一个帧内部的位错误,我们需要使用特殊的编码技术去发现帧内部的位错误,当我们发现位错误之后,通常来说有两种解决方案。第一…...
IGP(Interior Gateway Protocol,内部网关协议)
IGP(Interior Gateway Protocol,内部网关协议) 是一种用于在一个自治系统(AS)内部传递路由信息的路由协议,主要用于在一个组织或机构的内部网络中决定数据包的最佳路径。与用于自治系统之间通信的 EGP&…...
质量体系的重要
质量体系是为确保产品、服务或过程质量满足规定要求,由相互关联的要素构成的有机整体。其核心内容可归纳为以下五个方面: 🏛️ 一、组织架构与职责 质量体系明确组织内各部门、岗位的职责与权限,形成层级清晰的管理网络…...
CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...
大模型多显卡多服务器并行计算方法与实践指南
一、分布式训练概述 大规模语言模型的训练通常需要分布式计算技术,以解决单机资源不足的问题。分布式训练主要分为两种模式: 数据并行:将数据分片到不同设备,每个设备拥有完整的模型副本 模型并行:将模型分割到不同设备,每个设备处理部分模型计算 现代大模型训练通常结合…...
Map相关知识
数据结构 二叉树 二叉树,顾名思义,每个节点最多有两个“叉”,也就是两个子节点,分别是左子 节点和右子节点。不过,二叉树并不要求每个节点都有两个子节点,有的节点只 有左子节点,有的节点只有…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...
spring Security对RBAC及其ABAC的支持使用
RBAC (基于角色的访问控制) RBAC (Role-Based Access Control) 是 Spring Security 中最常用的权限模型,它将权限分配给角色,再将角色分配给用户。 RBAC 核心实现 1. 数据库设计 users roles permissions ------- ------…...
前端高频面试题2:浏览器/计算机网络
本专栏相关链接 前端高频面试题1:HTML/CSS 前端高频面试题2:浏览器/计算机网络 前端高频面试题3:JavaScript 1.什么是强缓存、协商缓存? 强缓存: 当浏览器请求资源时,首先检查本地缓存是否命中。如果命…...
