当前位置: 首页 > news >正文

一文速通calcite结合flink理解SQL从文本变成执行计划详细过程

文章目录

      • 你可以学到啥
      • 测试代码
      • 背景知识
      • SQL转变流程图
      • 问题

你可以学到啥

  • SQL如何一步步变成执行计划的
  • 有哪些优化器,哪些优化规则
  • calcite 和flink 如何结合的

测试代码

EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);Schema schema = Schema.newBuilder().column("count", DataTypes.INT()).column("word", DataTypes.STRING()).build();Schema schema1 = Schema.newBuilder().column("id", DataTypes.INT()).column("name", DataTypes.STRING()).build();tableEnvironment.createTemporaryTable("aa_user", TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/Users/xx/IdeaProjects/flink-demo/data/order.csv").format("csv").build());tableEnvironment.createTemporaryTable("bb_order", TableDescriptor.forConnector("filesystem").schema(schema1).option("path", "/Users/xx/IdeaProjects/flink-demo/data/user.csv").format("csv").build());String cost = tableEnvironment.explainSql("select * from aa_user inner join bb_order on `aa_user`.`count`=`bb_order`.`id`", ExplainDetail.ESTIMATED_COST);System.out.println(cost);

背景知识

需要了解calcite 里的基本知识,如AST,RelNode ,hepPlanner等等。
需要了解Flink 和Flink SQL里的一些知识

SQL转变流程图

SQL经过flink 里注册的每一个优化器,优化后,就能变成物理计划了,不过要变成执行代码,还要再经过代码生成。
在这里插入图片描述

问题

  • 问题1,FlinkBatchProgram
    所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {val SUBQUERY_REWRITE = "subquery_rewrite"val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"val DECORRELATE = "decorrelate"val DEFAULT_REWRITE = "default_rewrite"val PREDICATE_PUSHDOWN = "predicate_pushdown"val JOIN_REORDER = "join_reorder"val JOIN_REWRITE = "join_rewrite"val PROJECT_REWRITE = "project_rewrite"val WINDOW = "window"val LOGICAL = "logical"val LOGICAL_REWRITE = "logical_rewrite"val TIME_INDICATOR = "time_indicator"val PHYSICAL = "physical"val PHYSICAL_REWRITE = "physical_rewrite"val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"val RUNTIME_FILTER = "runtime_filter}
  • 问题2,calcite 优化器和flink 如何结合的
    logical,physical 这两个优化器都是用的VolcanoPlanner,结合规则和代价。
    剩下的优化器HepPlanner,HepPlanner 完全使用规则。

  • 问题3,project_rewrite 后,为啥少了LogicalProject ReNode ?
    因为最后一个操作,logicalproject 这里就是把所有的字段查出来了,所有这一步实际上是不用的

  • 问题4,物理计划如何生成执行代码的?
    BatchPhysicalTableSourceScan 类

class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {val rowCnt = mq.getRowCount(this)if (rowCnt == null) {return null}val cpu = 0val rowSize = mq.getAverageRowSize(this)val size = rowCnt * rowSizeplanner.getCostFactory.makeCost(rowCnt, cpu, size)}// 这里生成的执行代码override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}
  • 问题5,为啥aa_user 表被广播,哪里实现的?

BatchPhysicalHashJoinRule 规则实现的

核心代码

 val leftSize = JoinUtil.binaryRowRelNodeSize(join.getLeft)val rightSize = JoinUtil.binaryRowRelNodeSize(join.getRight)// if it is not with hint, just check size of left and right side by statistic and config// if leftSize or rightSize is unknown, cannot use broadcastif (leftSize == null || rightSize == null) {return (false, false)}val threshold =tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)val rightSizeSmallerThanThreshold = rightSize <= thresholdval leftSizeSmallerThanThreshold = leftSize <= thresholdval leftSmallerThanRight = leftSize < rightSizejoin.getJoinType match {case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)case JoinRelType.FULL => (false, false)case JoinRelType.INNER =>(leftSizeSmallerThanThreshold|| rightSizeSmallerThanThreshold,leftSmallerThanRight)// left side cannot be used as build side in SEMI/ANTI join.case JoinRelType.SEMI | JoinRelType.ANTI =>(rightSizeSmallerThanThreshold, false)}

主要就是实现

  def binaryRowRelNodeSize(relNode: RelNode): JDouble = {val mq = relNode.getCluster.getMetadataQueryval rowCount = mq.getRowCount(relNode)if (rowCount == null) {null} else {rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)}}

最后还是到了FlinkRelMdColumnNullCount 这个类
从这个ts: TableScan 对象里取出来
那ts 对象又是在哪里赋值的,看这个FlinkRecomputeStatisticsProgram 类

class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF/*** Gets the null count of the given column in TableScan.** @param ts*   TableScan RelNode* @param mq*   RelMetadataQuery instance* @param index*   the index of the given column* @return*   the null count of the given column in TableScan*/def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])val relOptTable = ts.getTable.asInstanceOf[FlinkPreparingTableBase]val fieldNames = relOptTable.getRowType.getFieldNamesPreconditions.checkArgument(index >= 0 && index < fieldNames.size())val fieldName = fieldNames.get(index)val statistic = relOptTable.getStatisticval colStats = statistic.getColumnStats(fieldName)if (colStats != null && colStats.getNullCount != null) {colStats.getNullCount.toDouble} else {null}}}

ts是在这里赋值,这里最后会用调用具体的文件系统,找到文件行数

 private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {final RelOptTable scanTable = scan.getTable();if (!(scanTable instanceof TableSourceTable)) {return scan;}FlinkContext context = ShortcutUtils.unwrapContext(scan);TableSourceTable table = (TableSourceTable) scanTable;boolean reportStatEnabled =context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)&& table.tableSource() instanceof SupportsStatisticReport;SourceAbilitySpec[] specs = table.abilitySpecs();PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);TableStats newTableStat =recomputeStatistics(table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);FlinkStatistic newStatistic =FlinkStatistic.builder().statistic(table.getStatistic()).tableStats(newTableStat).build();TableSourceTable newTable = table.copy(newStatistic);return new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);}

相关文章:

一文速通calcite结合flink理解SQL从文本变成执行计划详细过程

文章目录 你可以学到啥测试代码背景知识SQL转变流程图问题 你可以学到啥 SQL如何一步步变成执行计划的有哪些优化器&#xff0c;哪些优化规则calcite 和flink 如何结合的 测试代码 EnvironmentSettings settings EnvironmentSettings.inBatchMode(); TableEnvironment tabl…...

spring-TransactionTemplate 编程式事务

TransactionTemplate 是 Spring 框架提供的用于管理事务的一种方式。它提供了一种编程式的事务管理方法&#xff0c;允许开发者在代码中显式地控制事务的开始、提交或回滚。与使用 Transactional 注解相比&#xff0c;TransactionTemplate 提供了更多的灵活性和控制力。 为什么…...

中考全国45套(全国教育发达地区中考试卷)

文章目录 获取方式 为什么选择这份资源&#xff1f; 权威性与全面性&#xff1a;我们精心搜集了全国教育发达地区的最新中考试卷&#xff0c;确保每一套试卷都代表了该地区的教学水平和考试趋势。这不仅涵盖了丰富的知识点&#xff0c;还融入了各地独特的命题风格&#xff0c;让…...

嵌入式Linux学习笔记(5)-进程间常见通讯方式(c语言实现)

一、概述 进程间通信&#xff08;IPC&#xff0c;InterProcess Communication&#xff09;是指在多个进程之间进行数据传输和共享的机制。在操作系统中&#xff0c;进程是运行中的程序的实例&#xff0c;每个进程都有自己的内存空间和资源。 进程间通信可以用于在不同的进程之间…...

【移动端】菜单的自动展开与收回

前言 为了满足手机上菜单栏随用户移动&#xff0c;菜单的自动展示与隐藏&#xff0c;特此记录 基本原理 实现逻辑 window.addEventListener(‘scroll’, debouncedScrollHandler) – 监听文档视图滚动事件 document.querySelector(‘.header’) – 选择器匹配元素 创建show和h…...

Java获取Object中Value的方法

在Java中&#xff0c;获取对象&#xff08;Object&#xff09;中的值通常依赖于对象的类型以及我们希望访问的属性。由于Java是一种静态类型语言&#xff0c;直接从一个Object类型中访问属性是不可能的&#xff0c;因为Object是所有类的超类&#xff0c;但它本身不包含任何特定…...

集群聊天服务器项目【C++】(二)Json的简单使用

在上一章中&#xff0c;简单介绍了本项目的内容、技术栈、需求和目标等&#xff0c;详细介绍了环境配置&#xff0c;如果还没有配置成功&#xff0c;请参考我的上一篇博客环境配置 今天主要介绍Json库是什么以及简单的使用。 1.为什么要使用Json 我们在网络传输数据时&#…...

班迪录屏和这三款录屏工具,一键操作,太方便了!

嘿&#xff0c;小伙伴们&#xff01;今天我要跟大家分享几款超棒的录屏工具&#xff0c;它们绝对是我们在工作和学习中不可或缺的好帮&#xff1b;这些工具功能强大且操作简单&#xff0c;下面就让我来详细介绍一下它们的使用体验和好用之处吧&#xff01; 班迪录屏工具使用体…...

DAY60Bellman_ford 算法

队列优化算法 请找出从城市 1 到城市 n 的所有可能路径中&#xff0c;综合政府补贴后的最低运输成本。 如果能够从城市 1 到连通到城市 n&#xff0c; 请输出一个整数&#xff0c;表示运输成本。如果该整数是负数&#xff0c;则表示实现了盈利。如果从城市 1 没有路径可达城市…...

Dubbo SPI源码

文章目录 Dubbo SPI使用方式AOP功能源码剖析SPI注解1.获取加载器2.获取拓展实例对象3.创建拓展类的实例对象 Dubbo SPI Dubbo 的 SPI&#xff08;Service Provider Interface&#xff09;机制是一种强大的扩展机制&#xff0c;它允许开发者在运行时动态地替换或增加框架的功能。…...

《C++代码高度优化之双刃剑:避免过度优化引发的“暗雷”》

在 C编程的世界里&#xff0c;追求高效性能的代码是每个开发者的目标之一。高度优化的 C代码可以带来显著的性能提升&#xff0c;让程序在运行速度、内存占用等方面表现出色。然而&#xff0c;正如一把双刃剑&#xff0c;过度优化可能会引入难以察觉的错误&#xff0c;给程序带…...

javascript网页设计案例

设计一个具有良好用户体验的 JavaScript 网页涉及多个方面&#xff0c;如用户界面&#xff08;UI&#xff09;、用户体验&#xff08;UX&#xff09;、交互设计等。以下是一些示例案例&#xff0c;展示了如何使用 JavaScript 创建功能丰富且吸引人的网页设计。 1. 响应式导航菜…...

初阶数据结构【TOP】- 11.普通二叉树的介绍 - 1. (细致,保姆~~!)

文章目录 前言一、普通二叉树的链式结构二、 造树三、普通二叉树的遍历四、遍历完整代码五、总结 前言 本篇文章笔者将会对普通二叉树部分进行细致的讲解 , 本篇主要包括以下内容: 二叉树链式结构的介绍 ,二叉树的遍历. 笔者会一步一步分析带学者领略递归的美好~~ 一、普通二叉…...

【pyenv】pyenv安装版本超时的解决方案

目录 1、现象 2、分析现象 3、手动下载所需版本 4、存放到指定路径 5、重新安装 6、pip失败&#xff08;做个记录&#xff0c;未找到原因&#xff09; 7、方法二修改环境变量方法 7.1 设置环境变量 7.2 更新 7.3 安装即可 8、方法三修改XML文件 前言&#xff1a;研…...

【新片场-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…...

新160个crackme - 057-bbbs-crackme04

运行分析 因软件版本老旧&#xff0c;需使用windows XP虚拟机运行有个SystemID&#xff0c;值为12345678需破解User ID和Password PE分析 yC壳&#xff0c;32位 OD手动脱壳 使用windows XP虚拟机&#xff0c;将程序拖入OD按一下F8&#xff0c;ESP变红&#xff0c;根据ESP定律设…...

车机中 Android Audio 音频常见问题分析方法实践小结

文章目录 前言1. 无声2. 断音3. 杂音4. 延迟播放5. 焦点问题6. 无声问题(连上 BT )其他完善中…… 前言 本文主要总结了一下车机开发中遇到的 Audio 有关的问题&#xff0c;同时参考网上的一案例&#xff0c;由于Audio 模块出现音频问题的场景很多&#xff0c;对每一个出现的问…...

湘大 OJ 代码仓库

有时候不需要上传一些题解&#xff0c;想要上传一些纯代码就行&#xff0c;傻傻把代码上传到文章里面&#xff0c;感觉效率不是很高&#xff0c;还是建立一个代码仓库比较方便 需要会使用魔法可能才能访问&#xff0c;github代码仓库地址...

Ruoyi Cloud K8s 部署

本文视频版本:https://www.bilibili.com/video/BV1xF4Se3Esv 参考 https://blog.csdn.net/Equent/article/details/137779505 https://blog.csdn.net/weixin_48711696/article/details/138117392 https://zhuanlan.zhihu.com/p/470647732 https://gitee.com/y_project/Ruo…...

OpenGL Texture C++ Camera Filter滤镜

基于OpenGL Texture纹理的强大功能&#xff0c;在片段着色器&#xff08;Shader&#xff09;中编写GLSL代码&#xff0c;对YUV的数据进行数据转换从而实现视频编辑软件中的相机滤镜功能。 接上一篇OpenGL Texture C 预览Camera视频的功能实现&#xff0c;本篇来实现Camera滤镜效…...

ai辅助开发:让快马智能生成win11安装openclaw的交互式诊断助手

最近在折腾Win11系统上安装OpenClaw这个工具时&#xff0c;发现手动安装过程特别容易踩坑。从依赖版本冲突到权限问题&#xff0c;稍不注意就会卡住。后来尝试用InsCode(快马)平台的AI辅助功能&#xff0c;意外发现它能生成一个智能安装助手&#xff0c;把整个流程变得特别顺畅…...

mysql如何限制查询结果_mysqllimit语句使用示例

LIMIT 必须放在整个 SELECT 语句的最后&#xff0c;严格位于 ORDER BY 和 GROUP BY 之后、WHERE 之后&#xff1b;写在 WHERE 或 ORDER BY 中间会报错。MySQL 的 LIMIT 用在 WHERE 之后还是 ORDER BY 之后&#xff1f;LIMIT 必须放在整个 SELECT 语句的最后&#xff0c;且严格位…...

Settingator:嵌入式参数管理库的轻量级设计与实践

1. Settingator 库概述&#xff1a;嵌入式设备与移动端配置协同的工程实践Settingator 是一个面向嵌入式系统的轻量级 Arduino 兼容库&#xff0c;其核心目标并非提供通用通信协议栈&#xff0c;而是构建一套可验证、可回滚、低侵入的运行时参数管理机制&#xff0c;专为配合同…...

网络信息安全技术术语对照表

类别术语中文术语英文术语说明基础技术类加密encryption将明文数据通过特定算法和密钥转换为密文数据的过程&#xff0c;目的是确保数据在存储、传输过程中不被未授权方获取和理解。基础技术类解密decryption将加密后的密文数据&#xff0c;通过对应的算法和密钥还原为原始明文…...

OpenClaw云端体验:星图平台千问3.5-9B镜像快速验证

OpenClaw云端体验&#xff1a;星图平台千问3.5-9B镜像快速验证 1. 为什么选择云端沙盒验证OpenClaw&#xff1f; 第一次接触OpenClaw时&#xff0c;我被它的本地自动化能力吸引&#xff0c;但看到复杂的本地部署文档就打了退堂鼓。直到发现星图平台提供的OpenClaw千问3.5-9B组…...

仅限核心架构师查阅:Python无锁GIL环境下的并发成本熔断机制(含实时监控脚本+自动降级策略)

第一章&#xff1a;Python无锁GIL环境下的并发模型成本控制策略全景概览在标准 CPython 解释器中&#xff0c;全局解释器锁&#xff08;GIL&#xff09;本质限制了多线程对 CPU 密集型任务的并行执行能力。然而&#xff0c;“无锁 GIL 环境”并非指移除 GIL 本身&#xff0c;而…...

C++ 与 异步流调度:在 C++ AI 框架中利用多个 CUDA Stream 重叠计算与数据传输的掩盖性能分析

C 与 异步流调度&#xff1a;在 C AI 框架中利用多个 CUDA Stream 重叠计算与数据传输的掩盖性能分析引言在现代人工智能领域&#xff0c;尤其是深度学习的应用中&#xff0c;GPU 已成为不可或缺的计算引擎。然而&#xff0c;即使拥有强大的 GPU 算力&#xff0c;系统整体性能也…...

人形机器人核心部件揭秘:减速器、传感器如何撑起宇树和智元的未来?

人形机器人核心部件揭秘&#xff1a;减速器与传感器的技术革命 当波士顿动力的Atlas完成后空翻&#xff0c;当特斯拉Optimus在工厂灵活抓取零件&#xff0c;这些看似科幻的场景背后&#xff0c;是无数精密部件协同工作的结果。人形机器人的核心部件——减速器和传感器&#xff…...

效率提升秘籍:用快马AI自动生成openclaw一键部署与依赖管理脚本

最近在折腾openclaw框架时&#xff0c;发现环境配置真是个效率黑洞。每次在新设备上部署&#xff0c;光是查文档、解决依赖冲突就要花掉大半天。于是琢磨着用自动化工具来优化这个流程&#xff0c;没想到效果出奇的好&#xff0c;今天就把这套方案分享给大家。 环境配置分析器&…...

人大金仓Kingbase数据库PostGIS插件部署实战:从零到一解锁空间数据能力

1. 为什么你的Kingbase数据库需要PostGIS&#xff1f; 刚接触空间数据处理的开发者经常会遇到这样的困惑&#xff1a;明明数据库里存了经纬度坐标&#xff0c;却无法计算两点距离&#xff1b;明明有行政区划边界数据&#xff0c;却做不了区域叠加分析。这就是典型的"有数据…...