当前位置: 首页 > news >正文

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)

Paimon的下载及安装,并且了解了主键表的引擎以及changelog-producer的含义参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1)

利用Paimon表做lookup join,集成mysql cdc等参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

利用Paimon的Tag兼容Hive,Branch管理等参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)

今天,我们继续快速了解下最近比较火的Apache Paimon:

  • 官方文档:https://paimon.apache.org/docs/1.0/
  • 推荐阅读:当流计算邂逅数据湖:Paimon 的前生今世

1 利用Tag兼容Hive

  • Paimon 的每一次写都会生成一个 Immutable 的快照,快照可以被 Time Travel 的读取。
  • 但在大多数情况下,作业会生成过多的快照,所以根据表配置,快照会在合适的时间点被过期。快照过期还会删除旧的数据文件,过期快照的历史数据将无法再查询。
  • 要解决此问题,可以基于快照创建 Tag。Tag 将维护快照的清单和数据文件。
    • 典型的用法是每天创建Tag(如下图所示),然后可以维护每天的历史数据以进行批式查询。
    • 推荐在 ODS 层使用 Tag 来替代 Hive 的分区,但是后续的 DWD 和 DWS 不建议。

图片

1.1 Tag创建

1.1.1 自动创建

-- Flink SQL
CREATE TABLE t (k INT PRIMARY KEY NOT ENFORCED,f0 INT,...
) WITH ('tag.automatic-creation' = 'process-time',  -- 基于process-time自动创建'tag.creation-period' = 'daily',            -- 创建间隔:每天'tag.creation-delay' = '10 m',              -- 延迟10min'tag.num-retained-max' = '90'               -- 最大保存90天
);
  • 上面配置表明每天0点10分钟创建一个 Tag,最大保留3个月的 Tag,Flink 流式写入,自动创建 Tags,自动清理 Tags。

1.1.2 利用Action包创建Tag

<FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-1.0.0.jar \create_tag \--warehouse <warehouse-path> \--database <database-name> \ --table <table-name> \--tag_name <tag-name> \[--snapshot <snapshot_id>] \[--time_retained <time-retained>] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
  • 如果未设置snapshot, 那么默认snapshot_id默认为最新
  • 当然,也能删除Tag、回滚Tag,可以参考官网命令:
    • Manage Tags | Apache Paimon

1.2 利用Tag映射

  • 有了 Tag 后,可以在 Flink SQL 或者 Spark SQL 里使用 Time Travel 来查询 Tags;

  • 但是这给业务带来了一个问题,老的 Hive SQL 如何兼容?老的 Hive 可是一个全量分区表,而 Paimon 表是一个非分区主键表,Hive 数据仓库的传统使用更习惯于使用分区来指定查询的 Tag。

  • paimon引入了 metastore.tag-to-partitionmetastore.tag-to-partition.preview'(配置此参数可以让 Hive SQL 查询到未 Tag 的分区,比如当前最新数据) 来将未分区的主键表映射到 Hive metastore 中的分区表,并映射分区字段为 Tag 查询。

  • Flink 结合 Paimon 打造的入湖架构如下:

    • 通过 Flink CDC 一键全增量一体入湖到 Paimon,此任务可以配置 Tag 的自动创建,然后通过 Paimon 的能力,将 Tag 映射为 Hive 的分区,完全兼容原有 Hive SQL 的用法。
    • 优势如下:
      • 架构链路复杂度低,不再因为各种组件的问题导致链路延时,你只用运维这一个流作业,而且可以完全兼容原有 Hive SQL 用法。
      • 时延低:延时取决于流作业的 Checkpoint Interval,数据最低1分钟实时可见 (建议1-5分钟)。不但如此,Paimon 也提供了流读的能力,让你完成分钟级的 Streaming 计算,也可以写到下游别的存储。
      • 存储成本低:得益于湖格式的 Snapshot 管理,加上 LSM 的文件复用,比如同样是存储 100天的快照,原有 Hive 数仓 100 天需要 100 份的存储,Paimon 在某些增量数据不多的场景只需要 2 份的存储,大幅节省存储资源。
      • 计算成本低:得益于 LSM 的增量合并能力,此条链路只有增量数据的处理,没有全量的合并。可能有用户会担心,常驻的流作业会消耗更多的资源,对 Paimon 来说,你可以打开纯异步 Compaction 的机制,以 Paimon 优异的性能表现,只用少量的资源即可完成同步,Paimon 另有整库同步等能力帮助节省资源。

图片

1.2.1 tag-to-partition

-- 创建映射的paimon表
Flink SQL> drop table if exists mydb_t;
Flink SQL> CREATE TABLE mydb_t (pk INT,col1 STRING,col2 STRING
) WITH ('bucket' = '-1',-- Only Hive Engine can be used to query these upsert-to-partitioned tables.-- 将tag映射为hive分区'metastore.tag-to-partition' = 'dt' 
);-- 插入数据
-- snapshot=1
Flink SQL> INSERT INTO mydb_t VALUES (1, '10', '100'), (2, '20', '200');
-- snapshot=2
Flink SQL> INSERT INTO mydb_t VALUES (3, '30', '300'), (4, '40', '400');
  • 然后,利用action包创建Tag
# 利用action 包创建tag\
# 依旧利用hive元数据做catalog
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \create_tag \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table mydb_t \--tag_name '2025-02-18' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--snapshot 1[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \create_tag \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table mydb_t \--tag_name '2025-02-19' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--snapshot 2 
  • 我们就可以在hive中查看分区,且查询数据
0: jdbc:hive2://192.168.42.101:10000> show partitions mydb_t;
+----------------+
|   partition    |
+----------------+
| dt=2025-02-18  |
| dt=2025-02-19  |
+----------------+
2 rows selected (0.438 seconds)0: jdbc:hive2://192.168.42.101:10000> select * from mydb_t a where dt = '2025-02-18';
+-------+---------+---------+-------------+
| a.pk  | a.col1  | a.col2  |    a.dt     |
+-------+---------+---------+-------------+
| 1     | 10      | 100     | 2025-02-18  |
| 2     | 20      | 200     | 2025-02-18  |
+-------+---------+---------+-------------+
2 rows selected (3.27 seconds)0: jdbc:hive2://192.168.42.101:10000> select * from mydb_t a where dt = '2025-02-19';
+-------+---------+---------+-------------+
| a.pk  | a.col1  | a.col2  |    a.dt     |
+-------+---------+---------+-------------+
| 1     | 10      | 100     | 2025-02-19  |
| 2     | 20      | 200     | 2025-02-19  |
| 3     | 30      | 300     | 2025-02-19  |
| 4     | 40      | 400     | 2025-02-19  |
+-------+---------+---------+-------------+

1.2.2 tag-to-partition.preview

  • 上述示例只能查询已经创建的tag,但Paimon是一个实时数据湖,您还需要查询最新的数据。因此,Paimon提供了一个预览功能
  • 'metastore.tag-to-partition.preview'可选值如下:
    • “none”:不自动创建标签;
    • “process-time”:基于机器时间,当处理时间超过周期时间加上延迟时,创建标签;
    • “watermark”:基于输入的watermark,当watermark超过周期时间加上延迟时,创建标签;
    • “batch”:在批处理场景中,任务完成后生成当前快照对应的标签。
Flink SQL> drop table if exists mydb_preview;
Flink SQL> CREATE TABLE mydb_preview (pk INT,col1 STRING,col2 STRING
) WITH ('bucket' = '-1','metastore.tag-to-partition' = 'dt',-- paimon会基于process-time提前创建partitions'metastore.tag-to-partition.preview' = 'process-time'
);-- snapshot=1
Flink SQL> INSERT INTO mydb_preview VALUES (1, '10', '100'), (2, '20', '200');-- create tag '2025-02-19' for snapshot 1
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \create_tag \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table mydb_preview \--tag_name '2025-02-19' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--snapshot 10: jdbc:hive2://192.168.42.101:10000> show partitions mydb_preview;;
+----------------+
|   partition    |
+----------------+
| dt=2025-02-19  |
| dt=2025-02-20  |
+----------------+
2 rows selected (0.085 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-19';
+-------+---------+---------+-------------+
| a.pk  | a.col1  | a.col2  |    a.dt     |
+-------+---------+---------+-------------+
| 1     | 10      | 100     | 2025-02-19  |
| 2     | 20      | 200     | 2025-02-19  |
+-------+---------+---------+-------------+
2 rows selected (0.292 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-20';
+-------+---------+---------+-------------+
| a.pk  | a.col1  | a.col2  |    a.dt     |
+-------+---------+---------+-------------+
| 1     | 10      | 100     | 2025-02-20  |
| 2     | 20      | 200     | 2025-02-20  |
+-------+---------+---------+-------------+
2 rows selected (0.263 seconds)-- new data in '2025-02-20'
Flink SQL> INSERT INTO mydb_preview VALUES (3, '30', '300'), (4, '40', '400');0: jdbc:hive2://192.168.42.101:10000> select * from mydb_preview a where dt = '2025-02-20';
+-------+---------+---------+-------------+
| a.pk  | a.col1  | a.col2  |    a.dt     |
+-------+---------+---------+-------------+
| 1     | 10      | 100     | 2025-02-20  |
| 2     | 20      | 200     | 2025-02-20  |
| 3     | 30      | 300     | 2025-02-20  |
| 4     | 40      | 400     | 2025-02-20  |
+-------+---------+---------+-------------+

2 Branch分支管理

  • 在流式数据处理中,修正数据具有挑战性,因为它可能会影响现有数据,而用户会看到流式的临时结果,这是不期望的。
  • 我们假设现有工作流正在处理的分支是main分支。通过创建自定义数据分支,可以在现有表上对新任务进行实验性测试和数据验证,而无需停止现有的读取/写入工作流,也无需从主分支复制数据。
  • 通过合并或替换分支操作,用户可以完成数据的修正。
-- 1、创建paimon表
Flink SQL> drop table if exists flink_branch_demo;
Flink SQL> CREATE TABLE flink_branch_demo (dt STRING NOT NULL,name STRING NOT NULL,amount BIGINT,PRIMARY KEY (dt, name) NOT ENFORCED 
) PARTITIONED BY (dt)
WITH ('connector' = 'paimon'
);-- 2、创建一个专门用于流写的分支streambranch, 这个分支将负责接收实时流入的数据。
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \create_branch \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table flink_branch_demo \--branch_name streambranch \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083-- 3、设置流写分支的属性
Flink SQL> ALTER TABLE  `flink_branch_demo$branch_streambranch` SET ('bucket' = '4','changelog-producer' = 'lookup'
);-- 4、设置回滚分支(如果要实现分支回滚必须要设置该参数)
Flink SQL> ALTER TABLE flink_branch_demo SET ( 'scan.fallback-branch' = 'streambranch' );-- 5、写入数据
-- 5-1、主分支写入数据
Flink SQL> insert into flink_branch_demo values  ('20240725', 'apple', 3), ('20240725', 'banana', 5);Flink SQL> select * from flink_branch_demo;
+----------+--------+--------+
|       dt |   name | amount |
+----------+--------+--------+
| 20240725 |  apple |      3 |
| 20240725 | banana |      5 |
+----------+--------+--------+
2 rows in set--  5-2、再往streambranch分支写入数据
Flink SQL> INSERT INTO `flink_branch_demo$branch_streambranch` 
VALUES ('20240725', 'apple', 666), ('20240725', 'peach', 999), ('20240726', 'cherry', 33), ('20240726', 'pear', 88);-- 5-3、查询主分支
-- 20240725分区的新的数据没有生效! 那说明原表已经有的分区的数据,在streambranch写入这些分区的数据,原表是不会更新的,只要是往原表里面写了某个分区的数据,那么这个分区的数据以写入原表主分支的为准。
-- 原表主分支没有的分区的数据,则按照streambranch读取,因为设置了原表的 'scan.fallback-branch' = 'streambranch' ,读取原表可以查到streambranch这部分的数据。
Flink SQL> select * from flink_branch_demo;
+----------+--------+--------+
|       dt |   name | amount |
+----------+--------+--------+
| 20240726 | cherry |     33 | -- 26号分区主表没有,使用了分支表中的数据
| 20240726 |   pear |     88 |
| 20240725 |  apple |      3 | -- 25号的分区使用了主表中的数据
| 20240725 | banana |      5 |
+----------+--------+--------+-- 5-4、查询流分支
Flink SQL> select * from `flink_branch_demo$branch_streambranch` ;
+----------+--------+--------+
|       dt |   name | amount |
+----------+--------+--------+
| 20240726 | cherry |     33 |
| 20240726 |   pear |     88 |
| 20240725 |  apple |    666 |
| 20240725 |  peach |    999 |
+----------+--------+--------+
4 rows in set-- 6、合并分支
-- 合并分支表操作(Fast Forward),即:删除主表的一切数据,并将分支表的一切数据拷贝到主表
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \fast_forward \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table flink_branch_demo \--branch_name streambranch \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083-- 再次查询主表
Flink SQL> select * from flink_branch_demo;
+----------+--------+--------+
|       dt |   name | amount |
+----------+--------+--------+
| 20240726 | cherry |     33 |
| 20240726 |   pear |     88 |
| 20240725 |  apple |    666 |
| 20240725 |  peach |    999 |
+----------+--------+--------+-- 7、数据回归到主分支版本(注意:不进行上面合并操作)
Flink SQL> ALTER TABLE flink_branch_demo RESET( 'scan.fallback-branch');

3 追加表(Append table)

  • 如果一个表没有定义主键,那它就是一个追加表(Append Table)。与主键表相比,追加表无法直接接收变更日志,也不能直接通过 upsert 更新数据,只能接收追加数据。
使用场景或优势说明
批量写入和批量读取类似于常规的 Hive 分区表,适用于大规模数据的批量处理。
友好的对象存储良好的兼容性和适应性,支持 S3、OSS 等对象存储。
时间穿越和回滚支持数据的时间旅行和回滚功能,方便数据的历史查询和恢复。
低成本的删除和更新在批量数据操作中,能够以较低的计算和资源成本进行删除和更新操作。
流式接收中的小文件自动合并在流式写入过程中,自动处理小文件合并,减少存储碎片。
队列形式的流式读写支持如队列般的流式读写操作,可以像消息队列一样处理数据。
高性能查询通过顺序和索引实现的高效查询性能。

3.1 流式处理

  • Append Table可以通过 Flink 进行非常灵活的流式写入,并可以像队列一样通过 Flink 进行读取。
  • 唯一的区别是其延迟为分钟级别,但其优势在于非常低的成本以及能够进行过滤和投影下推。

3.1.1 小文件自动合并

  • 在流式写入作业中,如果没有定义分桶(bucket),写入器不会进行压缩;
  • 相反,将使用压缩协调器(Compact Coordinator)扫描小文件并将压缩任务传递给压缩工作者(Compact Worker)。
  • 流式模式下,如果在 Flink 中运行插入 SQL,拓扑结构将如下所示:

img

  • 注意:
    • 上面的压缩任务不会引起反压。
    • 如果设置 write-only 为 true,压缩协调器(Compact Coordinator)和压缩工作者(Compact Worker)将在拓扑中被移除。
    • 自动压缩仅在 Flink 引擎的流模式下被支持。可以通过 Paimon 在 Flink 中启动压缩作业,并通过设置 write-only 禁用所有其他压缩。

3.1.2 流式查询

  • 追加表可以像消息队列一样使用,进行流式查询,与主键表类似,有两个选项可以进行流式读取:
    • 默认模式:流式读取在首次启动时生成表的最新快照,并继续读取最新的增量记录。
    • 增量模式:可以指定 scan.mode 或 scan.snapshot-id 或 scan.timestamp-millis 或 scan.file-creation-time-millis 进行增量读取。
  • 追加表的流式查询类似 Flink-Kafka,默认情况下不保证顺序。如果数据需要某种顺序,也需要考虑定义桶键(bucket-key),即Bucketed Append

3.2 查询优化

3.2.1 按照顺序跳过查询

  • Paimon 默认在清单文件中记录每个字段的最大值和最小值。
  • 在查询时,根据查询的 WHERE 条件,通过清单中的统计信息进行文件过滤。如果过滤效果良好,查询时间可以从分钟级别加速到毫秒级别。
  • 然而,数据分布并不总是能有效过滤,因此如果可以根据 WHERE 条件中的字段对数据进行排序,将会更高效。
    • 具体可参考:Flink COMPACT Action or Flink COMPACT Procedure or Spark COMPACT Procedure.

3.2.2 按文件索引跳过数据

  • 如下代码所示,可以使用文件索引,会在读取端通过索引过滤文件

    • 定义 file-index.bloom-filter.columns 后,Paimon 将为每个文件创建相应的索引文件。
    • 如果索引文件太小,它将直接存储在清单中,否则将存储在数据文件的目录中。
    • 每个数据文件对应一个索引文件,该文件有独立的定义,可以包含不同类型的多列索引。
    CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
    ) WITH ('file-index.bloom-filter.columns' = 'product_id','file-index.bloom-filter.product_id.items' = '200'
    );
    
  • 索引种类如下所示:

    # 布隆过滤器索引
    file-index.bloom-filter.columns:指定需要创建布隆过滤器索引的列。
    file-index.bloom-filter.<column_name>.fpp:配置布隆过滤器的误报率(False Positive Probability)。
    file-index.bloom-filter.<column_name>.items:配置每个数据文件中预期的唯一项数量。# Bitmap(位图索引):
    file-index.bitmap.columns:指定需要创建位图索引的列。# Bit-Slice Index Bitmap(位切片索引位图):
    file-index.bsi.columns:指定需要创建位切片索引(BSI)的列。如果想为现有表添加文件索引,且不进行任何数据重写,可以使用rewrite_file_index过程。
    在使用该过程之前,可以使用ALTER子句来为表配置file-index.<filter-type>.columns。
    可以参考:
    https://paimon.apache.org/docs/1.0/flink/procedures/#procedures
    
    • 布隆过滤器索引和位图索引的区别
    特性布隆过滤器索引(Bloom Filter Index)位图索引(Bitmap Index)
    设计目标快速判断某个值是否可能存在,减少磁盘 I/O精确查询低基数列,支持多条件组合查询
    实现原理基于哈希函数的概率型数据结构基于Bitmap的精确索引结构
    适用数据类型高基数列(如唯一 ID、字符串等)低基数列(如性别、状态等)
    查询类型等值查询(=等值查询(=)和多条件组合查询(ANDOR
    存储效率存储空间小,适合大规模数据集低基数列存储效率高,高基数列存储开销大
    查询效率查询速度快,但存在误报率查询效率高,无误报率
    更新代价较低较高
    适用场景大数据集的快速过滤低基数列的精确查询和多条件组合查询
    • 位图索引和位切片索引的区别
    特性Bitmap Index(位图索引)Bit-Slice Index (BSI)(位切片索引)
    适用数据类型低基数(即列中唯一值的数量较少)的任意类型(如枚举、状态等)高基数的数值型数据(如金额、时间戳等)
    查询类型等值查询、范围查询范围查询、聚合查询(如 SUMMAX 等)
    存储效率低基数列高效,高基数列存储开销大高基数列存储效率高
    实现复杂度简单复杂
    更新代价较高较高

3.3 Bucketed Append

  • 可以指定 bucket 和 bucket-key 以创建一个Bucketed Append表。
  • 在Bucketed Append中,不同桶内的数据是严格有序的,流式读取将按写入顺序准确地传输记录。这样可以优化数据处理和查询性能。
-- 创建Bucketed Append表
CREATE TABLE my_table (product_id BIGINT,price DOUBLE,sales BIGINT
) WITH ('bucket' = '8','bucket-key' = 'product_id'
);

img

3.3.1 有界流

  • 流式来源(Streaming Source)也可以是有界的,可以通过指定 scan.bounded.watermark 来定义有界流模式的结束条件。
  • 例如,指定kafka源并声明watermark 的定义。当使用此kafka源写入Paimon表时,Paimon表的快照将生成相应的watermark,以便流式读取此Paimon表时可以使用有界watermark的功能。
-- 临时表
drop TEMPORARY table if exists order_from_kafka;
CREATE TEMPORARY TABLE order_from_kafka (`user` int,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '8' HOUR - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka','topic' = 'orders_test','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','properties.group.id' = 'tGroup','json.fail-on-missing-field' = 'false','scan.startup.mode' = 'earliest-offset','json.ignore-parse-errors' = 'true'
);-- 创建topic
/opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders_test --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 -- paimon追加表
drop table if exists paimon_r;
CREATE TABLE paimon_r (`user` int,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL '8' HOUR - INTERVAL '5' SECOND
) WITH ('connector' = 'paimon'
);-- 将Kafka表中的数据实时插入到Paimon表中:
INSERT INTO paimon_r SELECT * FROM order_from_kafka;-- 启动有界流任务读取 Paimon 表
-- 1696126500000 2023-10-01 10:15:00-- 当Flink处理过程中遇到第一个水印值大于或等于这个时间点的记录时,
-- 它会停止继续读取后续的数据,即使数据源中还有更晚时间点的数据。
Flink SQL> SELECT * FROM paimon_r /*+ OPTIONS('scan.bounded.watermark'='1696126500000') */;
+----+-------------+--------------------------------+-------------------------+
| op |        user |                        product |              order_time |
+----+-------------+--------------------------------+-------------------------+
| +I |        1001 |                      iPhone 15 | 2023-10-01 10:00:00.000 |
| +I |        1002 |                    MacBook Pro | 2023-10-01 10:05:00.000 |
| +I |        1003 |                    AirPods Pro | 2023-10-01 10:10:00.000 |
| +I |        1004 |                       iPad Air | 2023-10-01 10:15:00.000 |
+----+-------------+--------------------------------+-------------------------+
Received a total of 4 rows-- 启动命令行生产者,模拟数据源源源不断地生产数据(每隔一段时间插入1条数据)
/opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders_test --bootstrap-server centos01:9092
{"user": 1001, "product": "iPhone 15", "order_time": "2023-10-01 10:00:00"}
{"user": 1002, "product": "MacBook Pro", "order_time": "2023-10-01 10:05:00"}
{"user": 1003, "product": "AirPods Pro", "order_time": "2023-10-01 10:10:00"}
-- "2023-10-01 10:15:00" 时候watermark是1696126495000,即:2023-10-01 10:14:55
-- 此时有界流并未结束
{"user": 1004, "product": "iPad Air", "order_time": "2023-10-01 10:15:00"}
-- "2023-10-01 10:20:00" 时候watermark是1696126795000
-- 即:2023-10-01 10:19:55 > 2023-10-01 10:15:00(1696126500000)
-- 停止继续读取后续的数据,即使数据源中还有更晚时间点的数据
{"user": 1005, "product": "Apple Watch", "order_time": "2023-10-01 10:20:00"}
{"user": 1006, "product": "Apple Watch", "order_time": "2023-10-02 08:00:00"}
{"user": 1007, "product": "Apple Watch", "order_time": "2023-10-03 08:20:00"}
{"user": 1008, "product": "Apple Watch", "order_time": "2024-10-03 08:20:00"}
{"user": 1009, "product": "Apple Watch", "order_time": "2025-10-03 08:20:00"}

3.3.2 批处理(Batch)

  • 通过设置 spark.sql.sources.v2.bucketing.enabled 为 true,Spark 将识别 V2 数据源报告的特定分布,并在必要时尝试避免shuffle。
  • 如下代码所示,如果两个表具有相同的分桶策略和相同数量的桶,昂贵的 join shuffle 操作将被避免。
-- 在必要时尝试避免shuffle
SET spark.sql.sources.v2.bucketing.enabled = true;-- 事实表
CREATE TABLE FACT_TABLE (order_id INT, f1 STRING
) TBLPROPERTIES ('bucket'='10', 'bucket-key' = 'order_id');-- 维度表
CREATE TABLE DIM_TABLE (order_id INT, f2 STRING
) TBLPROPERTIES ('bucket'='10', 'primary-key' = 'order_id');SELECT * 
FROM FACT_TABLE 
JOIN DIM_TABLE 
ON FACT_TABLE.order_id = DIM_TABLE.order_id;

注:

  • Paimon还有其他功能,这里就不再介绍,可以参考官网自行了解。例如:

    • Paimon 在 Flink 1.17 及后续版本中支持使用 UPDATE 更新主键表记录、使用DELETE删除change-log数据;

    • 流式读取表时指定consumer-id,防止快照因为过期而被删除;

    • paimon提供了包含有关每个表的元数据和信息的系统表,例如创建的快照和使用的选项。用户可以通过批量查询访问系统表。

      -- 快照表 Snapshots Table
      SELECT * FROM ws_t$snapshots;-- 模式表 Schemas Table
      SELECT * FROM ws_t$schemas;-- 选项表 Options Table
      SELECT * FROM ws_t$options;-- 标签表 Tags Table
      SELECT * FROM ws_t$tags;-- 审计日志表 Audit log Table
      SELECT * FROM ws_t$audit_log;
      ......
      
    • 可以集成其他引擎,如spark引擎等

    • Paimon表支持分区过期配置

    • 缩放Bucket官方示例

      • Rescale Bucket | Apache Paimon

相关文章:

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)

Paimon的下载及安装&#xff0c;并且了解了主键表的引擎以及changelog-producer的含义参考&#xff1a; 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1) 利用Paimon表做lookup join&#xff0c;集成mysql cdc等参考&#xff1a; 大数据组件(四)快速入门实时数据…...

SVN把英文换中文

原文链接&#xff1a;SVN设置成中文版本 都是英文&#xff0c;换中文 Tortoise SVN 安装汉化教程(乌龟SVN) https://pan.quark.cn/s/cb6f2eee3f90 下载中文包...

Ubuntu 的RabbitMQ安装

目录 1.安装Erlang 查看erlang版本 退出命令 2. 安装 RabbitMQ 3.确认安装结果 4.安装RabbitMQ管理界面 5.启动服务并访问 1.启动服务 2.查看服务状态 3.通过IP:port 访问界面 4.添加管理员用户 a&#xff09;添加用户名&#xff1a;admin&#xff0c;密码&#xff1…...

基于WebRTC与AI大模型接入EasyRTC:打造轻量级、高实时、强互动的嵌入式音视频解决方案

随着物联网和嵌入式技术的快速发展&#xff0c;嵌入式设备对实时音视频通信的需求日益增长。然而&#xff0c;传统的音视频解决方案往往存在体积庞大、实时性差、互动体验不佳等问题&#xff0c;难以满足嵌入式设备的资源限制和应用场景需求。 针对以上痛点&#xff0c;本文将介…...

QML 实现一个动态的启动界面

QML 实现一个动态的启动界面 一、效果查看二、源码分享三、所用到的资源下载 一、效果查看 二、源码分享 工程结构 main.qml import QtQuick import QtQuick.Controls import QtQuick.Dialogs import Qt.labs.platformWindow {id:windowwidth: 640height: 400visible: truetit…...

智能预警系统标准化处理流程

在当今数字化时代,IT系统的稳定运行对企业的业务连续性至关重要。为了及时发现和响应系统异常,构建智能预警系统已成为许多企业的当务之急。但仅仅拥有预警系统还不够,我们还需要一套标准化的处理流程,确保问题能够高效、有序地得到解决。 © ivwdcwso (ID: u012172506) 一…...

Unity游戏制作中的C#基础(4)数组声明和使用

一、数组的声明 在 C# 中&#xff0c;声明数组有多种方式&#xff0c;每种方式都有其适用的场景&#xff0c;下面为你逐一详细介绍&#xff1a; 1. 直接初始化声明 这种方式直观且便捷&#xff0c;在声明数组的同时就为其赋初值&#xff0c;让数组从诞生之初就拥有了具体的数据…...

tailwindcss学习03

01 入门 02 vue中接入 03 工具类优先 准备 vue.svg <svg viewBox"0 0 40 40" xmlns"http://www.w3.org/2000/svg"> <defs> <linearGradient x1"50%" y1"0%" x2"50%" y2"100%" id"a"&…...

QML Component 与 Loader 结合动态加载组件

在实际项目中&#xff0c;有时候我们写好一个组件&#xff0c;但不是立即加载出来&#xff0c;而是触发某些条件后才动态的加载显示出来&#xff0c;当处理完某些操作后&#xff0c;再次将其关闭掉&#xff1b; 这样的需求&#xff0c;可以使用 Component 包裹着组件&#xff…...

Visual studio 2022 将打开文件的方式由单击改为双击

1. 打开vs2022&#xff0c;选择Tools -> Options打开Options设置页面 2. 在左侧依次展开Environment, 选择Tabs and Windows 3. 在右侧面板往下拖拽滚动条&#xff0c;找到Preview Tab section, unchecked "Preview selected files in Solution Explorer (Altclick t…...

网络工程师 (49)UDP协议

前言 UDP协议&#xff0c;即用户数据报协议&#xff08;User Datagram Protocol&#xff09;&#xff0c;是一种无连接的、不可靠的、面向报文的传输层通信协议。 一、基本特点 无连接性&#xff1a;UDP在发送数据之前不需要与目标设备建立连接&#xff0c;也无需在数据发送结束…...

了解大数据

一、大数据的特点&#xff1a; 1.大量 2.高速 3.多样 结构化数据和非结构化数据 4.低价值密度 二、大数据的应用场景&#xff1a;视频推荐、电商推荐等 三、大数据的技术发展脉络 阶段1&#xff1a;单机时代 阶段2&#xff1a;大数据时代-分布式处理 阶段3&#xff1a;实…...

命令模式

1. 命令模式简介 命令模式(Command Pattern)是一种行为型设计模式,它将一个请求封装为一个对象,从而使您可以用不同的请求对客户进行参数化、对请求排队或记录请求日志,以及支持可撤销的操作。命令模式的核心思想是将操作和操作的执行者解耦,使得操作可以独立于执行者进…...

解放大脑!用DeepSeek自动生成PPT!

DeepSeek应用&#xff08;PPT篇&#xff09; DeepSeek作为当前最好的AI大模型之一&#xff0c;其强大的文本生成能力被广泛的应用于各个领域&#xff0c;本文我们来聊聊用DeepSeek来自动生成PPT。 一、DeepSeek & PPT DeepSeek本身没有直接生成PPT的能力&#xff0c;换个…...

GUI编程(window系统→Linux系统)

最近有个项目需要将windows系统的程序往Linux系统上面移植&#xff0c;由于之前程序没有考虑过多平台兼容的问题&#xff0c;导致部分功能不可用以下是对近期遇到的问题的总结&#xff0c;以及相应的解决方案和经验分享。 1. Python 模块安装与管理 在 Linux 系统中&#xff0…...

互推机制在开源AI智能名片2+1链动模式S2B2C商城小程序源码推广中的应用探索

摘要&#xff1a; 在数字化营销时代&#xff0c;开源AI智能名片21链动模式S2B2C商城小程序源码作为一种创新的技术解决方案&#xff0c;正逐步成为企业数字化转型的重要工具。然而&#xff0c;面对激烈的市场竞争&#xff0c;如何高效推广这一前沿技术产品&#xff0c;成为开发…...

Pytorch实现之特征损失与残差结构稳定GAN训练,并训练自己的数据集

简介 简介:生成器和鉴别器分别采用了4个新颖设计的残差结构实现,同时在损失中结合了鉴别器层的特征损失来提高模型性能。 论文题目:Image Generation by Residual Block Based Generative Adversarial Networks(基于残留块的生成对抗网络产生图像) 会议:2022 IEEE Int…...

ES6相关操作(2)

一.Promise Promise是ES6引入的异步编程工具。 语法上Promise是一个构造函数,用于封装异步操作并可以获取操作成功或失败的结果 Promise构造函数:Promise(excutor){} Promise的常用函数:then,catch 实例化Promise对象(创建Promise工具) let data"请求数据"//该数据为…...

自动化办公|xlwings生成图表

在日常的数据分析和报告生成中&#xff0c;Excel图表是一个非常重要的工具。它能够帮助我们直观地展示数据&#xff0c;发现数据中的规律和趋势。然而&#xff0c;手动创建和调整图表往往耗时且容易出错。幸运的是&#xff0c;借助Python的xlwings库&#xff0c;我们可以自动化…...

大模型知识蒸馏技术(5)——在线蒸馏

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl1. 在线蒸馏概述 在线蒸馏是一种知识蒸馏方式,其核心特点是教师模型和学生模型的参数在训练过程中同时更新,整个蒸馏框架是端到端训练的。这种方式允许教师模型和学生模型相互影响、共同学习,能…...

【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表

1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...

select、poll、epoll 与 Reactor 模式

在高并发网络编程领域&#xff0c;高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表&#xff0c;以及基于它们实现的 Reactor 模式&#xff0c;为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。​ 一、I…...

使用Matplotlib创建炫酷的3D散点图:数据可视化的新维度

文章目录 基础实现代码代码解析进阶技巧1. 自定义点的大小和颜色2. 添加图例和样式美化3. 真实数据应用示例实用技巧与注意事项完整示例(带样式)应用场景在数据科学和可视化领域,三维图形能为我们提供更丰富的数据洞察。本文将手把手教你如何使用Python的Matplotlib库创建引…...

佰力博科技与您探讨热释电测量的几种方法

热释电的测量主要涉及热释电系数的测定&#xff0c;这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中&#xff0c;积分电荷法最为常用&#xff0c;其原理是通过测量在电容器上积累的热释电电荷&#xff0c;从而确定热释电系数…...

【Go语言基础【12】】指针:声明、取地址、解引用

文章目录 零、概述&#xff1a;指针 vs. 引用&#xff08;类比其他语言&#xff09;一、指针基础概念二、指针声明与初始化三、指针操作符1. &&#xff1a;取地址&#xff08;拿到内存地址&#xff09;2. *&#xff1a;解引用&#xff08;拿到值&#xff09; 四、空指针&am…...

Go 并发编程基础:通道(Channel)的使用

在 Go 中&#xff0c;Channel 是 Goroutine 之间通信的核心机制。它提供了一个线程安全的通信方式&#xff0c;用于在多个 Goroutine 之间传递数据&#xff0c;从而实现高效的并发编程。 本章将介绍 Channel 的基本概念、用法、缓冲、关闭机制以及 select 的使用。 一、Channel…...

三分算法与DeepSeek辅助证明是单峰函数

前置 单峰函数有唯一的最大值&#xff0c;最大值左侧的数值严格单调递增&#xff0c;最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值&#xff0c;最小值左侧的数值严格单调递减&#xff0c;最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...

Caliper 配置文件解析:fisco-bcos.json

config.yaml 文件 config.yaml 是 Caliper 的主配置文件,通常包含以下内容: test:name: fisco-bcos-test # 测试名称description: Performance test of FISCO-BCOS # 测试描述workers:type: local # 工作进程类型number: 5 # 工作进程数量monitor:type: - docker- pro…...

如何应对敏捷转型中的团队阻力

应对敏捷转型中的团队阻力需要明确沟通敏捷转型目的、提升团队参与感、提供充分的培训与支持、逐步推进敏捷实践、建立清晰的奖励和反馈机制。其中&#xff0c;明确沟通敏捷转型目的尤为关键&#xff0c;团队成员只有清晰理解转型背后的原因和利益&#xff0c;才能降低对变化的…...

系统掌握PyTorch:图解张量、Autograd、DataLoader、nn.Module与实战模型

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文通过代码驱动的方式&#xff0c;系统讲解PyTorch核心概念和实战技巧&#xff0c;涵盖张量操作、自动微分、数据加载、模型构建和训练全流程&#…...