Flink cdc3.0同步实例(动态变更表结构、分库分表同步)
文章目录
- 前言
- 准备
- flink环境
- docker构建mysql、doris环境
- 数据准备
- 通过 FlinkCDC cli 提交任务
- 整库同步
- 同步变更
- 路由变更
- 路由表结构不一致无法同步
- 结尾
前言
最近Flink CDC 3.0发布, 不仅提供基础的数据同步能力。schema 变更自动同步、整库同步、分库分表等增强功能使 Flink CDC 3.0 在更复杂的数据集成与用户业务场景中发挥作用:用户无需在数据源发生 schema 变更时手动介入,大大降低用户的运维成本;只需对同步任务进行简单配置即可将多表、多库同步至下游,并进行合并等逻辑,显著降低用户的开发难度与入门门槛。Flink CDC 3.0 正式发布。
我们今天基于 Flink CDC 3.0 同步 MySQL 到 Doris ,来体验下新上的整库同步、表结构变更同步和分库分表同步的功能。
准备
flink环境
准备 Flink Standalone 集群,下载最新版本 Flink 1.18.0 ,解压后得到 flink-1.18.0 目录。并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
通过在 conf/flink-conf.yaml
配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint,方便后续观察数据变更。
execution.checkpointing.interval: 3000
使用下面的命令启动 Flink 集群。
./bin/start-cluster.sh
启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
多次执行 start-cluster.sh 可以拉起多个 TaskManager,保证Total Task Slots >= 2, 不然提交任务会有资源不足异常,比如我这里执行了3次。 或者是修改 conf/flink-conf.yaml
资源配置。
docker构建mysql、doris环境
如果有安装这两个组件,就可以免去docker,接下来的教程将以 docker-compose 的方式准备所需要的组件。
由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令:
sysctl -w vm.max_map_count=2000000
docker 镜像启动,使用下面的内容创建一个 docker-compose.yml
文件:
version: '2.1'
services:doris:image: yagagagaga/doris-standaloneports:- "8030:8030"- "8040:8040"- "9030:9030"mysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw
该 Docker Compose 中包含的容器有:
MySQL: 包含商品信息的数据库 app_db
Doris: 存储从 MySQL 中根据规则映射过来的结果表
在 docker-compose.yml
所在目录下执行下面的命令来启动本教程需要的组件:
docker-compose up -d
该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8030/ 来查看 Doris 是否运行正常。
数据准备
进入 MySQL 容器, 或者通过客户端工具连接到mysql
docker-compose exec mysql mysql -uroot -p123456
创建数据库 app_db 和表 orders,products
并插入数据
-- 创建数据库
CREATE DATABASE app_db;USE app_db;-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);
Doris 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。
进入 Doris Web UI。http://localhost:8030/,默认的用户名为 root,默认密码为空。
通过 Web UI 创建 app_db 数据库
create database if not exists app_db;
通过 FlinkCDC cli 提交任务
下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.0.0
:
flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。
下载下面列出的 connector 包,并且移动到 lib 目录下
- MySQL pipeline connector 3.0.0
- Apache Doris pipeline connector 3.0.0
整库同步
编写任务配置 yaml 文件,下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2
其中:
source 中的 tables: app_db.\.*
通过正则匹配同步 app_db
下的所有表。
sink 添加 table.create.properties.replication_num
参数是由于 Docker 镜像中只有一个 Doris BE 节点。
最后,通过命令行提交任务到 Flink Standalone cluster
bash bin/flink-cdc.sh conf/mysql-to-doris.yaml
提交成功后,返回信息如:
在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris
的任务正在运行。job id对应上面的cb049fe4a2112510a77ee46e197054a6
打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功写入。
同步变更
接下来,修改 MySQL 数据库中表的数据,Doris 中显示的订单数据也将实时更新:
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
DELETE FROM app_db.orders WHERE id=2;
-- 区别于官方再新增一条数据
INSERT INTO app_db.orders VALUES (4, 200, 200.00);
也可以拆开每执行一步,刷新一次 Doris Web UI,可以看到 Doris 中显示的 orders 数据将实时更新,如下所示:
同样的,去修改 shipments, products 表,也能在 Doris 中实时看到同步变更的结果。
路由变更
Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。
下面提供一个配置文件conf/mysql-to-doris-route.yaml
说明:
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db1.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030benodes: 127.0.0.1:8040username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: app_db1.orders\.*sink-table: app_db1.ods_orderspipeline:name: Sync MySQL Database to Dorisparallelism: 2
通过上面的 route 配置,使用正则表达式,可以将诸如 app_db1.order_01、app_db1.order_02 的表汇总到 app_db1.ods_orders 中。从而实现分库分表同步的功能。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
另外官方文档里的写法存在一个问题。
正则表达式前面加上’\‘转义,app_db1.orders\.*
,否则会抛出异常:java.util.regex.PatternSyntaxException: Dangling meta character ‘*’ near index 0 *
我们在mysql和doris分别创建数据库app_db1
, 然后初始化mysql
-- 创建表orders_01
CREATE TABLE `orders_01` (`id` int NOT NULL,`price` decimal(10,2) NOT NULL,`amount` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- 创建表orders_02
CREATE TABLE `orders_02` (`id` int NOT NULL,`price` decimal(10,2) NOT NULL,`amount` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
启动新的job。
然后在orders_01,orders_02分别插入数据
INSERT INTO `orders_01` (`id`, `price`) VALUES (11, 4.00);
INSERT INTO `orders_02` (`id`, `price`) VALUES (12, 100.00);
在doris里验证,数据都写入了app_db1.ods_orders
路由表结构不一致无法同步
看Schema Evolution 设计原理,Flink CDC 3.0 在作业拓扑中引入了 SchemaRegistry,结合 SchemaOperator 协调并控制作业拓扑中的 schema 变更事件处理。当上游数据源发生 schema 变更时,SchemaRegistry 会控制 SchemaOperator 以暂停数据流,并将流水线中的数据从 sink 全部刷出以保证 schema 一致性。当 schema 变更事件在外部系统处理成功后,SchemaOperator 恢复数据流,完成本次 schema 变更的处理。
所以考虑只修改orders_01,再插入数据看doris同步的变化。
-- 添加sku字段
ALTER TABLE app_db1.orders_01 ADD sku varchar(32) NULL;
-- 向orders_01插入id=13
INSERT INTO `orders_01` VALUES (13, 4.00, 8.00, 'apple01');
-- 向orders_02插入id=14
INSERT INTO `orders_02` VALUES (14, 1.00, 1.00);
可以看到doris中的app_db1.orders
表结构发生了变化,但是orders_02
的id=14这条数据没有正常写入。flink异常提示:java.lang.IllegalStateException: Column size does not match the data size
而当修改orders_02的表结构,也会有异常:Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: status of AddColumnEvent is already existed。并且之后写入的数据无法正常同步。
结尾
flink cdc的功能越来越强,也再尝试解决用户的使用痛点。不过放到生产环境使用还需要建立在更多的实践测试之上。
相关文章:

Flink cdc3.0同步实例(动态变更表结构、分库分表同步)
文章目录 前言准备flink环境docker构建mysql、doris环境数据准备 通过 FlinkCDC cli 提交任务整库同步同步变更路由变更路由表结构不一致无法同步 结尾 前言 最近Flink CDC 3.0发布, 不仅提供基础的数据同步能力。schema 变更自动同步、整库同步、分库分表等增强功…...

国产Apple Find My认证芯片哪里找,伦茨科技ST17H6x芯片可以帮到您
深圳市伦茨科技有限公司(以下简称“伦茨科技”)发布ST17H6x Soc平台。成为继Nordic之后全球第二家取得Apple Find My「查找」认证的芯片厂家,该平台提供可通过Apple Find My认证的Apple查找(Find My)功能集成解决方案。…...
肺癌相关知识
写在前面 大概想了解下肺癌相关的知识,开此贴做记录,看看后续有没有相关的生信文章思路。 综述 文章名期刊影响因子Lung cancer immunotherapy: progress, pitfalls, and promisesMol Cancer37.3 常见治疗手段有surgery, radiation therapy, chemoth…...

ChimeraX使用教程-安装及基本操作
ChimeraX使用教程-安装及基本操作 1、访问https://www.cgl.ucsf.edu/chimerax/download.html进行下载,然后安装 安装完成后,显示界面 2、基本操作 1、点击file,导入 .PDB 文件。 (注:在 alphafold在线预测蛋白》点…...

【小黑嵌入式系统第十一课】μC/OS-III程序设计基础(一)——任务设计、任务管理(创建基本状态内部任务)、任务调度、系统函数
上一课: 【小黑嵌入式系统第十课】μC/OS-III概况——实时操作系统的特点、基本概念(内核&任务&中断)、与硬件的关系&实现 文章目录 一、任务设计1.1 任务概述1.2 任务的类型1.2.1 单次执行类任务(运行至完成型&#…...

Redis一些常用的技术
文章目录 第1关:Redis 事务与锁机制第2关:流水线第3关:发布订阅第4关:超时命令第5关:使用Lua语言 第1关:Redis 事务与锁机制 编程要求 根据提示,在右侧编辑器Begin-End补充代码,根据…...
基于QPainter 绘图图片绕绘制设备中心旋转
项目地址:https://gitcode.com/m0_45463480/QPainter/tree/main 获取途径:进入CSDN->GitCode直接下载或者通过git拉取仓库内容。 QPainter是Qt框架中的一个类,用于在QWidget或QPixmap等设备上进行绘图操作。它提供了丰富的绘图功能,可以用于绘制线条、图形、文本等。Q…...

计算机网络(4):网络层
网络层提供的两种服务 虚电路服务(Virtual Circuit Service)和数据报服务(Datagram Service)是在网络层(第三层)提供的两种不同的通信服务。它们主要区别在于建立连接的方式和数据传输的方式。 虚电路服务…...

动态内存分配(malloc和free、calloc和realloc)
目录 一、为什么要有动态内存分配 二、C/C中程序内存区域划分 三、malloc和free 2.1、malloc 2.2、free 四、calloc和realloc 3.1、calloc 3.2、realloc 3.3realloc在调整内存空间的是存在两种情况: 3.4realloc有malloc的功能 五、常见的动…...

C语言---井字棋(三子棋)
Tic-Tac-Toe 1 游戏介绍和随机数1.1 游戏介绍1.2 随机数的生成1.3 棋盘大小和符号 2 设计游戏2.1 初始化棋盘2.2 打印棋盘2.3 玩家下棋2.4 电脑下棋2.5 判断输赢2.6 game()函数2.7 main()函数 3 完整三子棋代码3.1 Tic_Tac_Toe.h3.2 Tic_Tac_Toe.c3.3 Test.c 4 游戏代码的缺陷 …...

[Kubernetes]3. k8s集群Service详解
在上一节讲解了k8s 的pod,deployment,以及借助pod,deployment来部署项目,但会存在问题: 每次只能访问一个 pod,没有负载均衡自动转发到不同 pod访问还需要端口转发Pod重创后IP变了,名字也变了针对上面的问题,可以借助Service来解决,下面就来看看Service怎么使用 一.Service详…...
C++ 指定范围内递增初始化一个vector<int> | Python: list(range(31, 90))
通过lambda表达式 std::iota()实现: template <typename Tp> inline void print_vec(const std::vector<Tp>& vec) {fmt::print("[{}]\n", fmt::join(vec, ", ")); }// 相当于Python的lst list(range(31, 90))const std::ve…...
【Java之数据结构与算法】
选择排序 package Code01;public class Code01_SelectionSort {public static void selectionSort(int[] arr) {if(arrnull||arr.length<2) {return;}for(int i0;i<arr.length;i) {int minIndex i;for(int ji1;j<arr.length;j) {minIndex arr[minIndex] > arr[j…...
爬虫scrapy中间件的使用
爬虫scrapy中间件的使用 学习目标: 应用 scrapy中使用间件使用随机UA的方法应用 scrapy中使用代理ip的的方法应用 scrapy与selenium配合使用 1. scrapy中间件的分类和作用 1.1 scrapy中间件的分类 根据scrapy运行流程中所在位置不同分为: 下载中间件…...
普冉(PUYA)单片机开发笔记 [完结篇]:使用体会
失败的移植:FreeRTOS 当使用了 PY32F003 的各种接口和功能后,手痒痒想把 FreeRTOS 也搬到这个 MCU 上,参考 STM32 和 GD32 对 FreeRTOS 的移植步骤,把 FreeRTOS v202212.00 版本的源码搬到了 Keil 工程中,编译倒是通过…...

Elasticsearch:生成 AI 中的微调与 RAG
在自然语言处理 (NLP) 领域,出现了两种卓越的技术,每种技术都有其独特的功能:微调大型语言模型 (LLM) 和 RAG(检索增强生成)。 这些方法极大地影响了我们利用语言模型的方式,使它们更加通用和有效。 在本文…...
ip静态好还是dhcp好?
选择使用静态 IP 还是 DHCP(动态主机配置协议)取决于您的网络需求和环境。下面是它们的一些特点和适用场景: 静态 IP: 固定的 IP 地址:静态 IP 是手动配置在设备上的固定 IP 地址,不会随时间或网络变化而改…...

PolarDB-X、OceanBase、CockroachDB、TiDB二级索引写入性能测评
为什么要做这个测试 二级索引是关系型数据库相较于NoSQL数据库的一个关键差异。二级索引必须是强一致的,因此索引的写入需要与主键的写入放在一个事务当中,事务的性能是二级索引性能的基础。 目前市面上的分布式数据库中,从使用体验的角度看…...

Convolutional Neural Network(CNN)——卷积神经网络
1.NN的局限性 拓展性差 NN的计算量大性能差,不利于在不同规模的数据集上有效运行若输入维度发生变化,需要修改并重新训练网络容易过拟合 全连接导致参数量特别多,容易过拟合如果增加更多层,参数量会翻倍无法有效利用局部特征 输入…...

鸿蒙开发基本概念
1、开发准备 1.1、UI框架 HarmonyOS提供了一套UI开发框架,即方舟开发框架(ArkUI框架)。方舟开发框架可为开发者提供应用UI开发所必需的能力,比如多种组件、布局计算、动画能力、UI交互、绘制等。 方舟开发框架针对不同目的和技术…...

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明
LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造,完美适配AGV和无人叉车。同时,集成以太网与语音合成技术,为各类高级系统(如MES、调度系统、库位管理、立库等)提供高效便捷的语音交互体验。 L…...
生成xcframework
打包 XCFramework 的方法 XCFramework 是苹果推出的一种多平台二进制分发格式,可以包含多个架构和平台的代码。打包 XCFramework 通常用于分发库或框架。 使用 Xcode 命令行工具打包 通过 xcodebuild 命令可以打包 XCFramework。确保项目已经配置好需要支持的平台…...
Java 语言特性(面试系列2)
一、SQL 基础 1. 复杂查询 (1)连接查询(JOIN) 内连接(INNER JOIN):返回两表匹配的记录。 SELECT e.name, d.dept_name FROM employees e INNER JOIN departments d ON e.dept_id d.dept_id; 左…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明
AI 领域的快速发展正在催生一个新时代,智能代理(agents)不再是孤立的个体,而是能够像一个数字团队一样协作。然而,当前 AI 生态系统的碎片化阻碍了这一愿景的实现,导致了“AI 巴别塔问题”——不同代理之间…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计
随着大语言模型(LLM)参数规模的增长,推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长,而KV缓存的内存消耗可能高达数十GB(例如Llama2-7B处理100K token时需50GB内存&a…...
Pinocchio 库详解及其在足式机器人上的应用
Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库,专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性,并提供了一个通用的框架&…...
python报错No module named ‘tensorflow.keras‘
是由于不同版本的tensorflow下的keras所在的路径不同,结合所安装的tensorflow的目录结构修改from语句即可。 原语句: from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后: from tensorflow.python.keras.lay…...

用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...

Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...