flink cdc mysql整理与总结
文章目录
- 一、业务中常见的需要数据同步的场景
- CDC是什么
- FlinkCDC是什么
- CDC原理
- 为什么是FlinkCDC
- 业务场景
- flink cdc对应flink的版本
- 二、模拟案例
- 1.阿里云flink sql
- 2.开源flink sql(单机模式)
- flink 安装
- 安装mysql
- 3.flink datastream
- 三、总结
提示:以下是本篇文章正文内容,下面案例可供参考
一、业务中常见的需要数据同步的场景
1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历史原因,类似刚开始的商业模式不清晰,导致一些业务线分分合合。或者某些边缘业务逐步融合到了主业务。早起的数据是分开的,业务运营也是分开,后来又合并成了一个大块业务。
2、某个数据需要写到多个存储中。业务数据需要写入到多个中间件或者存储中,比如业务的数据存储再Mysql的数据中,后来为了方便检索需要写入到ES,或者为了缓存需要写入到Redis,或者是Mysql分表的数据合并写入到Doris中。
3、数据仓库的场景。比如将表里的数据实时写入到DWS数据仓库的宽表中。
4、应急场景。如果不采专用CDC的方案,那么要达到实时查询的效果,只能在BFF层的代码调用多个中心层的查询API,然后再BFF层做各种聚合,运算。这种方式开发效率低下,万一有的中心层没有提供合适的查询API,临时开发的话,会让开发进度不可控。
总之,不管是数据多写、还是多表合并、还是建立数据仓库,都属于数据同步任务。
示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。
CDC是什么
CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。
目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。
FlinkCDC是什么
官网地址:官网FlinkCDC
官方定义:This project provides a set of source connectors for Apache Flink® directly ingesting changes coming from different databases using Change Data Capture(CDC)。根据FlinkCDC官方给出的定义,FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。
CDC原理
CDC的原理是,当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动。
为什么是FlinkCDC
1、FlinkCDC 提供了对 Debezium 连接器的封装和集成,简化了配置和使用的过程,并提供了更高级的 API 和功能,例如数据格式转换、事件时间处理等。Flink CDC 使用 Debezium 连接器作为底层的实现,将其与 Flink 的数据处理能力结合起来。通过配置和使用 Flink CDC,您可以轻松地将数据库中的变化数据流转化为 Flink 的 DataStream 或 Table,并进行实时的数据处理、转换和分析。
2、Flink的DataStream和SQL比较成熟和易用
3、Flink支持状态后端(State Backends),允许存储海量的数据状态
4、Flink有更好的生态,更多的Source和Sink的支持
业务场景
- 数据合并流向:

- 数据多写流向:

- 单数据源写单表流向:

- 数据链路对比
通过下图,我们可以看到Canal处理数据的链路比FlinkCDC更长,数据链路一旦变长意味着,出错的可能性更高。


flink cdc对应flink的版本

二、模拟案例
1.阿里云flink sql
- 验证mysql开启binlog

flink sql 定义binlog源数据,拿到数据处理(业务逻辑)再写表,后面很简单了

2.开源flink sql(单机模式)
背景:win10电脑安装vmware(虚拟化)软件,虚拟机中安装
linux节点一个,flink,mysql(yum默认安装的最新版本:8.0.37),java环境(此处安装java环境略,网上有)
flink 安装
###在linux下载flink包
[root@slave2 ~]# wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
### 加压包到当前目录下
[root@slave2 ~]# tar zxvf flink-1.16.3-bin-scala_2.12.tgz
[root@slave2 ~]# cd flink-1.16.3
[root@slave2 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar修改flink 文件
[root@slave2 flink-1.16.3]# cat conf/flink-conf.yaml 修改内容如下(如果不修改则win10本地电脑无法访问flink web UI,这里浪费很多时间):
taskmanager.host: localhost
rest.bind-address: 0.0.0.0###启动flink
[root@slave2 flink-1.16.3]# bin/start-cluster.sh
###启动客户端
root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded

安装mysql
安装mysql遇到很多小问题
[root@slave2 ~]# yum -y install mysql-community-server
“MySQL 8.0 Community Server” 的 GPG 密钥已安装,但是不适用于此软件包。请检查源的公钥 URL 是否配置正确。
失败的软件包是:mysql-community-libs-8.0.37-1.el7.x86_64

###用于跳过GPG签名检查 可以安装成功
[root@slave2 ~]# yum -y install mysql-server --nogpgcheck
###验证myqsl是否可用
[root@slave2 ~]# systemctl start mysqld
[root@slave2 ~]# mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 63
Server version: 8.0.37 MySQL Community Server - GPL
Copyright (c) 2000, 2024, Oracle and/or its affiliates.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> ### 配置开启binlog
[root@slave2 ~]# vim /etc/my.cnf
log_bin=mysql_bin
binlog-format=Row
server-id=1
###重启mysql
[root@slave2 ~]# systemctl restart mysqld
###重新登陆mysql并查看binlog开启情况
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)

mysql创建新用户等
###创建新用户时发现一直报错。查看mysql建密码时要求比较高(新版本mysql对密码要求高)
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+--------+
| Variable_name | Value |
+-------------------------------------------------+--------+
| validate_password.changed_characters_percentage | 0 |
| validate_password.check_user_name | ON |
| validate_password.dictionary_file | |
| validate_password.length | 10 |
| validate_password.mixed_case_count | 1 |
| validate_password.number_count | 1 |
| validate_password.policy | MEDIUM |
| validate_password.special_char_count | 1 |
+-------------------------------------------------+--------+
8 rows in set (0.00 sec)
###修改密码要求(在工作生产环境不建议这么做)
mysql> SET GLOBAL validate_password.length = 3;
Query OK, 0 rows affected (0.03 sec)
mysql> SET GLOBAL validate_password.policy = LOW;
Query OK, 0 rows affected (0.00 sec)
###再次查看对新建用户密码要求
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+-------+
| Variable_name | Value |
+-------------------------------------------------+-------+
| validate_password.changed_characters_percentage | 0 |
| validate_password.check_user_name | ON |
| validate_password.dictionary_file | |
| validate_password.length | 4 |
| validate_password.mixed_case_count | 1 |
| validate_password.number_count | 1 |
| validate_password.policy | LOW |
| validate_password.special_char_count | 1 |
+-------------------------------------------------+-------+
8 rows in set (0.00 sec)
###新建用户:root1234
mysql> CREATE USER 'root1234'@'localhost' IDENTIFIED BY 'root1234';
Query OK, 0 rows affected (0.01 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.02 sec)mysql> GRANT SELECT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.01 sec)mysql> SELECT User, Host FROM mysql.user;
+------------------+-----------+
| User | Host |
+------------------+-----------+
| mysql.infoschema | localhost |
| mysql.session | localhost |
| mysql.sys | localhost |
| root | localhost |
| root1234 | localhost |
+------------------+-----------+
5 rows in set (0.00 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)
建mysql库表(数据源头库表)
Flink SQL>
select * from test_flink_cdc5;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Can’t find any matched tables, please check your configured database-name: [mysql] and table-name: [mysql.test_cdc]
###mysql 默认只有这4个库(当时直接用默认库mysql建表导致flink 报一些上面奇诡的错)
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
| sys |
+--------------------+
###新建一个库:test
mysql> CREATE DATABASE test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Query OK, 1 row affected (0.02 sec)mysql> use test;
Database changed
###建表
mysql> CREATE TABLE `test_cdc` (-> `id` int NOT NULL AUTO_INCREMENT,-> `name` varchar(255) DEFAULT NULL,-> PRIMARY KEY (`id`)-> ) ENGINE=InnoDB ;
Query OK, 0 rows affected (0.04 sec)mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| test_cdc |
+----------------+
1 row in set (0.00 sec)
##开始验证
##mysql 使用上面创建的用户密码登录mysql
[root@slave2 ~]# mysql -u root -p'root1234'
mysql> use test
##flink登录
[root@slave2 flink-1.16.3]# bin/stop-cluster.sh
[root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded
##在flink sql中定义mysql源
CREATE TABLE test_flink_cdc ( id INT, name STRING,primary key(id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username'='root1234', 'password'='root1234', 'database-name'='test', 'table-name'='test_cdc'
);
###查询flink 接收到的binlog数据
Flink SQL> select * from test_flink_cdc;###到mysql sql界面向test_cdc表插入数据
mysql> INSERT INTO test_cdc VALUES (001, 'test01');
Query OK, 1 row affected (0.02 sec)mysql> INSERT INTO test_cdc VALUES (002, 'test02');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (003, 'test03');
Query OK, 1 row affected (0.04 sec)mysql> INSERT INTO test_cdc VALUES (004, 'test04');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (005, 'test05');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (006, 'test06');
Query OK, 1 row affected (0.00 sec)

向mysql表中插入数据

flink sql这时可以接到binlog数据

查看flink UI job情况


小结:当flink可以拿到mysql binlog源头数据,下面就好做了,根据自己的业务处理sink到任何数据库或组件中(例如sink到mysql,hbase,hive,pg,kafka等等),后面sink就不演示了。
下载链接:
1.mysql jdbc jar包驱动下载
2.flink cdc驱动下载
3.flink下载
3.flink datastream
datastream 比较灵活简单,下面是举例代码片段(datastream的CDC比flink sql还简单,打个jar包在flink web UI界面上传运行即可,此处不做举例)
public class MySqlSourceExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("yourHostname").port(yourPort).databaseList("yourDatabaseName") // set captured database.tableList("yourDatabaseName.yourTableName") // set captured table.username("yourUsername").password("yourPassword").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 4 parallel source tasks.setParallelism(4).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("Print MySQL Snapshot + Binlog");}
}
三、总结
- 公司用的cdc不能分享出来,所以搭建上面案例时遇到很多问题,遇到很多坑。
- 工作中使用的就是如下截图流程,没有使用canal和kafka,使用的是
logtail和SLS(阿里云的组件,类似kafka,但要比kafka等功能强大) - 上面说了很多关于flink cdc的优点。我结合工作中使用的flink cdc说一些缺点(自己结合业务场景)。
1.使用flink cdc不适合直观观察binlog的数据(例如脏数据,数据断流,不能直观看到最近的binglog情况,表的更新频率不高等造成的困扰)。
2.使用阿里云SLS收集mysql binlog数据,将数据保存近1个月(保持时间可设置),同时可以在logstore查看数据结构样式(sls支持sql语法查询日志数据),方便后续flink代码开发,也方便flink sql和datastream代码debug。
3.使用flink cdc做数据质量监控比较后知后觉(只能监控已表数据),我这边做法是直接监控sls原始数据质量,发现有数据质量问题会报出来。sls也支持实时断流提醒。
4.一般对于简单的不太重要的业务适合使用flink cdc,这样开发快,数据流不用咋校验。对于复杂数据或复杂业务或重要数据,需要观察binlog数据结果(不信任上游其他部门数据)还是使用类似sls比较好。方便查询数据变化与汇总,方便做数据报警等。
5.总之还是要根据自己的业务场景和自己公司现有技术组件组合着使用比较好。

相关文章:
flink cdc mysql整理与总结
文章目录 一、业务中常见的需要数据同步的场景CDC是什么FlinkCDC是什么CDC原理为什么是FlinkCDC业务场景flink cdc对应flink的版本 二、模拟案例1.阿里云flink sql2.开源flink sql(单机模式)flink 安装安装mysql3.flink datastream 三、总结 提示:以下是本篇文章正文…...
【三维重建】ePnP
PnP问题应用与一下场景: 已知三维点和对应二维点以及相机相机内参数,可以获取相机外参。 我们介绍其中的一种算法:ePnP 算法流程 1、ePnP算法首先在世界坐标系内寻找4个控制点,记作 C 1 w , C 2 w , C 3 w , C 4 w C_1^w,C_2^w,…...
C++进阶之路:何为运算符重载、赋值运算符重载与前后置++重载(类与对象_中篇)
✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…...
8、python基础知识图谱
...
智慧校园建设规划方案
在信息化浪潮的推动下,智慧校园的建设已成为教育现代化的必然趋势。以创新科技赋能教育,打造智慧校园,旨在提升教学品质,优化管理流程,增强学生体验。构建智慧校园需要具有前瞻性的规划方案,它将以教育为核…...
【深度学习实战—8】:基于MediaPipe的人脸检测
✨博客主页:王乐予🎈 ✨年轻人要:Living for the moment(活在当下)!💪 🏆推荐专栏:【图像处理】【千锤百炼Python】【深度学习】【排序算法】 目录 😺一、Med…...
OSCP学习,布置你的Kali Linux
为什么要写这篇文章? 我是一个OSCP学习者,以教促学。同时也能让各位入门的师傅们更好的了解OSCP这门课程。本人文笔不太好,如果有什么写的不对的地方,师傅们多多指正。 参考资料: OSCP 考试电子书 Linux Basics for…...
PWA离线优先策略:提升用户体验的关键步骤
Progressive Web Apps (PWA) 的离线优先策略是通过Service Worker和Cache API实现的,它允许在没有网络连接时仍然可以访问网站的部分或全部内容。 2500G计算机入门到高级架构师开发资料超级大礼包免费送! 1. 创建Service Worker注册文件(se…...
网页提示“非私密连接”是为什么?
网页提示“非私密连接”(英文提示可能是 "Your connection is not private" 或 "Your connection is not secure")主要是因为浏览器无法验证你正试图访问的网站的SSL/TLS证书,或者是证书存在问题,从而无法建立…...
[自动驾驶技术]-8 Tesla自动驾驶方案之硬件(AI Day 2022)
特斯拉在AI Day 2022先介绍了AI编译器,后面又介绍了Dojo的硬件软件,软件部分和AI编译器有部分重叠,本文介绍还是延用AI Day的思路,分为三部分:AI编译和推理,Dojo硬件,Dojo软件。 特斯拉车道检测…...
人力资源管理信息化系统如何支持企业开展管理诊断?
华恒智信人力资源顾问有限公司致力于帮助企业开展人力资源管理方面的各项提升改进工作,在长期的咨询工作中,最常听到企业提到的问题莫过于管理诊断方面的问题,事实上,很多企业在日常工作中,都意识到企业内部存在管理方…...
Cohere继Command-R+之后发布大模型Aya-23,性能超越 Gemma、Mistral 等,支持中文
前言 近年来,多语言大模型(MLLM)发展迅速,但大多数模型的性能依然存在显著差距,尤其是在非英语语言方面表现不佳。为了推动多语言自然语言处理技术的发展,Cohere团队发布了新的多语言指令微调模型家族——…...
身为UI设计老鸟,不学点3D,好像要被潮流抛弃啦,卷起来吧。
当前3D原则在UI设计中运用的越来越多,在UI设计中,使用3D元素可以为界面带来以下几个价值: 增强视觉冲击力:3D元素可以通过立体感和逼真的效果,为界面增添视觉冲击力,使得设计更加生动、吸引人,并…...
【C语言】实现贪吃蛇--项目实践(超详细)
前言: 贪吃蛇游戏大家都玩过吧?这次我们要用C语言来亲手制作一个!这个项目不仅能让我们复习C语言的知识,还能了解游戏是怎么一步步做出来的。我们会一起完成蛇的移动、食物的生成,还有碰撞检测等有趣的部分。准备好了…...
Elasticsearch 分析器的高级用法一(同义词,高亮搜索)
Elasticsearch 分析器的高级用法一(同义词,高亮搜索) 同义词简介分析使用同义词案例 高亮搜索高亮搜索策略unifiedplainvh 同义词 简介 在搜索场景中,同义词用来处理不同的查询词,有可能是想表达相同的搜索目标。 例…...
Python 开心消消乐
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…...
mysql - 索引基本知识梳理
mysql索引基本知识梳理 索引介绍 官方介绍索引是帮助MySQL高效获取数据的数据结构, 原理为以空间换时间, mysql的索引采用的是B树的结构 索引的优缺点 优点: 提高查询效率降低数据库IO成本通过索引对数据进行排序, 降低排序成本, 降低CPU消耗 缺点:…...
Nginx SSL/TLS配置:搭建安全的HTTPS网站
随着互联网安全性的日益提升,HTTPS已经成为网站安全通信的标配。Nginx作为一款高性能的HTTP和反向代理服务器,支持SSL/TLS协议,使得我们可以轻松地搭建安全的HTTPS网站。下面,我们将详细介绍如何在Nginx上配置SSL/TLS,…...
echarts 折线图流光效果偏移或不显示
x轴数据需要字符串数组...
【Linux】shell脚本忽略错误继续执行
在 shell 脚本中,可以使用 set -e 命令来设置脚本在遇到错误时退出执行。如果你希望脚本忽略错误并继续执行,可以在脚本开头添加 set e 命令来取消该设置。 举例1 #!/bin/bash# 取消 set -e 的设置 set e# 执行命令,并忽略错误 rm somefile…...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
【AI学习】三、AI算法中的向量
在人工智能(AI)算法中,向量(Vector)是一种将现实世界中的数据(如图像、文本、音频等)转化为计算机可处理的数值型特征表示的工具。它是连接人类认知(如语义、视觉特征)与…...
【生成模型】视频生成论文调研
工作清单 上游应用方向:控制、速度、时长、高动态、多主体驱动 类型工作基础模型WAN / WAN-VACE / HunyuanVideo控制条件轨迹控制ATI~镜头控制ReCamMaster~多主体驱动Phantom~音频驱动Let Them Talk: Audio-Driven Multi-Person Conversational Video Generation速…...
【分享】推荐一些办公小工具
1、PDF 在线转换 https://smallpdf.com/cn/pdf-tools 推荐理由:大部分的转换软件需要收费,要么功能不齐全,而开会员又用不了几次浪费钱,借用别人的又不安全。 这个网站它不需要登录或下载安装。而且提供的免费功能就能满足日常…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
