一文速通calcite结合flink理解SQL从文本变成执行计划详细过程
文章目录
- 你可以学到啥
- 测试代码
- 背景知识
- SQL转变流程图
- 问题
你可以学到啥
- SQL如何一步步变成执行计划的
- 有哪些优化器,哪些优化规则
- calcite 和flink 如何结合的
测试代码
EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);Schema schema = Schema.newBuilder().column("count", DataTypes.INT()).column("word", DataTypes.STRING()).build();Schema schema1 = Schema.newBuilder().column("id", DataTypes.INT()).column("name", DataTypes.STRING()).build();tableEnvironment.createTemporaryTable("aa_user", TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/Users/xx/IdeaProjects/flink-demo/data/order.csv").format("csv").build());tableEnvironment.createTemporaryTable("bb_order", TableDescriptor.forConnector("filesystem").schema(schema1).option("path", "/Users/xx/IdeaProjects/flink-demo/data/user.csv").format("csv").build());String cost = tableEnvironment.explainSql("select * from aa_user inner join bb_order on `aa_user`.`count`=`bb_order`.`id`", ExplainDetail.ESTIMATED_COST);System.out.println(cost);
背景知识
需要了解calcite 里的基本知识,如AST,RelNode ,hepPlanner等等。
需要了解Flink 和Flink SQL里的一些知识
SQL转变流程图
SQL经过flink 里注册的每一个优化器,优化后,就能变成物理计划了,不过要变成执行代码,还要再经过代码生成。

问题
- 问题1,FlinkBatchProgram
所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {val SUBQUERY_REWRITE = "subquery_rewrite"val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"val DECORRELATE = "decorrelate"val DEFAULT_REWRITE = "default_rewrite"val PREDICATE_PUSHDOWN = "predicate_pushdown"val JOIN_REORDER = "join_reorder"val JOIN_REWRITE = "join_rewrite"val PROJECT_REWRITE = "project_rewrite"val WINDOW = "window"val LOGICAL = "logical"val LOGICAL_REWRITE = "logical_rewrite"val TIME_INDICATOR = "time_indicator"val PHYSICAL = "physical"val PHYSICAL_REWRITE = "physical_rewrite"val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"val RUNTIME_FILTER = "runtime_filter}
-
问题2,calcite 优化器和flink 如何结合的
logical,physical 这两个优化器都是用的VolcanoPlanner,结合规则和代价。
剩下的优化器HepPlanner,HepPlanner 完全使用规则。 -
问题3,project_rewrite 后,为啥少了LogicalProject ReNode ?
因为最后一个操作,logicalproject 这里就是把所有的字段查出来了,所有这一步实际上是不用的 -
问题4,物理计划如何生成执行代码的?
BatchPhysicalTableSourceScan 类
class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {val rowCnt = mq.getRowCount(this)if (rowCnt == null) {return null}val cpu = 0val rowSize = mq.getAverageRowSize(this)val size = rowCnt * rowSizeplanner.getCostFactory.makeCost(rowCnt, cpu, size)}// 这里生成的执行代码override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}
- 问题5,为啥aa_user 表被广播,哪里实现的?
BatchPhysicalHashJoinRule 规则实现的
核心代码
val leftSize = JoinUtil.binaryRowRelNodeSize(join.getLeft)val rightSize = JoinUtil.binaryRowRelNodeSize(join.getRight)// if it is not with hint, just check size of left and right side by statistic and config// if leftSize or rightSize is unknown, cannot use broadcastif (leftSize == null || rightSize == null) {return (false, false)}val threshold =tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)val rightSizeSmallerThanThreshold = rightSize <= thresholdval leftSizeSmallerThanThreshold = leftSize <= thresholdval leftSmallerThanRight = leftSize < rightSizejoin.getJoinType match {case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)case JoinRelType.FULL => (false, false)case JoinRelType.INNER =>(leftSizeSmallerThanThreshold|| rightSizeSmallerThanThreshold,leftSmallerThanRight)// left side cannot be used as build side in SEMI/ANTI join.case JoinRelType.SEMI | JoinRelType.ANTI =>(rightSizeSmallerThanThreshold, false)}
主要就是实现
def binaryRowRelNodeSize(relNode: RelNode): JDouble = {val mq = relNode.getCluster.getMetadataQueryval rowCount = mq.getRowCount(relNode)if (rowCount == null) {null} else {rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)}}
最后还是到了FlinkRelMdColumnNullCount 这个类
从这个ts: TableScan 对象里取出来
那ts 对象又是在哪里赋值的,看这个FlinkRecomputeStatisticsProgram 类
class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF/*** Gets the null count of the given column in TableScan.** @param ts* TableScan RelNode* @param mq* RelMetadataQuery instance* @param index* the index of the given column* @return* the null count of the given column in TableScan*/def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])val relOptTable = ts.getTable.asInstanceOf[FlinkPreparingTableBase]val fieldNames = relOptTable.getRowType.getFieldNamesPreconditions.checkArgument(index >= 0 && index < fieldNames.size())val fieldName = fieldNames.get(index)val statistic = relOptTable.getStatisticval colStats = statistic.getColumnStats(fieldName)if (colStats != null && colStats.getNullCount != null) {colStats.getNullCount.toDouble} else {null}}}
ts是在这里赋值,这里最后会用调用具体的文件系统,找到文件行数
private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {final RelOptTable scanTable = scan.getTable();if (!(scanTable instanceof TableSourceTable)) {return scan;}FlinkContext context = ShortcutUtils.unwrapContext(scan);TableSourceTable table = (TableSourceTable) scanTable;boolean reportStatEnabled =context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)&& table.tableSource() instanceof SupportsStatisticReport;SourceAbilitySpec[] specs = table.abilitySpecs();PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);TableStats newTableStat =recomputeStatistics(table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);FlinkStatistic newStatistic =FlinkStatistic.builder().statistic(table.getStatistic()).tableStats(newTableStat).build();TableSourceTable newTable = table.copy(newStatistic);return new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);}
相关文章:
一文速通calcite结合flink理解SQL从文本变成执行计划详细过程
文章目录 你可以学到啥测试代码背景知识SQL转变流程图问题 你可以学到啥 SQL如何一步步变成执行计划的有哪些优化器,哪些优化规则calcite 和flink 如何结合的 测试代码 EnvironmentSettings settings EnvironmentSettings.inBatchMode(); TableEnvironment tabl…...
spring-TransactionTemplate 编程式事务
TransactionTemplate 是 Spring 框架提供的用于管理事务的一种方式。它提供了一种编程式的事务管理方法,允许开发者在代码中显式地控制事务的开始、提交或回滚。与使用 Transactional 注解相比,TransactionTemplate 提供了更多的灵活性和控制力。 为什么…...
中考全国45套(全国教育发达地区中考试卷)
文章目录 获取方式 为什么选择这份资源? 权威性与全面性:我们精心搜集了全国教育发达地区的最新中考试卷,确保每一套试卷都代表了该地区的教学水平和考试趋势。这不仅涵盖了丰富的知识点,还融入了各地独特的命题风格,让…...
嵌入式Linux学习笔记(5)-进程间常见通讯方式(c语言实现)
一、概述 进程间通信(IPC,InterProcess Communication)是指在多个进程之间进行数据传输和共享的机制。在操作系统中,进程是运行中的程序的实例,每个进程都有自己的内存空间和资源。 进程间通信可以用于在不同的进程之间…...
【移动端】菜单的自动展开与收回
前言 为了满足手机上菜单栏随用户移动,菜单的自动展示与隐藏,特此记录 基本原理 实现逻辑 window.addEventListener(‘scroll’, debouncedScrollHandler) – 监听文档视图滚动事件 document.querySelector(‘.header’) – 选择器匹配元素 创建show和h…...
Java获取Object中Value的方法
在Java中,获取对象(Object)中的值通常依赖于对象的类型以及我们希望访问的属性。由于Java是一种静态类型语言,直接从一个Object类型中访问属性是不可能的,因为Object是所有类的超类,但它本身不包含任何特定…...
集群聊天服务器项目【C++】(二)Json的简单使用
在上一章中,简单介绍了本项目的内容、技术栈、需求和目标等,详细介绍了环境配置,如果还没有配置成功,请参考我的上一篇博客环境配置 今天主要介绍Json库是什么以及简单的使用。 1.为什么要使用Json 我们在网络传输数据时&#…...
班迪录屏和这三款录屏工具,一键操作,太方便了!
嘿,小伙伴们!今天我要跟大家分享几款超棒的录屏工具,它们绝对是我们在工作和学习中不可或缺的好帮;这些工具功能强大且操作简单,下面就让我来详细介绍一下它们的使用体验和好用之处吧! 班迪录屏工具使用体…...
DAY60Bellman_ford 算法
队列优化算法 请找出从城市 1 到城市 n 的所有可能路径中,综合政府补贴后的最低运输成本。 如果能够从城市 1 到连通到城市 n, 请输出一个整数,表示运输成本。如果该整数是负数,则表示实现了盈利。如果从城市 1 没有路径可达城市…...
Dubbo SPI源码
文章目录 Dubbo SPI使用方式AOP功能源码剖析SPI注解1.获取加载器2.获取拓展实例对象3.创建拓展类的实例对象 Dubbo SPI Dubbo 的 SPI(Service Provider Interface)机制是一种强大的扩展机制,它允许开发者在运行时动态地替换或增加框架的功能。…...
《C++代码高度优化之双刃剑:避免过度优化引发的“暗雷”》
在 C编程的世界里,追求高效性能的代码是每个开发者的目标之一。高度优化的 C代码可以带来显著的性能提升,让程序在运行速度、内存占用等方面表现出色。然而,正如一把双刃剑,过度优化可能会引入难以察觉的错误,给程序带…...
javascript网页设计案例
设计一个具有良好用户体验的 JavaScript 网页涉及多个方面,如用户界面(UI)、用户体验(UX)、交互设计等。以下是一些示例案例,展示了如何使用 JavaScript 创建功能丰富且吸引人的网页设计。 1. 响应式导航菜…...
初阶数据结构【TOP】- 11.普通二叉树的介绍 - 1. (细致,保姆~~!)
文章目录 前言一、普通二叉树的链式结构二、 造树三、普通二叉树的遍历四、遍历完整代码五、总结 前言 本篇文章笔者将会对普通二叉树部分进行细致的讲解 , 本篇主要包括以下内容: 二叉树链式结构的介绍 ,二叉树的遍历. 笔者会一步一步分析带学者领略递归的美好~~ 一、普通二叉…...
【pyenv】pyenv安装版本超时的解决方案
目录 1、现象 2、分析现象 3、手动下载所需版本 4、存放到指定路径 5、重新安装 6、pip失败(做个记录,未找到原因) 7、方法二修改环境变量方法 7.1 设置环境变量 7.2 更新 7.3 安装即可 8、方法三修改XML文件 前言:研…...
【新片场-注册安全分析报告-无验证方式导致安全隐患】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 1. 暴力破解密码,造成用户信息泄露 2. 短信盗刷的安全问题,影响业务及导致用户投诉 3. 带来经济损失,尤其是后付费客户,风险巨大,造…...
新160个crackme - 057-bbbs-crackme04
运行分析 因软件版本老旧,需使用windows XP虚拟机运行有个SystemID,值为12345678需破解User ID和Password PE分析 yC壳,32位 OD手动脱壳 使用windows XP虚拟机,将程序拖入OD按一下F8,ESP变红,根据ESP定律设…...
车机中 Android Audio 音频常见问题分析方法实践小结
文章目录 前言1. 无声2. 断音3. 杂音4. 延迟播放5. 焦点问题6. 无声问题(连上 BT )其他完善中…… 前言 本文主要总结了一下车机开发中遇到的 Audio 有关的问题,同时参考网上的一案例,由于Audio 模块出现音频问题的场景很多,对每一个出现的问…...
湘大 OJ 代码仓库
有时候不需要上传一些题解,想要上传一些纯代码就行,傻傻把代码上传到文章里面,感觉效率不是很高,还是建立一个代码仓库比较方便 需要会使用魔法可能才能访问,github代码仓库地址...
Ruoyi Cloud K8s 部署
本文视频版本:https://www.bilibili.com/video/BV1xF4Se3Esv 参考 https://blog.csdn.net/Equent/article/details/137779505 https://blog.csdn.net/weixin_48711696/article/details/138117392 https://zhuanlan.zhihu.com/p/470647732 https://gitee.com/y_project/Ruo…...
OpenGL Texture C++ Camera Filter滤镜
基于OpenGL Texture纹理的强大功能,在片段着色器(Shader)中编写GLSL代码,对YUV的数据进行数据转换从而实现视频编辑软件中的相机滤镜功能。 接上一篇OpenGL Texture C 预览Camera视频的功能实现,本篇来实现Camera滤镜效…...
Linux链表操作全解析
Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表?1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...
【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)
服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建
华为云FlexusDeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色,华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型,能助力我们轻松驾驭 DeepSeek-V3/R1,本文中将分享如何…...
HDFS分布式存储 zookeeper
hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...
在Ubuntu24上采用Wine打开SourceInsight
1. 安装wine sudo apt install wine 2. 安装32位库支持,SourceInsight是32位程序 sudo dpkg --add-architecture i386 sudo apt update sudo apt install wine32:i386 3. 验证安装 wine --version 4. 安装必要的字体和库(解决显示问题) sudo apt install fonts-wqy…...
Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...
SQL慢可能是触发了ring buffer
简介 最近在进行 postgresql 性能排查的时候,发现 PG 在某一个时间并行执行的 SQL 变得特别慢。最后通过监控监观察到并行发起得时间 buffers_alloc 就急速上升,且低水位伴随在整个慢 SQL,一直是 buferIO 的等待事件,此时也没有其他会话的争抢。SQL 虽然不是高效 SQL ,但…...
Go 并发编程基础:通道(Channel)的使用
在 Go 中,Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式,用于在多个 Goroutine 之间传递数据,从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...
并发编程 - go版
1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...
