当前位置: 首页 > news >正文

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

Paimon的下载及安装,并且了解了主键表的引擎以及changelog-producer的含义参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1)

利用Paimon表做lookup join,集成mysql cdc等参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

利用Paimon的Tag兼容Hive,Branch管理等参考:

  • 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(3)

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

今天,我们继续快速了解下最近比较火的Apache Paimon:

  • 官方文档:https://paimon.apache.org/docs/1.0/
  • 推荐阅读:当流计算邂逅数据湖:Paimon 的前生今世

1 利用Paimon做维表join

  • 在流式处理中,Lookup Join用来从另一个表(Paimon)中“查字典”来补充流的信息。一个表需要有时间标记(处理时间属性),另一个表需要支持快速查找(查找源连接器)
  • Paimon在Flink中支持对带有主键的表和追加表进行Lookup Join操作

1.1 常规Lookup

SET 'execution.runtime-mode' = 'streaming';-- 1、用户维表
CREATE TABLE customers (id INT PRIMARY KEY NOT ENFORCED,name STRING    comment '姓名',country STRING comment '城市',zip STRING     comment '邮编'
) WITH ('connector' = 'paimon'
);-- 插入维表数据
INSERT INTO customers VALUES(1,'tom','伦敦','123'),(2,'hank','纽约','456'),(3,'小明','北京','789');-- 2、模拟订单表(数据流)
drop TEMPORARY TABLE orders_info;
CREATE TEMPORARY TABLE orders_info (order_id INT,total INT,customer_id INT,proc_time AS PROCTIME()
) WITH ('connector' = 'datagen', 'rows-per-second'='1', 'fields.order_id.kind'='sequence', 'fields.order_id.start'='1', 'fields.order_id.end'='1000000', 'fields.total.kind'='random', 'fields.total.min'='1', 'fields.total.max'='1000', 'fields.customer_id.kind'='random', 'fields.customer_id.min'='1', 'fields.customer_id.max'='3'
);-- 3、维表join
SELECT o.order_id, o.total, c.country, c.zip
FROM orders_info AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;-- 可以看到下面的结果
+----+----------+----------+-------------+---------+
| op | order_id |    total |     country |     zip |
+----+----------+----------+-------------+---------+
| +I |        1 |      179 |        纽约 |     456 |
| +I |        2 |       31 |        北京 |     789 |
| +I |        3 |      148 |        北京 |     789 |
| +I |        4 |      774 |        北京 |     789 |

1.2 同步重试Lookup

  • 如果用户维表(查找表)的数据未准备好,导致订单表(主表)的记录无法完成连接,你可以考虑使用Flink的延迟重试策略进行查找。
  • 这个功能仅适用于Flink 1.16及以上的版本。
  • 下面sql的提示(hints)设置了重试策略:
    • 'table'='c': 指定了应用查找重试策略的目标表的别名,在这个例子中是customers表,其别名为c。
    • 'retry-predicate'='lookup_miss': 定义了触发重试的条件。这里的条件是lookup_miss,意味着如果在查找过程中没有找到对应的数据(即查找缺失),就会触发重试机制。
    • 'retry-strategy'='fixed_delay': 设置了重试的策略为固定延迟,即每次重试之间会有固定的等待时间。
    • 'fixed-delay'='1s': 当采用固定延迟重试策略时,这里设定了每次重试前等待的时间长度为1秒。
    • 'max-attempts'='600': 设定最大重试次数为600次。结合上面的fixed-delay设置,这意味着系统会每隔1秒尝试一次数据查找,最多尝试600次。
-- enrich each order with customer information
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

1.3 异步重试Lookup

  • 在同步重试机制下,如果处理某一条记录时遇到了问题(例如查找失败),系统会按照设定的重试策略反复尝试直到成功或者达到最大重试次数。在这期间,这条记录后面的其他记录即使没有问题也无法被处理,因为整个处理流程被阻塞了。这会导致数据处理的整体效率低下,甚至可能造成作业延迟或超时。
  • 使用异步加上allow_unordered,可以在某些记录在查找时缺失也不会再阻塞其他记录。
  • 下面sql配置项解释:
    • 'output-mode'='allow_unordered': 设置输出模式允许无序,意味着当查找失败时不会阻塞其他记录的处理,适合于不需要严格顺序的场景。
    • 'lookup.async'='true': 启用异步查找,提高效率,避免因等待某个查找结果而阻塞其它查找操作。
    • 'lookup.async-thread-number'='16': 设置用于异步查找的线程数为16,这可以加速查找过程,特别是在高并发情况下。
  • 注意:如果主表(orders)是CDC流,allow_unordered会被Flink SQL忽略(只支持追加流),流作业可能会被阻塞。可以尝试使用Paimon的audit_log系统表功能来绕过这个问题(将CDC流转为追加流)。
SELECT /*+ LOOKUP('table'='c', 'retry-predicate'='lookup_miss', 'output-mode'='allow_unordered', 'retry-strategy'='fixed_delay', 'fixed-delay'='1s', 'max-attempts'='600') */o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('lookup.async'='true', 'lookup.async-thread-number'='16') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

1.4 max_pt功能

  • 在传统的数据仓库中,每个分区通常维护最新的完整数据,因此这种分区表只需要连接最新的分区即可。Paimon特别为此场景开发了max_pt功能。

  • 通过max_pt,Lookup节点能够自动刷新并查询最新分区的数据,确保所使用的客户信息始终是最新的,而不需要手动干预来确定或切换到最新的数据分区。

  • 这种方法特别适用于需要频繁更新和查询最新数据的场景,可以大大提高数据处理的效率和准确性。

-- 1、分区用户维表
drop table if exists customers;
CREATE TABLE customers (id INT,name STRING,country STRING,zip STRING,dt STRING,PRIMARY KEY (id, dt) NOT ENFORCED
) PARTITIONED BY (dt);-- 插入数据(维表关联时候,只查找'2025-02-19'分区数据)
INSERT INTO customers VALUES
(1, 'Alice', 'USA', '10001', '2025-02-18'),
(2, 'Bob', 'UK', '20002', '2025-02-18'),
(3, 'Charlie', 'Germany', '30003', '2025-02-18'),
(1, 'Alice', 'USA', '10002', '2025-02-19'), -- 更新了 Alice 的邮编
(4, 'David', 'France', '40004', '2025-02-19');-- 2、订单信息表(解析kafka数据)
CREATE TEMPORARY TABLE orders (order_id BIGINT,customer_id INT,total DECIMAL(10, 2),proc_time AS PROCTIME()
) WITH ('connector' = 'kafka','topic' = 'orders_topic','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','properties.group.id' = 'testordersGroup','scan.startup.mode' = 'earliest-offset','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true'
);-- 创建topic
/opt/apps/kafka_2.12-2.6.2/bin/kafka-topics.sh --create --topic orders_topic --replication-factor 1 --partitions 1 --bootstrap-server centos01:9092 
-- 启动命令行生产者,生产数据
/opt/apps/kafka_2.12-2.6.2/bin/kafka-console-producer.sh --topic orders_topic --bootstrap-server centos01:9092
{"order_id":1001,"customer_id":1,"total":50.0}
{"order_id":1002,"customer_id":2,"total":30.0}
{"order_id":1004,"customer_id":4,"total":20.0}-- 'lookup.dynamic-partition'='max_pt()': 这个选项指示查找节点自动定位并使用最新(最大)的分区。max_pt()函数帮助系统识别最新的分区,确保只查询最新的数据。
-- 'lookup.dynamic-partition.refresh-interval'='1 h': 设置了查找节点刷新最新分区的时间间隔为1小时。这意味着系统会每隔1小时检查一次是否有新的分区可用,并自动更新到最新分区的数据。SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;+----+----------------------+--------------+--------------------------------+--------------------------------+
| op |             order_id |        total |                        country |                            zip |
+----+----------------------+--------------+--------------------------------+--------------------------------+
| +I |                 1001 |        50.00 |                            USA |                          10002 |
| +I |                 1004 |        20.00 |                         France |                          40004 |

  • 可以运行一个Flink流式作业来启动针对该paimon维表的查询服务。当QueryService存在时,Flink查找连接(Lookup Join)会优先从该服务获取数据,这将有效提高查询性能。

  • 可以通过调用sys.query_service系统函数来实现:
    CALL sys.query_service('paimon_db.customers', 4); -- 设置并行度为4或者通过下面action包开启

    <FLINK_HOME>/bin/flink run \/path/to/paimon-flink-action-1.0.0.jar \query_service \--warehouse <warehouse-path> \--database <database-name> \--table <table-name> \[--parallelism <parallelism>] \[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]]
    
  • 查询服务能够在内存中缓存频繁访问的数据,并以高并发的方式提供这些数据,减少了磁盘I/O操作和网络延迟的影响。

2 集成Mysql CDC

  • 可以通过 Flink SQL 或者 Flink DataStream API 将 Flink CDC 数据写入 Paimon 中,也可以通过Paimon 提供的 CDC 工具来完成入湖。那这两种方式有什么区别呢?

    在这里插入图片描述

  • 上图是使用 Flink SQL 来完成入湖,简单,但是当源表添加新列后,同步作业不会同步新的列,下游 Paimon 表也不会增加新列。

在这里插入图片描述

  • 上图是使用 Paimon CDC 工具来同步数据,可以看到,当源表发生列的新增后,流作业会自动新增列的同步,并传导到下游的 Paimon 表中,完成 Schema Evolution 的同步

  • Paimon CDC 工具也提供了整库同步:

    • 一个作业同步多张表,以低成本的方式同步大量小表
    • 作业里同时自动进行 Schema Evolution
    • 新表将会被自动进行同步,你不用重启作业,全自动完成
  • 推荐阅读:Flink + Paimon 数据 CDC 入湖最佳实践

2.1 MySQL一张表同步到Paimon一张表

2.1.1 环境准备

# mysql-cdc相关jar包的下载地址,同时还需要mysql-connector
https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-mysql-cdc/3.1.1/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.1.1/
https://repo.maven.apache.org/maven2/org/apache/flink/flink-cdc-common/3.1.1/# 在flink的lib目录添加下面的jar包
[root@centos01 lib]# ll /opt/apps/flink-1.16.0/lib/
-rw-r--r--. 1 root root     259756 Feb 19 15:13 flink-cdc-common-3.1.1.jar
-rw-r--r--. 1 root root   21286022 Feb 19 11:40 flink-cdc-pipeline-connector-mysql-3.1.1.jar
-rw-r--r--. 1 root root     388680 Feb 19 10:57 flink-connector-mysql-cdc-3.1.1.jar
-rw-r--r--. 1 root root    2475087 Feb 19 10:58 mysql-connector-java-8.0.27.jar
......# 开启MySQL Binlog并重启MySQL
[root@centos01 ~]# vim /etc/my.cnf
#添加如下配置信息,开启`cdc_db`数据库的Binlog
#数据库id
server-id = 1
##启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型
binlog_format=row
##启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=cdc_db[root@centos01 ~]# systemctl restart mysqld# 启动hadoop和yarn
[root@centos01 ~]# start-all.sh
# 启动Hive
[root@centos01 ~]# nohup hive --service metastore  2>&1 & 
[root@centos01 ~]# nohup  hive --service hiveserver2   2>&1 & 
# yarn-session模式
# http://centos01:8088/cluster中可以看到Flink session cluster的job ID
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/yarn-session.sh -d
# 启动Flink的sql-client
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/sql-client.sh -s yarn-session
  • 使用Hive的元数据
Flink SQL> CREATE CATALOG paimon_hive_catalog WITH ('type' = 'paimon','metastore' = 'hive','uri' = 'thrift://centos01:9083'
);Flink SQL> USE CATALOG paimon_hive_catalog;
Flink SQL> create database if not exists paimon_db;
Flink SQL> use paimon_db;# 设置显示模式
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
Flink SQL> SET 'execution.runtime-mode' = 'batch';

2.1.2 案例详解

  • 如果指定的Paimon表不存在,将自动创建表。其schema将从所有指定的MySQL表派生;
  • 如果Paimon 表已存在,则其schema将与所有指定MySQL表的schema进行比较;
  • 仅支持同步具有主键的MySQL表
  • 也可MySQL多张表同步到Paimon一张表,可以查看官网示例。
-- 准备测试数据
DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info` (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',`login_name` varchar(200) DEFAULT NULL COMMENT '用户名称',`name` varchar(200) DEFAULT NULL COMMENT '用户姓名',`phone_num` varchar(200) DEFAULT NULL COMMENT '手机号',`birthday` date DEFAULT NULL COMMENT '用户生日',`gender` varchar(1) DEFAULT NULL COMMENT '性别 M男,F女',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`operate_time` datetime DEFAULT NULL COMMENT '修改时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;-- ----------------------------
-- Records of user_info
-- ----------------------------
INSERT INTO `user_info`(`login_name`, `name`, `phone_num`, `birthday`, `gender`, `create_time`, `operate_time`) VALUES 
('zhangsan', '张三', '13800001234', '1990-01-01', 'M', NOW(), NOW()),
('lisi', '李四', '13800005678', '1992-02-02', 'F', NOW(), NOW()),
('wangwu', '王五', '13800009012', '1995-03-03', 'M', NOW(), NOW()),
('zhaoliu', '赵六', '13800003456', '1998-04-04', 'F', NOW(), NOW()),
('sunqi', '孙七', '13800007890', '2000-05-05', 'M', NOW(), NOW());CREATE TABLE `orders` (`order_id` bigint(20) NOT NULL  AUTO_INCREMENT COMMENT '订单编号',`user_id` bigint(20) NOT NULL COMMENT '用户编号',`order_date` datetime DEFAULT NULL COMMENT '订单日期',`total_price` decimal(10,2) DEFAULT NULL COMMENT '订单总价',PRIMARY KEY (`order_id`) USING BTREE,KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;INSERT INTO `orders`(`user_id`, `order_date`, `total_price`) VALUES 
(1001, '2025-02-18 14:32:10', 199.99),
(1002, '2025-02-19 09:15:30', 299.50),
(1001, '2025-02-19 12:47:05', 79.99),
(1003, '2025-02-19 13:00:00', 499.00),
(1004, '2025-02-20 08:20:25', 159.99);
# 按照天进行分区
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-table \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table ods_user_info_cdc \--primary-keys pt,id \# 指定分区键,分区键是通过函数转换而来--partition_keys pt \--computed_column 'pt=date_format(operate_time, yyyyMMdd)' \--mysql-conf hostname=centos01 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=cdc_db \--mysql-conf table-name='user_info' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--table-conf bucket=2 \--table-conf changelog-producer=input \--table-conf sink.parallelism=2 
  • 我们可以进行测试了
-- 此时会自动创建paimon表(无需我们手动建表了)
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 17:28:18 | 2025-02-19 17:28:18 | 20250219 |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+-- 修改原始表的schema
mysql> ALTER TABLE user_info ADD COLUMN email varchar(255);
mysql> INSERT INTO `user_info`(`login_name`, `name`, `phone_num`, `birthday`, `gender`, `create_time`, `operate_time`, `email`)
values('hank', '汉克', '18600007890', '2000-05-05', 'M', NOW(), NOW(), 'hank@163.com');-- 注意:我们此时在原始sql-client窗口查询,`email`字段并没有加上
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |
|  6 |       hank | 汉克 | 18600007890 | 2000-05-05 |      M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+-- 其实,此时的schema已经改变,我们查看hdfs上的schema信息会发现`email`字实际上已经添加
[root@centos01 ~]# hdfs dfs -cat /user/hive/warehouse/paimon_db.db/ods_user_info_cdc/schema/schema-1-- 因此,我们需要另外重新启动一个sql-client窗口进行查询,可以发现是正确的
Flink SQL> select * from ods_user_info_cdc;
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+
| id | login_name | name |   phone_num |   birthday | gender |         create_time |        operate_time |       pt |        email |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+
|  3 |     wangwu | 王五 | 13800009012 | 1995-03-03 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  1 |   zhangsan | 张三 | 13800001234 | 1990-01-01 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  2 |       lisi | 李四 | 13800005678 | 1992-02-02 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  4 |    zhaoliu | 赵六 | 13800003456 | 1998-04-04 |      F | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  5 |      sunqi | 孙七 | 13800007890 | 2000-05-05 |      M | 2025-02-19 18:01:18 | 2025-02-19 18:01:18 | 20250219 |       <NULL> |
|  6 |       hank | 汉克 | 18600007890 | 2000-05-05 |      M | 2025-02-19 18:01:43 | 2025-02-19 18:01:43 | 20250219 | hank@163.com |
+----+------------+------+-------------+------------+--------+---------------------+---------------------+----------+--------------+

注:

  • 上面自动创建的paimon表是分区表,但是在Hive中不是分区表
-- 报错
0: jdbc:hive2://192.168.42.101:10000> show partitions ods_user_info_cdc ;
Error: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Table ods_user_info_cdc is not a partitioned table (state=42000,code=1)
  • 要想在hive中也是分区表,需要在同步时候设置参数metastore.partitioned-table=true
# 同步到另一张paimon表,并设置hive分区参数
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-table \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table ods_user_info_partition_cdc \--primary-keys pt,id \--partition_keys pt \--computed_column 'pt=date_format(operate_time, yyyyMMdd)' \--mysql-conf hostname=centos01 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=cdc_db \--mysql-conf table-name='user_info' \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--table-conf bucket=2 \--table-conf changelog-producer=input \--table-conf metastore.partitioned-table=true \--table-conf sink.parallelism=2 
  • 然后可以从hive中读取相关分区了
0: jdbc:hive2://192.168.42.101:10000> show partitions ods_user_info_partition_cdc ;
+--------------+
|  partition   |
+--------------+
| pt=20250219  |
+--------------+-- 读取相关分区数据
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_partition_cdc a where  pt=20250219;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |   a.pt    |    a.email    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+
| 1     | zhangsan      | 张三      | 13800001234  | 1990-01-01  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 2     | lisi          | 李四      | 13800005678  | 1992-02-02  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | lisi@qq.com   |
| 4     | zhaoliu       | 赵六      | 13800003456  | 1998-04-04  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 5     | sunqi         | 孙七      | 13800007890  | 2000-05-05  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
| 6     | hank          | 汉克      | 18600007890  | 2000-05-05  | M         | 2025-02-19 18:01:43.0  | 2025-02-19 18:01:43.0  | 20250219  | hank@163.com  |
| 3     | wangwu        | 王五      | 13800009012  | 1995-03-03  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | 20250219  | NULL          |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-----------+---------------+-- 需要注意的是,如果我们修改数据,原始的hive分区原始数据可能会减少
-- 下面示例,我们修改了两条数据
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_partition_cdc a  where pt=20250219;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |    a.email    |   a.pt    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
| 2     | lisi          | 李四      | 13800005678  | 1992-02-02  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | lisi@qq.com   | 20250219  |
| 4     | zhaoliu       | 赵六      | 13800003456  | 1998-04-04  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | NULL          | 20250219  |
| 5     | sunqi         | 孙七      | 13800007890  | 2000-05-05  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | NULL          | 20250219  |
| 6     | hank          | 汉克      | 18600007890  | 2000-05-05  | M         | 2025-02-19 18:01:43.0  | 2025-02-19 18:01:43.0  | hank@163.com  | 20250219  |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
4 rows selected (0.199 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_partition_cdc a  where pt=20250220;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-------------+-----------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |   a.email   |   a.pt    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-------------+-----------+
| 3     | wangwu        | 王五      | 13800009012  | 1995-03-03  | M         | 2025-02-19 18:01:18.0  | 2025-02-20 12:01:18.0  | ww@163.com  | 20250220  |
| 1     | zhangsan      | 张三三     | 13800001234  | 1990-01-01  | M         | 2025-02-19 18:01:18.0  | 2025-02-20 18:01:18.0  | NULL        | 20250220  |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+-------------+-----------+
2 rows selected (0.189 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from ods_user_info_partition_cdc a  ;
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
| a.id  | a.login_name  | a.name  | a.phone_num  | a.birthday  | a.gender  |     a.create_time      |     a.operate_time     |    a.email    |   a.pt    |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
| 2     | lisi          | 李四      | 13800005678  | 1992-02-02  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | lisi@qq.com   | 20250219  |
| 4     | zhaoliu       | 赵六      | 13800003456  | 1998-04-04  | F         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | NULL          | 20250219  |
| 5     | sunqi         | 孙七      | 13800007890  | 2000-05-05  | M         | 2025-02-19 18:01:18.0  | 2025-02-19 18:01:18.0  | NULL          | 20250219  |
| 6     | hank          | 汉克      | 18600007890  | 2000-05-05  | M         | 2025-02-19 18:01:43.0  | 2025-02-19 18:01:43.0  | hank@163.com  | 20250219  |
| 3     | wangwu        | 王五      | 13800009012  | 1995-03-03  | M         | 2025-02-19 18:01:18.0  | 2025-02-20 12:01:18.0  | ww@163.com    | 20250220  |
| 1     | zhangsan      | 张三三     | 13800001234  | 1990-01-01  | M         | 2025-02-19 18:01:18.0  | 2025-02-20 18:01:18.0  | NULL          | 20250220  |
+-------+---------------+---------+--------------+-------------+-----------+------------------------+------------------------+---------------+-----------+
  • 当然,我们在建立paimon表的时候,也可以指定参数'metastore.partitioned-table' = 'true'
Flink SQL> drop table if exists flink_p_demo;
Flink SQL> CREATE TABLE flink_p_demo (dt STRING NOT NULL,name STRING NOT NULL,amount BIGINT,PRIMARY KEY (dt, name) NOT ENFORCED 
) PARTITIONED BY (dt)
WITH ('connector' = 'paimon','changelog-producer' = 'lookup','metastore.partitioned-table' = 'true'  -- 设置为true
);
Flink SQL> insert into flink_p_demo values  ('20240725', 'apple', 3), ('20240726', 'banana', 5);0: jdbc:hive2://192.168.42.101:10000> show partitions flink_p_demo;
+--------------+
|  partition   |
+--------------+
| dt=20240725  |
| dt=20240726  |
+--------------+
0: jdbc:hive2://192.168.42.101:10000> select * from flink_p_demo a  where dt = 20240726;
+---------+-----------+-----------+
| a.name  | a.amount  |   a.dt    |
+---------+-----------+-----------+
| banana  | 5         | 20240726  |
+---------+-----------+-----------+

2.2 整库同步

  • 通过官方提供的paimon-action的jar包可以很方便的将 MySQL、Kafka、Mongo等中的数据实时摄入到Paimon中
  • 使用paimon-flink-action,采用mysql-sync-database(整库同步),并通过"–including_tables"参数选择要同步的表
  • 这种同步模式有效地节省了大量资源开销,相比每个表启动一个 Flink 任务而言,避免了资源的大量浪费。
[root@centos01 ~]# /opt/apps/flink-1.16.0/bin/flink run \/opt/apps/flink-1.16.0/lib/paimon-flink-action-1.0.0.jar \mysql-sync-database \--warehouse hdfs://centos01:8020/user/hive/warehouse \--database paimon_db \--table-prefix "ods_" \--table-suffix "_mysql_cdc" \--mysql-conf hostname=centos01 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=cdc_db \--catalog-conf metastore=hive \--catalog-conf uri=thrift://centos01:9083 \--table-conf bucket=2 \--table-conf changelog-producer=input \--table-conf sink.parallelism=2 \--including-tables 'user_info|orders'-- 在一个flink任务中同步多个mysql表
-- 在paimon中会自动创建多张表
Flink SQL> select * from ods_orders_mysql_cdc;
Flink SQL> select * from ods_user_info_mysql_cdc;

相关文章:

大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(2)

Paimon的下载及安装&#xff0c;并且了解了主键表的引擎以及changelog-producer的含义参考&#xff1a; 大数据组件(四)快速入门实时数据湖存储系统Apache Paimon(1) 利用Paimon表做lookup join&#xff0c;集成mysql cdc等参考&#xff1a; 大数据组件(四)快速入门实时数据…...

PLC通讯

PPI通讯 是西门子公司专为s7-200系列plc开发的通讯协议。内置于s7-200 CPU中。PPI协议物理上基于RS-485口&#xff0c;通过屏蔽双绞线就可以实现PPI通讯。PPI协议是一种主-从协议。主站设备发送要求到从站设备&#xff0c;从站设备响应&#xff0c;从站不能主动发出信息。主站…...

前端js进阶,ES6语法,包详细

进阶ES6 作用域的概念加深对js理解 let、const申明的变量&#xff0c;在花括号中会生成块作用域&#xff0c;而var就不会生成块作用域 作用域链本质上就是底层的变量查找机制 作用域链查找的规则是:优先查找当前作用域先把的变量&#xff0c;再依次逐级找父级作用域直到全局…...

Scrum方法论指导下的Deepseek R1医疗AI部署开发

一、引言 1.1 研究背景与意义 在当今数智化时代&#xff0c;软件开发方法论对于项目的成功实施起着举足轻重的作用。Scrum 作为一种广泛应用的敏捷开发方法论&#xff0c;以其迭代式开发、快速反馈和高效协作的特点&#xff0c;在软件开发领域占据了重要地位。自 20 世纪 90 …...

LINUX安装使用Redis

参考 Install Redis on Linux | Docs 安装命令 sudo apt-get install -y lsb-release curl gpgcurl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpgsudo chmod 644 /usr/share/keyrings/redis-archive-keyrin…...

基于java新闻管理系统,推荐一款开源cms内容管理系统ruoyi-fast-cms

一、项目概述 1.1 项目背景 在信息高速流通的当下&#xff0c;新闻媒体行业每天都要处理和传播海量信息。传统的新闻管理模式依赖人工操作&#xff0c;在新闻采集、编辑、发布以及后续管理等环节中&#xff0c;不仅效率低下&#xff0c;而且容易出现人为失误。同时&#xff0…...

054 redisson

文章目录 使用Redisson演示可重入锁读写锁信号量闭锁获取三级分类redisson分布式锁 package com.xd.cubemall.product.config;import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context…...

【数据结构】(12) 反射、枚举、lambda 表达式

一、反射 1、反射机制定义及作用 反射是允许程序在运行时检查和操作类、方法、属性等的机制&#xff0c;能够动态地获取信息、调用方法等。换句话说&#xff0c;在编写程序时&#xff0c;不需要知道要操作的类的具体信息&#xff0c;而是在程序运行时获取和使用。 2、反射机制…...

java实现二维码图片生成和编解码

java实现二维码图片生成和编解码 在wutool中&#xff0c;封装了二维码工具类&#xff0c;基于google的zxing库&#xff0c;实现二维码图片生成、编码和解码。 关于wutool wutool是一个java代码片段收集库&#xff0c;针对特定场景提供轻量解决方案&#xff0c;只要按需选择代…...

Java基础常见的面试题(易错!!)

面试题一&#xff1a;为什么 Java 不支持多继承 Java 不支持多继承主要是为避免 “菱形继承问题”&#xff08;又称 “钻石问题”&#xff09;&#xff0c;即一个子类从多个父类继承到同名方法或属性时&#xff0c;编译器无法确定该调用哪个父类的成员。同时&#xff0c;多继承…...

hugging face---transformers包

一、前言 不同于计算机视觉的百花齐放&#xff0c;不同网络适用不同情况&#xff0c;NLP则由Transformer一统天下。transformer是2017年提出的一种基于自注意力机制的神经网络架构&#xff0c;transformers库是hugging face社区创造的一个py库&#xff0c;通过该库可以实现统一…...

网络安全防护指南:筑牢网络安全防线(510)

一、网络安全的基本概念 &#xff08;一&#xff09;网络的定义 网络是指由计算机或者其他信息终端及相关设备组成的按照一定的规则和程序对信息收集、存储、传输、交换、处理的系统。在当今数字化时代&#xff0c;网络已经成为人们生活和工作中不可或缺的一部分。它连接了世…...

微信小程序实现拉卡拉支付

功能需求&#xff1a;拉卡拉支付&#xff08;通过跳转拉卡拉平台进行支付&#xff09;&#xff0c;他人支付&#xff08;通过链接进行平台跳转支付&#xff09; 1.支付操作 //支付 const onCanStartPay async (obj) > {uni.showLoading({mask: true})// 支付接口获取需要传…...

git从本地其他设备上fetch分支

在 Git 中&#xff0c;如果你想从本地其他设备上获取分支&#xff0c;可以通过以下几种方式实现。不过&#xff0c;需要注意的是&#xff0c;Git 本身是分布式版本控制系统&#xff0c;通常我们是从远程仓库&#xff08;如 GitHub、GitLab 等&#xff09;拉取分支&#xff0c;而…...

【干货教程】Windows电脑本地部署运行DeepSeek R1大模型(基于Ollama和Chatbox)

文章目录 一、环境准备二、安装Ollama2.1 访问Ollama官方网站2.2 下载适用于Windows的安装包2.3 安装Ollama安装包2.4 指定Ollama安装目录2.5 指定Ollama的大模型的存储目录 三、选择DeepSeek R1模型四、下载并运行DeepSeek R1模型五、常见问题解答六、使用Chatbox进行交互6.1 …...

基于 SSM框架 的 “捷邻小程序” 系统的设计与实现

大家好&#xff0c;今天要和大家聊的是一款基于 SSM框架 的 “捷邻小程序” 系统的设计与实现。项目源码以及部署相关事宜请联系我&#xff0c;文末附上联系方式。 项目简介 基于 SSM框架 的 “捷邻小程序” 系统设计与实现的主要使用者分为 管理员 和 用户&#xff0c;没有授…...

基于Springboot医院预约挂号小程序系统【附源码】

基于Springboot医院预约挂号小程序系统 效果如下&#xff1a; 小程序主页面 帖子页面 医生账号页面 留言内容页面 管理员主页面 用户管理页面 我的挂号页面 医生管理页面 研究背景 随着信息技术的飞速发展和互联网医疗的兴起&#xff0c;传统的医疗服务模式正面临着深刻的变…...

基于AVue的二次封装:快速构建后台管理系统的CRUD方案

基于AVue的二次封装&#xff1a;快速构建后台管理系统的CRUD方案 在开发后台管理系统时&#xff0c;表格是常见的组件之一。然而&#xff0c;使用原生的Element Plus实现CRUD&#xff08;增删改查&#xff09;功能往往需要编写大量重复代码&#xff0c;过程繁琐。即使借助类似…...

【含开题报告+文档+PPT+源码】基于springboot加vue 前后端分离的校园新闻审核发布管理系统

开题报告 本研究旨在设计并实现一套基于SpringBoot后端框架结合Vue前端技术的校园新闻发布系统&#xff0c;该系统面向学生用户群体提供了全面的功能服务。学生用户通过身份验证登录后&#xff0c;能够便捷高效地获取校园内的各类新闻资讯&#xff0c;实时了解校内动态。系统不…...

Qt 是一个跨平台的 C++ 应用程序框架

Qt 是一个跨平台的 C++ 应用程序框架,广泛用于开发图形用户界面(GUI)应用程序,也可以用于开发非 GUI 程序,如命令行工具和控制台应用程序。Qt 提供了丰富的类库和工具,支持多种操作系统,包括 Windows、macOS、Linux 等。 主要特点: 跨平台:Qt 支持多种操作系统,开发…...

关于nvm与node.js

1 安装nvm 安装过程中手动修改 nvm的安装路径&#xff0c; 以及修改 通过nvm安装node后正在使用的node的存放目录【这句话可能难以理解&#xff0c;但接着往下看你就了然了】 2 修改nvm中settings.txt文件配置 nvm安装成功后&#xff0c;通常在该文件中会出现以下配置&…...

系统设计 --- MongoDB亿级数据查询优化策略

系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log&#xff0c;共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题&#xff0c;不能使用ELK只能使用…...

【机器视觉】单目测距——运动结构恢复

ps&#xff1a;图是随便找的&#xff0c;为了凑个封面 前言 在前面对光流法进行进一步改进&#xff0c;希望将2D光流推广至3D场景流时&#xff0c;发现2D转3D过程中存在尺度歧义问题&#xff0c;需要补全摄像头拍摄图像中缺失的深度信息&#xff0c;否则解空间不收敛&#xf…...

12.找到字符串中所有字母异位词

&#x1f9e0; 题目解析 题目描述&#xff1a; 给定两个字符串 s 和 p&#xff0c;找出 s 中所有 p 的字母异位词的起始索引。 返回的答案以数组形式表示。 字母异位词定义&#xff1a; 若两个字符串包含的字符种类和出现次数完全相同&#xff0c;顺序无所谓&#xff0c;则互为…...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...

基于Java+VUE+MariaDB实现(Web)仿小米商城

仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意&#xff1a;运行前…...

加密通信 + 行为分析:运营商行业安全防御体系重构

在数字经济蓬勃发展的时代&#xff0c;运营商作为信息通信网络的核心枢纽&#xff0c;承载着海量用户数据与关键业务传输&#xff0c;其安全防御体系的可靠性直接关乎国家安全、社会稳定与企业发展。随着网络攻击手段的不断升级&#xff0c;传统安全防护体系逐渐暴露出局限性&a…...

[拓扑优化] 1.概述

常见的拓扑优化方法有&#xff1a;均匀化法、变密度法、渐进结构优化法、水平集法、移动可变形组件法等。 常见的数值计算方法有&#xff1a;有限元法、有限差分法、边界元法、离散元法、无网格法、扩展有限元法、等几何分析等。 将上述数值计算方法与拓扑优化方法结合&#…...

02-性能方案设计

需求分析与测试设计 根据具体的性能测试需求&#xff0c;确定测试类型&#xff0c;以及压测的模块(web/mysql/redis/系统整体)前期要与相关人员充分沟通&#xff0c;初步确定压测方案及具体的性能指标QA完成性能测试设计后&#xff0c;需产出测试方案文档发送邮件到项目组&…...