当前位置: 首页 > news >正文

【ClickHouse源码】物化视图的写入过程

本文对 ClickHouse 物化视图的写入流程源码做个详细说明,基于 v22.8.14.53-lts 版本。

StorageMaterializedView

首先来看物化视图的构造函数:

StorageMaterializedView::StorageMaterializedView(const StorageID & table_id_,ContextPtr local_context,const ASTCreateQuery & query,const ColumnsDescription & columns_,bool attach_,const String & comment): IStorage(table_id_), WithMutableContext(local_context->getGlobalContext())
{StorageInMemoryMetadata storage_metadata;storage_metadata.setColumns(columns_);......if (!has_inner_table){target_table_id = query.to_table_id;}else if (attach_){/// If there is an ATTACH request, then the internal table must already be created.target_table_id = StorageID(getStorageID().database_name, generateInnerTableName(getStorageID()), query.to_inner_uuid);}else{/// We will create a query to create an internal table.auto create_context = Context::createCopy(local_context);auto manual_create_query = std::make_shared<ASTCreateQuery>();manual_create_query->setDatabase(getStorageID().database_name);manual_create_query->setTable(generateInnerTableName(getStorageID()));manual_create_query->uuid = query.to_inner_uuid;auto new_columns_list = std::make_shared<ASTColumns>();new_columns_list->set(new_columns_list->columns, query.columns_list->columns->ptr());manual_create_query->set(manual_create_query->columns_list, new_columns_list);manual_create_query->set(manual_create_query->storage, query.storage->ptr());InterpreterCreateQuery create_interpreter(manual_create_query, create_context);create_interpreter.setInternal(true);create_interpreter.execute();target_table_id = DatabaseCatalog::instance().getTable({manual_create_query->getDatabase(), manual_create_query->getTable()}, getContext())->getStorageID();}
}

通过以上代码可以发现物化视图支持几种创建语法,总的来说可以归为 3 类:

  1. 指定了目的表的情况:

    create table src(id Int32) Engine=Memory();
    create table dest(id Int32) Engine=Memory();create materialized view mv to dest as select * from src;
    

    使用以上形式时,target_table_id 会选择 dest 表的 table_id

  2. 不指定目的表的情况:

    create table src(id Int32) Engine=Memory();create materialized view mv Engine=Memory() as select * from src;
    

    使用以上形式时,首先会根据源表的 table_id 生成一个以 .inner. 开头的目的表名,如 .inner.5ef4ec2c-efb1-4918-bf6c-59de2edb54cf,然后在生成一个随机的 uuid 作为目的表的 table_id 并同时作为 target_table_id

  3. 第 3 种其实不是创建语法,而是在 ClickHouse 启动或者物化视图被 detach 掉后,执行 attach 的实现。

StorageMaterializedView::read

void StorageMaterializedView::read(QueryPlan & query_plan,const Names & column_names,const StorageSnapshotPtr & storage_snapshot,SelectQueryInfo & query_info,ContextPtr local_context,QueryProcessingStage::Enum processed_stage,const size_t max_block_size,const size_t num_streams)
{/// 获取目的表实例auto storage = getTargetTable();auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);auto target_metadata_snapshot = storage->getInMemoryMetadataPtr();auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot, local_context);if (query_info.order_optimizer)query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context);storage->read(query_plan, column_names, target_storage_snapshot, query_info, local_context, processed_stage, max_block_size, num_streams);if (query_plan.isInitialized()){/// 获取物化视图 stream 中对应的 block 结构auto mv_header = getHeaderForProcessingStage(column_names, storage_snapshot, query_info, local_context, processed_stage);/// 获取查询语句中所需的列对应的 block 结构auto target_header = query_plan.getCurrentDataStream().header;/// 从查询的列中去除那些mv不存在的列removeNonCommonColumns(mv_header, target_header);/// 分布式表引擎在查询处理到指定阶段,header 中可能不包含物化视图中的所有列,例如 group by/// 所以从 mv_header 中去除那些查询不需要的列removeNonCommonColumns(target_header, mv_header);/// 当查询中得到的 mv_header 和 target_header 有不同结构时,会通过在 pipeline 中添加表达式计算来进行转换/// 比如 Decimal(38, 6) -> Decimal(16, 6),或者一些聚合运算,如 sum 等if (!blocksHaveEqualStructure(mv_header, target_header)){auto converting_actions = ActionsDAG::makeConvertingActions(target_header.getColumnsWithTypeAndName(),mv_header.getColumnsWithTypeAndName(),ActionsDAG::MatchColumnsMode::Name);auto converting_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), converting_actions);converting_step->setStepDescription("Convert target table structure to MaterializedView structure");query_plan.addStep(std::move(converting_step));}query_plan.addStorageHolder(storage);query_plan.addTableLock(std::move(lock));}
}

通过以上代码可以看出,物化视图是一种逻辑描述,数据都是存储在目的表中,读取时实际操作的目的表,并且在在查询过程中还会涉及到多阶段 block 的转换,以及表达式的计算。

StorageMaterializedView::write

SinkToStoragePtr StorageMaterializedView::write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr local_context)
{auto storage = getTargetTable();auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);auto metadata_snapshot = storage->getInMemoryMetadataPtr();auto sink = storage->write(query, metadata_snapshot, local_context);sink->addTableLock(lock);return sink;
}

同样写也是将数据存入了目的表。


我们都知道数据写源表时会触发写物化视图,从而将数据写入目的表,下面就看一下是如何实现的。SQL 的执行都是通过 IInterpreterInterpreterXxx 的,这里就不再多说,一个写入操作最中会调用 InterpreterInsertQuery,所以从 InterpreterInsertQuery::execute() 开始跟踪。

InterpreterInsertQuery::execute()

BlockIO InterpreterInsertQuery::execute()
{......std::vector<Chain> out_chains;if (!distributed_pipeline || query.watch){size_t out_streams_size = 1;......for (size_t i = 0; i < out_streams_size; ++i){auto out = buildChainImpl(table, metadata_snapshot, query_sample_block, nullptr, nullptr);out_chains.emplace_back(std::move(out));}}......
}

execute() 中通过 buildChainImpl() 来构建输出链, buildChainImpl() 会判断当前表是否有物化视图关联,如果有就会调用 buildPushingToViewsChain()

buildPushingToViewsChain()

这个方法非常长,这里只展示和本文想说明的问题相关的部分。

Chain buildPushingToViewsChain(const StoragePtr & storage,const StorageMetadataPtr & metadata_snapshot,ContextPtr context,const ASTPtr & query_ptr,bool no_destination,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms,const Block & live_view_header)
{......auto table_id = storage->getStorageID();auto views = DatabaseCatalog::instance().getDependentViews(table_id);......std::vector<Chain> chains;for (const auto & view_id : views){auto view = DatabaseCatalog::instance().tryGetTable(view_id, context);......if (auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get())){......StoragePtr inner_table = materialized_view->getTargetTable();auto inner_table_id = inner_table->getStorageID();auto inner_metadata_snapshot = inner_table->getInMemoryMetadataPtr();query = view_metadata_snapshot->getSelectQuery().inner_query;target_name = inner_table_id.getFullTableName();Block header;/// Get list of columns we get from select query.if (select_context->getSettingsRef().allow_experimental_analyzer)header = InterpreterSelectQueryAnalyzer::getSampleBlock(query, select_context);elseheader = InterpreterSelectQuery(query, select_context, SelectQueryOptions().analyze()).getSampleBlock();/// Insert only columns returned by select.Names insert_columns;const auto & inner_table_columns = inner_metadata_snapshot->getColumns();for (const auto & column : header){/// But skip columns which storage doesn't have.if (inner_table_columns.hasPhysical(column.name))insert_columns.emplace_back(column.name);}InterpreterInsertQuery interpreter(nullptr, insert_context, false, false, false);out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms);out.addStorageHolder(view);out.addStorageHolder(inner_table);}else if (auto * live_view = dynamic_cast<StorageLiveView *>(view.get())){runtime_stats->type = QueryViewsLogElement::ViewType::LIVE;query = live_view->getInnerQuery(); // Used only to log in system.query_views_logout = buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), true, thread_status_holder, view_counter_ms, storage_header);}else if (auto * window_view = dynamic_cast<StorageWindowView *>(view.get())){runtime_stats->type = QueryViewsLogElement::ViewType::WINDOW;query = window_view->getMergeableQuery(); // Used only to log in system.query_views_logout = buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), true, thread_status_holder, view_counter_ms);}elseout = buildPushingToViewsChain(view, view_metadata_snapshot, insert_context, ASTPtr(), false, thread_status_holder, view_counter_ms);......
}

buildPushingToViewsChain() 会检查当前表是否有视图依赖,通过几个判断可以看出视图分为三种:物化视图、实时视图和窗口视图,最后的 else 是指当前表是个普通表。如果当前表是源表且有物化视图依赖,就会调用 buildPushingToViewsChain() 来构建链,这是个递归调用,首次进入当前表是普通表,其依赖的物化视图会再次调用该方法,再次进入就会物化视图的 if 逻辑,最终是通过 buildChain() 来构建链。

buildChainImpl

buildChain() 中是调用了 buildChainImpl() 这个实现类。

Chain InterpreterInsertQuery::buildChainImpl(const StoragePtr & table,const StorageMetadataPtr & metadata_snapshot,const Block & query_sample_block,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms)
{....../// We create a pipeline of several streams, into which we will write data.Chain out;/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyedout.addInterpreterContext(context_ptr);/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.///       Otherwise we'll get duplicates when MV reads same rows again from Kafka.if (table->noPushingToViews() && !no_destination){auto sink = table->write(query_ptr, metadata_snapshot, context_ptr);sink->setRuntimeData(thread_status, elapsed_counter_ms);out.addSource(std::move(sink));}else{out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);}......
}

buildChainImpl() 会根据当前表(或视图)是否有依赖的视图或目的表,来做不同的操作,这里就可以处理视图级连视图的情况,会不断递归构造相应的链节点,使之连接起来。

Chain InterpreterInsertQuery::buildChainImpl(const StoragePtr & table,const StorageMetadataPtr & metadata_snapshot,const Block & query_sample_block,ThreadStatusesHolderPtr thread_status_holder,std::atomic_uint64_t * elapsed_counter_ms)
{.../// We create a pipeline of several streams, into which we will write data.Chain out;/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyedout.addInterpreterContext(context_ptr);/// NOTE: we explicitly ignore bound materialized views when inserting into Kafka Storage.///       Otherwise we'll get duplicates when MV reads same rows again from Kafka.if (table->noPushingToViews() && !no_destination)  // table->noPushingToViews() 用于禁止物化视图插入数据到 KafkaEngine{auto sink = table->write(query_ptr, metadata_snapshot, context_ptr);sink->setRuntimeData(thread_status, elapsed_counter_ms);out.addSource(std::move(sink));}else  // 构建物化视图插入 pushingToViewChain,重点!!!{out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, query_ptr, no_destination, thread_status_holder, elapsed_counter_ms);}...return out;
}

小结

所以源表和物化视图在写入时是构造了多个输出链,数据也是只能对当前写入的数据做操作,不会影响源表现有数据。而且写入源表和目的表的过程是一个 pipeline,需要全部完成才算写入成功,当然 pipeline 可以并行处理,可以加快写入速度。


欢迎添加微信:xiedeyantu,讨论技术问题。

相关文章:

【ClickHouse源码】物化视图的写入过程

本文对 ClickHouse 物化视图的写入流程源码做个详细说明&#xff0c;基于 v22.8.14.53-lts 版本。 StorageMaterializedView 首先来看物化视图的构造函数&#xff1a; StorageMaterializedView::StorageMaterializedView(const StorageID & table_id_,ContextPtr local_…...

.NET 使用NLog增强日志输出

引言 不管你是开发单体应用还是微服务应用&#xff0c;在实际的软件的开发、测试和运行阶段&#xff0c;开发者都需要借助日志来定位问题。因此一款好的日志组件将至关重要&#xff0c;在.NET 的开源生态中&#xff0c;目前主要有Serilog、Log4Net和NLog三款优秀的日志组件&…...

一道阿里类的初始化顺序笔试题

问题很简单&#xff0c;就是下面的代码打印出什么&#xff1f; public class InitializeDemo {private static int k 1;private static InitializeDemo t1 new InitializeDemo("t1" );private static InitializeDemo t2 new InitializeDemo("t2");priv…...

cuda找不到路径报错

编译C文件时出现&#xff1a;error: [Errno 2] No such file or directory: :/usr/local/cuda:/usr/local/cuda/bin/nvcc 在终端输入&#xff1a; export CUDA_HOME/usr/local/cuda...

Elasticsearch进阶之(核心概念、系统架构、路由计算、倒排索引、分词、Kibana)

Elasticsearch进阶之&#xff08;核心概念、系统架构、路由计算、倒排索引、分词、Kibana&#xff09; 1、核心概念&#xff1a; 1.1、索引&#xff08;Index&#xff09; 一个索引就是一个拥有几分相似特征的文档的集合。比如说&#xff0c;你可以有一个客户数据的索引&…...

Android包体积缩减

关于减小包体积的方案&#xff1a; 一、所有的图片压缩&#xff0c;采用webp 格式。 &#xff08;当然有些图片采用webp格式反而变大了&#xff0c;可以仍采用png格式&#xff09; 二、语音资源过滤 只保留中文 resConfigs "zh-rCN", "zh” 可以减少resourc…...

【华为OD机试】 网上商城优惠活动(C++ Java Javascript Python)

文章目录 题目描述输入描述输出描述备注用例题目解析C++JavaScriptJavaPython题目描述 某网上商场举办优惠活动,发布了满减、打折、无门槛3种优惠券,分别为: 每满100元优惠10元,无使用数限制,如100199元可以使用1张减10元,200299可使用2张减20元,以此类推;92折券,1次…...

GWT安装过程

1:安装前准备 &#xff08;可以问我要&#xff09; appengine-java-sdk-1.9.8 com.google.gdt.eclipse.suite.4.3.update.site_3.8.0 gwt-2.5.1 eclipse-jee-kepler-SR2-win32-x86_64.zip 2&#xff1a;安装环境上 打开eclipse Help –Install New Software… 选择Add –…...

代码随想录算法训练营第一天| 704. 二分查找、27. 移除元素

Leetcode 704 二分查找题目链接&#xff1a;704二分查找介绍给定一个 n 个元素有序的&#xff08;升序&#xff09;整型数组 nums 和一个目标值 target &#xff0c;写一个函数搜索 nums 中的 target&#xff0c;如果目标值存在返回下标&#xff0c;否则返回 -1。思路先看看一个…...

office@word@ppt启用mathtype组件方法整理

文章目录将mathtype添加到word中ref查看office安装路径文件操作法Note附PPT中使用mathtype将mathtype添加到word中 先安装office,再安装mathtype,那么这个过程是自动的如果是先安装mathtype,再安装office,那么有以下选择: 重新安装一遍mathtype(比较简单,不需要说明)执行文件操…...

计算机大小端

我们先假定内存结构为上下型的&#xff0c;上代表内存高地址&#xff0c;下代表内存低地址。 电脑读取内存数据时&#xff0c;是从低位地址到高位地址进行读取&#xff08;从下到上&#xff09;。 1、何为大小端 大端&#xff1a;数据的高位字节存放在低地址&#xff0c;数据…...

Matplotlib绘图从零入门到实践(含各类用法详解)

一、引入 Matplotlib 是一个Python的综合库&#xff0c;用于在 Python 中创建静态、动画和交互式可视化。 本教程包含笔者在使用Matplotlib库过程中遇到的各类完整实例与用法还有遇到的库理论问题&#xff0c;可以根据自己的需要在目录中查询对应的用法、实例以及第四部分关于…...

C语言 入门教程||C语言 指针||C语言 字符串

C语言 指针 学习 C 语言的指针既简单又有趣。通过指针&#xff0c;可以简化一些 C 编程任务的执行&#xff0c;还有一些任务&#xff0c;如动态内存分配&#xff0c;没有指针是无法执行的。所以&#xff0c;想要成为一名优秀的 C 程序员&#xff0c;学习指针是很有必要的。 …...

Nacos2.x+Nginx集群配置

一、配置 nacos 集群 注意&#xff1a;需要先配置好 nacos 连接本地数据库 1、拷贝三份 nacos 2、修改配置文件&#xff08;cluster.conf&#xff09; 修改启动端口&#xff1a; nacos1&#xff1a;8818 nacos2&#xff1a;8828 nacos3&#xff1a;8838 当nacos客户端升级为…...

Android源码分析 - InputManagerService与触摸事件

0. 前言 有人问到&#xff1a;“通过TouchEvent&#xff0c;你可以获得到当前的触点&#xff0c;它更新的频率和屏幕刷新的频率一样吗&#xff1f;”。听到这个问题的时候我感到很诧异&#xff0c;我们知道Android是事件驱动机制的设计&#xff0c;可以从多种服务中通过IPC通信…...

python库--urllib

目录 一.urllib导入 二.urllib爬取网页 三.Headers属性 1.使用build_opener()修改报头 2.使用add_header()添加报头 四.超时设置 五.get和post请求 1.get请求 2.post请求 urllib库和request库作用差不多&#xff0c;但比较起来request库更加容易上手&#xff0c;但该了…...

美团前端二面常考react面试题及答案

什么原因会促使你脱离 create-react-app 的依赖 当你想去配置 webpack 或 babel presets。 React 16中新生命周期有哪些 关于 React16 开始应用的新生命周期&#xff1a; 可以看出&#xff0c;React16 自上而下地对生命周期做了另一种维度的解读&#xff1a; Render 阶段&a…...

环境搭建04-Ubuntu16.04更改conda,pip的镜像源

我常用的pipy国内镜像源&#xff1a; https://pypi.tuna.tsinghua.edu.cn/simple # 清华 http://mirrors.aliyun.com/pypi/simple/ # 阿里云 https://pypi.mirrors.ustc.edu.cn/simple/ #中国科技大学1、将conda的镜像源修改为国内的镜像源 先查看conda安装的信息…...

【C++进阶】四、STL---set和map的介绍和使用

目录 一、关联式容器 二、键值对 三、树形结构的关联式容器 四、set的介绍及使用 4.1 set的介绍 4.2 set的使用 五、multiset的介绍及使用 六、map的介绍和使用 6.1 map的介绍 6.2 map的使用 七、multimap的介绍和使用 一、关联式容器 前面已经接触过 STL 中的部分…...

JavaSE学习进阶 day1_01 static关键字和静态代码块的使用

好的现在我们进入进阶部分的学习&#xff0c;看一张版图&#xff1a; 前面我们已经学习完基础班的内容了&#xff0c;现在我们已经来到了第二板块——基础进阶&#xff0c;这部分内容就不是那么容易了。学完第二板块&#xff0c;慢慢就在向java程序员靠拢了。 面向对象进阶部分…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

R语言AI模型部署方案:精准离线运行详解

R语言AI模型部署方案:精准离线运行详解 一、项目概述 本文将构建一个完整的R语言AI部署解决方案,实现鸢尾花分类模型的训练、保存、离线部署和预测功能。核心特点: 100%离线运行能力自包含环境依赖生产级错误处理跨平台兼容性模型版本管理# 文件结构说明 Iris_AI_Deployme…...

Java 8 Stream API 入门到实践详解

一、告别 for 循环&#xff01; 传统痛点&#xff1a; Java 8 之前&#xff0c;集合操作离不开冗长的 for 循环和匿名类。例如&#xff0c;过滤列表中的偶数&#xff1a; List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...

工程地质软件市场:发展现状、趋势与策略建议

一、引言 在工程建设领域&#xff0c;准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具&#xff0c;正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...

Vue2 第一节_Vue2上手_插值表达式{{}}_访问数据和修改数据_Vue开发者工具

文章目录 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染2. 插值表达式{{}}3. 访问数据和修改数据4. vue响应式5. Vue开发者工具--方便调试 1.Vue2上手-如何创建一个Vue实例,进行初始化渲染 准备容器引包创建Vue实例 new Vue()指定配置项 ->渲染数据 准备一个容器,例如: …...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题&#xff0c;前来答题。 每个人对刷题理解是不同&#xff0c;有的人是看了writeup就等于刷了&#xff0c;有的人是收藏了writeup就等于刷了&#xff0c;有的人是跟着writeup做了一遍就等于刷了&#xff0c;还有的人是独立思考做了一遍就等于刷了。…...

以光量子为例,详解量子获取方式

光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学&#xff08;silicon photonics&#xff09;的光波导&#xff08;optical waveguide&#xff09;芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中&#xff0c;光既是波又是粒子。光子本…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...