一文速通calcite结合flink理解SQL从文本变成执行计划详细过程
文章目录
- 你可以学到啥
- 测试代码
- 背景知识
- SQL转变流程图
- 问题
你可以学到啥
- SQL如何一步步变成执行计划的
- 有哪些优化器,哪些优化规则
- calcite 和flink 如何结合的
测试代码
EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);Schema schema = Schema.newBuilder().column("count", DataTypes.INT()).column("word", DataTypes.STRING()).build();Schema schema1 = Schema.newBuilder().column("id", DataTypes.INT()).column("name", DataTypes.STRING()).build();tableEnvironment.createTemporaryTable("aa_user", TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/Users/xx/IdeaProjects/flink-demo/data/order.csv").format("csv").build());tableEnvironment.createTemporaryTable("bb_order", TableDescriptor.forConnector("filesystem").schema(schema1).option("path", "/Users/xx/IdeaProjects/flink-demo/data/user.csv").format("csv").build());String cost = tableEnvironment.explainSql("select * from aa_user inner join bb_order on `aa_user`.`count`=`bb_order`.`id`", ExplainDetail.ESTIMATED_COST);System.out.println(cost);
背景知识
需要了解calcite 里的基本知识,如AST,RelNode ,hepPlanner等等。
需要了解Flink 和Flink SQL里的一些知识
SQL转变流程图
SQL经过flink 里注册的每一个优化器,优化后,就能变成物理计划了,不过要变成执行代码,还要再经过代码生成。

问题
- 问题1,FlinkBatchProgram
所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {val SUBQUERY_REWRITE = "subquery_rewrite"val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"val DECORRELATE = "decorrelate"val DEFAULT_REWRITE = "default_rewrite"val PREDICATE_PUSHDOWN = "predicate_pushdown"val JOIN_REORDER = "join_reorder"val JOIN_REWRITE = "join_rewrite"val PROJECT_REWRITE = "project_rewrite"val WINDOW = "window"val LOGICAL = "logical"val LOGICAL_REWRITE = "logical_rewrite"val TIME_INDICATOR = "time_indicator"val PHYSICAL = "physical"val PHYSICAL_REWRITE = "physical_rewrite"val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"val RUNTIME_FILTER = "runtime_filter}
-
问题2,calcite 优化器和flink 如何结合的
logical,physical 这两个优化器都是用的VolcanoPlanner,结合规则和代价。
剩下的优化器HepPlanner,HepPlanner 完全使用规则。 -
问题3,project_rewrite 后,为啥少了LogicalProject ReNode ?
因为最后一个操作,logicalproject 这里就是把所有的字段查出来了,所有这一步实际上是不用的 -
问题4,物理计划如何生成执行代码的?
BatchPhysicalTableSourceScan 类
class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {val rowCnt = mq.getRowCount(this)if (rowCnt == null) {return null}val cpu = 0val rowSize = mq.getAverageRowSize(this)val size = rowCnt * rowSizeplanner.getCostFactory.makeCost(rowCnt, cpu, size)}// 这里生成的执行代码override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}
- 问题5,为啥aa_user 表被广播,哪里实现的?
BatchPhysicalHashJoinRule 规则实现的
核心代码
val leftSize = JoinUtil.binaryRowRelNodeSize(join.getLeft)val rightSize = JoinUtil.binaryRowRelNodeSize(join.getRight)// if it is not with hint, just check size of left and right side by statistic and config// if leftSize or rightSize is unknown, cannot use broadcastif (leftSize == null || rightSize == null) {return (false, false)}val threshold =tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)val rightSizeSmallerThanThreshold = rightSize <= thresholdval leftSizeSmallerThanThreshold = leftSize <= thresholdval leftSmallerThanRight = leftSize < rightSizejoin.getJoinType match {case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)case JoinRelType.FULL => (false, false)case JoinRelType.INNER =>(leftSizeSmallerThanThreshold|| rightSizeSmallerThanThreshold,leftSmallerThanRight)// left side cannot be used as build side in SEMI/ANTI join.case JoinRelType.SEMI | JoinRelType.ANTI =>(rightSizeSmallerThanThreshold, false)}
主要就是实现
def binaryRowRelNodeSize(relNode: RelNode): JDouble = {val mq = relNode.getCluster.getMetadataQueryval rowCount = mq.getRowCount(relNode)if (rowCount == null) {null} else {rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)}}
最后还是到了FlinkRelMdColumnNullCount 这个类
从这个ts: TableScan 对象里取出来
那ts 对象又是在哪里赋值的,看这个FlinkRecomputeStatisticsProgram 类
class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF/*** Gets the null count of the given column in TableScan.** @param ts* TableScan RelNode* @param mq* RelMetadataQuery instance* @param index* the index of the given column* @return* the null count of the given column in TableScan*/def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])val relOptTable = ts.getTable.asInstanceOf[FlinkPreparingTableBase]val fieldNames = relOptTable.getRowType.getFieldNamesPreconditions.checkArgument(index >= 0 && index < fieldNames.size())val fieldName = fieldNames.get(index)val statistic = relOptTable.getStatisticval colStats = statistic.getColumnStats(fieldName)if (colStats != null && colStats.getNullCount != null) {colStats.getNullCount.toDouble} else {null}}}
ts是在这里赋值,这里最后会用调用具体的文件系统,找到文件行数
private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {final RelOptTable scanTable = scan.getTable();if (!(scanTable instanceof TableSourceTable)) {return scan;}FlinkContext context = ShortcutUtils.unwrapContext(scan);TableSourceTable table = (TableSourceTable) scanTable;boolean reportStatEnabled =context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)&& table.tableSource() instanceof SupportsStatisticReport;SourceAbilitySpec[] specs = table.abilitySpecs();PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);TableStats newTableStat =recomputeStatistics(table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);FlinkStatistic newStatistic =FlinkStatistic.builder().statistic(table.getStatistic()).tableStats(newTableStat).build();TableSourceTable newTable = table.copy(newStatistic);return new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);}
相关文章:
一文速通calcite结合flink理解SQL从文本变成执行计划详细过程
文章目录 你可以学到啥测试代码背景知识SQL转变流程图问题 你可以学到啥 SQL如何一步步变成执行计划的有哪些优化器,哪些优化规则calcite 和flink 如何结合的 测试代码 EnvironmentSettings settings EnvironmentSettings.inBatchMode(); TableEnvironment tabl…...
spring-TransactionTemplate 编程式事务
TransactionTemplate 是 Spring 框架提供的用于管理事务的一种方式。它提供了一种编程式的事务管理方法,允许开发者在代码中显式地控制事务的开始、提交或回滚。与使用 Transactional 注解相比,TransactionTemplate 提供了更多的灵活性和控制力。 为什么…...
中考全国45套(全国教育发达地区中考试卷)
文章目录 获取方式 为什么选择这份资源? 权威性与全面性:我们精心搜集了全国教育发达地区的最新中考试卷,确保每一套试卷都代表了该地区的教学水平和考试趋势。这不仅涵盖了丰富的知识点,还融入了各地独特的命题风格,让…...
嵌入式Linux学习笔记(5)-进程间常见通讯方式(c语言实现)
一、概述 进程间通信(IPC,InterProcess Communication)是指在多个进程之间进行数据传输和共享的机制。在操作系统中,进程是运行中的程序的实例,每个进程都有自己的内存空间和资源。 进程间通信可以用于在不同的进程之间…...
【移动端】菜单的自动展开与收回
前言 为了满足手机上菜单栏随用户移动,菜单的自动展示与隐藏,特此记录 基本原理 实现逻辑 window.addEventListener(‘scroll’, debouncedScrollHandler) – 监听文档视图滚动事件 document.querySelector(‘.header’) – 选择器匹配元素 创建show和h…...
Java获取Object中Value的方法
在Java中,获取对象(Object)中的值通常依赖于对象的类型以及我们希望访问的属性。由于Java是一种静态类型语言,直接从一个Object类型中访问属性是不可能的,因为Object是所有类的超类,但它本身不包含任何特定…...
集群聊天服务器项目【C++】(二)Json的简单使用
在上一章中,简单介绍了本项目的内容、技术栈、需求和目标等,详细介绍了环境配置,如果还没有配置成功,请参考我的上一篇博客环境配置 今天主要介绍Json库是什么以及简单的使用。 1.为什么要使用Json 我们在网络传输数据时&#…...
班迪录屏和这三款录屏工具,一键操作,太方便了!
嘿,小伙伴们!今天我要跟大家分享几款超棒的录屏工具,它们绝对是我们在工作和学习中不可或缺的好帮;这些工具功能强大且操作简单,下面就让我来详细介绍一下它们的使用体验和好用之处吧! 班迪录屏工具使用体…...
DAY60Bellman_ford 算法
队列优化算法 请找出从城市 1 到城市 n 的所有可能路径中,综合政府补贴后的最低运输成本。 如果能够从城市 1 到连通到城市 n, 请输出一个整数,表示运输成本。如果该整数是负数,则表示实现了盈利。如果从城市 1 没有路径可达城市…...
Dubbo SPI源码
文章目录 Dubbo SPI使用方式AOP功能源码剖析SPI注解1.获取加载器2.获取拓展实例对象3.创建拓展类的实例对象 Dubbo SPI Dubbo 的 SPI(Service Provider Interface)机制是一种强大的扩展机制,它允许开发者在运行时动态地替换或增加框架的功能。…...
《C++代码高度优化之双刃剑:避免过度优化引发的“暗雷”》
在 C编程的世界里,追求高效性能的代码是每个开发者的目标之一。高度优化的 C代码可以带来显著的性能提升,让程序在运行速度、内存占用等方面表现出色。然而,正如一把双刃剑,过度优化可能会引入难以察觉的错误,给程序带…...
javascript网页设计案例
设计一个具有良好用户体验的 JavaScript 网页涉及多个方面,如用户界面(UI)、用户体验(UX)、交互设计等。以下是一些示例案例,展示了如何使用 JavaScript 创建功能丰富且吸引人的网页设计。 1. 响应式导航菜…...
初阶数据结构【TOP】- 11.普通二叉树的介绍 - 1. (细致,保姆~~!)
文章目录 前言一、普通二叉树的链式结构二、 造树三、普通二叉树的遍历四、遍历完整代码五、总结 前言 本篇文章笔者将会对普通二叉树部分进行细致的讲解 , 本篇主要包括以下内容: 二叉树链式结构的介绍 ,二叉树的遍历. 笔者会一步一步分析带学者领略递归的美好~~ 一、普通二叉…...
【pyenv】pyenv安装版本超时的解决方案
目录 1、现象 2、分析现象 3、手动下载所需版本 4、存放到指定路径 5、重新安装 6、pip失败(做个记录,未找到原因) 7、方法二修改环境变量方法 7.1 设置环境变量 7.2 更新 7.3 安装即可 8、方法三修改XML文件 前言:研…...
【新片场-注册安全分析报告-无验证方式导致安全隐患】
前言 由于网站注册入口容易被黑客攻击,存在如下安全问题: 1. 暴力破解密码,造成用户信息泄露 2. 短信盗刷的安全问题,影响业务及导致用户投诉 3. 带来经济损失,尤其是后付费客户,风险巨大,造…...
新160个crackme - 057-bbbs-crackme04
运行分析 因软件版本老旧,需使用windows XP虚拟机运行有个SystemID,值为12345678需破解User ID和Password PE分析 yC壳,32位 OD手动脱壳 使用windows XP虚拟机,将程序拖入OD按一下F8,ESP变红,根据ESP定律设…...
车机中 Android Audio 音频常见问题分析方法实践小结
文章目录 前言1. 无声2. 断音3. 杂音4. 延迟播放5. 焦点问题6. 无声问题(连上 BT )其他完善中…… 前言 本文主要总结了一下车机开发中遇到的 Audio 有关的问题,同时参考网上的一案例,由于Audio 模块出现音频问题的场景很多,对每一个出现的问…...
湘大 OJ 代码仓库
有时候不需要上传一些题解,想要上传一些纯代码就行,傻傻把代码上传到文章里面,感觉效率不是很高,还是建立一个代码仓库比较方便 需要会使用魔法可能才能访问,github代码仓库地址...
Ruoyi Cloud K8s 部署
本文视频版本:https://www.bilibili.com/video/BV1xF4Se3Esv 参考 https://blog.csdn.net/Equent/article/details/137779505 https://blog.csdn.net/weixin_48711696/article/details/138117392 https://zhuanlan.zhihu.com/p/470647732 https://gitee.com/y_project/Ruo…...
OpenGL Texture C++ Camera Filter滤镜
基于OpenGL Texture纹理的强大功能,在片段着色器(Shader)中编写GLSL代码,对YUV的数据进行数据转换从而实现视频编辑软件中的相机滤镜功能。 接上一篇OpenGL Texture C 预览Camera视频的功能实现,本篇来实现Camera滤镜效…...
ai辅助开发:让快马智能生成win11安装openclaw的交互式诊断助手
最近在折腾Win11系统上安装OpenClaw这个工具时,发现手动安装过程特别容易踩坑。从依赖版本冲突到权限问题,稍不注意就会卡住。后来尝试用InsCode(快马)平台的AI辅助功能,意外发现它能生成一个智能安装助手,把整个流程变得特别顺畅…...
mysql如何限制查询结果_mysqllimit语句使用示例
LIMIT 必须放在整个 SELECT 语句的最后,严格位于 ORDER BY 和 GROUP BY 之后、WHERE 之后;写在 WHERE 或 ORDER BY 中间会报错。MySQL 的 LIMIT 用在 WHERE 之后还是 ORDER BY 之后?LIMIT 必须放在整个 SELECT 语句的最后,且严格位…...
Settingator:嵌入式参数管理库的轻量级设计与实践
1. Settingator 库概述:嵌入式设备与移动端配置协同的工程实践Settingator 是一个面向嵌入式系统的轻量级 Arduino 兼容库,其核心目标并非提供通用通信协议栈,而是构建一套可验证、可回滚、低侵入的运行时参数管理机制,专为配合同…...
网络信息安全技术术语对照表
类别术语中文术语英文术语说明基础技术类加密encryption将明文数据通过特定算法和密钥转换为密文数据的过程,目的是确保数据在存储、传输过程中不被未授权方获取和理解。基础技术类解密decryption将加密后的密文数据,通过对应的算法和密钥还原为原始明文…...
OpenClaw云端体验:星图平台千问3.5-9B镜像快速验证
OpenClaw云端体验:星图平台千问3.5-9B镜像快速验证 1. 为什么选择云端沙盒验证OpenClaw? 第一次接触OpenClaw时,我被它的本地自动化能力吸引,但看到复杂的本地部署文档就打了退堂鼓。直到发现星图平台提供的OpenClaw千问3.5-9B组…...
仅限核心架构师查阅:Python无锁GIL环境下的并发成本熔断机制(含实时监控脚本+自动降级策略)
第一章:Python无锁GIL环境下的并发模型成本控制策略全景概览在标准 CPython 解释器中,全局解释器锁(GIL)本质限制了多线程对 CPU 密集型任务的并行执行能力。然而,“无锁 GIL 环境”并非指移除 GIL 本身,而…...
C++ 与 异步流调度:在 C++ AI 框架中利用多个 CUDA Stream 重叠计算与数据传输的掩盖性能分析
C 与 异步流调度:在 C AI 框架中利用多个 CUDA Stream 重叠计算与数据传输的掩盖性能分析引言在现代人工智能领域,尤其是深度学习的应用中,GPU 已成为不可或缺的计算引擎。然而,即使拥有强大的 GPU 算力,系统整体性能也…...
人形机器人核心部件揭秘:减速器、传感器如何撑起宇树和智元的未来?
人形机器人核心部件揭秘:减速器与传感器的技术革命 当波士顿动力的Atlas完成后空翻,当特斯拉Optimus在工厂灵活抓取零件,这些看似科幻的场景背后,是无数精密部件协同工作的结果。人形机器人的核心部件——减速器和传感器ÿ…...
效率提升秘籍:用快马AI自动生成openclaw一键部署与依赖管理脚本
最近在折腾openclaw框架时,发现环境配置真是个效率黑洞。每次在新设备上部署,光是查文档、解决依赖冲突就要花掉大半天。于是琢磨着用自动化工具来优化这个流程,没想到效果出奇的好,今天就把这套方案分享给大家。 环境配置分析器&…...
人大金仓Kingbase数据库PostGIS插件部署实战:从零到一解锁空间数据能力
1. 为什么你的Kingbase数据库需要PostGIS? 刚接触空间数据处理的开发者经常会遇到这样的困惑:明明数据库里存了经纬度坐标,却无法计算两点距离;明明有行政区划边界数据,却做不了区域叠加分析。这就是典型的"有数据…...
