当前位置: 首页 > news >正文

Flink cdc3.0同步实例(动态变更表结构、分库分表同步)

文章目录

  • 前言
  • 准备
    • flink环境
    • docker构建mysql、doris环境
    • 数据准备
  • 通过 FlinkCDC cli 提交任务
    • 整库同步
    • 同步变更
    • 路由变更
    • 路由表结构不一致无法同步
  • 结尾

前言

最近Flink CDC 3.0发布, 不仅提供基础的数据同步能力。schema 变更自动同步、整库同步、分库分表等增强功能使 Flink CDC 3.0 在更复杂的数据集成与用户业务场景中发挥作用:用户无需在数据源发生 schema 变更时手动介入,大大降低用户的运维成本;只需对同步任务进行简单配置即可将多表、多库同步至下游,并进行合并等逻辑,显著降低用户的开发难度与入门门槛。Flink CDC 3.0 正式发布。
我们今天基于 Flink CDC 3.0 同步 MySQL 到 Doris ,来体验下新上的整库同步、表结构变更同步和分库分表同步的功能。

准备

flink环境

准备 Flink Standalone 集群,下载最新版本 Flink 1.18.0 ,解压后得到 flink-1.18.0 目录。并且设置 FLINK_HOME 为 flink-1.18.0 所在目录。
通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint,方便后续观察数据变更。

execution.checkpointing.interval: 3000

使用下面的命令启动 Flink 集群。

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
在这里插入图片描述
多次执行 start-cluster.sh 可以拉起多个 TaskManager,保证Total Task Slots >= 2, 不然提交任务会有资源不足异常,比如我这里执行了3次。 或者是修改 conf/flink-conf.yaml 资源配置。

docker构建mysql、doris环境

如果有安装这两个组件,就可以免去docker,接下来的教程将以 docker-compose 的方式准备所需要的组件。
由于 Doris 的运行需要内存映射支持,需在宿主机执行如下命令:

sysctl -w vm.max_map_count=2000000

docker 镜像启动,使用下面的内容创建一个 docker-compose.yml 文件:

version: '2.1'
services:doris:image: yagagagaga/doris-standaloneports:- "8030:8030"- "8040:8040"- "9030:9030"mysql:image: debezium/example-mysql:1.1ports:- "3306:3306"environment:- MYSQL_ROOT_PASSWORD=123456- MYSQL_USER=mysqluser- MYSQL_PASSWORD=mysqlpw

该 Docker Compose 中包含的容器有:

MySQL: 包含商品信息的数据库 app_db

Doris: 存储从 MySQL 中根据规则映射过来的结果表

docker-compose.yml 所在目录下执行下面的命令来启动本教程需要的组件:

docker-compose up -d

该命令将以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。你可以通过 docker ps 来观察上述的容器是否正常启动了,也可以通过访问 http://localhost:8030/ 来查看 Doris 是否运行正常。

数据准备

进入 MySQL 容器, 或者通过客户端工具连接到mysql

docker-compose exec mysql mysql -uroot -p123456

创建数据库 app_db 和表 orders,products 并插入数据

-- 创建数据库
CREATE DATABASE app_db;USE app_db;-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);

Doris 暂时不支持自动创建数据库,需要先创建写入表对应的数据库。
进入 Doris Web UI。http://localhost:8030/,默认的用户名为 root,默认密码为空。
通过 Web UI 创建 app_db 数据库

create database if not exists app_db;

在这里插入图片描述

通过 FlinkCDC cli 提交任务

下载下面列出的二进制压缩包,并解压得到目录 flink-cdc-3.0.0
flink-cdc-3.0.0-bin.tar.gz flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录。

下载下面列出的 connector 包,并且移动到 lib 目录下

  • MySQL pipeline connector 3.0.0
  • Apache Doris pipeline connector 3.0.0
    在这里插入图片描述

整库同步

编写任务配置 yaml 文件,下面给出了一个整库同步的示例文件 mysql-to-doris.yaml:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2

其中:
source 中的 tables: app_db.\.* 通过正则匹配同步 app_db 下的所有表。
sink 添加 table.create.properties.replication_num 参数是由于 Docker 镜像中只有一个 Doris BE 节点。

最后,通过命令行提交任务到 Flink Standalone cluster

bash bin/flink-cdc.sh conf/mysql-to-doris.yaml

提交成功后,返回信息如:
在这里插入图片描述
在 Flink Web UI,可以看到一个名为 Sync MySQL Database to Doris 的任务正在运行。job id对应上面的cb049fe4a2112510a77ee46e197054a6
在这里插入图片描述
打开 Doris 的 Web UI,可以看到数据表已经被创建出来,数据能成功写入。
在这里插入图片描述

同步变更

接下来,修改 MySQL 数据库中表的数据,Doris 中显示的订单数据也将实时更新:

INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;
DELETE FROM app_db.orders WHERE id=2;
-- 区别于官方再新增一条数据
INSERT INTO app_db.orders VALUES (4, 200, 200.00);

也可以拆开每执行一步,刷新一次 Doris Web UI,可以看到 Doris 中显示的 orders 数据将实时更新,如下所示:
在这里插入图片描述
同样的,去修改 shipments, products 表,也能在 Doris 中实时看到同步变更的结果。

路由变更

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。
下面提供一个配置文件conf/mysql-to-doris-route.yaml说明:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db1.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030benodes: 127.0.0.1:8040username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: app_db1.orders\.*sink-table: app_db1.ods_orderspipeline:name: Sync MySQL Database to Dorisparallelism: 2

通过上面的 route 配置,使用正则表达式,可以将诸如 app_db1.order_01、app_db1.order_02 的表汇总到 app_db1.ods_orders 中。从而实现分库分表同步的功能。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
另外官方文档里的写法存在一个问题。
在这里插入图片描述
正则表达式前面加上’\‘转义,app_db1.orders\.*,否则会抛出异常:java.util.regex.PatternSyntaxException: Dangling meta character ‘*’ near index 0 *
在这里插入图片描述
我们在mysql和doris分别创建数据库app_db1, 然后初始化mysql

-- 创建表orders_01
CREATE TABLE `orders_01` (`id` int NOT NULL,`price` decimal(10,2) NOT NULL,`amount` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;-- 创建表orders_02
CREATE TABLE `orders_02` (`id` int NOT NULL,`price` decimal(10,2) NOT NULL,`amount` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

启动新的job。
在这里插入图片描述
然后在orders_01,orders_02分别插入数据


INSERT INTO `orders_01` (`id`, `price`) VALUES (11, 4.00);
INSERT INTO `orders_02` (`id`, `price`) VALUES (12, 100.00);

在doris里验证,数据都写入了app_db1.ods_orders
在这里插入图片描述

路由表结构不一致无法同步

看Schema Evolution 设计原理,Flink CDC 3.0 在作业拓扑中引入了 SchemaRegistry,结合 SchemaOperator 协调并控制作业拓扑中的 schema 变更事件处理。当上游数据源发生 schema 变更时,SchemaRegistry 会控制 SchemaOperator 以暂停数据流,并将流水线中的数据从 sink 全部刷出以保证 schema 一致性。当 schema 变更事件在外部系统处理成功后,SchemaOperator 恢复数据流,完成本次 schema 变更的处理。
在这里插入图片描述
所以考虑只修改orders_01,再插入数据看doris同步的变化。

-- 添加sku字段
ALTER TABLE app_db1.orders_01 ADD sku varchar(32) NULL;
-- 向orders_01插入id=13
INSERT INTO `orders_01` VALUES (13, 4.00, 8.00, 'apple01');
-- 向orders_02插入id=14
INSERT INTO `orders_02` VALUES (14, 1.00, 1.00);

可以看到doris中的app_db1.orders表结构发生了变化,但是orders_02的id=14这条数据没有正常写入。flink异常提示:java.lang.IllegalStateException: Column size does not match the data size
在这里插入图片描述
而当修改orders_02的表结构,也会有异常:Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: status of AddColumnEvent is already existed。并且之后写入的数据无法正常同步。

结尾

flink cdc的功能越来越强,也再尝试解决用户的使用痛点。不过放到生产环境使用还需要建立在更多的实践测试之上。

相关文章:

Flink cdc3.0同步实例(动态变更表结构、分库分表同步)

文章目录 前言准备flink环境docker构建mysql、doris环境数据准备 通过 FlinkCDC cli 提交任务整库同步同步变更路由变更路由表结构不一致无法同步 结尾 前言 最近Flink CDC 3.0发布, 不仅提供基础的数据同步能力。schema 变更自动同步、整库同步、分库分表等增强功…...

国产Apple Find My认证芯片哪里找,伦茨科技ST17H6x芯片可以帮到您

深圳市伦茨科技有限公司(以下简称“伦茨科技”)发布ST17H6x Soc平台。成为继Nordic之后全球第二家取得Apple Find My「查找」认证的芯片厂家,该平台提供可通过Apple Find My认证的Apple查找(Find My)功能集成解决方案。…...

肺癌相关知识

写在前面 大概想了解下肺癌相关的知识,开此贴做记录,看看后续有没有相关的生信文章思路。 综述 文章名期刊影响因子Lung cancer immunotherapy: progress, pitfalls, and promisesMol Cancer37.3 常见治疗手段有surgery, radiation therapy, chemoth…...

ChimeraX使用教程-安装及基本操作

ChimeraX使用教程-安装及基本操作 1、访问https://www.cgl.ucsf.edu/chimerax/download.html进行下载,然后安装 安装完成后,显示界面 2、基本操作 1、点击file,导入 .PDB 文件。 (注:在 alphafold在线预测蛋白》点…...

【小黑嵌入式系统第十一课】μC/OS-III程序设计基础(一)——任务设计、任务管理(创建基本状态内部任务)、任务调度、系统函数

上一课: 【小黑嵌入式系统第十课】μC/OS-III概况——实时操作系统的特点、基本概念(内核&任务&中断)、与硬件的关系&实现 文章目录 一、任务设计1.1 任务概述1.2 任务的类型1.2.1 单次执行类任务(运行至完成型&#…...

Redis一些常用的技术

文章目录 第1关:Redis 事务与锁机制第2关:流水线第3关:发布订阅第4关:超时命令第5关:使用Lua语言 第1关:Redis 事务与锁机制 编程要求 根据提示,在右侧编辑器Begin-End补充代码,根据…...

基于QPainter 绘图图片绕绘制设备中心旋转

项目地址:https://gitcode.com/m0_45463480/QPainter/tree/main 获取途径:进入CSDN->GitCode直接下载或者通过git拉取仓库内容。 QPainter是Qt框架中的一个类,用于在QWidget或QPixmap等设备上进行绘图操作。它提供了丰富的绘图功能,可以用于绘制线条、图形、文本等。Q…...

计算机网络(4):网络层

网络层提供的两种服务 虚电路服务(Virtual Circuit Service)和数据报服务(Datagram Service)是在网络层(第三层)提供的两种不同的通信服务。它们主要区别在于建立连接的方式和数据传输的方式。 虚电路服务…...

动态内存分配(malloc和free​、calloc和realloc​)

目录 一、为什么要有动态内存分配​ 二、C/C中程序内存区域划分​ 三、malloc和free​ 2.1、malloc 2.2、free​ 四、calloc和realloc​ 3.1、calloc​ 3.2、realloc​ 3.3realloc在调整内存空间的是存在两种情况: 3.4realloc有malloc的功能 五、常见的动…...

C语言---井字棋(三子棋)

Tic-Tac-Toe 1 游戏介绍和随机数1.1 游戏介绍1.2 随机数的生成1.3 棋盘大小和符号 2 设计游戏2.1 初始化棋盘2.2 打印棋盘2.3 玩家下棋2.4 电脑下棋2.5 判断输赢2.6 game()函数2.7 main()函数 3 完整三子棋代码3.1 Tic_Tac_Toe.h3.2 Tic_Tac_Toe.c3.3 Test.c 4 游戏代码的缺陷 …...

[Kubernetes]3. k8s集群Service详解

在上一节讲解了k8s 的pod,deployment,以及借助pod,deployment来部署项目,但会存在问题: 每次只能访问一个 pod,没有负载均衡自动转发到不同 pod访问还需要端口转发Pod重创后IP变了,名字也变了针对上面的问题,可以借助Service来解决,下面就来看看Service怎么使用 一.Service详…...

C++ 指定范围内递增初始化一个vector<int> | Python: list(range(31, 90))

通过lambda表达式 std::iota()实现&#xff1a; template <typename Tp> inline void print_vec(const std::vector<Tp>& vec) {fmt::print("[{}]\n", fmt::join(vec, ", ")); }// 相当于Python的lst list(range(31, 90))const std::ve…...

【Java之数据结构与算法】

选择排序 package Code01;public class Code01_SelectionSort {public static void selectionSort(int[] arr) {if(arrnull||arr.length<2) {return;}for(int i0;i<arr.length;i) {int minIndex i;for(int ji1;j<arr.length;j) {minIndex arr[minIndex] > arr[j…...

爬虫scrapy中间件的使用

爬虫scrapy中间件的使用 学习目标&#xff1a; 应用 scrapy中使用间件使用随机UA的方法应用 scrapy中使用代理ip的的方法应用 scrapy与selenium配合使用 1. scrapy中间件的分类和作用 1.1 scrapy中间件的分类 根据scrapy运行流程中所在位置不同分为&#xff1a; 下载中间件…...

普冉(PUYA)单片机开发笔记 [完结篇]:使用体会

失败的移植&#xff1a;FreeRTOS 当使用了 PY32F003 的各种接口和功能后&#xff0c;手痒痒想把 FreeRTOS 也搬到这个 MCU 上&#xff0c;参考 STM32 和 GD32 对 FreeRTOS 的移植步骤&#xff0c;把 FreeRTOS v202212.00 版本的源码搬到了 Keil 工程中&#xff0c;编译倒是通过…...

Elasticsearch:生成 AI 中的微调与 RAG

在自然语言处理 (NLP) 领域&#xff0c;出现了两种卓越的技术&#xff0c;每种技术都有其独特的功能&#xff1a;微调大型语言模型 (LLM) 和 RAG&#xff08;检索增强生成&#xff09;。 这些方法极大地影响了我们利用语言模型的方式&#xff0c;使它们更加通用和有效。 在本文…...

ip静态好还是dhcp好?

选择使用静态 IP 还是 DHCP&#xff08;动态主机配置协议&#xff09;取决于您的网络需求和环境。下面是它们的一些特点和适用场景&#xff1a; 静态 IP&#xff1a; 固定的 IP 地址&#xff1a;静态 IP 是手动配置在设备上的固定 IP 地址&#xff0c;不会随时间或网络变化而改…...

PolarDB-X、OceanBase、CockroachDB、TiDB二级索引写入性能测评

为什么要做这个测试 二级索引是关系型数据库相较于NoSQL数据库的一个关键差异。二级索引必须是强一致的&#xff0c;因此索引的写入需要与主键的写入放在一个事务当中&#xff0c;事务的性能是二级索引性能的基础。 目前市面上的分布式数据库中&#xff0c;从使用体验的角度看…...

Convolutional Neural Network(CNN)——卷积神经网络

1.NN的局限性 拓展性差 NN的计算量大性能差&#xff0c;不利于在不同规模的数据集上有效运行若输入维度发生变化&#xff0c;需要修改并重新训练网络容易过拟合 全连接导致参数量特别多&#xff0c;容易过拟合如果增加更多层&#xff0c;参数量会翻倍无法有效利用局部特征 输入…...

鸿蒙开发基本概念

1、开发准备 1.1、UI框架 HarmonyOS提供了一套UI开发框架&#xff0c;即方舟开发框架&#xff08;ArkUI框架&#xff09;。方舟开发框架可为开发者提供应用UI开发所必需的能力&#xff0c;比如多种组件、布局计算、动画能力、UI交互、绘制等。 方舟开发框架针对不同目的和技术…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

高等数学(下)题型笔记(八)空间解析几何与向量代数

目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...

相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)

【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...

#Uniapp篇:chrome调试unapp适配

chrome调试设备----使用Android模拟机开发调试移动端页面 Chrome://inspect/#devices MuMu模拟器Edge浏览器&#xff1a;Android原生APP嵌入的H5页面元素定位 chrome://inspect/#devices uniapp单位适配 根路径下 postcss.config.js 需要装这些插件 “postcss”: “^8.5.…...

认识CMake并使用CMake构建自己的第一个项目

1.CMake的作用和优势 跨平台支持&#xff1a;CMake支持多种操作系统和编译器&#xff0c;使用同一份构建配置可以在不同的环境中使用 简化配置&#xff1a;通过CMakeLists.txt文件&#xff0c;用户可以定义项目结构、依赖项、编译选项等&#xff0c;无需手动编写复杂的构建脚本…...

【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统

Kafka从入门到实战:构建高吞吐量分布式消息系统 一、Kafka概述 Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。 Kafka核心特…...

链式法则中 复合函数的推导路径 多变量“信息传递路径”

非常好&#xff0c;我们将之前关于偏导数链式法则中不能“约掉”偏导符号的问题&#xff0c;统一使用 二重复合函数&#xff1a; z f ( u ( x , y ) , v ( x , y ) ) \boxed{z f(u(x,y),\ v(x,y))} zf(u(x,y), v(x,y))​ 来全面说明。我们会展示其全微分形式&#xff08;偏导…...

Linux入门课的思维导图

耗时两周&#xff0c;终于把慕课网上的Linux的基础入门课实操、总结完了&#xff01; 第一次以Blog的形式做学习记录&#xff0c;过程很有意思&#xff0c;但也很耗时。 课程时长5h&#xff0c;涉及到很多专有名词&#xff0c;要去逐个查找&#xff0c;以前接触过的概念因为时…...