FlinkCDC 实现 MySQL 数据变更实时同步
文章目录
- 1、基本介绍
- 2、代码实战
- 2.1、数据源准备
- 2.2、代码实战
- 2.3、数据格式
1、基本介绍
Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:
- FlinkSQL
- Flink DataStream 和 Table API(本文使用该方式)

对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:

2、代码实战
2.1、数据源准备
本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:
CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主键id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',`content` varchar(500) DEFAULT NULL COMMENT '消息内容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:

最后,要把数据库时区配置好,否则会出现问题,命令如下:
SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

2.2、代码实战
首先,引入Flink CDC相关依赖,内容如下:
<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>
第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}
最后,配置好 Flink CDC 监听进程,随着项目启动运行:
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置监听数据源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}
项目启动完毕后,可以通过8081端口访问Flink UI页面:

2.3、数据格式
上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:
# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}
# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}
# 删除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}
相关文章:
FlinkCDC 实现 MySQL 数据变更实时同步
文章目录 1、基本介绍2、代码实战2.1、数据源准备2.2、代码实战2.3、数据格式 1、基本介绍 Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB…...
JavaWeb——Maven(4/8):Maven坐标,idea集成-导入maven项目(两种方式)
目录 Maven坐标 导入Maven项目 第一种方式 第二种方式 Maven坐标 Maven 坐标 是 Maven 当中资源的唯一标识。通过这个坐标,我们就能够唯一定位资源的位置。 Maven 坐标主要用在两个地方。第一个地方:我们可以使用坐标来定义项目。第二个地方&#…...
实现uniapp天地图边界范围覆盖
在uniapp中,难免会遇到使用地图展示的功能,但是百度谷歌这些收费的显然对于大部分开源节流的开发者是不愿意接受的,所以天地图则是最佳选择。 此篇文章,详细的实现地图展示功能,并且可以自定义容器宽高,还可…...
思科网络设备命令
一、交换机巡检命令 接口和流量状态 show interface stats:查看所有接口当前流量。show interface summary:查看所有接口当前状态和流量。show interface status:查看接口状态及可能的错误。show interface | include errors | FastEthernet …...
Egg.js使用ejs快速自动生成resetful风格的CRUD接口
目前的插件能够自动生成egg的crud的都不太好用 我们自己写一个吧 ejs模块 也方便定制 安装依赖 npm install ejs --save ejs 是一个简单易用的模板引擎,常用于 Node.js 应用程序中 在项目根目录下创建 template/controller.ejs 模板文件 use strict;const Co…...
自动化抖音点赞取消脚本批量处理
🌟 前言 欢迎来到我的技术小宇宙!🌌 这里不仅是我记录技术点滴的后花园,也是我分享学习心得和项目经验的乐园。📚 无论你是技术小白还是资深大牛,这里总有一些内容能触动你的好奇心。🔍 &#x…...
基于YOLOv8深度学习的智能车牌检测与识别系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战
背景及意义 智能车牌检测与识别系统通过使用最新的YOLOv8与PaddleOCR算法能够迅速、准确地在多种环境下实现实时车牌的检测和识别。本文基于YOLOv8深度学习框架,通过16770张图片,训练了一个进行车牌检测模型,可以检测蓝牌与绿牌,然后对检测到的车牌使用O…...
qt QGraphicsGridLayout详解
一、概述 QGraphicsGridLayout是Qt框架中用于在QGraphicsScene中布置图形项的一个布局管理器。它类似于QWidget中的QGridLayout,但主要处理的是QGraphicsItem和QGraphicsWidget等图形项。通过合理设置网格位置、伸缩因子和尺寸,可以实现复杂而灵活的布局…...
数字处理系列
(1)将数字转化成中文的过滤器 <template><div><p>数字转中文:{{ 110 | numberToChinese }}</p></div></template><script>export default {filters: {numberToChinese(num) {const chineseNums …...
基于开源Jetlinks物联网平台协议包-MQTT自定义主题数据的编解码
目录 前言 1.下载官方协议包 2.解压 3.自定义主题 4.重写解码方法 5.以下是我解析后接收到的数据 前言 最近这段时间,一直在用开源的Jetlinks物联网平台在学习,偶尔有一次机会接触到物联网设备对接,在协议对接的时候,遇到了…...
【Python】Python2.7升级Python3
需求背景 服务是跑在docker的容器里的,因此要新建image依赖环境是Ubuntu,老的是16.4。 步骤 先准备环境,因为只有你的环境上去了,运行代码的时候才会报错,这样才会把需要改的代码暴露出来。 python3.5目前也是被遗弃的…...
Python 内置函数 round() 详解
在 Python 编程中,round() 函数是一个非常实用的内置函数,用于对数字进行四舍五入。无论是在数据处理、财务计算还是科学计算中,round() 函数都能帮助我们得到所需的精确值。本文将详细介绍 round() 函数的用法和注意事项。 1. round() 函数…...
JavaScript入门中-流程控制语句
本文转载自:https://fangcaicoding.cn/article/52 大家好!我是方才,目前是8人后端研发团队的负责人,拥有6年后端经验&3年团队管理经验,截止目前面试过近200位候选人,主导过单表上10亿、累计上100亿数据…...
kconfig语法(一)
一、安装Kconfiglib python -m pip install windows-curses python -m pip install kconfiglib二、使用样例 ①创建kconfig文件。 ②在kconfig文件添加内容: config KCONFIG_DEMO_ITEM1boolprompt "demonstate item1 for bool learning"config KCONFIG_DEMO_ITE…...
十七、行为型(命令模式)
命令模式(Command Pattern) 概念 命令模式是一种行为型设计模式,它将请求封装成一个对象,从而使您可以使用不同的请求对客户进行参数化,排队请求,以及支持可撤销操作。通过这种模式,调用操作的…...
原材料供应商的GRS认证证书过期了怎么办?
在全球纺织和时尚产业中,GRS(Global Recycle Standard,全球再生标准)认证已成为衡量企业环保和可持续发展的重要指标。然而,当原材料供应商的GRS认证证书过期时,企业需迅速采取行动,以确保供应链…...
C++编程:实现一个基于原始指针的环形缓冲区(RingBuffer)缓存串口数据
文章目录 0. 引言1. 使用示例2. 流程图2.1 追加数据流程2.2 获取空闲块流程2.3 处理特殊字符流程2.4 释放块流程2.5 获取下一个使用块流程 3. 代码详解3.1 Block 结构体3.2 RingBuffer 类3.3 主要方法解析append 方法currentUsed 和 currentUsing 方法release 方法nextUsed 方法…...
LangChain 创始人万字科普:手把手教你设计 Agent 用户交互
LangChain 可以算是 LLM 时代做 AI 应用开发必备的框架和平台,从模型选择、数据库链接与各种 Agent 搭建等,AI 应用的搭建、运行和管理都可以在 LangChain 上进行。 某种意义上,LangChain 可能是最了解 Agent(智能体)…...
Docker 用例:15 种最常见的 Docker 使用方法
容器化应用程序而不是将它们托管在虚拟机上是过去几年一直流行的概念,使容器管理流行起来。Docker 处于这一转变的核心,帮助组织无缝地采用容器化技术。最近,Docker 用例遍布所有行业,无论规模大小和性质如何。 什么是Docker&…...
若依 RuoYi4.6.0 代码审计
环境布置: 到官网下载源码:https://github.com/yangzongzhuan/RuoYi 采用phpstudy集成数据库,5.7版本。JDK1.8。 IDEA打开项目,等待自动加载,修改application-druid.yml配置文件:数据库名,账…...
测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...
云计算——弹性云计算器(ECS)
弹性云服务器:ECS 概述 云计算重构了ICT系统,云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台,包含如下主要概念。 ECS(Elastic Cloud Server):即弹性云服务器,是云计算…...
【Oracle APEX开发小技巧12】
有如下需求: 有一个问题反馈页面,要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据,方便管理员及时处理反馈。 我的方法:直接将逻辑写在SQL中,这样可以直接在页面展示 完整代码: SELECTSF.FE…...
以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
1688商品列表API与其他数据源的对接思路
将1688商品列表API与其他数据源对接时,需结合业务场景设计数据流转链路,重点关注数据格式兼容性、接口调用频率控制及数据一致性维护。以下是具体对接思路及关键技术点: 一、核心对接场景与目标 商品数据同步 场景:将1688商品信息…...
如何为服务器生成TLS证书
TLS(Transport Layer Security)证书是确保网络通信安全的重要手段,它通过加密技术保护传输的数据不被窃听和篡改。在服务器上配置TLS证书,可以使用户通过HTTPS协议安全地访问您的网站。本文将详细介绍如何在服务器上生成一个TLS证…...
新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
C++.OpenGL (10/64)基础光照(Basic Lighting)
基础光照(Basic Lighting) 冯氏光照模型(Phong Lighting Model) #mermaid-svg-GLdskXwWINxNGHso {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-GLdskXwWINxNGHso .error-icon{fill:#552222;}#mermaid-svg-GLd…...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...
