当前位置: 首页 > news >正文

Doris-Routine Load(二十七)

例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。

适用场景

当前仅支持从 Kafka 系统进行例行导入,使用限制:

(1)支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。

(2)支持的消息格式为 csv, json 文本格式。csv 每一个 message 为一行,且行尾不包含换行符。

(3)默认支持 Kafka 0.10.0.0(含)以上版本。如果要使用 Kafka 0.10.0.0 以下版本(0.9.0, 0.8.2, 0.8.1, 0.8.0),需要修改 be 的配置,将 kafka_broker_version_fallback 的值设置为要兼容的旧版本,或者在创建 routine load 的时候直接设置property.broker.version.fallback的值为要兼容的旧版本,使用旧版本的代价是 routine load 的部分新特性可能无法使用,如根据时间设置 kafka 分区的 offset。

基本原理

如上图,Client 向 FE 提交一个例行导入作业。

(1)FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。

(2)在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。

(3)FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的Task 进行重试。

(4)整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入。

基本语法

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]

执行 HELP ROUTINE LOAD 可以查看语法帮助,下面是参数说明

1)[db.]job_name

导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。

2)tbl_name

指定需要导入的表的名称。

3)merge_type

数据的合并类型,一共支持三种类型 APPEND、DELETE、MERGE 其中,APPEND 是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据 key 相同的所有行,MERGE 语义 需要与 delete on 条件联合使用,表示满足 delete 条件的数据按照 DELETE 语义处理其余的按照 APPEND 语义处理 , 语法为

[WITHMERGE|APPEND|DELETE]
4)load_properties

用于描述导入数据。语法:

[column_separator], [columns_mapping], [where_predicates], [delete_on_predicates], [source_sequence], [partitions], [preceding_predicates]

(1)column_separator:

指定列分隔符,如: COLUMNS TERMINATED BY "," 这个只在文本数据导入的时候需要指定,JSON 格式的数据导入不需要指定这个参数。

默认为:\t

(2)columns_mapping:

指定源数据中列的映射关系,以及定义衍生列的生成方式。

映射列:

按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。假设目的表有三列 k1, k2, v1。源数据有 4 列,其中第 1、2、4 列分别对应 k2, k1, v1。则书写如下:

COLUMNS (k2, k1, xxx, v1)

其中 xxx 为不存在的一列,用于跳过源数据中的第三列。

衍生列:

以 col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。 衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。 接上一个示例,假设目的表还有第 4 列 v2,v2 由 k1 和 k2 的和产生。则可以书写如下:

COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

再举例,假设用户需要导入只包含 k1 一列的表,列类型为 int。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以 100。这个功能可以通过 case when 函数实现,正确写法应如下:

COLUMNS (xx, k1 = case when xx < 0 then cast(-xx as varchar) else cast((xx + '100') as varchar) end)

(3)where_predicates

用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。 例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:

WHERE k1 > 100 and k2 = 1000

(4)partitions

指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。

示例:

PARTITION(p1, p2, p3)

(5)delete_on_predicates

表示删除条件,仅在 merge type 为 MERGE 时有意义,语法与 where 相同

(6)source_sequence:

只适用于 UNIQUE_KEYS,相同 key 列下,保证 value 列按照 source_sequence 列进行REPLACE, source_sequence 可以是数据源中的列,也可以是表结构中的一列。

(7)preceding_predicates

PRECEDING FILTER predicate用于过滤原始数据。原始数据是未经列映射、转换的数据。用户可以在对转换前的数据前进行一次过滤,选取期望的数据,再进行转换。

5)job_properties

用于指定例行导入作业的通用参数。 语法:

PROPERTIES ("key1" = "val1","key2" = "val2"
)

目前支持以下参数:

(1)desired_concurrent_number

期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于 0。默认为 3。 这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。

一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:

Min(partition num, desired_concurrent_number, alive_backend_num, 
Config.max_routine_load_task_concurrrent_num)

其中 Config.max_routine_load_task_concurrrent_num 是系统的一个默认的最大并发数限制。这是一个 FE 配置,可以通过改配置调整。默认为 5。

其中 partition num 指订阅的 Kafka topic 的 partition 数量。alive_backend_num 是当前正常的 BE 节点数。

(2)max_batch_interval/max_batch_rows/max_batch_size这三个参数分别表示:

  • ① 每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为 10。

  • ② 每个子任务最多读取的行数。必须大于等于 200000。默认是 200000。

  • ③ 每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是100MB。

这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。 例如:

"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200"

(3)max_error_number

采样窗口内,允许的最大错误行数。必须大于等于 0。默认是 0,即不允许有错误行。

采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。 被 where 条件过滤掉的行不算错误行

(4)strict_mode

是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为NULL,则会被过滤。指定方式为 "strict_mode" = "true"

(5)timezone

指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果

(6)format

指定导入数据格式,默认是 csv,支持 json 格式

(7)jsonpaths

jsonpaths: 导入 json 方式分为:简单模式和匹配模式。如果设置了jsonpath 则为匹配模式导入,否则为简单模式导入,具体可参考示例

(8)strip_outer_array

布尔类型,为 true 表示 json 数据以数组对象开始且将数组对象中进行展平,默认值是false

(9)json_root

json_root 为合法的 jsonpath 字符串,用于指定 json document 的根节点,默认值为""

(10)send_batch_parallelism

整型,用于设置发送批处理数据的并行度,如果并行度的值超过BE配置中的max_send_batch_parallelism_per_job,那么作为协调点的BE将使用max_send_batch_parallelism_per_job 的值

6)data_source_properties

数据源的类型。当前支持:Kafka

("key1" = "val1","key2" = "val2"
)

相关文章:

Doris-Routine Load(二十七)

例行导入&#xff08;Routine Load&#xff09;功能为用户提供了一种自动从指定数据源进行数据导入的功能。 适用场景 当前仅支持从 Kafka 系统进行例行导入&#xff0c;使用限制&#xff1a; &#xff08;1&#xff09;支持无认证的 Kafka 访问&#xff0c;以及通过 SSL 方…...

linux驱动.之 网络udp应用层测试工具demon(一)

绑定vlan&#xff0c;网卡的demon&#xff0c;如果有多个网卡&#xff0c;多个vlan&#xff0c;网卡的ip设置成一致&#xff0c;那就不能只简单绑定ip来创建socket&#xff0c; 需要绑定网卡设备 客户端udp_client.c #include <stdio.h> #include <string.h> #inc…...

【Flutter】graphic图表的快速上手

简介 graphic是一个数据可视化语法和Flutter图表库。 官方github示例 网上可用资源很少,只有作者的几篇文章,并且没有特别详细的文档,使用的话还是需要一定的时间去调研,在此简单记录。 示例 以折线图为例(因为我只用到了折线图,但其他的图大差不差) 创建一个两个文…...

DeepMind 推出 OPRO 技术,可用于优化 ChatGPT 提示

本心、输入输出、结果 文章目录 DeepMind 推出 OPRO 技术&#xff0c;可用于优化 ChatGPT 提示前言消息摘要OPRO的工作原理DeepMind的研究相关链接花有重开日&#xff0c;人无再少年实践是检验真理的唯一标准 DeepMind 推出 OPRO 技术&#xff0c;可用于优化 ChatGPT 提示 编辑…...

企业网络中的身份安全

随着近年来数字化转型的快速发展&#xff0c;企业使用的数字身份数量急剧增长。身份不再仅仅局限于用户。它们现在扩展到设备、应用程序、机器人、第三方供应商和组织中员工以外的其他实体。即使在用户之间&#xff0c;也存在不同类型的身份&#xff0c;例如属于IT管理员、远程…...

智能优化算法应用:基于正余弦算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于正余弦算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于正余弦算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.正余弦算法4.实验参数设定5.算法结果6.参考文献7.…...

创建一个带有背景图层和前景图层的渲染窗口

开发环境&#xff1a; Windows 11 家庭中文版Microsoft Visual Studio Community 2019VTK-9.3.0.rc0vtk-example demo解决问题&#xff1a; 创建一个带有背景图层和前景图层的渲染窗口&#xff0c;知识点&#xff1a;1. 画布转image&#xff1b;2. 渲染图层设置&#xff1b;3.…...

Docker 运行 Oracle Autonomous Database Free Container

​ Docker 运行 Oracle Autonomous Database Free Container Oracle Autonomous Database Free Container Image 介绍通过 Docker 运行 Oracle Autonomous Database Free ContainerWallet 配置可用的 TNS 别名MY_ATP TNS 别名MY_ADW TNS 别名连接到 Oracle Autonomous Databas…...

《2023全球隐私计算报告》正式发布!

2023全球隐私计算报告 1、2023全球隐私计算图谱2、国内外隐私计算相关政策3、隐私计算技术的最新发展4、隐私计算技术的合规挑战5、隐私计算的应用市场动态6、隐私计算开源整体趋势7、隐私计算的未来趋势 11月23日&#xff0c;由浙江省人民政府、商务部共同主办&#xff0c;杭州…...

JAVA sql 查询2

SELECT * FROM employees order by salayr DESC SELECT employee_id,first_name,salary from employees ORDER BY salary,employee_id desc -- 最大值 最小值 总和 平均值 SELECT max(salary),MIN(salary),sum(salary),AVG(salary) FROM employees -- 总共有多少员工 select…...

为第一个原生Spring5应用程序添加上Log4J日志框架!

&#x1f609;&#x1f609; 学习交流群&#xff1a; ✅✅1&#xff1a;这是孙哥suns给大家的福利&#xff01; ✨✨2&#xff1a;我们免费分享Netty、Dubbo、k8s、Mybatis、Spring...应用和源码级别的视频资料 &#x1f96d;&#x1f96d;3&#xff1a;QQ群&#xff1a;583783…...

单片机复位电路

有时候我们的代码会跑飞,这个时候基本上是一切推到重来.”推倒重来”在计算机术语上称为复位.复位需要硬件的支持,复位电路就是在单片机的复位管脚上产生一个信号&#xff0c;俗称复位信号.这个信号需要持续一定的时间,单片机收到该信号之后就会复位,从头执行。 复位原理: 那么…...

11.28 知识回顾(Web框架、路由控制、视图层)

一、 web 框架 1.1 web框架是什么&#xff1f; 别人帮咱们写了一些基础代码------》我们只需要在固定的位置写固定的代码--》就能实现一个web应用 Web框架&#xff08;Web framework&#xff09;是一种开发框架&#xff0c;用来支持动态网站、网络应用和网络服务的开发。这大多…...

osgFX扩展库-异性光照、贴图、卡通特效(1)

本章将简单介绍 osgFX扩展库及osgSim 扩展库。osgFX库用得比较多,osgSim库不常用&#xff0c;因此&#xff0c;这里只对这个库作简单的说明。 osgFX扩展库 osgFX是一个OpenSceneGraph 的附加库&#xff0c;是一个用于实现一致、完备、可重用的特殊效果的构架工具&#xff0c;其…...

SELinux零知识学习三十一、SELinux策略语言之角色和用户(2)

接前一篇文章:SELinux零知识学习三十、SELinux策略语言之角色和用户(1) 三、SELinux策略语言之角色和用户 SELinux提供了一种依赖于类型强制(类型增强,TE)的基于角色的访问控制(Role-Based Access Control),角色用于组域类型和限制域类型与用户之间的关系,SELinux中…...

Unity UGUI的自动布局-LayoutGroup(水平布局)组件

Horizontal Layout Group | Unity UI | 1.0.0 1. 什么是HorizontalLayoutGroup组件&#xff1f; HorizontalLayoutGroup是Unity UGUI中的一种布局组件&#xff0c;用于在水平方向上对子物体进行排列和布局。它可以根据一定的规则自动调整子物体的位置和大小&#xff0c;使它们…...

【SpringCloud】设计原则之分层架构与统一通信协议

一、设计原则之分层架构 应用分层看起来很简单&#xff0c;但每个程序员都有自己的一套方法&#xff0c;哪怕是初学者&#xff0c;所以实施起来并非易事 最早接触的分层架构应该是最熟悉的 MVC&#xff08;Model - View - Controller&#xff09;架构&#xff0c;其将应用分成…...

在Linux环境如何启动和redis数据库?

Linux中连接redis数据库&#xff1a; 前台启动&#xff1a; 第一步&#xff1a;redis-server:服务器启动命令 当我们启动改窗口后&#xff0c;出现如下所示&#xff1a; 该窗口就不能关闭&#xff0c;否则会出现redis无法使用的情况&#xff0c;重新打开一个窗口&#xff0c…...

selenium判断元素是否存在的方法

文章目录 快捷方法完整示例程序 快捷方法 selenium没有exist_xxx相关的方法&#xff0c;无法直接判断元素存在。但是锁定元素时使用的browser.find_elements(By.CSS_SELECTOR, "css元素")会返回一个列表list&#xff0c;如果不存在这个元素就会返回一个空列表。因此…...

后端真批量新增的使用

1,添加真批量新增抽象接口 public interface EasyBaseMapper extends BaseMapper { /** * 批量插入 仅适用于mysql * * return 影响行数 */ Integer insertBatchSomeColumn(Collection entityList); } 2,新增类,添加真批量新增的方法 public class InsertBatchSqlInjector ext…...

[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解

突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 ​安全措施依赖问题​ GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

【位运算】消失的两个数字(hard)

消失的两个数字&#xff08;hard&#xff09; 题⽬描述&#xff1a;解法&#xff08;位运算&#xff09;&#xff1a;Java 算法代码&#xff1a;更简便代码 题⽬链接&#xff1a;⾯试题 17.19. 消失的两个数字 题⽬描述&#xff1a; 给定⼀个数组&#xff0c;包含从 1 到 N 所有…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

Linux云原生安全:零信任架构与机密计算

Linux云原生安全&#xff1a;零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言&#xff1a;云原生安全的范式革命 随着云原生技术的普及&#xff0c;安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测&#xff0c;到2025年&#xff0c;零信任架构将成为超…...

C++ 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

大数据学习(132)-HIve数据分析

​​​​&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4…...

C#中的CLR属性、依赖属性与附加属性

CLR属性的主要特征 封装性&#xff1a; 隐藏字段的实现细节 提供对字段的受控访问 访问控制&#xff1a; 可单独设置get/set访问器的可见性 可创建只读或只写属性 计算属性&#xff1a; 可以在getter中执行计算逻辑 不需要直接对应一个字段 验证逻辑&#xff1a; 可以…...

毫米波雷达基础理论(3D+4D)

3D、4D毫米波雷达基础知识及厂商选型 PreView : https://mp.weixin.qq.com/s/bQkju4r6med7I3TBGJI_bQ 1. FMCW毫米波雷达基础知识 主要参考博文&#xff1a; 一文入门汽车毫米波雷达基本原理 &#xff1a;https://mp.weixin.qq.com/s/_EN7A5lKcz2Eh8dLnjE19w 毫米波雷达基础…...

Vue 模板语句的数据来源

&#x1f9e9; Vue 模板语句的数据来源&#xff1a;全方位解析 Vue 模板&#xff08;<template> 部分&#xff09;中的表达式、指令绑定&#xff08;如 v-bind, v-on&#xff09;和插值&#xff08;{{ }}&#xff09;都在一个特定的作用域内求值。这个作用域由当前 组件…...