当前位置: 首页 > news >正文

详解flink sql, calcite logical转flink logical

文章目录

    • 背景
    • 示例
    • FlinkLogicalCalcConverter
    • BatchPhysicalCalcRule
    • StreamPhysicalCalcRule
    • 其它算子
      • FlinkLogicalAggregate
      • FlinkLogicalCorrelate
      • FlinkLogicalDataStreamTableScan
      • FlinkLogicalDistribution
      • FlinkLogicalExpand
      • FlinkLogicalIntermediateTableScan
      • FlinkLogicalIntersect
      • FlinkLogicalJoin
      • FlinkLogicalLegacySink
      • FlinkLogicalLegacyTableSourceScan
      • FlinkLogicalMatch
      • FlinkLogicalMinus
      • FlinkLogicalOverAggregate
      • FlinkLogicalRank
      • FlinkLogicalSink
      • FlinkLogicalSnapshot
      • FlinkLogicalSort
      • FlinkLogicalUnion
      • FlinkLogicalValues

背景

本文主要介绍calcite 如何转成自定义的relnode

在这里插入图片描述

示例

FlinkLogicalCalcConverter

检查是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc

private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {override def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[LogicalCalc]val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)FlinkLogicalCalc.create(newInput, calc.getProgram)}
}

BatchPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc = rel.asInstanceOf[FlinkLogicalCalc]val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)}
}

StreamPhysicalCalcRule

检查是不是FlinkLogicalCalc 的relnode

class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {override def matches(call: RelOptRuleCall): Boolean = {val calc: FlinkLogicalCalc = call.rel(0)val program = calc.getProgram!program.getExprList.asScala.exists(containsPythonCall(_))}def convert(rel: RelNode): RelNode = {val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)}
}

其它算子

介绍下算子的匹配条件

FlinkLogicalAggregate

对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用并且支持聚合函数,则返回true

override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleval supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {// we support AVGcase SqlKind.AVG => true// but none of the other AVG agg functionscase k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => falsecase _ => true}val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)!hasAccurateDistinctCall && supported}

FlinkLogicalAggregateStreamConverter

SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换

override def matches(call: RelOptRuleCall): Boolean = {val agg = call.rel(0).asInstanceOf[LogicalAggregate]// we do not support these functions natively// they have to be converted using the FlinkAggregateReduceFunctionsRuleagg.getAggCallList.map(_.getAggregation.getKind).forall {case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => falsecase _ => true}}

FlinkLogicalCorrelate

对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的连接操作
检查relnode 是不是LogicalCorrelate,重写relnode

默认的onMatch 函数

FlinkLogicalDataStreamTableScan

对应的SQL语义是,检查数据源是不是流式的
检查relnode 是不是LogicalCorrelate,重写relnode

  override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])dataStreamTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)}

FlinkLogicalDistribution

描述数据是不是打散的

  override def convert(rel: RelNode): RelNode = {val distribution = rel.asInstanceOf[LogicalDistribution]val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)}

FlinkLogicalExpand

支持复杂聚合操作(如 ROLLUP 和 CUBE)的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val expand = rel.asInstanceOf[LogicalExpand]val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)}

FlinkLogicalIntermediateTableScan

FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑操作

override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])intermediateTable != null}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)}

FlinkLogicalIntersect

用于表示 SQL 中 INTERSECT 操作的逻辑运算符

override def convert(rel: RelNode): RelNode = {val intersect = rel.asInstanceOf[LogicalIntersect]val newInputs = intersect.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalIntersect.create(newInputs, intersect.all)}

FlinkLogicalJoin

用于表示 SQL 中 JOIN 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val join = rel.asInstanceOf[LogicalJoin]val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)}

FlinkLogicalLegacySink

写数据到传统的数据源

override def convert(rel: RelNode): RelNode = {val sink = rel.asInstanceOf[LogicalLegacySink]val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)FlinkLogicalLegacySink.create(newInput,sink.hints,sink.sink,sink.sinkName,sink.catalogTable,sink.staticPartitions)}

FlinkLogicalLegacyTableSourceScan

读传统的数据源

override def matches(call: RelOptRuleCall): Boolean = {val scan: TableScan = call.rel(0)isTableSourceScan(scan)}def convert(rel: RelNode): RelNode = {val scan = rel.asInstanceOf[TableScan]val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)}

FlinkLogicalMatch

MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句允许用户在流数据中进行复杂的事件模式匹配,这对于实时数据处理和复杂事件处理(CEP)非常有用。

override def convert(rel: RelNode): RelNode = {val logicalMatch = rel.asInstanceOf[LogicalMatch]val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)new FlinkLogicalMatch(rel.getCluster,traitSet,newInput,logicalMatch.getRowType,logicalMatch.getPattern,logicalMatch.isStrictStart,logicalMatch.isStrictEnd,logicalMatch.getPatternDefinitions,logicalMatch.getMeasures,logicalMatch.getAfter,logicalMatch.getSubsets,logicalMatch.isAllRows,logicalMatch.getPartitionKeys,logicalMatch.getOrderKeys,logicalMatch.getInterval)}

FlinkLogicalMinus

用于表示 SQL 中 minus 操作的逻辑运算符

 override def convert(rel: RelNode): RelNode = {val minus = rel.asInstanceOf[LogicalMinus]val newInputs = minus.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalMinus.create(newInputs, minus.all)}

FlinkLogicalOverAggregate

用于表示 SQL 中 窗口函数操作的逻辑运算符

FlinkLogicalRank

SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名

override def convert(rel: RelNode): RelNode = {val rank = rel.asInstanceOf[LogicalRank]val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)FlinkLogicalRank.create(newInput,rank.partitionKey,rank.orderKey,rank.rankType,rank.rankRange,rank.rankNumberType,rank.outputRankNumber)}

FlinkLogicalSink

表示SQL里的写

FlinkLogicalSnapshot

SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照操作,从而在处理数据时可以引用特定时间点的数据快照

def convert(rel: RelNode): RelNode = {val snapshot = rel.asInstanceOf[LogicalSnapshot]val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)snapshot.getPeriod match {case _: RexFieldAccess =>FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)case _: RexLiteral =>newInput}}

FlinkLogicalSort

表示SQL里的排序

FlinkLogicalUnion

表示SQL里的union 操作

 override def matches(call: RelOptRuleCall): Boolean = {val union: LogicalUnion = call.rel(0)union.all}override def convert(rel: RelNode): RelNode = {val union = rel.asInstanceOf[LogicalUnion]val newInputs = union.getInputs.map {input => RelOptRule.convert(input, FlinkConventions.LOGICAL)}FlinkLogicalUnion.create(newInputs, union.all)}

FlinkLogicalValues

SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式允许在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。

相关文章:

详解flink sql, calcite logical转flink logical

文章目录 背景示例FlinkLogicalCalcConverterBatchPhysicalCalcRuleStreamPhysicalCalcRule其它算子FlinkLogicalAggregateFlinkLogicalCorrelateFlinkLogicalDataStreamTableScanFlinkLogicalDistributionFlinkLogicalExpandFlinkLogicalIntermediateTableScanFlinkLogicalInt…...

PostgreSQL的系统视图pg_statio_all_indexes

PostgreSQL的系统视图pg_statio_all_indexes 在 PostgreSQL 数据库中,pg_statio_all_indexes 视图提供了有关所有索引的 I/O 活动的统计信息。这些统计信息对于了解索引的使用情况和性能调优非常有帮助。 pg_statio_all_indexes 视图的结构 以下是 pg_statio_all…...

【C++ Primer Plus学习记录】函数和C-风格字符串

将字符串作为参数时意味着传递的是地址,但可以使用const来禁止对字符串参数进行修改。 假设要将字符串作为参数传递给函数,则表示字符串的方式有三种: (1)char数组 (2)用引号括起来的字符串常…...

力扣双指针算法题目:移动零

1.题目 . - 力扣&#xff08;LeetCode&#xff09; 2.思路解析 这个题目的思路和“使用递归排序快速排序解决数组的排序问题”相同 class solution { public:void QuickSort(vector<int>& nums, int left, int right){if (left > right) return;int key left…...

day60---面试专题(微服务面试题-参考回答)

微服务面试题 **面试官&#xff1a;**Spring Cloud 5大组件有哪些&#xff1f; 候选人&#xff1a; 早期我们一般认为的Spring Cloud五大组件是 Eureka : 注册中心Ribbon : 负载均衡Feign : 远程调用Hystrix : 服务熔断Zuul/Gateway : 网关 随着SpringCloudAlibba在国内兴起 , …...

laravel+phpoffice+easyexcel实现导入

资源包下载地址 https://download.csdn.net/download/QiZong__BK/89503486 easy-excel下载&#xff1a; "dcat/easy-excel": "^1.0", 命令行&#xff1a; composer require dcat/easy-excel 前端代码 <!doctype html> <html lang"en&…...

Spring Boot集成多数据源的最佳实践

Spring Boot集成多数据源的最佳实践 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01; 为什么需要多数据源&#xff1f; 在实际的应用开发中&#xff0c;有时候…...

Java项目:基于SSM框架实现的班主任助理管理系统【ssm+B/S架构+源码+数据库+开题报告+毕业论文】

一、项目简介 本项目是一套基于SSM框架实现的班主任助理管理系统 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观、操作简单、功…...

数据在内存中的存储方式

&#x1f31f;&#x1f31f;作者主页&#xff1a;ephemerals__ &#x1f31f;&#x1f31f;所属专栏&#xff1a;C语言 目录 前言 一、整数的存储 二、大小端字节序及其判断 1.什么是大小端 2.为什么有大小端 3.用c语言编写程序判断大小端 三、浮点数的存储 1.浮点数…...

Selenium 监视数据收发

实际上&#xff0c;在我提供的示例中&#xff0c;确实使用了浏览器实例。webdriver.Chrome()这行代码正是创建了一个Chrome浏览器的WebDriver实例。Selenium Wire扩展了标准的Selenium WebDriver&#xff0c;允许你通过这个浏览器实例来监听网络请求。 当你运行类似这样的代码…...

基于 STM32 的智能睡眠呼吸监测系统设计

本设计的硬件构成&#xff1a; STM32F103C8T6单片机最小系统板&#xff08;包含3.3V稳压电路时钟晶振电路复位电路&#xff08;上电自复位&#xff0c;手动复位&#xff09;&#xff09;&#xff0c;心率传感器、气压传感器、液晶显示、按键、蜂鸣器、LED灯、蓝牙模块组合而成…...

Spring的事务管理、AOP实现底层

目录 spring的事务管理是如何实现的&#xff1f; Spring的AOP的底层实现原理 spring的事务管理是如何实现的&#xff1f; 首先&#xff0c;spring的事务是由aop来实现的&#xff0c;首先要生成具体的代理对象&#xff0c;然后按照aop的整套流程来执行具体的操作逻辑&#xff…...

基于SpringBoot的篮球竞赛预约平台

你好&#xff0c;我是计算机学姐码农小野&#xff01;如果你对篮球竞赛预约平台感兴趣或有相关需求&#xff0c;欢迎私信联系我。 开发语言&#xff1a; Java 数据库&#xff1a; MySQL 技术&#xff1a; SpringBootMySql 工具&#xff1a; MyEclipse、Tomcat 系统展示…...

学生用小台灯什么牌子的好?列举出几款学生用台灯推荐

眼睛是我们感知世界的窗口&#xff0c;但近年来&#xff0c;儿童青少年的视力健康却受到了严重困扰。数据显示&#xff0c;近视问题在儿童群体中呈现出明显的增长趋势&#xff0c;这给他们的学习和生活带来了诸多不便。虽然现代科技的快速发展使得电子产品成为了我们生活中不可…...

软件测试面试题:项目中的MQ是如何测试的?

通常&#xff0c;咱们会从两个方面来考虑&#xff1a;正常情况和异常情况。 首先&#xff0c;咱们得确保消息队列在正常工作时结果正确。比如&#xff0c;消息发送出去的时候&#xff0c;所有的字段都得齐全&#xff0c;接收方收到的消息也得一样。咱们得确保系统能够正确无误…...

Python爬取国家医保平台公开数据

国家医保服务平台数据爬取python爬虫数据爬取医疗公开数据 定点医疗机构查询定点零售药店查询医保机构查询药品分类与代码查询 等等&#xff0c;数据都能爬 接口地址&#xff1a;/ebus/fuwu/api/nthl/api/CommQuery/queryFixedHospital 签名参数&#xff1a;signData {dat…...

B站大课堂-自动化精品视频(个人存档)

基础知识 工业通信协议 Modbus 施耐德研发&#xff0c;有基于以太网的 ModbusTCP 协议和使用 485/232 串口通信的 ModbusRTU/ASCII。 Modbus 协议面世较早、协议简洁高效、商用免费、功能灵活、实现简单&#xff0c;是目前应用最广泛的现场总线协议。 我的笔记里边有一些推荐…...

C++_STL---priority_queue

priority_queue的相关介绍 优先级队列是一种容器适配器&#xff0c;根据严格的排序标准&#xff0c;它的第一个元素总是它所包含的元素中最大(小)的。该容器适配器类似于堆&#xff0c;在堆中可以随时插入元素&#xff0c;并且可以检索最大(小)堆元素(优先级队列中位于顶部的元…...

可移动天线辅助宽带通信的性能分析和优化

可移动天线辅助宽带通信的性能分析和优化 可移动天线 (MA) 已成为一种很有前景的技术&#xff0c;通过在发射器 (Tx) 和/或接收器 (Rx) 处实现天线的本地移动来实现更有利的信道条件&#xff0c;从而增强无线通信性能。 由于现有的MA辅助无线通信研究主要考虑平坦衰落信道中的…...

h5兼容table ,如何实现h5在app内使用h5渲染table表格而且实现横屏预览?

压图地址 横屏div 通过css 实现 transform: rotate(90deg); transformOrigin: 50vw 50vw ; height: 100vw; width: 100vh;<divclass"popup-box":style"{transform: originSet 0 ? rotate(90deg) : ,transformOrigin: originSet 0 ? 50vw 50vw : ,height…...

【根据当天日期输出明天的日期(需对闰年做判定)。】2022-5-15

缘由根据当天日期输出明天的日期(需对闰年做判定)。日期类型结构体如下&#xff1a; struct data{ int year; int month; int day;};-编程语言-CSDN问答 struct mdata{ int year; int month; int day; }mdata; int 天数(int year, int month) {switch (month){case 1: case 3:…...

【Linux】shell脚本忽略错误继续执行

在 shell 脚本中&#xff0c;可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行&#xff0c;可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令&#xff0c;并忽略错误 rm somefile…...

练习(含atoi的模拟实现,自定义类型等练习)

一、结构体大小的计算及位段 &#xff08;结构体大小计算及位段 详解请看&#xff1a;自定义类型&#xff1a;结构体进阶-CSDN博客&#xff09; 1.在32位系统环境&#xff0c;编译选项为4字节对齐&#xff0c;那么sizeof(A)和sizeof(B)是多少&#xff1f; #pragma pack(4)st…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

Java - Mysql数据类型对应

Mysql数据类型java数据类型备注整型INT/INTEGERint / java.lang.Integer–BIGINTlong/java.lang.Long–––浮点型FLOATfloat/java.lang.FloatDOUBLEdouble/java.lang.Double–DECIMAL/NUMERICjava.math.BigDecimal字符串型CHARjava.lang.String固定长度字符串VARCHARjava.lang…...

linux arm系统烧录

1、打开瑞芯微程序 2、按住linux arm 的 recover按键 插入电源 3、当瑞芯微检测到有设备 4、松开recover按键 5、选择升级固件 6、点击固件选择本地刷机的linux arm 镜像 7、点击升级 &#xff08;忘了有没有这步了 估计有&#xff09; 刷机程序 和 镜像 就不提供了。要刷的时…...

第 86 场周赛:矩阵中的幻方、钥匙和房间、将数组拆分成斐波那契序列、猜猜这个单词

Q1、[中等] 矩阵中的幻方 1、题目描述 3 x 3 的幻方是一个填充有 从 1 到 9 的不同数字的 3 x 3 矩阵&#xff0c;其中每行&#xff0c;每列以及两条对角线上的各数之和都相等。 给定一个由整数组成的row x col 的 grid&#xff0c;其中有多少个 3 3 的 “幻方” 子矩阵&am…...

Unsafe Fileupload篇补充-木马的详细教程与木马分享(中国蚁剑方式)

在之前的皮卡丘靶场第九期Unsafe Fileupload篇中我们学习了木马的原理并且学了一个简单的木马文件 本期内容是为了更好的为大家解释木马&#xff08;服务器方面的&#xff09;的原理&#xff0c;连接&#xff0c;以及各种木马及连接工具的分享 文件木马&#xff1a;https://w…...

HDFS分布式存储 zookeeper

hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架&#xff0c;允许使用简单的变成模型跨计算机对大型集群进行分布式处理&#xff08;1.海量的数据存储 2.海量数据的计算&#xff09;Hadoop核心组件 hdfs&#xff08;分布式文件存储系统&#xff09;&a…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...