flink cdc mysql整理与总结
文章目录
- 一、业务中常见的需要数据同步的场景
- CDC是什么
- FlinkCDC是什么
- CDC原理
- 为什么是FlinkCDC
- 业务场景
- flink cdc对应flink的版本
- 二、模拟案例
- 1.阿里云flink sql
- 2.开源flink sql(单机模式)
- flink 安装
- 安装mysql
- 3.flink datastream
- 三、总结
提示:以下是本篇文章正文内容,下面案例可供参考
一、业务中常见的需要数据同步的场景
1、多个库的表合并到一张表。不同的业务线或者微服务在不同的数据库里开发,但是此时有些报表需要将多个库的类似的数据合并后做查询统计。或者,某些历史原因,类似刚开始的商业模式不清晰,导致一些业务线分分合合。或者某些边缘业务逐步融合到了主业务。早起的数据是分开的,业务运营也是分开,后来又合并成了一个大块业务。
2、某个数据需要写到多个存储中。业务数据需要写入到多个中间件或者存储中,比如业务的数据存储再Mysql的数据中,后来为了方便检索需要写入到ES,或者为了缓存需要写入到Redis,或者是Mysql分表的数据合并写入到Doris中。
3、数据仓库的场景。比如将表里的数据实时写入到DWS数据仓库的宽表中。
4、应急场景。如果不采专用CDC的方案,那么要达到实时查询的效果,只能在BFF层的代码调用多个中心层的查询API,然后再BFF层做各种聚合,运算。这种方式开发效率低下,万一有的中心层没有提供合适的查询API,临时开发的话,会让开发进度不可控。
总之,不管是数据多写、还是多表合并、还是建立数据仓库,都属于数据同步任务。
示例:pandas 是基于NumPy 的一种工具,该工具是为了解决数据分析任务而创建的。
CDC是什么
CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。
目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。
FlinkCDC是什么
官网地址:官网FlinkCDC
官方定义:This project provides a set of source connectors for Apache Flink® directly ingesting changes coming from different databases using Change Data Capture(CDC)。根据FlinkCDC官方给出的定义,FlinkCDC提供一组源数据的连接器,使用变更数据捕获的方式,直接吸收来自不同数据库的变更数据。
CDC原理
CDC的原理是,当数据源表发生变动时,会通过附加在表上的触发器或者 binlog 等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。这种方式的优点是实时性高,可以精确捕捉上游的各种变动。
为什么是FlinkCDC
1、FlinkCDC 提供了对 Debezium 连接器的封装和集成,简化了配置和使用的过程,并提供了更高级的 API 和功能,例如数据格式转换、事件时间处理等。Flink CDC 使用 Debezium 连接器作为底层的实现,将其与 Flink 的数据处理能力结合起来。通过配置和使用 Flink CDC,您可以轻松地将数据库中的变化数据流转化为 Flink 的 DataStream 或 Table,并进行实时的数据处理、转换和分析。
2、Flink的DataStream和SQL比较成熟和易用
3、Flink支持状态后端(State Backends),允许存储海量的数据状态
4、Flink有更好的生态,更多的Source和Sink的支持
业务场景
- 数据合并流向:
- 数据多写流向:
- 单数据源写单表流向:
- 数据链路对比
通过下图,我们可以看到Canal处理数据的链路比FlinkCDC更长,数据链路一旦变长意味着,出错的可能性更高。
flink cdc对应flink的版本
二、模拟案例
1.阿里云flink sql
- 验证mysql开启binlog
flink sql 定义binlog源数据,拿到数据处理(业务逻辑)再写表,后面很简单了
2.开源flink sql(单机模式)
背景:win10电脑安装vmware(虚拟化)软件,虚拟机中安装
linux节点一个,flink,mysql(yum默认安装的最新版本:8.0.37),java环境(此处安装java环境略,网上有)
flink 安装
###在linux下载flink包
[root@slave2 ~]# wget https://dlcdn.apache.org/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz
### 加压包到当前目录下
[root@slave2 ~]# tar zxvf flink-1.16.3-bin-scala_2.12.tgz
[root@slave2 ~]# cd flink-1.16.3
[root@slave2 lib]# wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.0/flink-sql-connector-mysql-cdc-2.4.0.jar修改flink 文件
[root@slave2 flink-1.16.3]# cat conf/flink-conf.yaml 修改内容如下(如果不修改则win10本地电脑无法访问flink web UI,这里浪费很多时间):
taskmanager.host: localhost
rest.bind-address: 0.0.0.0###启动flink
[root@slave2 flink-1.16.3]# bin/start-cluster.sh
###启动客户端
root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded
安装mysql
安装mysql遇到很多小问题
[root@slave2 ~]# yum -y install mysql-community-server
“MySQL 8.0 Community Server” 的 GPG 密钥已安装,但是不适用于此软件包。请检查源的公钥 URL 是否配置正确。
失败的软件包是:mysql-community-libs-8.0.37-1.el7.x86_64
###用于跳过GPG签名检查 可以安装成功
[root@slave2 ~]# yum -y install mysql-server --nogpgcheck
###验证myqsl是否可用
[root@slave2 ~]# systemctl start mysqld
[root@slave2 ~]# mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 63
Server version: 8.0.37 MySQL Community Server - GPL
Copyright (c) 2000, 2024, Oracle and/or its affiliates.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> ### 配置开启binlog
[root@slave2 ~]# vim /etc/my.cnf
log_bin=mysql_bin
binlog-format=Row
server-id=1
###重启mysql
[root@slave2 ~]# systemctl restart mysqld
###重新登陆mysql并查看binlog开启情况
mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)
mysql创建新用户等
###创建新用户时发现一直报错。查看mysql建密码时要求比较高(新版本mysql对密码要求高)
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+--------+
| Variable_name | Value |
+-------------------------------------------------+--------+
| validate_password.changed_characters_percentage | 0 |
| validate_password.check_user_name | ON |
| validate_password.dictionary_file | |
| validate_password.length | 10 |
| validate_password.mixed_case_count | 1 |
| validate_password.number_count | 1 |
| validate_password.policy | MEDIUM |
| validate_password.special_char_count | 1 |
+-------------------------------------------------+--------+
8 rows in set (0.00 sec)
###修改密码要求(在工作生产环境不建议这么做)
mysql> SET GLOBAL validate_password.length = 3;
Query OK, 0 rows affected (0.03 sec)
mysql> SET GLOBAL validate_password.policy = LOW;
Query OK, 0 rows affected (0.00 sec)
###再次查看对新建用户密码要求
mysql> SHOW VARIABLES LIKE 'validate_password%';
+-------------------------------------------------+-------+
| Variable_name | Value |
+-------------------------------------------------+-------+
| validate_password.changed_characters_percentage | 0 |
| validate_password.check_user_name | ON |
| validate_password.dictionary_file | |
| validate_password.length | 4 |
| validate_password.mixed_case_count | 1 |
| validate_password.number_count | 1 |
| validate_password.policy | LOW |
| validate_password.special_char_count | 1 |
+-------------------------------------------------+-------+
8 rows in set (0.00 sec)
###新建用户:root1234
mysql> CREATE USER 'root1234'@'localhost' IDENTIFIED BY 'root1234';
Query OK, 0 rows affected (0.01 sec)mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.02 sec)mysql> GRANT SELECT ON *.* TO 'root1234'@'localhost';
Query OK, 0 rows affected (0.01 sec)mysql> SELECT User, Host FROM mysql.user;
+------------------+-----------+
| User | Host |
+------------------+-----------+
| mysql.infoschema | localhost |
| mysql.session | localhost |
| mysql.sys | localhost |
| root | localhost |
| root1234 | localhost |
+------------------+-----------+
5 rows in set (0.00 sec)mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.00 sec)
建mysql库表(数据源头库表)
Flink SQL>
select * from test_flink_cdc5;
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Can’t find any matched tables, please check your configured database-name: [mysql] and table-name: [mysql.test_cdc]
###mysql 默认只有这4个库(当时直接用默认库mysql建表导致flink 报一些上面奇诡的错)
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| performance_schema |
| sys |
+--------------------+
###新建一个库:test
mysql> CREATE DATABASE test CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
Query OK, 1 row affected (0.02 sec)mysql> use test;
Database changed
###建表
mysql> CREATE TABLE `test_cdc` (-> `id` int NOT NULL AUTO_INCREMENT,-> `name` varchar(255) DEFAULT NULL,-> PRIMARY KEY (`id`)-> ) ENGINE=InnoDB ;
Query OK, 0 rows affected (0.04 sec)mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| test_cdc |
+----------------+
1 row in set (0.00 sec)
##开始验证
##mysql 使用上面创建的用户密码登录mysql
[root@slave2 ~]# mysql -u root -p'root1234'
mysql> use test
##flink登录
[root@slave2 flink-1.16.3]# bin/stop-cluster.sh
[root@slave2 flink-1.16.3]# ./bin/sql-client.sh embedded
##在flink sql中定义mysql源
CREATE TABLE test_flink_cdc ( id INT, name STRING,primary key(id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username'='root1234', 'password'='root1234', 'database-name'='test', 'table-name'='test_cdc'
);
###查询flink 接收到的binlog数据
Flink SQL> select * from test_flink_cdc;###到mysql sql界面向test_cdc表插入数据
mysql> INSERT INTO test_cdc VALUES (001, 'test01');
Query OK, 1 row affected (0.02 sec)mysql> INSERT INTO test_cdc VALUES (002, 'test02');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (003, 'test03');
Query OK, 1 row affected (0.04 sec)mysql> INSERT INTO test_cdc VALUES (004, 'test04');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (005, 'test05');
Query OK, 1 row affected (0.01 sec)mysql> INSERT INTO test_cdc VALUES (006, 'test06');
Query OK, 1 row affected (0.00 sec)
向mysql表中插入数据
flink sql这时可以接到binlog数据
查看flink UI job情况
小结:当flink可以拿到mysql binlog源头数据,下面就好做了,根据自己的业务处理sink到任何数据库或组件中(例如sink到mysql,hbase,hive,pg,kafka等等),后面sink就不演示了。
下载链接:
1.mysql jdbc jar包驱动下载
2.flink cdc驱动下载
3.flink下载
3.flink datastream
datastream 比较灵活简单,下面是举例代码片段(datastream的CDC比flink sql还简单,打个jar包在flink web UI界面上传运行即可,此处不做举例)
public class MySqlSourceExample {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("yourHostname").port(yourPort).databaseList("yourDatabaseName") // set captured database.tableList("yourDatabaseName.yourTableName") // set captured table.username("yourUsername").password("yourPassword").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// set 4 parallel source tasks.setParallelism(4).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute("Print MySQL Snapshot + Binlog");}
}
三、总结
- 公司用的cdc不能分享出来,所以搭建上面案例时遇到很多问题,遇到很多坑。
- 工作中使用的就是如下截图流程,没有使用canal和kafka,使用的是
logtail和SLS(阿里云的组件,类似kafka,但要比kafka等功能强大)
- 上面说了很多关于flink cdc的优点。我结合工作中使用的flink cdc说一些缺点(自己结合业务场景)。
1.使用flink cdc不适合直观观察binlog的数据(例如脏数据,数据断流,不能直观看到最近的binglog情况,表的更新频率不高等造成的困扰)。
2.使用阿里云SLS收集mysql binlog数据,将数据保存近1个月(保持时间可设置),同时可以在logstore查看数据结构样式(sls支持sql语法查询日志数据),方便后续flink代码开发,也方便flink sql和datastream代码debug。
3.使用flink cdc做数据质量监控比较后知后觉(只能监控已表数据),我这边做法是直接监控sls原始数据质量,发现有数据质量问题会报出来。sls也支持实时断流提醒。
4.一般对于简单的不太重要的业务适合使用flink cdc,这样开发快,数据流不用咋校验。对于复杂数据或复杂业务或重要数据,需要观察binlog数据结果(不信任上游其他部门数据)还是使用类似sls比较好。方便查询数据变化与汇总,方便做数据报警等。
5.总之还是要根据自己的业务场景和自己公司现有技术组件组合着使用比较好。
相关文章:

flink cdc mysql整理与总结
文章目录 一、业务中常见的需要数据同步的场景CDC是什么FlinkCDC是什么CDC原理为什么是FlinkCDC业务场景flink cdc对应flink的版本 二、模拟案例1.阿里云flink sql2.开源flink sql(单机模式)flink 安装安装mysql3.flink datastream 三、总结 提示:以下是本篇文章正文…...

【三维重建】ePnP
PnP问题应用与一下场景: 已知三维点和对应二维点以及相机相机内参数,可以获取相机外参。 我们介绍其中的一种算法:ePnP 算法流程 1、ePnP算法首先在世界坐标系内寻找4个控制点,记作 C 1 w , C 2 w , C 3 w , C 4 w C_1^w,C_2^w,…...

C++进阶之路:何为运算符重载、赋值运算符重载与前后置++重载(类与对象_中篇)
✨✨ 欢迎大家来访Srlua的博文(づ ̄3 ̄)づ╭❤~✨✨ 🌟🌟 欢迎各位亲爱的读者,感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢,在这里我会分享我的知识和经验。&am…...

8、python基础知识图谱
...

智慧校园建设规划方案
在信息化浪潮的推动下,智慧校园的建设已成为教育现代化的必然趋势。以创新科技赋能教育,打造智慧校园,旨在提升教学品质,优化管理流程,增强学生体验。构建智慧校园需要具有前瞻性的规划方案,它将以教育为核…...

【深度学习实战—8】:基于MediaPipe的人脸检测
✨博客主页:王乐予🎈 ✨年轻人要:Living for the moment(活在当下)!💪 🏆推荐专栏:【图像处理】【千锤百炼Python】【深度学习】【排序算法】 目录 😺一、Med…...

OSCP学习,布置你的Kali Linux
为什么要写这篇文章? 我是一个OSCP学习者,以教促学。同时也能让各位入门的师傅们更好的了解OSCP这门课程。本人文笔不太好,如果有什么写的不对的地方,师傅们多多指正。 参考资料: OSCP 考试电子书 Linux Basics for…...
PWA离线优先策略:提升用户体验的关键步骤
Progressive Web Apps (PWA) 的离线优先策略是通过Service Worker和Cache API实现的,它允许在没有网络连接时仍然可以访问网站的部分或全部内容。 2500G计算机入门到高级架构师开发资料超级大礼包免费送! 1. 创建Service Worker注册文件(se…...

网页提示“非私密连接”是为什么?
网页提示“非私密连接”(英文提示可能是 "Your connection is not private" 或 "Your connection is not secure")主要是因为浏览器无法验证你正试图访问的网站的SSL/TLS证书,或者是证书存在问题,从而无法建立…...

[自动驾驶技术]-8 Tesla自动驾驶方案之硬件(AI Day 2022)
特斯拉在AI Day 2022先介绍了AI编译器,后面又介绍了Dojo的硬件软件,软件部分和AI编译器有部分重叠,本文介绍还是延用AI Day的思路,分为三部分:AI编译和推理,Dojo硬件,Dojo软件。 特斯拉车道检测…...

人力资源管理信息化系统如何支持企业开展管理诊断?
华恒智信人力资源顾问有限公司致力于帮助企业开展人力资源管理方面的各项提升改进工作,在长期的咨询工作中,最常听到企业提到的问题莫过于管理诊断方面的问题,事实上,很多企业在日常工作中,都意识到企业内部存在管理方…...

Cohere继Command-R+之后发布大模型Aya-23,性能超越 Gemma、Mistral 等,支持中文
前言 近年来,多语言大模型(MLLM)发展迅速,但大多数模型的性能依然存在显著差距,尤其是在非英语语言方面表现不佳。为了推动多语言自然语言处理技术的发展,Cohere团队发布了新的多语言指令微调模型家族——…...

身为UI设计老鸟,不学点3D,好像要被潮流抛弃啦,卷起来吧。
当前3D原则在UI设计中运用的越来越多,在UI设计中,使用3D元素可以为界面带来以下几个价值: 增强视觉冲击力:3D元素可以通过立体感和逼真的效果,为界面增添视觉冲击力,使得设计更加生动、吸引人,并…...

【C语言】实现贪吃蛇--项目实践(超详细)
前言: 贪吃蛇游戏大家都玩过吧?这次我们要用C语言来亲手制作一个!这个项目不仅能让我们复习C语言的知识,还能了解游戏是怎么一步步做出来的。我们会一起完成蛇的移动、食物的生成,还有碰撞检测等有趣的部分。准备好了…...

Elasticsearch 分析器的高级用法一(同义词,高亮搜索)
Elasticsearch 分析器的高级用法一(同义词,高亮搜索) 同义词简介分析使用同义词案例 高亮搜索高亮搜索策略unifiedplainvh 同义词 简介 在搜索场景中,同义词用来处理不同的查询词,有可能是想表达相同的搜索目标。 例…...

Python 开心消消乐
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…...
mysql - 索引基本知识梳理
mysql索引基本知识梳理 索引介绍 官方介绍索引是帮助MySQL高效获取数据的数据结构, 原理为以空间换时间, mysql的索引采用的是B树的结构 索引的优缺点 优点: 提高查询效率降低数据库IO成本通过索引对数据进行排序, 降低排序成本, 降低CPU消耗 缺点:…...
Nginx SSL/TLS配置:搭建安全的HTTPS网站
随着互联网安全性的日益提升,HTTPS已经成为网站安全通信的标配。Nginx作为一款高性能的HTTP和反向代理服务器,支持SSL/TLS协议,使得我们可以轻松地搭建安全的HTTPS网站。下面,我们将详细介绍如何在Nginx上配置SSL/TLS,…...
echarts 折线图流光效果偏移或不显示
x轴数据需要字符串数组...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真
目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销,平衡网络负载,延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...
Admin.Net中的消息通信SignalR解释
定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...

dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...

抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...

【单片机期末】单片机系统设计
主要内容:系统状态机,系统时基,系统需求分析,系统构建,系统状态流图 一、题目要求 二、绘制系统状态流图 题目:根据上述描述绘制系统状态流图,注明状态转移条件及方向。 三、利用定时器产生时…...

Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...

20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
第7篇:中间件全链路监控与 SQL 性能分析实践
7.1 章节导读 在构建数据库中间件的过程中,可观测性 和 性能分析 是保障系统稳定性与可维护性的核心能力。 特别是在复杂分布式场景中,必须做到: 🔍 追踪每一条 SQL 的生命周期(从入口到数据库执行)&#…...
Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?
Pod IP 的本质与特性 Pod IP 的定位 纯端点地址:Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址(如 10.244.1.2)无特殊名称:在 Kubernetes 中,它通常被称为 “Pod IP” 或 “容器 IP”生命周期:与 Pod …...