Flink流批一体计算(14):PyFlink Tabel API之SQL查询
举个例子
查询 source 表,同时执行计算
# 通过 Table API 创建一张表:
source_table = table_env.from_path("datagen")
# 或者通过 SQL 查询语句创建一张表:
source_table = table_env.sql_query("SELECT * FROM datagen")
result_table = source_table.select(source_table.id + 1, source_table.data)
Table API 查询
Table 对象有许多方法,可以用于进行关系操作。
这些方法返回新的 Table 对象,表示对输入 Table 应用关系操作之后的结果。
这些关系操作可以由多个方法调用组成,例如 table.group_by(...).select(...)。
Table API 文档描述了流和批处理上所有支持的 Table API 操作。
以下示例展示了一个简单的 Table API 聚合查询:
from pyflink.table import Environmentsettings, TableEnvironment
# 通过 batch table environment 来执行查询
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)],['name', 'country', 'revenue'])
# 计算所有来自法国客户的收入
revenue = orders \.select(orders.name, orders.country, orders.revenue) \.where(orders.country == 'FRANCE') \.group_by(orders.name) \.select(orders.name, orders.revenue.sum.alias('rev_sum'))
revenue.to_pandas()
Table API 也支持行操作的 API, 这些行操作包括 Map Operation, FlatMap Operation, Aggregate Operation 和 FlatAggregate Operation.
以下示例展示了一个简单的 Table API 基于行操作的查询
from pyflink.table import Environmentsettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.udf import udf
import pandas as pd# 通过 batch table environment 来执行查询
env_settings = Environmentsettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue'])
map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1),result_type=DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()),DataTypes.FIELD("revenue", DataTypes.BIGINT())]),func_type="pandas")
orders.map(map_function).alias('name', 'revenue').to_pandas()
SQL 查询
Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。SQL 查询语句使用字符串来表达。SQL 支持Flink 对流和批处理。
下面示例展示了一个简单的 SQL 聚合查询:
from pyflink.table import Environmentsettings, TableEnvironment# 通过 stream table environment 来执行查询env_settings = Environmentsettings.in_streaming_mode()table_env = TableEnvironment.create(env_settings)table_env.execute_sql("""CREATE TABLE random_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='8','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='11')""")table_env.execute_sql("""CREATE TABLE print_sink (id BIGINT,data_sum TINYINT) WITH ('connector' = 'print')""")table_env.execute_sql("""INSERT INTO print_sinkSELECT id, sum(data) as data_sum FROM(SELECT id / 2 as id, data FROM random_source)WHERE id > 1GROUP BY id""").wait()
Table API 和 SQL 的混合使用
Table API 中的 Table 对象和 SQL 中的 Table 可以自由地相互转换。
下面例子展示了如何在 SQL 中使用 Table 对象:
create_temporary_view(view_path, table) 将一个 `Table` 对象注册为一张临时表,类似于 SQL 的临时表。
# 创建一张 sink 表来接收结果数据
table_env.execute_sql("""CREATE TABLE table_sink (id BIGINT,data VARCHAR) WITH ('connector' = 'print')
""")
# 将 Table API 表转换成 SQL 中的视图
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view('table_api_table', table)
# 将 Table API 表的数据写入结果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()
下面例子展示了如何在 Table API 中使用 SQL 表:
sql_query(query) 执行一条 SQL 查询,并将查询的结果作为一个 `Table` 对象。
# 创建一张 SQL source 表
table_env.execute_sql("""CREATE TABLE sql_source (id BIGINT,data TINYINT) WITH ('connector' = 'datagen','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='4','fields.data.kind'='sequence','fields.data.start'='4','fields.data.end'='7')
""")# 将 SQL 表转换成 Table API 表
table = table_env.from_path("sql_source")
# 或者通过 SQL 查询语句创建表
table = table_env.sql_query("SELECT * FROM sql_source")
# 将表中的数据写出
table.to_pandas()
优化
数据倾斜
当数据发生倾斜(某一部分数据量特别大),虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。
- 需要重新设计key,以更小粒度的key使得task大小合理化。
- 修改并行度。
- 调用rebalance操作,使数据分区均匀。
缓冲区超时设置
由于task在执行过程中存在数据通过网络进行交换,数据在不同服务器之间传递的缓冲区超时时间可以通过setBufferTimeout进行设置。
当设置“setBufferTimeout(-1)”,会等待缓冲区满之后才会刷新,使其达到最大吞吐量;当设置“setBufferTimeout(0)”时,可以最小化延迟,数据一旦接收到就会刷新;当设置“setBufferTimeout”大于0时,缓冲区会在该时间之后超时,然后进行缓冲区的刷新。
示例可以参考如下:
env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
相关文章:
Flink流批一体计算(14):PyFlink Tabel API之SQL查询
举个例子 查询 source 表,同时执行计算 # 通过 Table API 创建一张表: source_table table_env.from_path("datagen") # 或者通过 SQL 查询语句创建一张表: source_table table_env.sql_query("SELECT * FROM datagen&quo…...
JRebel插件扩展-mac版
前言 上一篇分享了mac开发环境的搭建,但是欠了博友几个优化的债,今天先还一个,那就是idea里jRebel插件的扩展。 一、场景回眸 这个如果在win环境那扩展是分分钟,一个exe文件点点就行。现在在mac环境就没有这样的dmg可以执行的&…...
C语言中常见的一些语法概念和功能
常用代码: 程序入口:int main() 函数用于定义程序的入口点。 输出:使用 printf() 函数可以在控制台打印输出。 输入:使用 scanf() 函数可以接收用户的输入。 条件判断:使用 if-else 语句可以根据条件执行不同的代码…...
Python土力学与基础工程计算.PDF-钻探泥浆制备
Python 求解代码如下: 1. rho1 2.5 # 黏土密度,单位:t/m 2. rho2 1.0 # 泥浆密度,单位:t/m 3. rho3 1.0 # 水的密度,单位:t/m 4. V 1.0 # 泥浆容积,单位:…...
【机器学习】— 2 图神经网络GNN
一、说明 在本文中,我们探讨了图神经网络(GNN)在推荐系统中的潜力,强调了它们相对于传统矩阵完成方法的优势。GNN为利用图论来改进推荐系统提供了一个强大的框架。在本文中,我们将在推荐系统的背景下概述图论和图神经网…...
QT的布局与间隔器介绍
布局与间隔器 1、概述 QT中使用绝对定位的布局方式,无法适用窗口的变化,但是,也可以通过尺寸策略来进行 调整,使得 可以适用窗口变化。 布局管理器作用最主要用来在qt设计师中进行控件的排列,另外,布局管理…...
深入浅出Pytorch函数——torch.nn.Linear
分类目录:《深入浅出Pytorch函数》总目录 对输入数据做线性变换 y x A T b yxA^Tb yxATb 语法 torch.nn.Linear(in_features, out_features, biasTrue, deviceNone, dtypeNone)参数 in_features:[int] 每个输入样本的大小out_features :…...
Vue3.2+TS的defineExpose的应用
defineExpose通俗来讲,其实就是讲子组件的方法或者数据,暴露给父组件进行使用,这样对组件的封装使用,有很大的帮助,那么defineExpose应该如何使用,下面我来用一些实际的代码,带大家快速学会defi…...
牛客网Python入门103题练习|【08--元组】
⭐NP62 运动会双人项目 描述 牛客运动会上有一项双人项目,因为报名成功以后双人成员不允许被修改,因此请使用元组(tuple)进行记录。先输入两个人的名字,请输出他们报名成功以后的元组。 输入描述: 第一…...
Jenkins改造—nginx配置鉴权
先kill掉8082的端口进程 netstat -natp | grep 8082 kill 10256 1、下载nginx nginx安装 EPEL 仓库中有 Nginx 的安装包。如果你还没有安装过 EPEL,可以通过运行下面的命令来完成安装 sudo yum install epel-release 输入以下命令来安装 Nginx sudo yum inst…...
(二)VisionOS平台概述
2.VisionOS平台概述 1. VisionOS平台概述 Unity 对VisionOS的支持将 Unity 编辑器和运行时引擎的全部功能与RealityKit提供的渲染功能结合起来。Unity 的核心功能(包括脚本、物理、动画混合、AI、场景管理等)无需修改即可支持。这允许游戏和应用程序逻…...
菜单中的类似iOS中开关的样式
背景是我们有需求,做类似ios中开关的按钮。github上有一些开源项目,比如 SwitchButton, 但是这个项目中提供了很多选项,并且实际使用中会出现一些奇怪的问题。 我调整了下代码,把无关的功能都给删了,保留核…...
Vue 2 动态组件和异步组件
先阅读 【Vue 2 组件基础】中的初步了解动态组件。 动态组件与keep-alive 我们知道动态组件使用is属性和component标签结合来切换不同组件。 下面给出一个示例: <!DOCTYPE html> <html><head><title>Vue 动态组件</title><scri…...
MongoDB升级经历(4.0.23至5.0.19)
MongoDB从4.0.23至5.0.19升级经历 引子:为了解决MongoDB的两个漏洞决定把MongoDB升级至最新版本,期间也踩了不少坑,在这里分享出来供大家学习与避坑~ 1、MongoDB的两个漏洞 漏洞1:MongoDB Server 安全漏洞(CVE-2021-20330) 漏洞2…...
iPhone上的个人热点丢失了怎么办?如何修复iPhone上不见的个人热点?
个人热点功能可将我们的iPhone手机转变为 Wi-Fi 热点,有了Wi-Fi 热点后就可以与附近的其他设备共享其互联网连接。 一般情况下,个人热点打开就可以使用,但也有部分用户在升级系统或越狱后发现 iPhone 的个人热点消失了。 iPhone上的个人热点…...
AI 媒人:为什么图形神经网络比 MLP 更好?
一、说明 G拉夫神经网络(GNN)!想象他们是人工智能世界的媒人,通过探索他们的联系,不知疲倦地帮助数据点找到朋友和人气。数字派对上的终极僚机。 现在,为什么这些GNN如此重要,你问?好…...
信息学奥赛一本通 1984:【19CSPJ普及组】纪念品 | 洛谷 P5662 [CSP-J2019] 纪念品
【题目链接】 ybt 1984:【19CSPJ普及组】纪念品 洛谷 P5662 [CSP-J2019] 纪念品 【题目考点】 1. 动态规划:完全背包 【解题思路】 由于小伟每天都可以买卖物品无限次,我们可以假想每天开始时,他把所有的商品都卖出ÿ…...
JVM——JVM参数指南
文章目录 1.概述2.堆内存相关2.1.显式指定堆内存–Xms和-Xmx2.2.显式新生代内存(Young Ceneration)2.3.显示指定永久代/元空间的大小 3.垃圾收集相关3.1.垃圾回收器3.2.GC记录 1.概述 在本篇文章中,你将掌握最常用的 JVM 参数配置。如果对于下面提到了一些概念比如…...
马上七夕到了,用各种编程语言实现10种浪漫表白方式
目录 1. 直接表白:2. 七夕节表白:3. 猜心游戏:4. 浪漫诗句:5. 爱的方程式:6. 爱心Python:7. 心形图案JavaScript 代码:8. 心形并显示表白信息HTML 页面:9. Java七夕快乐:…...
Spring Clould 注册中心 - Eureka,Nacos
视频地址:微服务(SpringCloudRabbitMQDockerRedis搜索分布式) Eureka 微服务技术栈导学(P1、P2) 微服务涉及的的知识 认识微服务-服务架构演变(P3、P4) 总结: 认识微服务-微服务技…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具
文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...
跨链模式:多链互操作架构与性能扩展方案
跨链模式:多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈:模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展(H2Cross架构): 适配层…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
Java线上CPU飙高问题排查全指南
一、引言 在Java应用的线上运行环境中,CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时,通常会导致应用响应缓慢,甚至服务不可用,严重影响用户体验和业务运行。因此,掌握一套科学有效的CPU飙高问题排查方法&…...
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...
基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解
JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用,结合SQLite数据库实现联系人管理功能,并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能,同时可以最小化到系统…...
springboot整合VUE之在线教育管理系统简介
可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生,小白用户,想学习知识的 有点基础,想要通过项…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
FFmpeg:Windows系统小白安装及其使用
一、安装 1.访问官网 Download FFmpeg 2.点击版本目录 3.选择版本点击安装 注意这里选择的是【release buids】,注意左上角标题 例如我安装在目录 F:\FFmpeg 4.解压 5.添加环境变量 把你解压后的bin目录(即exe所在文件夹)加入系统变量…...
