详解flink sql, calcite logical转flink logical
文章目录
- 背景
- 示例
- FlinkLogicalCalcConverter
- BatchPhysicalCalcRule
- StreamPhysicalCalcRule
- 其它算子
- FlinkLogicalAggregate
- FlinkLogicalCorrelate
- FlinkLogicalDataStreamTableScan
- FlinkLogicalDistribution
- FlinkLogicalExpand
- FlinkLogicalIntermediateTableScan
- FlinkLogicalIntersect
- FlinkLogicalJoin
- FlinkLogicalLegacySink
- FlinkLogicalLegacyTableSourceScan
- FlinkLogicalMatch
- FlinkLogicalMinus
- FlinkLogicalOverAggregate
- FlinkLogicalRank
- FlinkLogicalSink
- FlinkLogicalSnapshot
- FlinkLogicalSort
- FlinkLogicalUnion
- FlinkLogicalValues
背景
本文主要介绍calcite 如何转成自定义的relnode

示例
FlinkLogicalCalcConverter
检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc
private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {override def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[LogicalCalc]val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)FlinkLogicalCalc.create(newInput, calc.getProgram)}
}
BatchPhysicalCalcRule
检查是不是FlinkLogicalCalc 的relnode
class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[FlinkLogicalCalc]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)}
}
StreamPhysicalCalcRule
检查是不是FlinkLogicalCalc 的relnode
class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)}
}
其它算子
介绍下算子的匹配条件
FlinkLogicalAggregate
对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用并且支持聚合函数,则返回true
override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleval supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {// we support AVGcase SqlKind.AVG => true// but none of the other AVG agg functionscase k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => falsecase _ => true}val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)!hasAccurateDistinctCall && supported}
FlinkLogicalAggregateStreamConverter
SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换
override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleagg.getAggCallList.map(_.getAggregation.getKind).forall {case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => falsecase _ => true}}
FlinkLogicalCorrelate
对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
检查relnode 是不是LogicalCorrelate,重写relnode
默认的onMatch 函数
FlinkLogicalDataStreamTableScan
对应的SQL语义是,检查数据源是不是流式的
检查relnode 是不是LogicalCorrelate,重写relnode
override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])dataStreamTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)}
FlinkLogicalDistribution
描述数据是不是打散的
override def convert(rel: RelNode): RelNode = {val distribution = rel.asInstanceOf[LogicalDistribution]val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)}
FlinkLogicalExpand
支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符
override def convert(rel: RelNode): RelNode = {val expand = rel.asInstanceOf[LogicalExpand]val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)}
FlinkLogicalIntermediateTableScan
FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作
override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])intermediateTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)}
FlinkLogicalIntersect
用于表示 SQL 中 INTERSECT 操作的逻辑运算符
override def convert(rel: RelNode): RelNode = {val intersect = rel.asInstanceOf[LogicalIntersect]val newInputs = intersect.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalIntersect.create(newInputs, intersect.all)}
FlinkLogicalJoin
用于表示 SQL 中 JOIN 操作的逻辑运算符
override def convert(rel: RelNode): RelNode = {val join = rel.asInstanceOf[LogicalJoin]val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)}
FlinkLogicalLegacySink
写数据到传统的数据源
override def convert(rel: RelNode): RelNode = {val sink = rel.asInstanceOf[LogicalLegacySink]val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)FlinkLogicalLegacySink.create(newInput,sink.hints,sink.sink,sink.sinkName,sink.catalogTable,sink.staticPartitions)}
FlinkLogicalLegacyTableSourceScan
读传统的数据源
override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)isTableSourceScan(scan)}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)}
FlinkLogicalMatch
MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。
override def convert(rel: RelNode): RelNode = {val logicalMatch = rel.asInstanceOf[LogicalMatch]val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)new FlinkLogicalMatch(rel.getCluster,traitSet,newInput,logicalMatch.getRowType,logicalMatch.getPattern,logicalMatch.isStrictStart,logicalMatch.isStrictEnd,logicalMatch.getPatternDefinitions,logicalMatch.getMeasures,logicalMatch.getAfter,logicalMatch.getSubsets,logicalMatch.isAllRows,logicalMatch.getPartitionKeys,logicalMatch.getOrderKeys,logicalMatch.getInterval)}
FlinkLogicalMinus
用于表示 SQL 中 minus 操作的逻辑运算符
override def convert(rel: RelNode): RelNode = {val minus = rel.asInstanceOf[LogicalMinus]val newInputs = minus.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalMinus.create(newInputs, minus.all)}
FlinkLogicalOverAggregate
用于表示 SQL 中 窗口函数操作的逻辑运算符
FlinkLogicalRank
SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名
override def convert(rel: RelNode): RelNode = {val rank = rel.asInstanceOf[LogicalRank]val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)FlinkLogicalRank.create(newInput,rank.partitionKey,rank.orderKey,rank.rankType,rank.rankRange,rank.rankNumberType,rank.outputRankNumber)}
FlinkLogicalSink
表示SQL里的写
FlinkLogicalSnapshot
SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照
def convert(rel: RelNode): RelNode = {val snapshot = rel.asInstanceOf[LogicalSnapshot]val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)snapshot.getPeriod match {case _: RexFieldAccess =>FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)case _: RexLiteral =>newInput}}
FlinkLogicalSort
表示SQL里的排序
FlinkLogicalUnion
表示SQL里的union 操作
override def matches(call: RelOptRuleCall): Boolean = {val union: LogicalUnion = call.rel(0)union.all}override def convert(rel: RelNode): RelNode = {val union = rel.asInstanceOf[LogicalUnion]val newInputs = union.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalUnion.create(newInputs, union.all)}
FlinkLogicalValues
SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。
相关文章:
详解flink sql, calcite logical转flink logical
文章目录 背景示例FlinkLogicalCalcConverterBatchPhysicalCalcRuleStreamPhysicalCalcRule其它算子FlinkLogicalAggregateFlinkLogicalCorrelateFlinkLogicalDataStreamTableScanFlinkLogicalDistributionFlinkLogicalExpandFlinkLogicalIntermediateTableScanFlinkLogicalInt…...
PostgreSQL的系统视图pg_statio_all_indexes
PostgreSQL的系统视图pg_statio_all_indexes 在 PostgreSQL 数据库中,pg_statio_all_indexes 视图提供了有关所有索引的 I/O 活动的统计信息。这些统计信息对于了解索引的使用情况和性能调优非常有帮助。 pg_statio_all_indexes 视图的结构 以下是 pg_statio_all…...
【C++ Primer Plus学习记录】函数和C-风格字符串
将字符串作为参数时意味着传递的是地址,但可以使用const来禁止对字符串参数进行修改。 假设要将字符串作为参数传递给函数,则表示字符串的方式有三种: (1)char数组 (2)用引号括起来的字符串常…...
力扣双指针算法题目:移动零
1.题目 . - 力扣(LeetCode) 2.思路解析 这个题目的思路和“使用递归排序快速排序解决数组的排序问题”相同 class solution { public:void QuickSort(vector<int>& nums, int left, int right){if (left > right) return;int key left…...
day60---面试专题(微服务面试题-参考回答)
微服务面试题 **面试官:**Spring Cloud 5大组件有哪些? 候选人: 早期我们一般认为的Spring Cloud五大组件是 Eureka : 注册中心Ribbon : 负载均衡Feign : 远程调用Hystrix : 服务熔断Zuul/Gateway : 网关 随着SpringCloudAlibba在国内兴起 , …...
laravel+phpoffice+easyexcel实现导入
资源包下载地址 https://download.csdn.net/download/QiZong__BK/89503486 easy-excel下载: "dcat/easy-excel": "^1.0", 命令行: composer require dcat/easy-excel 前端代码 <!doctype html> <html lang"en&…...
Spring Boot集成多数据源的最佳实践
Spring Boot集成多数据源的最佳实践 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 为什么需要多数据源? 在实际的应用开发中,有时候…...
Java项目:基于SSM框架实现的班主任助理管理系统【ssm+B/S架构+源码+数据库+开题报告+毕业论文】
一、项目简介 本项目是一套基于SSM框架实现的班主任助理管理系统 包含:项目源码、数据库脚本等,该项目附带全部源码可作为毕设使用。 项目都经过严格调试,eclipse或者idea 确保可以运行! 该系统功能完善、界面美观、操作简单、功…...
数据在内存中的存储方式
🌟🌟作者主页:ephemerals__ 🌟🌟所属专栏:C语言 目录 前言 一、整数的存储 二、大小端字节序及其判断 1.什么是大小端 2.为什么有大小端 3.用c语言编写程序判断大小端 三、浮点数的存储 1.浮点数…...
Selenium 监视数据收发
实际上,在我提供的示例中,确实使用了浏览器实例。webdriver.Chrome()这行代码正是创建了一个Chrome浏览器的WebDriver实例。Selenium Wire扩展了标准的Selenium WebDriver,允许你通过这个浏览器实例来监听网络请求。 当你运行类似这样的代码…...
基于 STM32 的智能睡眠呼吸监测系统设计
本设计的硬件构成: STM32F103C8T6单片机最小系统板(包含3.3V稳压电路时钟晶振电路复位电路(上电自复位,手动复位)),心率传感器、气压传感器、液晶显示、按键、蜂鸣器、LED灯、蓝牙模块组合而成…...
Spring的事务管理、AOP实现底层
目录 spring的事务管理是如何实现的? Spring的AOP的底层实现原理 spring的事务管理是如何实现的? 首先,spring的事务是由aop来实现的,首先要生成具体的代理对象,然后按照aop的整套流程来执行具体的操作逻辑ÿ…...
基于SpringBoot的篮球竞赛预约平台
你好,我是计算机学姐码农小野!如果你对篮球竞赛预约平台感兴趣或有相关需求,欢迎私信联系我。 开发语言: Java 数据库: MySQL 技术: SpringBootMySql 工具: MyEclipse、Tomcat 系统展示…...
学生用小台灯什么牌子的好?列举出几款学生用台灯推荐
眼睛是我们感知世界的窗口,但近年来,儿童青少年的视力健康却受到了严重困扰。数据显示,近视问题在儿童群体中呈现出明显的增长趋势,这给他们的学习和生活带来了诸多不便。虽然现代科技的快速发展使得电子产品成为了我们生活中不可…...
软件测试面试题:项目中的MQ是如何测试的?
通常,咱们会从两个方面来考虑:正常情况和异常情况。 首先,咱们得确保消息队列在正常工作时结果正确。比如,消息发送出去的时候,所有的字段都得齐全,接收方收到的消息也得一样。咱们得确保系统能够正确无误…...
Python爬取国家医保平台公开数据
国家医保服务平台数据爬取python爬虫数据爬取医疗公开数据 定点医疗机构查询定点零售药店查询医保机构查询药品分类与代码查询 等等,数据都能爬 接口地址:/ebus/fuwu/api/nthl/api/CommQuery/queryFixedHospital 签名参数:signData {dat…...
B站大课堂-自动化精品视频(个人存档)
基础知识 工业通信协议 Modbus 施耐德研发,有基于以太网的 ModbusTCP 协议和使用 485/232 串口通信的 ModbusRTU/ASCII。 Modbus 协议面世较早、协议简洁高效、商用免费、功能灵活、实现简单,是目前应用最广泛的现场总线协议。 我的笔记里边有一些推荐…...
C++_STL---priority_queue
priority_queue的相关介绍 优先级队列是一种容器适配器,根据严格的排序标准,它的第一个元素总是它所包含的元素中最大(小)的。该容器适配器类似于堆,在堆中可以随时插入元素,并且可以检索最大(小)堆元素(优先级队列中位于顶部的元…...
可移动天线辅助宽带通信的性能分析和优化
可移动天线辅助宽带通信的性能分析和优化 可移动天线 (MA) 已成为一种很有前景的技术,通过在发射器 (Tx) 和/或接收器 (Rx) 处实现天线的本地移动来实现更有利的信道条件,从而增强无线通信性能。 由于现有的MA辅助无线通信研究主要考虑平坦衰落信道中的…...
h5兼容table ,如何实现h5在app内使用h5渲染table表格而且实现横屏预览?
压图地址 横屏div 通过css 实现 transform: rotate(90deg); transformOrigin: 50vw 50vw ; height: 100vw; width: 100vh;<divclass"popup-box":style"{transform: originSet 0 ? rotate(90deg) : ,transformOrigin: originSet 0 ? 50vw 50vw : ,height…...
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以?
Golang 面试经典题:map 的 key 可以是什么类型?哪些不可以? 在 Golang 的面试中,map 类型的使用是一个常见的考点,其中对 key 类型的合法性 是一道常被提及的基础却很容易被忽视的问题。本文将带你深入理解 Golang 中…...
解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...
对WWDC 2025 Keynote 内容的预测
借助我们以往对苹果公司发展路径的深入研究经验,以及大语言模型的分析能力,我们系统梳理了多年来苹果 WWDC 主题演讲的规律。在 WWDC 2025 即将揭幕之际,我们让 ChatGPT 对今年的 Keynote 内容进行了一个初步预测,聊作存档。等到明…...
Spring Boot面试题精选汇总
🤟致敬读者 🟩感谢阅读🟦笑口常开🟪生日快乐⬛早点睡觉 📘博主相关 🟧博主信息🟨博客首页🟫专栏推荐🟥活动信息 文章目录 Spring Boot面试题精选汇总⚙️ **一、核心概…...
基于matlab策略迭代和值迭代法的动态规划
经典的基于策略迭代和值迭代法的动态规划matlab代码,实现机器人的最优运输 Dynamic-Programming-master/Environment.pdf , 104724 Dynamic-Programming-master/README.md , 506 Dynamic-Programming-master/generalizedPolicyIteration.m , 1970 Dynamic-Programm…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数
高效线程安全的单例模式:Python 中的懒加载与自定义初始化参数 在软件开发中,单例模式(Singleton Pattern)是一种常见的设计模式,确保一个类仅有一个实例,并提供一个全局访问点。在多线程环境下,实现单例模式时需要注意线程安全问题,以防止多个线程同时创建实例,导致…...
回溯算法学习
一、电话号码的字母组合 import java.util.ArrayList; import java.util.List;import javax.management.loading.PrivateClassLoader;public class letterCombinations {private static final String[] KEYPAD {"", //0"", //1"abc", //2"…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
音视频——I2S 协议详解
I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议,专门用于在数字音频设备之间传输数字音频数据。它由飞利浦(Philips)公司开发,以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...
