当前位置: 首页 > news >正文

一文速通calcite结合flink理解SQL从文本变成执行计划详细过程

文章目录

      • 你可以学到啥
      • 测试代码
      • 背景知识
      • SQL转变流程图
      • 问题

你可以学到啥

  • SQL如何一步步变成执行计划的
  • 有哪些优化器,哪些优化规则
  • calcite 和flink 如何结合的

测试代码

EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);Schema schema = Schema.newBuilder().column("count", DataTypes.INT()).column("word", DataTypes.STRING()).build();Schema schema1 = Schema.newBuilder().column("id", DataTypes.INT()).column("name", DataTypes.STRING()).build();tableEnvironment.createTemporaryTable("aa_user", TableDescriptor.forConnector("filesystem").schema(schema).option("path", "/Users/xx/IdeaProjects/flink-demo/data/order.csv").format("csv").build());tableEnvironment.createTemporaryTable("bb_order", TableDescriptor.forConnector("filesystem").schema(schema1).option("path", "/Users/xx/IdeaProjects/flink-demo/data/user.csv").format("csv").build());String cost = tableEnvironment.explainSql("select * from aa_user inner join bb_order on `aa_user`.`count`=`bb_order`.`id`", ExplainDetail.ESTIMATED_COST);System.out.println(cost);

背景知识

需要了解calcite 里的基本知识,如AST,RelNode ,hepPlanner等等。
需要了解Flink 和Flink SQL里的一些知识

SQL转变流程图

SQL经过flink 里注册的每一个优化器,优化后,就能变成物理计划了,不过要变成执行代码,还要再经过代码生成。
在这里插入图片描述

问题

  • 问题1,FlinkBatchProgram
    所有flink优化器都是在这个类里添加的
object FlinkBatchProgram {val SUBQUERY_REWRITE = "subquery_rewrite"val TEMPORAL_JOIN_REWRITE = "temporal_join_rewrite"val DECORRELATE = "decorrelate"val DEFAULT_REWRITE = "default_rewrite"val PREDICATE_PUSHDOWN = "predicate_pushdown"val JOIN_REORDER = "join_reorder"val JOIN_REWRITE = "join_rewrite"val PROJECT_REWRITE = "project_rewrite"val WINDOW = "window"val LOGICAL = "logical"val LOGICAL_REWRITE = "logical_rewrite"val TIME_INDICATOR = "time_indicator"val PHYSICAL = "physical"val PHYSICAL_REWRITE = "physical_rewrite"val DYNAMIC_PARTITION_PRUNING = "dynamic_partition_pruning"val RUNTIME_FILTER = "runtime_filter}
  • 问题2,calcite 优化器和flink 如何结合的
    logical,physical 这两个优化器都是用的VolcanoPlanner,结合规则和代价。
    剩下的优化器HepPlanner,HepPlanner 完全使用规则。

  • 问题3,project_rewrite 后,为啥少了LogicalProject ReNode ?
    因为最后一个操作,logicalproject 这里就是把所有的字段查出来了,所有这一步实际上是不用的

  • 问题4,物理计划如何生成执行代码的?
    BatchPhysicalTableSourceScan 类

class BatchPhysicalTableSourceScan(cluster: RelOptCluster,traitSet: RelTraitSet,hints: util.List[RelHint],tableSourceTable: TableSourceTable)extends CommonPhysicalTableSourceScan(cluster, traitSet, hints, tableSourceTable)with BatchPhysicalRel {override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {val rowCnt = mq.getRowCount(this)if (rowCnt == null) {return null}val cpu = 0val rowSize = mq.getAverageRowSize(this)val size = rowCnt * rowSizeplanner.getCostFactory.makeCost(rowCnt, cpu, size)}// 这里生成的执行代码override def translateToExecNode(): ExecNode[_] = {val tableSourceSpec = new DynamicTableSourceSpec(tableSourceTable.contextResolvedTable,util.Arrays.asList(tableSourceTable.abilitySpecs: _*))tableSourceSpec.setTableSource(tableSourceTable.tableSource)new BatchExecTableSourceScan(unwrapTableConfig(this),tableSourceSpec,FlinkTypeFactory.toLogicalRowType(getRowType),getRelDetailedDescription)}
}
  • 问题5,为啥aa_user 表被广播,哪里实现的?

BatchPhysicalHashJoinRule 规则实现的

核心代码

 val leftSize = JoinUtil.binaryRowRelNodeSize(join.getLeft)val rightSize = JoinUtil.binaryRowRelNodeSize(join.getRight)// if it is not with hint, just check size of left and right side by statistic and config// if leftSize or rightSize is unknown, cannot use broadcastif (leftSize == null || rightSize == null) {return (false, false)}val threshold =tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD)val rightSizeSmallerThanThreshold = rightSize <= thresholdval leftSizeSmallerThanThreshold = leftSize <= thresholdval leftSmallerThanRight = leftSize < rightSizejoin.getJoinType match {case JoinRelType.LEFT => (rightSizeSmallerThanThreshold, false)case JoinRelType.RIGHT => (leftSizeSmallerThanThreshold, true)case JoinRelType.FULL => (false, false)case JoinRelType.INNER =>(leftSizeSmallerThanThreshold|| rightSizeSmallerThanThreshold,leftSmallerThanRight)// left side cannot be used as build side in SEMI/ANTI join.case JoinRelType.SEMI | JoinRelType.ANTI =>(rightSizeSmallerThanThreshold, false)}

主要就是实现

  def binaryRowRelNodeSize(relNode: RelNode): JDouble = {val mq = relNode.getCluster.getMetadataQueryval rowCount = mq.getRowCount(relNode)if (rowCount == null) {null} else {rowCount * FlinkRelMdUtil.binaryRowAverageSize(relNode)}}

最后还是到了FlinkRelMdColumnNullCount 这个类
从这个ts: TableScan 对象里取出来
那ts 对象又是在哪里赋值的,看这个FlinkRecomputeStatisticsProgram 类

class FlinkRelMdColumnNullCount private extends MetadataHandler[ColumnNullCount] {override def getDef: MetadataDef[ColumnNullCount] = FlinkMetadata.ColumnNullCount.DEF/*** Gets the null count of the given column in TableScan.** @param ts*   TableScan RelNode* @param mq*   RelMetadataQuery instance* @param index*   the index of the given column* @return*   the null count of the given column in TableScan*/def getColumnNullCount(ts: TableScan, mq: RelMetadataQuery, index: Int): JDouble = {Preconditions.checkArgument(mq.isInstanceOf[FlinkRelMetadataQuery])val relOptTable = ts.getTable.asInstanceOf[FlinkPreparingTableBase]val fieldNames = relOptTable.getRowType.getFieldNamesPreconditions.checkArgument(index >= 0 && index < fieldNames.size())val fieldName = fieldNames.get(index)val statistic = relOptTable.getStatisticval colStats = statistic.getColumnStats(fieldName)if (colStats != null && colStats.getNullCount != null) {colStats.getNullCount.toDouble} else {null}}}

ts是在这里赋值,这里最后会用调用具体的文件系统,找到文件行数

 private LogicalTableScan recomputeStatistics(LogicalTableScan scan) {final RelOptTable scanTable = scan.getTable();if (!(scanTable instanceof TableSourceTable)) {return scan;}FlinkContext context = ShortcutUtils.unwrapContext(scan);TableSourceTable table = (TableSourceTable) scanTable;boolean reportStatEnabled =context.getTableConfig().get(TABLE_OPTIMIZER_SOURCE_REPORT_STATISTICS_ENABLED)&& table.tableSource() instanceof SupportsStatisticReport;SourceAbilitySpec[] specs = table.abilitySpecs();PartitionPushDownSpec partitionPushDownSpec = getSpec(specs, PartitionPushDownSpec.class);FilterPushDownSpec filterPushDownSpec = getSpec(specs, FilterPushDownSpec.class);TableStats newTableStat =recomputeStatistics(table, partitionPushDownSpec, filterPushDownSpec, reportStatEnabled);FlinkStatistic newStatistic =FlinkStatistic.builder().statistic(table.getStatistic()).tableStats(newTableStat).build();TableSourceTable newTable = table.copy(newStatistic);return new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTable);}

相关文章:

一文速通calcite结合flink理解SQL从文本变成执行计划详细过程

文章目录 你可以学到啥测试代码背景知识SQL转变流程图问题 你可以学到啥 SQL如何一步步变成执行计划的有哪些优化器&#xff0c;哪些优化规则calcite 和flink 如何结合的 测试代码 EnvironmentSettings settings EnvironmentSettings.inBatchMode(); TableEnvironment tabl…...

spring-TransactionTemplate 编程式事务

TransactionTemplate 是 Spring 框架提供的用于管理事务的一种方式。它提供了一种编程式的事务管理方法&#xff0c;允许开发者在代码中显式地控制事务的开始、提交或回滚。与使用 Transactional 注解相比&#xff0c;TransactionTemplate 提供了更多的灵活性和控制力。 为什么…...

中考全国45套(全国教育发达地区中考试卷)

文章目录 获取方式 为什么选择这份资源&#xff1f; 权威性与全面性&#xff1a;我们精心搜集了全国教育发达地区的最新中考试卷&#xff0c;确保每一套试卷都代表了该地区的教学水平和考试趋势。这不仅涵盖了丰富的知识点&#xff0c;还融入了各地独特的命题风格&#xff0c;让…...

嵌入式Linux学习笔记(5)-进程间常见通讯方式(c语言实现)

一、概述 进程间通信&#xff08;IPC&#xff0c;InterProcess Communication&#xff09;是指在多个进程之间进行数据传输和共享的机制。在操作系统中&#xff0c;进程是运行中的程序的实例&#xff0c;每个进程都有自己的内存空间和资源。 进程间通信可以用于在不同的进程之间…...

【移动端】菜单的自动展开与收回

前言 为了满足手机上菜单栏随用户移动&#xff0c;菜单的自动展示与隐藏&#xff0c;特此记录 基本原理 实现逻辑 window.addEventListener(‘scroll’, debouncedScrollHandler) – 监听文档视图滚动事件 document.querySelector(‘.header’) – 选择器匹配元素 创建show和h…...

Java获取Object中Value的方法

在Java中&#xff0c;获取对象&#xff08;Object&#xff09;中的值通常依赖于对象的类型以及我们希望访问的属性。由于Java是一种静态类型语言&#xff0c;直接从一个Object类型中访问属性是不可能的&#xff0c;因为Object是所有类的超类&#xff0c;但它本身不包含任何特定…...

集群聊天服务器项目【C++】(二)Json的简单使用

在上一章中&#xff0c;简单介绍了本项目的内容、技术栈、需求和目标等&#xff0c;详细介绍了环境配置&#xff0c;如果还没有配置成功&#xff0c;请参考我的上一篇博客环境配置 今天主要介绍Json库是什么以及简单的使用。 1.为什么要使用Json 我们在网络传输数据时&#…...

班迪录屏和这三款录屏工具,一键操作,太方便了!

嘿&#xff0c;小伙伴们&#xff01;今天我要跟大家分享几款超棒的录屏工具&#xff0c;它们绝对是我们在工作和学习中不可或缺的好帮&#xff1b;这些工具功能强大且操作简单&#xff0c;下面就让我来详细介绍一下它们的使用体验和好用之处吧&#xff01; 班迪录屏工具使用体…...

DAY60Bellman_ford 算法

队列优化算法 请找出从城市 1 到城市 n 的所有可能路径中&#xff0c;综合政府补贴后的最低运输成本。 如果能够从城市 1 到连通到城市 n&#xff0c; 请输出一个整数&#xff0c;表示运输成本。如果该整数是负数&#xff0c;则表示实现了盈利。如果从城市 1 没有路径可达城市…...

Dubbo SPI源码

文章目录 Dubbo SPI使用方式AOP功能源码剖析SPI注解1.获取加载器2.获取拓展实例对象3.创建拓展类的实例对象 Dubbo SPI Dubbo 的 SPI&#xff08;Service Provider Interface&#xff09;机制是一种强大的扩展机制&#xff0c;它允许开发者在运行时动态地替换或增加框架的功能。…...

《C++代码高度优化之双刃剑:避免过度优化引发的“暗雷”》

在 C编程的世界里&#xff0c;追求高效性能的代码是每个开发者的目标之一。高度优化的 C代码可以带来显著的性能提升&#xff0c;让程序在运行速度、内存占用等方面表现出色。然而&#xff0c;正如一把双刃剑&#xff0c;过度优化可能会引入难以察觉的错误&#xff0c;给程序带…...

javascript网页设计案例

设计一个具有良好用户体验的 JavaScript 网页涉及多个方面&#xff0c;如用户界面&#xff08;UI&#xff09;、用户体验&#xff08;UX&#xff09;、交互设计等。以下是一些示例案例&#xff0c;展示了如何使用 JavaScript 创建功能丰富且吸引人的网页设计。 1. 响应式导航菜…...

初阶数据结构【TOP】- 11.普通二叉树的介绍 - 1. (细致,保姆~~!)

文章目录 前言一、普通二叉树的链式结构二、 造树三、普通二叉树的遍历四、遍历完整代码五、总结 前言 本篇文章笔者将会对普通二叉树部分进行细致的讲解 , 本篇主要包括以下内容: 二叉树链式结构的介绍 ,二叉树的遍历. 笔者会一步一步分析带学者领略递归的美好~~ 一、普通二叉…...

【pyenv】pyenv安装版本超时的解决方案

目录 1、现象 2、分析现象 3、手动下载所需版本 4、存放到指定路径 5、重新安装 6、pip失败&#xff08;做个记录&#xff0c;未找到原因&#xff09; 7、方法二修改环境变量方法 7.1 设置环境变量 7.2 更新 7.3 安装即可 8、方法三修改XML文件 前言&#xff1a;研…...

【新片场-注册安全分析报告-无验证方式导致安全隐患】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 1. 暴力破解密码&#xff0c;造成用户信息泄露 2. 短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉 3. 带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造…...

新160个crackme - 057-bbbs-crackme04

运行分析 因软件版本老旧&#xff0c;需使用windows XP虚拟机运行有个SystemID&#xff0c;值为12345678需破解User ID和Password PE分析 yC壳&#xff0c;32位 OD手动脱壳 使用windows XP虚拟机&#xff0c;将程序拖入OD按一下F8&#xff0c;ESP变红&#xff0c;根据ESP定律设…...

车机中 Android Audio 音频常见问题分析方法实践小结

文章目录 前言1. 无声2. 断音3. 杂音4. 延迟播放5. 焦点问题6. 无声问题(连上 BT )其他完善中…… 前言 本文主要总结了一下车机开发中遇到的 Audio 有关的问题&#xff0c;同时参考网上的一案例&#xff0c;由于Audio 模块出现音频问题的场景很多&#xff0c;对每一个出现的问…...

湘大 OJ 代码仓库

有时候不需要上传一些题解&#xff0c;想要上传一些纯代码就行&#xff0c;傻傻把代码上传到文章里面&#xff0c;感觉效率不是很高&#xff0c;还是建立一个代码仓库比较方便 需要会使用魔法可能才能访问&#xff0c;github代码仓库地址...

Ruoyi Cloud K8s 部署

本文视频版本:https://www.bilibili.com/video/BV1xF4Se3Esv 参考 https://blog.csdn.net/Equent/article/details/137779505 https://blog.csdn.net/weixin_48711696/article/details/138117392 https://zhuanlan.zhihu.com/p/470647732 https://gitee.com/y_project/Ruo…...

OpenGL Texture C++ Camera Filter滤镜

基于OpenGL Texture纹理的强大功能&#xff0c;在片段着色器&#xff08;Shader&#xff09;中编写GLSL代码&#xff0c;对YUV的数据进行数据转换从而实现视频编辑软件中的相机滤镜功能。 接上一篇OpenGL Texture C 预览Camera视频的功能实现&#xff0c;本篇来实现Camera滤镜效…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

【2025年】解决Burpsuite抓不到https包的问题

环境&#xff1a;windows11 burpsuite:2025.5 在抓取https网站时&#xff0c;burpsuite抓取不到https数据包&#xff0c;只显示&#xff1a; 解决该问题只需如下三个步骤&#xff1a; 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...

Axios请求超时重发机制

Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式&#xff1a; 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...

「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案

在移动互联网营销竞争白热化的当下&#xff0c;推客小程序系统凭借其裂变传播、精准营销等特性&#xff0c;成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径&#xff0c;助力开发者打造具有市场竞争力的营销工具。​ 一、系统核心功能架构&…...

uni-app学习笔记三十五--扩展组件的安装和使用

由于内置组件不能满足日常开发需要&#xff0c;uniapp官方也提供了众多的扩展组件供我们使用。由于不是内置组件&#xff0c;需要安装才能使用。 一、安装扩展插件 安装方法&#xff1a; 1.访问uniapp官方文档组件部分&#xff1a;组件使用的入门教程 | uni-app官网 点击左侧…...

Python常用模块:time、os、shutil与flask初探

一、Flask初探 & PyCharm终端配置 目的: 快速搭建小型Web服务器以提供数据。 工具: 第三方Web框架 Flask (需 pip install flask 安装)。 安装 Flask: 建议: 使用 PyCharm 内置的 Terminal (模拟命令行) 进行安装,避免频繁切换。 PyCharm Terminal 配置建议: 打开 Py…...

ThreadLocal 源码

ThreadLocal 源码 此类提供线程局部变量。这些变量不同于它们的普通对应物&#xff0c;因为每个访问一个线程局部变量的线程&#xff08;通过其 get 或 set 方法&#xff09;都有自己独立初始化的变量副本。ThreadLocal 实例通常是类中的私有静态字段&#xff0c;这些类希望将…...

【Ftrace 专栏】Ftrace 参考博文

ftrace、perf、bcc、bpftrace、ply、simple_perf的使用Ftrace 基本用法Linux 利用 ftrace 分析内核调用如何利用ftrace精确跟踪特定进程调度信息使用 ftrace 进行追踪延迟Linux-培训笔记-ftracehttps://www.kernel.org/doc/html/v4.18/trace/events.htmlhttps://blog.csdn.net/…...