大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
- 一、版本兼容性
- 二、使用
- 三、Flink SQL
- 四、DataStream
- 五、Lookup Join
- 六、配置
- 通用配置项
- 接收器配置项
- 查找Join配置项
- 七、Doris 和 Flink 列类型映射
- 八、使用Flink CDC访问Doris的示例
- 九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
- 十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)
- 十一、使用FlinkCDC更新Key列
- 十二、使用Flink根据指定的列删除数据
- 十三、最佳实践应用场景
- 十四、常见问题解答
可以通过Flink操作(读取、插入、修改、删除)支持存储在Doris中的数据。本文介绍了如何通过Datastream和Flink操作Doris。
注意:
- 修改和删除仅支持唯一键模型。
- 当前的删除是为了支持Flink CDC访问数据以实现自动删除。如果要删除其他数据访问方法,您需要自行实现。
一、版本兼容性

二、使用
Maven
添加 flink-doris-connector
<!-- flink-doris-connector -->
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.16</artifactId><version>1.6.0</version>
</dependency>
- 请根据不同的Flink版本替换相应的Connector和Flink依赖版本。
- 也可以从这里下载相关版本的jar包。
flink-doris-connector下载地址:
- https://repo.maven.apache.org/maven2/org/apache/doris/
编译
- 编译时直接运行sh build.sh即可。
- 编译成功后,会在dist目录下生成目标jar包,如:flink-doris-connector-1.5.0-SNAPSHOT.jar。将此文件复制到 Flink 的类路径中以使用 Flink-Doris-Connector。例如,Flink 运行在 Local 模式,则将此文件放在 lib/ 文件夹中。 Flink运行在Yarn集群模式下,将此文件放入预部署包中。
三、Flink SQL
read
-- doris source
CREATE TABLE flink_doris_source (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'database.table','username' = 'root','password' = 'password'
);
write
--enable checkpoint
SET 'execution.checkpointing.interval' = '10s';-- doris sink
CREATE TABLE flink_doris_sink (name STRING,age INT,price DECIMAL(5,2),sale DOUBLE)WITH ('connector' = 'doris','fenodes' = 'FE_IP:HTTP_PORT','table.identifier' = 'db.table','username' = 'root','password' = 'password','sink.label-prefix' = 'doris_label'
);-- submit insert job
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
四、DataStream
read
DorisOptions.Builder builder = DorisOptions.builder().setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder().setDorisOptions(builder.build()).setDorisReadOptions(DorisReadOptions.builder().build()).setDeserializer(new SimpleListDeserializationSchema()).build();env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
write
DorisSink通过StreamLoad向Doris写入数据,DataStream写入时支持不同的序列化方式
字符串数据流(SimpleStringSerializer)
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");Properties properties = new Properties();
// When the upstream is writing json, the configuration needs to be enabled.
//properties.setProperty("format", "json");
//properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); ;builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(new SimpleStringSerializer()) //serialize according to string.setDorisOptions(dorisBuilder.build());//mock string source
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("doris",1));
DataStreamSource<Tuple2<String, Integer>> source = env. fromCollection(data);source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1).sinkTo(builder.build());//mock json string source
//env.fromElements("{\"name\":\"zhangsan\",\"age\":1}").sinkTo(builder.build());
RowData数据流(RowDataSerializer)
// enable checkpoint
env.enableCheckpointing(10000);
// using batch mode for bounded data
env.setRuntimeMode(RuntimeExecutionMode.BATCH);//doris sink option
DorisSink.Builder<RowData> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("FE_IP:HTTP_PORT").setTableIdentifier("db.table").setUsername("root").setPassword("password");// json format to streamload
Properties properties = new Properties();
properties.setProperty("format", "json");
properties.setProperty("read_json_by_line", "true");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-doris") //streamload label prefix.setDeletable(false).setStreamLoadProp(properties); //streamload params//flink rowdata's schema
String[] fields = {"city", "longitude", "latitude", "destroy_date"};
DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setSerializer(RowDataSerializer.builder() //serialize according to rowdata.setFieldNames(fields).setType("json") //json format.setFieldType(types).build()).setDorisOptions(dorisBuilder.build());//mock rowdata source
DataStream<RowData> source = env. fromElements("").map(new MapFunction<String, RowData>() {@Overridepublic RowData map(String value) throws Exception {GenericRowData genericRowData = new GenericRowData(4);genericRowData.setField(0, StringData.fromString("beijing"));genericRowData.setField(1, 116.405419);genericRowData.setField(2, 39.916927);genericRowData.setField(3, LocalDate.now().toEpochDay());return genericRowData;}});source. sinkTo(builder. build());
SchemaChange数据流(JsonDebeziumSchemaSerializer)
// enable checkpoint
env.enableCheckpointing(10000);Properties props = new Properties();
props. setProperty("format", "json");
props.setProperty("read_json_by_line", "true");
DorisOptions dorisOptions = DorisOptions. builder().setFenodes("127.0.0.1:8030").setTableIdentifier("test.t1").setUsername("root").setPassword("").build();DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder.setLabelPrefix("label-prefix").setStreamLoadProp(props).setDeletable(true);DorisSink.Builder<String> builder = DorisSink.builder();
builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(executionBuilder.build()).setDorisOptions(dorisOptions).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").sinkTo(builder.build());
五、Lookup Join
CREATE TABLE fact_table (`id` BIGINT,`name` STRING,`city` STRING,`process_time` as proctime()
) WITH ('connector' = 'kafka',...
);create table dim_city(`city` STRING,`level` INT ,`province` STRING,`country` STRING
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','jdbc-url' = 'jdbc:mysql://127.0.0.1:9030','table.identifier' = 'dim.dim_city','username' = 'root','password' = ''
);SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
- 这个命令是用于创建两个表和一个查询语句。第一个表是名为"fact_table"的表,它有四个列,分别为"id"、“name”、“city"和"process_time”。其中,"id"列是BIGINT类型,"name"和"city"列是STRING类型,"process_time"列是一个基于当前系统时间的计算列,它使用"proctime()"函数实现。
- 第二个表是名为"dim_city"的表,它有四个列,分别为"city"、“level”、“province"和"country”。其中,“city”、“province"和"country"列是STRING类型,“level"列是INT类型。该表使用"Doris"作为存储引擎,连接器为"connector”,并且需要指定连接器的其他参数,如"fenodes”、“jdbc-url”、“table.identifier”、"username"和"password"等。
- 最后一个命令是一个查询语句,它使用"LEFT JOIN"将"fact_table"和"dim_city"两个表进行连接,并使用"FOR SYSTEM_TIME AS OF"来指定连接时的时间戳,这里使用了"process_time"列的值。查询结果包括"id"、“name”、“city”、“province”、"country"和"level"这些列。
六、配置
通用配置项
fenodes:
- Doris FE http地址,支持多个地址,用逗号分隔
benodes:
- Doris BE http地址,支持多个地址,以逗号分隔。
jdbc-url:
- jdbc连接信息,如:jdbc:mysql://127.0.0.1:9030
table.identifier:
- Doris表名,如:db.tbl
auto-redirect:
- 默认值:true
- 是否重定向 StreamLoad 请求。开启后StreamLoad会通过FE写入,不再显示BE信息。
doris.request.retries:
- 默认值:3
- 向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms:
- 默认值:30000
- 向 Doris 发送请求的连接超时
doris.request.read.timeout.ms:
- 默认值:30000
- 读取向 Doris 发送请求的超时
源配置项
doris.request.query.timeout.s:
- 默认值:3600
- 查询Doris的超时时间,默认1小时,-1表示无超时限制
doris.request.tablet.size:
- 默认值:Integer. MAX_VALUE
- 一个Partition对应的Doris Tablet数量。该值设置得越小,生成的Partition就越多。这提高了 Flink 端的并行度,但同时也给 Doris 带来了更大的压力。
doris.batch.size:
- 默认值:1024
- 一次从BE读取数据的最大行数。增加该值会减少 Flink 和 Doris 之间建立的连接数量。从而减少网络延迟带来的额外时间开销。
doris.exec.mem.limit:
- 默认值:2147483648
- 单个查询的内存限制。默认为2GB,以字节为单位
doris.deserialize.arrow.async:
- 默认值:FALSE
- 是否支持flink-doris-connector迭代所需的Arrow格式异步转换为RowBatch
doris.deserialize.queue.size:
- 默认值:64
- Arrow格式的内部处理队列的异步转换,当doris.deserialize.arrow.async为true时有效
doris.read.field:
- 读取Doris表的列名列表,以逗号分隔
doris.filter.query:
- 过滤读取数据的表达式,这个表达式透明传递给Doris。 Doris使用这个表达式来完成源端的数据过滤。例如年龄=18。
接收器配置项
sink.label-prefix:
- Stream加载导入使用的标签前缀。在2pc场景下,需要全局唯一性来保证Flink的EOS语义。
sink.properties.*:
- 导入流负载参数。
例如: ‘sink.properties.column_separator’ = ', ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特殊字符作为分隔符, ‘\x01’ 将转换为二进制 0x01 - JSON格式导入
‘sink.properties.format’ = ‘json’ ‘sink.properties.按行读取 json’ = ‘true’
详细参数请参考这里。
sink.enable-delete:
- 默认值:TRUE
- 是否启用删除。该选项需要Doris表开启批量删除功能(Doris 0.15+版本默认开启),且仅支持Unique模型。
sink.enable-2pc:
- 默认值:TRUE
- 是否启用两阶段提交(2pc),默认为true,以保证Exactly-Once语义。
sink.buffer-size:
- 默认值:1MB
- 写入数据缓存缓冲区的大小,以字节为单位。不建议修改,默认配置即可
sink.buffer-count:
- 默认值:3
- 写入数据缓冲区的数量。不建议修改,默认配置即可
sink.max-retries:
- 默认值:3
- Commit失败后最大重试次数,默认3
sink.use-cache:
- 默认值:false
- 发生异常时,是否使用内存缓存进行恢复。启用后,Checkpoint 期间的数据将保留在缓存中。
sink.enable.batch-mode:
- 默认值:false
- 是否使用批处理模式写入Doris。使能后,写入时序不依赖于Checkpoint。写入是通过sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval参数控制的。输入机会。
同时开启后,Exactly-once语义将无法保证。 Uniq模型可以用来实现幂等性。
sink.flush.queue-size:
- 默认值:2
- 在批处理模式下,缓存的列大小。
sink.buffer-flush.max-rows:
- 默认值:50000
- 批处理模式下,单批写入的最大数据行数。
sink.buffer-flush.max-bytes:
- 默认值:10MB
- 在批处理模式下,单批写入的最大字节数。
sink.buffer-flush.interval:
- 默认值:10s
- 批处理模式下,异步刷新缓存的时间间隔
sink.ignore.update-before:
- 默认值:true
- 是否忽略update-before事件,默认忽略。
查找Join配置项
lookup.cache.max-rows
- 默认值:-1
- 查找缓存的最大行数,默认值为-1,不启用缓存
lookup.cache.ttl:
- 默认值:10s
- 查找缓存的最大时间,默认10s
lookup.max-retries:
- 默认值:1
- 查找查询失败后重试的次数
lookup.jdbc.async:
- 默认值:false
- 是否启用异步查找,默认为false
lookup.jdbc.read.batch.size:
- 默认值:128
- 异步查找下,每个查询的最大批量大小
lookup.jdbc.read.batch.queue-size:
- 默认值:256
- 异步查找时中间缓冲队列的大小
lookup.jdbc.read.thread-size:
- 默认值:3
- 每个任务中用于查找的jdbc线程数
七、Doris 和 Flink 列类型映射
| Doris类型 | Flink类型 |
|---|---|
| NULL_TYPE | NULL |
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
| SMALLINT | SMALLINT |
| INT | INT |
| BIGINT | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| DATETIME | TIMESTAMP |
| DECIMAL | DECIMAL |
| CHAR | STRING |
| LARGEINT | STRING |
| VARCHAR | STRING |
| STRING | STRING |
| DECIMALV2 | DECIMAL |
| ARRAY | ARRAY |
| MAP | MAP |
| JSON | STRING |
| VARIANT | STRING |
| IPV4 | STRING |
| IPV6 | STRING |
从connector-1.6.1版本开始,增加了对Variant、IPV6、IPV4三种数据类型读取的支持。读取 IPV6 和 Variant 需要 Doris 2.1.1 或更高版本。
八、使用Flink CDC访问Doris的示例
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (id int,name VARCHAR,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table'
);-- Support synchronous insert/update/delete events
CREATE TABLE doris_sink (
id INT,
name STRING
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.enable-delete' = 'true', -- Synchronize delete events'sink.label-prefix' = 'doris_label'
);insert into doris_sink select id,name from cdc_mysql_source;
九、使用FlinkSQL通过CDC访问并实现部分列更新的示例
-- enable checkpoint
SET 'execution.checkpointing.interval' = '10s';CREATE TABLE cdc_mysql_source (id int,name STRING,bank STRING,age int,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '127.0.0.1','port' = '3306','username' = 'root','password' = 'password','database-name' = 'database','table-name' = 'table'
);CREATE TABLE doris_sink (id INT,name STRING,bank STRING,age int
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = '','sink.properties.format' = 'json','sink.properties.read_json_by_line' = 'true','sink.properties.columns' = 'id,name,bank,age','sink.properties.partial_columns' = 'true' --Enable partial column updates
);insert into doris_sink select id,name,bank,age from cdc_mysql_source;
十、使用FlinkCDC访问多个表或整个数据库(支持MySQL、Oracle、PostgreSQL、SQLServer)
MySQL同步示例
<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s\-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools\lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \mysql-sync-database\--database test_db \--mysql-conf hostname=127.0.0.1 \--mysql-conf port=3306 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=mysql_db \--including-tables "tbl1|test.*" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=123456 \--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
Oracle同步示例
<FLINK_HOME>bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar\oracle-sync-database \--database test_db \--oracle-conf hostname=127.0.0.1 \--oracle-conf port=1521 \--oracle-conf username=admin \--oracle-conf password="password" \--oracle-conf database-name=XE \--oracle-conf schema-name=ADMIN \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
PostgreSQL 同步示例
<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1\-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \postgres-sync-database \--database db1\--postgres-conf hostname=127.0.0.1 \--postgres-conf port=5432 \--postgres-conf username=postgres \--postgres-conf password="123456" \--postgres-conf database-name=postgres \--postgres-conf schema-name=public \--postgres-conf slot.name=test \--postgres-conf decoding.plugin.name=pgoutput \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
SQLServer同步示例
<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \sqlserver-sync-database \--database db1\--sqlserver-conf hostname=127.0.0.1 \--sqlserver-conf port=1433 \--sqlserver-conf username=sa \--sqlserver-conf password="123456" \--sqlserver-conf database-name=CDC_DB \--sqlserver-conf schema-name=dbo \--including-tables "tbl1|tbl2" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=\--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label \--table-conf replication_num=1
十一、使用FlinkCDC更新Key列
一般来说,在业务数据库中,数字被用作表的主键,例如学生表中使用数字(id)作为主键,但随着业务的发展,与数据对应的数字可能会发生变化。在这种情况下,使用FlinkCDC + Doris Connector进行数据同步可以自动更新Doris主键列中的数据。
原理
Flink CDC的底层采集工具是Debezium。Debezium内部使用op字段来识别相应的操作:op字段的值为c、u、d和r,分别对应创建、更新、删除和读取。对于主键列的更新,FlinkCDC将向下游发送DELETE和INSERT事件,在数据同步到Doris后,会自动更新主键列的数据。
示例
Flink程序可以参考上述CDC同步示例。任务成功提交后,在MySQL端执行更新主键列语句(update student set id = ‘1002’ where id = ‘1001’),以修改Doris中的数据。
十二、使用Flink根据指定的列删除数据
通常,Kafka中的消息使用特定的字段来标记操作类型,例如{“op_type”:“delete”,data:{…}}。对于这种类型的数据,希望删除op_type=delete的数据。
默认情况下,DorisSink将根据RowKind来区分事件类型。通常,在cdc的情况下,可以直接获取事件类型,并将隐藏列__DORIS_DELETE_SIGN__赋值以实现删除的目的,而Kafka需要基于业务逻辑进行判断,显示传递给隐藏列的值。
-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(data STRING,op_type STRING
) WITH ('connector' = 'kafka',...
);CREATE TABLE DORIS_SINK(id INT,name STRING,__DORIS_DELETE_SIGN__ INT
) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'db.table','username' = 'root','password' = '','sink.enable-delete' = 'false', -- false means not to get the event type from RowKind'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Display the import column of the specified streamload
);INSERT INTO DORIS_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
from KAFKA_SOURCE;
- 这段代码是一个示例,演示了如何使用Flink从Kafka源表读取数据,并将其写入Doris目标表。具体来说,如果源表中的数据op_type字段的值为"delete",则希望在Doris目标表中删除相应的数据。
- 首先,在Kafka源表的定义中,我们有一个data字段用于存储源数据的JSON字符串,以及一个op_type字段用于标识操作类型。
- 然后,在Doris目标表的定义中,我们有一个id字段和一个name字段来存储数据的具体内容,还有一个名为__DORIS_DELETE_SIGN__的隐藏列,用于标识是否要进行删除操作。
- 在INSERT INTO语句中,我们将从Kafka源表中选择data字段的id和name,并使用json_value函数提取相应的值。同时,我们使用if函数将op_type字段的值与"delete"进行比较,如果相等则将__DORIS_DELETE_SIGN__赋值为1,否则赋值为0。
- 最后,将处理后的数据插入到Doris目标表中。
- 总之,这段代码的作用是根据源表中的op_type字段值,将对应的数据删除或写入到Doris目标表中。
十三、最佳实践应用场景
- 使用Flink Doris Connector最适合的场景是实时/批量将源数据同步到Doris(Mysql、Oracle、PostgreSQL)中,然后使用Flink对Doris和其他数据源中的数据进行联合分析。您还可以使用Flink Doris Connector。
其他注意事项:
- Flink Doris Connector主要依赖于Checkpoint进行流式写入,因此Checkpoint之间的时间间隔就是数据的可见延迟时间。
- 为了确保Flink的Exactly Once语义,Flink Doris Connector默认启用两阶段提交,Doris在1.1版本之后默认启用两阶段提交。1.0可以通过修改BE参数来启用。
十四、常见问题解答
Doris Source读取数据后,为什么流会结束?
- 目前Doris Source是有界流,不支持CDC读取。
Flink能否读取Doris并执行条件下推?
- 通过配置doris.filter.query参数可以实现。
如何写入位图类型?
CREATE TABLE bitmap_sink (
dt int,
page string,
user_id int
)
WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'test.bitmap_test','username' = 'root','password' = '','sink.label-prefix' = 'doris_label','sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
)
errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]
- 在Exactly-Once场景中,Flink Job必须从最新的Checkpoint/Savepoint重新启动,否则会报告上述错误。当不需要Exactly-Once时,可以通过关闭2PC提交(sink.enable-2pc=false)或更改不同的sink.label-prefix来解决。
errCode = 2, detailMessage = transaction [19650] not found
- 发生在Commit阶段,Checkpoint中记录的事务ID在FE端已过期,在此时再次提交时会出现上述错误。此时无法从Checkpoint启动,可以通过修改fe.conf中的streaming_label_keep_max_second配置来延长过期时间,默认为12小时。
errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100
- 这是因为同一库的并发导入超过了100,可以通过调整fe.conf的max_running_txn_num_per_db参数来解决。详细信息,请参考max_running_txn_num_per_db。
- 同时,如果一个任务频繁修改标签并重新启动,也可能导致此错误发生。在2pc场景(重复/聚合模型)中,每个任务的标签需要唯一,在从Checkpoint重新启动时,Flink任务将主动中止之前已经成功预提交但未提交的事务。频繁修改标签并重新启动将导致大量已成功预提交的事务无法中止,占用事务。在Unique模型下,也可以关闭2pc,实现幂等写入。
当Flink向Uniq模型写入一批数据时,如何确保数据的顺序?
- 您可以添加序列列的配置来确保顺序。
Flink任务没有报错,但数据无法同步?
- 在Connector1.1.0之前,数据是批量写入的,并且写入是由数据驱动的。需要确定上游是否有数据写入。在1.1.0之后,它依赖于Checkpoint,并且必须启用Checkpoint才能进行写入。
tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235
- 通常发生在Connector1.1.0之前,这是因为写入频率过快,导致版本过多。可以通过设置sink.batch.size和sink.batch.interval参数来减少Streamload的频率。
源表和Doris表应如何对应?
- 在使用Flink Connector导入数据时,需要注意两个方面。第一,源表的列和类型应与Flink SQL中的列和类型对应;第二,Flink SQL中的列和类型必须与Doris表的列和类型匹配。
TApplicationException: get_next failed: out of sequence response: expected 4 but got 3
- 这是由于 Thrift 中的并发错误造成的。建议您尽可能使用最新的连接器和兼容的 Flink 版本。
DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc
- 您可以在TaskManager中搜索日志中止事务响应,并根据HTTP返回码判断是客户端问题还是服务器问题。
org.apache.flink.table.api.SqlParserException when using doris.filter.query: SQL parsing failed. “xx” encountered at row x, column xx
- 这个问题主要是由于条件中的varchar/string类型,需要进行引号转义。正确的写法是xxx = ‘‘xxx’’。这样,Flink SQL解析器会将连续的两个单引号解释为一个单引号字符,而不是字符串的结束,并将拼接的字符串作为属性的值。例如:t1 >= ‘2024-01-01’ 可以写为 ‘doris.filter.query’ = ‘t1 >=’‘2024-01-01’'。
Failed to connect to backend: http://host:webserver_port, and BE is still alive
- 这个问题可能是由于配置了be的IP地址,而该地址无法被外部的Flink集群访问。这主要是因为在连接fe时,be的地址是通过fe进行解析的。例如,如果将be地址添加为’127.0.0.1’,那么Flink集群通过fe获取到的be地址将是’127.0.0.1:webserver_port’,并且Flink将连接到该地址。当出现这种问题时,可以通过将be的实际对应的外部IP地址添加到"with"属性中来解决:‘benodes’=“be_ip:webserver_port,be_ip:webserver_port…”。对于整个数据库的同步,可以使用以下属性:–sink-conf benodes=be_ip:webserver,be_ip:webserver…。
当使用Flink-connector将MySQL数据同步到Doris时,时间戳之间存在几小时的时间差。
- Flink Connector默认使用UTC+8时区从MySQL同步整个数据库。如果您的数据位于不同的时区,您可以使用以下配置进行调整,例如:–mysql-conf debezium.date.format.timestamp.zone=“UTC+3”。
相关文章:
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库
大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、…...
LabVIEW VI 多语言动态加载与运行的实现
在多语言应用程序开发中,确保用户界面能够根据用户的语言偏好动态切换是一个关键需求。本文通过分析一个LabVIEW程序框图,详细说明了如何使用LabVIEW中的属性节点和调用节点来实现VI(虚拟仪器)界面语言的动态加载与运行。此程序允…...
Unity引擎基础知识
目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能? Unity C#脚本语言的高级编…...
练习题- 探索正则表达式对象和对象匹配
正则表达式(Regular Expressions)是一种强大而灵活的文本处理工具,它允许我们通过模式匹配来处理字符串。这在数据清理、文本分析等领域有着广泛的应用。在Python中,正则表达式通过re模块提供支持,学习和掌握正则表达式对于处理复杂的文本数据至关重要。 本文将探索如何在…...
Java集合提升
1. 手写ArrayList 1.1. ArrayList底层原理细节 底层结构是一个长度可以动态增长的数组(顺序表)transient Object[] elementData; 特点:在内存中分配连续的空间,只存储数据,不存储地址信息。位置就隐含着地址。优点 节…...
uniapp 微信小程序生成水印图片
效果 源码 <template><view style"overflow: hidden;"><camera device-position"back" flash"auto" class"camera"><cover-view class"text-white padding water-mark"><cover-view class"…...
ElasticSearch相关知识点
ElasticSearch中的倒排索引是如何工作的? 倒排索引是ElasticSearch中用于全文检索的一种数据结构,与正排索引不同的是,正排索引将文档按照词汇顺序组织。而倒排索引是将词汇映射到包含该词汇的文档中。 在ElasticSearch中,倒排索…...
css 文字图片居中及网格布局
以下内容纯自已个人理解,直接上代码: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><…...
解决ImportError: DLL load failed while importing _rust: 找不到指定的程序
解决ImportError: DLL load failed while importing _rust: 找不到指定的程序 python使用库cryptography 当 from cryptography.hazmat.bindings._rust import exceptions as rust_exceptions 时,会报错: ImportError: DLL load failed while importin…...
集合-List去重
1.利用Set去重 @Test public void distinctList() {List<String> oldList = new ArrayList<>();oldList.add("a");oldList.add("a");oldList.add("b");oldList.add("c");oldList.add("d");List<String> …...
ST-LINK USB communication error 非常有效的解决方法
文章目录 一、检查确定是ST-LINK USB communication error的问题二、关闭文件,打开keil软件所在文件夹,找到STLink文件夹,找到该应用程序双击 一、检查确定是ST-LINK USB communication error的问题 二、关闭文件,打开keil软件所在…...
探索CSS的:future-link伪类:选择指向未来文档的链接
CSS(层叠样式表)是Web设计中用于描述网页元素样式的语言。随着CSS4的提案,引入了许多新的选择器,其中之一是:future-link伪类。然而,需要注意的是,:future-link伪类目前还处于提议阶段,并没有在…...
【C++】序列与关联容器(三)map与multimap容器
【C】序列与关联容器(三)map与multimap容器 一、map二、multiset / multimap 一、map 树中的每个结点的类型是一个std::pair //pair的类型是<const key,value> pair是一个包含两个指针的结构体,第一个指针指向该节点的key,…...
ActiveMQ、RabbitMQ、Kafka、RocketMQ在优先级队列、延迟队列、死信队列、重试队列、消费模式、广播模式的区别
ActiveMQ、RabbitMQ、Kafka、RocketMQ这四款消息队列在优先级队列、延迟队列、死信队列、重试队列、消费模式、广播模式等方面各有其特点和差异。以下是对这些方面的详细比较: 1. 优先级队列 ActiveMQ:支持优先级队列,可以在发送消息时指定…...
首款会员制区块链 Geist 介绍
今天,Pixelcraft Studios 很高兴地宣布即将推出 Geist,这是一个由 Base、Arbitrum、Alchemy 以及 Aavegotchi 支持的全新 L3。 Geist 之前的代号为 “Gotchichain”,是首个专为游戏打造的会员专用区块链。 为什么选择 Geist? …...
CANoe软件中Trace窗口的筛选栏标题不显示(空白)的解决方法
文章目录 问题描述原因分析解决方案扩展知识总结问题描述 不知道什么情况,CANoe软件中Trace窗口的筛选栏标题突然不显示了,一片空白。现象如下: 虽然不影响CANoe软件的使用,但是观感上非常难受,对于强迫症患者非常不友好。 原因分析 按照常规思路,尝试了: 1、重启CAN…...
日期类代码实现-C++
一、目标 通过前面对类和对象的介绍我们可以自己通过C代码初步实现一个简单的日期类。 实现的主要操作有: 1.日期类的构造函数 2.日期类的拷贝构造函数(在头文件中实现) 3.日期类的比较运算符重载 4.日期类的计算运算符重载 5.流插入运…...
【问题记录+总结】VS Code Tex Live 2024 Latex Workshop Springer模板----更新ing
目录 Summary 道阻且长 少即是多 兵马未动粮草先行 没有万能 和一劳永逸 具体问题具体分析 心态 Detail 1、关于模板[官网] 2、settings.json 3、虫和杀虫剂 4、擦 换成Tex Studio都好了。。。 Summary 道阻且长 某中意期刊,只有Latex。之前只简单用过…...
Linux运维_Bash脚本_源码安装Go-1.21.11
Linux运维_Bash脚本_源码安装Go-1.21.11 Bash (Bourne Again Shell) 是一个解释器,负责处理 Unix 系统命令行上的命令。它是由 Brian Fox 编写的免费软件,并于 1989 年发布的免费软件,作为 Sh (Bourne Shell) 的替代品。 您可以在 Linux 和…...
ShareSDK Twitter
创建应用 1.登录Twitter控制台并通过认证 2.点击Developer Portal进入Twitter后台 3.点击Sign up for Free Account创建应用 4.配置应用信息 以下为创建过程示例,图中信息仅为示例,创建时请按照真实信息填写,否则无法正常使用。 权限申请…...
大数据学习栈记——Neo4j的安装与使用
本文介绍图数据库Neofj的安装与使用,操作系统:Ubuntu24.04,Neofj版本:2025.04.0。 Apt安装 Neofj可以进行官网安装:Neo4j Deployment Center - Graph Database & Analytics 我这里安装是添加软件源的方法 最新版…...
【力扣数据库知识手册笔记】索引
索引 索引的优缺点 优点1. 通过创建唯一性索引,可以保证数据库表中每一行数据的唯一性。2. 可以加快数据的检索速度(创建索引的主要原因)。3. 可以加速表和表之间的连接,实现数据的参考完整性。4. 可以在查询过程中,…...
定时器任务——若依源码分析
分析util包下面的工具类schedule utils: ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类,封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz,先构建任务的 JobD…...
前端开发面试题总结-JavaScript篇(一)
文章目录 JavaScript高频问答一、作用域与闭包1.什么是闭包(Closure)?闭包有什么应用场景和潜在问题?2.解释 JavaScript 的作用域链(Scope Chain) 二、原型与继承3.原型链是什么?如何实现继承&a…...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...
【Nginx】使用 Nginx+Lua 实现基于 IP 的访问频率限制
使用 NginxLua 实现基于 IP 的访问频率限制 在高并发场景下,限制某个 IP 的访问频率是非常重要的,可以有效防止恶意攻击或错误配置导致的服务宕机。以下是一个详细的实现方案,使用 Nginx 和 Lua 脚本结合 Redis 来实现基于 IP 的访问频率限制…...
MySQL 8.0 事务全面讲解
以下是一个结合两次回答的 MySQL 8.0 事务全面讲解,涵盖了事务的核心概念、操作示例、失败回滚、隔离级别、事务性 DDL 和 XA 事务等内容,并修正了查看隔离级别的命令。 MySQL 8.0 事务全面讲解 一、事务的核心概念(ACID) 事务是…...
安卓基础(Java 和 Gradle 版本)
1. 设置项目的 JDK 版本 方法1:通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分,设置 Gradle JDK 方法2:通过 Settings File → Settings... (或 CtrlAltS)…...
Unity中的transform.up
2025年6月8日,周日下午 在Unity中,transform.up是Transform组件的一个属性,表示游戏对象在世界空间中的“上”方向(Y轴正方向),且会随对象旋转动态变化。以下是关键点解析: 基本定义 transfor…...
