当前位置: 首页 > news >正文

Doris-Routine Load(二十七)

例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。

适用场景

当前仅支持从 Kafka 系统进行例行导入,使用限制:

(1)支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。

(2)支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。

(3)默认支持 Kafka 0.10.0.0(含)以上版本。如果要使用 Kafka 0.10.0.0 以下版本(0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 be 的配置,将 kafka_broker_version_fallback 的值设置为要兼容的旧版本,或者在创建 routine load 的时候直接设置property.broker.version.fallback的值为要兼容的旧版本,使用旧版本的代价是 routine load 的部分新特性可能无法使用,如根据时间设置 kafka 分区的 offset。

基本原理

如上图,Client 向 FE 提交一个例行导入作业。

(1)FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。

(2)在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。

(3)FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的Task 进行重试。

(4)整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。

基本语法

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

执行 HELP ROUTINE LOAD 可以查看语法帮助,下面是参数说明

1)[db.]job_name

导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。

2)tbl_name

指定需要导入的表的名称。

3)merge_type

数据的合并类型,一共支持三种类型 APPEND、DELETE、MERGE 其中,APPEND 是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据 key 相同的所有行,MERGE 语义 需要与 delete on 条件联合使用,表示满足 delete 条件的数据按照 DELETE 语义处理其余的按照 APPEND 语义处理 , 语法为

[WITHMERGE|APPEND|DELETE]
4)load_properties

用于描述导入数据。语法:

[column_separator], [columns_mapping], [where_predicates], [delete_on_predicates], [source_sequence], [partitions], [preceding_predicates]

(1)column_separator:

指定列分隔符,如: COLUMNS TERMINATED BY "," 这个只在文本数据导入的时候需要指定,JSON 格式的数据导入不需要指定这个参数。

默认为:\t

(2)columns_mapping:

指定源数据中列的映射关系,以及定义衍生列的生成方式。

映射列:

按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。假设目的表有三列 k1, k2, v1。源数据有 4 列,其中第 1、2、4 列分别对应 k2, k1, v1。则书写如下:

COLUMNS (k2, k1, xxx, v1)

其中 xxx 为不存在的一列,用于跳过源数据中的第三列。

衍生列:

以 col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。 衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。 接上一个示例,假设目的表还有第 4 列 v2,v2 由 k1 和 k2 的和产生。则可以书写如下:

COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

再举例,假设用户需要导入只包含 k1 一列的表,列类型为 int。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以 100。这个功能可以通过 case when 函数实现,正确写法应如下:

COLUMNS (xx, k1 = case when xx < 0 then cast(-xx as varchar) else cast((xx + '100') as varchar) end)

(3)where_predicates

用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。 例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:

WHERE k1 > 100 and k2 = 1000

(4)partitions

指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。

示例:

PARTITION(p1, p2, p3)

(5)delete_on_predicates

表示删除条件,仅在 merge type 为 MERGE 时有意义,语法与 where 相同

(6)source_sequence:

只适用于 UNIQUE_KEYS,相同 key 列下,保证 value 列按照 source_sequence 列进行REPLACE, source_sequence 可以是数据源中的列,也可以是表结构中的一列。

(7)preceding_predicates

PRECEDING FILTER predicate用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。

5)job_properties

用于指定例行导入作业的通用参数。 语法:

PROPERTIES ("key1" = "val1","key2" = "val2"
)

目前支持以下参数:

(1)desired_concurrent_number

期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于 0。默认为 3。 这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。

一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:

Min(partition num, desired_concurrent_number, alive_backend_num, 
Config.max_routine_load_task_concurrrent_num)

其中 Config.max_routine_load_task_concurrrent_num 是系统的一个默认的最大并发数限制。这是一个 FE 配置,可以通过改配置调整。默认为 5。

其中 partition num 指订阅的 Kafka topic 的 partition 数量。alive_backend_num 是当前正常的 BE 节点数。

(2)max_batch_interval/max_batch_rows/max_batch_size这三个参数分别表示:

  • ① 每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为 10。

  • ② 每个子任务最多读取的行数。必须大于等于 200000。默认是 200000。

  • ③ 每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是100MB。

这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。 例如:

"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"

(3)max_error_number

采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。

采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。 被 where 条件过滤掉的行不算错误行

(4)strict_mode

是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为NULL,则会被过滤。指定方式为 "strict_mode" = "true"

(5)timezone

指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果

(6)format

指定导入数据格式,默认是 csv,支持 json 格式

(7)jsonpaths

jsonpaths: 导入 json 方式分为:简单模式和匹配模式。如果设置了jsonpath 则为匹配模式导入,否则为简单模式导入,具体可参考示例

(8)strip_outer_array

布尔类型,为 true 表示 json 数据以数组对象开始且将数组对象中进行展平,默认值是false

(9)json_root

json_root 为合法的 jsonpath 字符串,用于指定 json document 的根节点,默认值为""

(10)send_batch_parallelism

整型,用于设置发送批处理数据的并行度,如果并行度的值超过BE配置中的max_send_batch_parallelism_per_job,那么作为协调点的BE将使用max_send_batch_parallelism_per_job 的值

6)data_source_properties

数据源的类型。当前支持:Kafka

("key1" = "val1","key2" = "val2"
)

相关文章:

Doris-Routine Load(二十七)

例行导入&#xff08;Routine Load&#xff09;功能为用户提供了一种自动从指定数据源进行数据导入的功能。 适用场景 当前仅支持从 Kafka 系统进行例行导入&#xff0c;使用限制&#xff1a; &#xff08;1&#xff09;支持无认证的 Kafka 访问&#xff0c;以及通过 SSL 方…...

linux驱动.之 网络udp应用层测试工具demon(一)

绑定vlan&#xff0c;网卡的demon&#xff0c;如果有多个网卡&#xff0c;多个vlan&#xff0c;网卡的ip设置成一致&#xff0c;那就不能只简单绑定ip来创建socket&#xff0c; 需要绑定网卡设备 客户端udp_client.c #include <stdio.h> #include <string.h> #inc…...

【Flutter】graphic图表的快速上手

简介 graphic是一个数据可视化语法和Flutter图表库。 官方github示例 网上可用资源很少,只有作者的几篇文章,并且没有特别详细的文档,使用的话还是需要一定的时间去调研,在此简单记录。 示例 以折线图为例(因为我只用到了折线图,但其他的图大差不差) 创建一个两个文…...

DeepMind 推出 OPRO 技术,可用于优化 ChatGPT 提示

本心、输入输出、结果 文章目录 DeepMind 推出 OPRO 技术&#xff0c;可用于优化 ChatGPT 提示前言消息摘要OPRO的工作原理DeepMind的研究相关链接花有重开日&#xff0c;人无再少年实践是检验真理的唯一标准 DeepMind 推出 OPRO 技术&#xff0c;可用于优化 ChatGPT 提示 编辑…...

企业网络中的身份安全

随着近年来数字化转型的快速发展&#xff0c;企业使用的数字身份数量急剧增长。身份不再仅仅局限于用户。它们现在扩展到设备、应用程序、机器人、第三方供应商和组织中员工以外的其他实体。即使在用户之间&#xff0c;也存在不同类型的身份&#xff0c;例如属于IT管理员、远程…...

智能优化算法应用:基于正余弦算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于正余弦算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于正余弦算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.正余弦算法4.实验参数设定5.算法结果6.参考文献7.…...

创建一个带有背景图层和前景图层的渲染窗口

开发环境&#xff1a; Windows 11 家庭中文版Microsoft Visual Studio Community 2019VTK-9.3.0.rc0vtk-example demo解决问题&#xff1a; 创建一个带有背景图层和前景图层的渲染窗口&#xff0c;知识点&#xff1a;1. 画布转image&#xff1b;2. 渲染图层设置&#xff1b;3.…...

Docker 运行 Oracle Autonomous Database Free Container

​ Docker 运行 Oracle Autonomous Database Free Container Oracle Autonomous Database Free Container Image 介绍通过 Docker 运行 Oracle Autonomous Database Free ContainerWallet 配置可用的 TNS 别名MY_ATP TNS 别名MY_ADW TNS 别名连接到 Oracle Autonomous Databas…...

《2023全球隐私计算报告》正式发布!

2023全球隐私计算报告 1、2023全球隐私计算图谱2、国内外隐私计算相关政策3、隐私计算技术的最新发展4、隐私计算技术的合规挑战5、隐私计算的应用市场动态6、隐私计算开源整体趋势7、隐私计算的未来趋势 11月23日&#xff0c;由浙江省人民政府、商务部共同主办&#xff0c;杭州…...

JAVA sql 查询2

SELECT * FROM employees order by salayr DESC SELECT employee_id,first_name,salary from employees ORDER BY salary,employee_id desc -- 最大值 最小值 总和 平均值 SELECT max(salary),MIN(salary),sum(salary),AVG(salary) FROM employees -- 总共有多少员工 select…...

为第一个原生Spring5应用程序添加上Log4J日志框架!

&#x1f609;&#x1f609; 学习交流群&#xff1a; ✅✅1&#xff1a;这是孙哥suns给大家的福利&#xff01; ✨✨2&#xff1a;我们免费分享Netty、Dubbo、k8s、Mybatis、Spring...应用和源码级别的视频资料 &#x1f96d;&#x1f96d;3&#xff1a;QQ群&#xff1a;583783…...

单片机复位电路

有时候我们的代码会跑飞,这个时候基本上是一切推到重来.”推倒重来”在计算机术语上称为复位.复位需要硬件的支持,复位电路就是在单片机的复位管脚上产生一个信号&#xff0c;俗称复位信号.这个信号需要持续一定的时间,单片机收到该信号之后就会复位,从头执行。 复位原理: 那么…...

11.28 知识回顾(Web框架、路由控制、视图层)

一、 web 框架 1.1 web框架是什么&#xff1f; 别人帮咱们写了一些基础代码------》我们只需要在固定的位置写固定的代码--》就能实现一个web应用 Web框架&#xff08;Web framework&#xff09;是一种开发框架&#xff0c;用来支持动态网站、网络应用和网络服务的开发。这大多…...

osgFX扩展库-异性光照、贴图、卡通特效(1)

本章将简单介绍 osgFX扩展库及osgSim 扩展库。osgFX库用得比较多,osgSim库不常用&#xff0c;因此&#xff0c;这里只对这个库作简单的说明。 osgFX扩展库 osgFX是一个OpenSceneGraph 的附加库&#xff0c;是一个用于实现一致、完备、可重用的特殊效果的构架工具&#xff0c;其…...

SELinux零知识学习三十一、SELinux策略语言之角色和用户(2)

接前一篇文章:SELinux零知识学习三十、SELinux策略语言之角色和用户(1) 三、SELinux策略语言之角色和用户 SELinux提供了一种依赖于类型强制(类型增强,TE)的基于角色的访问控制(Role-Based Access Control),角色用于组域类型和限制域类型与用户之间的关系,SELinux中…...

Unity UGUI的自动布局-LayoutGroup(水平布局)组件

Horizontal Layout Group | Unity UI | 1.0.0 1. 什么是HorizontalLayoutGroup组件&#xff1f; HorizontalLayoutGroup是Unity UGUI中的一种布局组件&#xff0c;用于在水平方向上对子物体进行排列和布局。它可以根据一定的规则自动调整子物体的位置和大小&#xff0c;使它们…...

【SpringCloud】设计原则之分层架构与统一通信协议

一、设计原则之分层架构 应用分层看起来很简单&#xff0c;但每个程序员都有自己的一套方法&#xff0c;哪怕是初学者&#xff0c;所以实施起来并非易事 最早接触的分层架构应该是最熟悉的 MVC&#xff08;Model - View - Controller&#xff09;架构&#xff0c;其将应用分成…...

在Linux环境如何启动和redis数据库?

Linux中连接redis数据库&#xff1a; 前台启动&#xff1a; 第一步&#xff1a;redis-server:服务器启动命令 当我们启动改窗口后&#xff0c;出现如下所示&#xff1a; 该窗口就不能关闭&#xff0c;否则会出现redis无法使用的情况&#xff0c;重新打开一个窗口&#xff0c…...

selenium判断元素是否存在的方法

文章目录 快捷方法完整示例程序 快捷方法 selenium没有exist_xxx相关的方法&#xff0c;无法直接判断元素存在。但是锁定元素时使用的browser.find_elements(By.CSS_SELECTOR, "css元素")会返回一个列表list&#xff0c;如果不存在这个元素就会返回一个空列表。因此…...

后端真批量新增的使用

1,添加真批量新增抽象接口 public interface EasyBaseMapper extends BaseMapper { /** * 批量插入 仅适用于mysql * * return 影响行数 */ Integer insertBatchSomeColumn(Collection entityList); } 2,新增类,添加真批量新增的方法 public class InsertBatchSqlInjector ext…...

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站&#xff0c;会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后&#xff0c;网站没有变化的情况。 不熟悉siteground主机的新手&#xff0c;遇到这个问题&#xff0c;就很抓狂&#xff0c;明明是哪都没操作错误&#x…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?

在建筑行业&#xff0c;项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升&#xff0c;传统的管理模式已经难以满足现代工程的需求。过去&#xff0c;许多企业依赖手工记录、口头沟通和分散的信息管理&#xff0c;导致效率低下、成本失控、风险频发。例如&#…...

五年级数学知识边界总结思考-下册

目录 一、背景二、过程1.观察物体小学五年级下册“观察物体”知识点详解&#xff1a;由来、作用与意义**一、知识点核心内容****二、知识点的由来&#xff1a;从生活实践到数学抽象****三、知识的作用&#xff1a;解决实际问题的工具****四、学习的意义&#xff1a;培养核心素养…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

JAVA后端开发——多租户

数据隔离是多租户系统中的核心概念&#xff0c;确保一个租户&#xff08;在这个系统中可能是一个公司或一个独立的客户&#xff09;的数据对其他租户是不可见的。在 RuoYi 框架&#xff08;您当前项目所使用的基础框架&#xff09;中&#xff0c;这通常是通过在数据表中增加一个…...

Vite中定义@软链接

在webpack中可以直接通过符号表示src路径&#xff0c;但是vite中默认不可以。 如何实现&#xff1a; vite中提供了resolve.alias&#xff1a;通过别名在指向一个具体的路径 在vite.config.js中 import { join } from pathexport default defineConfig({plugins: [vue()],//…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...

Scrapy-Redis分布式爬虫架构的可扩展性与容错性增强:基于微服务与容器化的解决方案

在大数据时代&#xff0c;海量数据的采集与处理成为企业和研究机构获取信息的关键环节。Scrapy-Redis作为一种经典的分布式爬虫架构&#xff0c;在处理大规模数据抓取任务时展现出强大的能力。然而&#xff0c;随着业务规模的不断扩大和数据抓取需求的日益复杂&#xff0c;传统…...

AI语音助手的Python实现

引言 语音助手(如小爱同学、Siri)通过语音识别、自然语言处理(NLP)和语音合成技术,为用户提供直观、高效的交互体验。随着人工智能的普及,Python开发者可以利用开源库和AI模型,快速构建自定义语音助手。本文由浅入深,详细介绍如何使用Python开发AI语音助手,涵盖基础功…...