当前位置: 首页 > news >正文

FlinkCDC 实现 MySQL 数据变更实时同步

文章目录

  • 1、基本介绍
  • 2、代码实战
    • 2.1、数据源准备
    • 2.2、代码实战
    • 2.3、数据格式

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)
    在这里插入图片描述
    对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
    在这里插入图片描述

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATE TABLE `quick_chat_msg` (`id` bigint NOT NULL COMMENT '主键id',`from_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(发送人)',`to_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '账户id(接收人)',`relation_id` varchar(50) CHARACTER SET utf8 COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '发送关联',`content` varchar(500) DEFAULT NULL COMMENT '消息内容',`msg_type` tinyint(1) DEFAULT NULL COMMENT '消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info` varchar(500) DEFAULT NULL COMMENT '额外信息',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`deleted` tinyint(1) DEFAULT NULL COMMENT '删除标识',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
在这里插入图片描述
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone = '+8:00';
SET time_zone = '+8:00';
SHOW VARIABLES LIKE '%time_zone%';

在这里插入图片描述

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency>
</dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class MySinkHandler extends RichSinkFunction<String> {@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println(value);}@Overridepublic void open(Configuration parameters) throws Exception {}@Overridepublic void close() throws Exception {}
}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MySqlSourceExample {@PostConstructpublic void init() throws Exception {// 配置监听数据源MySqlSource<String> source = MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(new JsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new MySinkHandler());env.execute();}
}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
在这里插入图片描述

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增
{"before": null,"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135279000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2452,"row": 0,"thread": null,"query": null},"op": "c","ts_ms": 1729135278633,"transaction": null
}
# 修改
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135289000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 2825,"row": 0,"thread": null,"query": null},"op": "u","ts_ms": 1729135288473,"transaction": null
}
# 删除
{"before": {"id": 3,"from_id": "dog","to_id": "cat","relation_id": "dog:cat","content": "你好啊,小猫咪","msg_type": 1,"extra_info": null,"create_time": 1729164075000,"deleted": 0},"after": null,"source": {"version": "1.6.4.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 1729135301000,"snapshot": "false","db": "quick_chat","sequence": null,"table": "quick_chat_msg","server_id": 1,"gtid": null,"file": "binlog.000002","pos": 3247,"row": 0,"thread": null,"query": null},"op": "d","ts_ms": 1729135300692,"transaction": null
}

相关文章:

FlinkCDC 实现 MySQL 数据变更实时同步

文章目录 1、基本介绍2、代码实战2.1、数据源准备2.2、代码实战2.3、数据格式 1、基本介绍 Flink CDC 是 Apache Flink 提供的一个功能强大的组件&#xff0c;用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库&#xff08;如MySQL、PostgreSQL、Oracle、MongoDB…...

JavaWeb——Maven(4/8):Maven坐标,idea集成-导入maven项目(两种方式)

目录 Maven坐标 导入Maven项目 第一种方式 第二种方式 Maven坐标 Maven 坐标 是 Maven 当中资源的唯一标识。通过这个坐标&#xff0c;我们就能够唯一定位资源的位置。 Maven 坐标主要用在两个地方。第一个地方&#xff1a;我们可以使用坐标来定义项目。第二个地方&#…...

实现uniapp天地图边界范围覆盖

在uniapp中&#xff0c;难免会遇到使用地图展示的功能&#xff0c;但是百度谷歌这些收费的显然对于大部分开源节流的开发者是不愿意接受的&#xff0c;所以天地图则是最佳选择。 此篇文章&#xff0c;详细的实现地图展示功能&#xff0c;并且可以自定义容器宽高&#xff0c;还可…...

思科网络设备命令

一、交换机巡检命令 接口和流量状态 show interface stats&#xff1a;查看所有接口当前流量。show interface summary&#xff1a;查看所有接口当前状态和流量。show interface status&#xff1a;查看接口状态及可能的错误。show interface | include errors | FastEthernet …...

Egg.js使用ejs快速自动生成resetful风格的CRUD接口

目前的插件能够自动生成egg的crud的都不太好用 我们自己写一个吧 ejs模块 也方便定制 安装依赖 npm install ejs --save ejs 是一个简单易用的模板引擎&#xff0c;常用于 Node.js 应用程序中 在项目根目录下创建 template/controller.ejs 模板文件 use strict;const Co…...

自动化抖音点赞取消脚本批量处理

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…...

基于YOLOv8深度学习的智能车牌检测与识别系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

背景及意义 智能车牌检测与识别系统通过使用最新的YOLOv8与PaddleOCR算法能够迅速、准确地在多种环境下实现实时车牌的检测和识别。本文基于YOLOv8深度学习框架&#xff0c;通过16770张图片&#xff0c;训练了一个进行车牌检测模型,可以检测蓝牌与绿牌,然后对检测到的车牌使用O…...

qt QGraphicsGridLayout详解

一、概述 QGraphicsGridLayout是Qt框架中用于在QGraphicsScene中布置图形项的一个布局管理器。它类似于QWidget中的QGridLayout&#xff0c;但主要处理的是QGraphicsItem和QGraphicsWidget等图形项。通过合理设置网格位置、伸缩因子和尺寸&#xff0c;可以实现复杂而灵活的布局…...

数字处理系列

&#xff08;1&#xff09;将数字转化成中文的过滤器 <template><div><p>数字转中文&#xff1a;{{ 110 | numberToChinese }}</p></div></template><script>export default {filters: {numberToChinese(num) {const chineseNums …...

基于开源Jetlinks物联网平台协议包-MQTT自定义主题数据的编解码

目录 前言 1.下载官方协议包 2.解压 3.自定义主题 4.重写解码方法 5.以下是我解析后接收到的数据 前言 最近这段时间&#xff0c;一直在用开源的Jetlinks物联网平台在学习&#xff0c;偶尔有一次机会接触到物联网设备对接&#xff0c;在协议对接的时候&#xff0c;遇到了…...

【Python】Python2.7升级Python3

需求背景 服务是跑在docker的容器里的&#xff0c;因此要新建image依赖环境是Ubuntu&#xff0c;老的是16.4。 步骤 先准备环境&#xff0c;因为只有你的环境上去了&#xff0c;运行代码的时候才会报错&#xff0c;这样才会把需要改的代码暴露出来。 python3.5目前也是被遗弃的…...

Python 内置函数 round() 详解

在 Python 编程中&#xff0c;round() 函数是一个非常实用的内置函数&#xff0c;用于对数字进行四舍五入。无论是在数据处理、财务计算还是科学计算中&#xff0c;round() 函数都能帮助我们得到所需的精确值。本文将详细介绍 round() 函数的用法和注意事项。 1. round() 函数…...

JavaScript入门中-流程控制语句

本文转载自&#xff1a;https://fangcaicoding.cn/article/52 大家好&#xff01;我是方才&#xff0c;目前是8人后端研发团队的负责人&#xff0c;拥有6年后端经验&3年团队管理经验&#xff0c;截止目前面试过近200位候选人&#xff0c;主导过单表上10亿、累计上100亿数据…...

kconfig语法(一)

一、安装Kconfiglib python -m pip install windows-curses python -m pip install kconfiglib二、使用样例 ①创建kconfig文件。 ②在kconfig文件添加内容: config KCONFIG_DEMO_ITEM1boolprompt "demonstate item1 for bool learning"config KCONFIG_DEMO_ITE…...

十七、行为型(命令模式)

命令模式&#xff08;Command Pattern&#xff09; 概念 命令模式是一种行为型设计模式&#xff0c;它将请求封装成一个对象&#xff0c;从而使您可以使用不同的请求对客户进行参数化&#xff0c;排队请求&#xff0c;以及支持可撤销操作。通过这种模式&#xff0c;调用操作的…...

原材料供应商的GRS认证证书过期了怎么办?

在全球纺织和时尚产业中&#xff0c;GRS&#xff08;Global Recycle Standard&#xff0c;全球再生标准&#xff09;认证已成为衡量企业环保和可持续发展的重要指标。然而&#xff0c;当原材料供应商的GRS认证证书过期时&#xff0c;企业需迅速采取行动&#xff0c;以确保供应链…...

C++编程:实现一个基于原始指针的环形缓冲区(RingBuffer)缓存串口数据

文章目录 0. 引言1. 使用示例2. 流程图2.1 追加数据流程2.2 获取空闲块流程2.3 处理特殊字符流程2.4 释放块流程2.5 获取下一个使用块流程 3. 代码详解3.1 Block 结构体3.2 RingBuffer 类3.3 主要方法解析append 方法currentUsed 和 currentUsing 方法release 方法nextUsed 方法…...

LangChain 创始人万字科普:手把手教你设计 Agent 用户交互

LangChain 可以算是 LLM 时代做 AI 应用开发必备的框架和平台&#xff0c;从模型选择、数据库链接与各种 Agent 搭建等&#xff0c;AI 应用的搭建、运行和管理都可以在 LangChain 上进行。 某种意义上&#xff0c;LangChain 可能是最了解 Agent&#xff08;智能体&#xff09;…...

Docker 用例:15 种最常见的 Docker 使用方法

容器化应用程序而不是将它们托管在虚拟机上是过去几年一直流行的概念&#xff0c;使容器管理流行起来。Docker 处于这一转变的核心&#xff0c;帮助组织无缝地采用容器化技术。最近&#xff0c;Docker 用例遍布所有行业&#xff0c;无论规模大小和性质如何。 什么是Docker&…...

若依 RuoYi4.6.0 代码审计

环境布置&#xff1a; 到官网下载源码&#xff1a;https://github.com/yangzongzhuan/RuoYi 采用phpstudy集成数据库&#xff0c;5.7版本。JDK1.8。 IDEA打开项目&#xff0c;等待自动加载&#xff0c;修改application-druid.yml配置文件&#xff1a;数据库名&#xff0c;账…...

中南大学无人机智能体的全面评估!BEDI:用于评估无人机上具身智能体的综合性基准测试

作者&#xff1a;Mingning Guo, Mengwei Wu, Jiarun He, Shaoxian Li, Haifeng Li, Chao Tao单位&#xff1a;中南大学地球科学与信息物理学院论文标题&#xff1a;BEDI: A Comprehensive Benchmark for Evaluating Embodied Agents on UAVs论文链接&#xff1a;https://arxiv.…...

循环冗余码校验CRC码 算法步骤+详细实例计算

通信过程&#xff1a;&#xff08;白话解释&#xff09; 我们将原始待发送的消息称为 M M M&#xff0c;依据发送接收消息双方约定的生成多项式 G ( x ) G(x) G(x)&#xff08;意思就是 G &#xff08; x ) G&#xff08;x) G&#xff08;x) 是已知的&#xff09;&#xff0…...

【项目实战】通过多模态+LangGraph实现PPT生成助手

PPT自动生成系统 基于LangGraph的PPT自动生成系统&#xff0c;可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析&#xff1a;自动解析Markdown文档结构PPT模板分析&#xff1a;分析PPT模板的布局和风格智能布局决策&#xff1a;匹配内容与合适的PPT布局自动…...

基于数字孪生的水厂可视化平台建设:架构与实践

分享大纲&#xff1a; 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年&#xff0c;数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段&#xff0c;基于数字孪生的水厂可视化平台的…...

DBAPI如何优雅的获取单条数据

API如何优雅的获取单条数据 案例一 对于查询类API&#xff0c;查询的是单条数据&#xff0c;比如根据主键ID查询用户信息&#xff0c;sql如下&#xff1a; select id, name, age from user where id #{id}API默认返回的数据格式是多条的&#xff0c;如下&#xff1a; {&qu…...

tree 树组件大数据卡顿问题优化

问题背景 项目中有用到树组件用来做文件目录&#xff0c;但是由于这个树组件的节点越来越多&#xff0c;导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多&#xff0c;导致的浏览器卡顿&#xff0c;这里很明显就需要用到虚拟列表的技术&…...

【Go语言基础【13】】函数、闭包、方法

文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数&#xff08;函数作为参数、返回值&#xff09; 三、匿名函数与闭包1. 匿名函数&#xff08;Lambda函…...

音视频——I2S 协议详解

I2S 协议详解 I2S (Inter-IC Sound) 协议是一种串行总线协议&#xff0c;专门用于在数字音频设备之间传输数字音频数据。它由飞利浦&#xff08;Philips&#xff09;公司开发&#xff0c;以其简单、高效和广泛的兼容性而闻名。 1. 信号线 I2S 协议通常使用三根或四根信号线&a…...

Selenium常用函数介绍

目录 一&#xff0c;元素定位 1.1 cssSeector 1.2 xpath 二&#xff0c;操作测试对象 三&#xff0c;窗口 3.1 案例 3.2 窗口切换 3.3 窗口大小 3.4 屏幕截图 3.5 关闭窗口 四&#xff0c;弹窗 五&#xff0c;等待 六&#xff0c;导航 七&#xff0c;文件上传 …...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?

在工业自动化持续演进的今天&#xff0c;通信网络的角色正变得愈发关键。 2025年6月6日&#xff0c;为期三天的华南国际工业博览会在深圳国际会展中心&#xff08;宝安&#xff09;圆满落幕。作为国内工业通信领域的技术型企业&#xff0c;光路科技&#xff08;Fiberroad&…...