flink-connector-mysql-cdc:01 mysql-cdc础配置代码演示
flink-connector-mysql-cdc:
- 01 mysql-cdc基础配置代码演示
- 02 mysql-cdc高级扩展
- 03 mysql-cdc常见问题汇总
- 04 mysql-cdc-kafka生产级代码分享
- 05 flink-kafka-doris生产级代码分享
- 06 flink-kafka-hudi生产级代码分享
flink-cdc版本:3.2.0
flink版本:flink-1.18.0
mysql版本:8.0.26
java版本:1.8
maven版本:3.8.4
目录
1. Mysql数据库设置
1.1 开启binlog日志
1.2 创建用户
1.3 准备测试数据
2. 编写测试代码
2.1 maven 依赖
2.2 测试代码
3. mysql-cdc扩展
3.1 时区设置
3.2 为每个读取器设置不同的 SERVER ID
1. Mysql数据库设置
1.1 开启binlog日志
编辑 MySQL 配置文件
-
在 Unix/Linux 系统中,通常是
/etc/my.cnf或/etc/mysql/my.cnf。 -
在 Windows 上,可能位于
C:\ProgramData\MySQL\MySQL Server X.Y\my.ini。
# 在 mysqld 部分下添加以下内容(如果已经存在,请确认其值):
[mysqld]
log-bin=mysql-bin # 二进制日志文件前缀,MySQL将生成名为 mysql-bin.000001, mysql-bin.000002 等的文件。
binlog-format=row # 设置二进制日志格式为行级(row),可选值为 STATEMENT、ROW 和 MIXED;这里推荐使用行级。
expire_logs_days=7 # 设置二进制日志的过期时间,单位为天,超过这个天数后的日志将被自动删除,这里以 7 天为例。
max-binlog-size=100M # 设置单个二进制日志文件的最大大小,超出后将自动创建一个新的日志文件(可以根据需要调整)。
1.2 创建用户
以 “flinkcdc”用户为例
# 创建 MySQL 用户:
CREATE USER 'flinkcdc'@'localhost' IDENTIFIED BY '123456';
# 向用户授予所需的权限:
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY '123456';
# 授权所有权限
GRANT ALL PRIVILEGES ON *.* TO 'flinkcdc'@'localhost';
GRANT ALL PRIVILEGES ON *.* TO 'flinkcdc'@'%';
# 完成用户的权限:
FLUSH PRIVILEGES;
1.3 准备测试数据
# 使用flinkcdc用户登录数据库# 创建测试数据库
create database cdc_demo;
# 创建测试表
CREATE TABLE cdc_demo.flink_cdc_test (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50) NOT NULL,description TEXT,age INT,balance DECIMAL(10, 2),is_active BOOLEAN DEFAULT TRUE,created_at DATETIME DEFAULT CURRENT_TIMESTAMP,birth_date DATE,long_value BIGINT,last_login TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
# 插入测试数据
INSERT INTO cdc_demo.flink_cdc_test (name, description, age, balance, is_active, created_at, birth_date, long_value, last_login) VALUES
('Alice Smith', 'Alice is a software engineer with 5 years of experience.', 30, 2500.50, TRUE, '2023-01-01 10:00:00', '1992-05-15', 12345678901234, '2023-05-20 10:00:00'),
('Bob Johnson', 'Bob enjoys hiking and outdoor activities.', 25, 1500.00, TRUE, '2023-02-15 12:30:00', '1998-08-22', 987654321054321, '2023-05-18 14:00:00'),
('Charlie Brown', 'Charlie is an avid reader and coffee lover.', 35, 3200.75, FALSE, '2023-03-22 14:45:00', '1988-01-11', 135792468012345, '2023-05-19 09:20:00'),
('Daisy Miller', 'Daisy loves painting and traveling.', 28, 1800.25, TRUE, '2023-04-05 09:15:00', '1994-11-03', 24681357901234, '2023-05-21 12:30:00'),
('Ethan White', 'Ethan enjoys playing guitar and writing songs.', 40, 5000.00, TRUE, '2023-05-18 16:20:00', '1983-07-30', 98765432102468, '2023-05-22 15:00:00');
2. 编写测试代码
2.1 maven 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.toroidal</groupId><artifactId>flink-connector-mysql-cdc-demo</artifactId><name>flink-connector-mysql-cdc-demo</name><version>1.0-SNAPSHOT</version><repositories><repository><id>aliyunmaven</id><name>阿里云公共仓库</name><url>https://maven.aliyun.com/repository/public/</url></repository><repository><id>mirrorId</id><name>Human Readable Name for this Mirror.</name><url>http://my.repository.com/repo/path</url></repository></repositories><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.18.0</flink.version><scala.binary.version>2.12</scala.binary.version><flinkcdc.version>3.2.0</flinkcdc.version><mysql.version>8.0.26</mysql.version><log4j.version>2.17.1</log4j.version><lombok.version>1.18.24</lombok.version><fastjson.version>1.2.83</fastjson.version></properties><dependencies><!-- flink Dependency --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><!-- mysql-cdc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flinkcdc.version}</version></dependency><!-- mysql --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- log --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.9.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><!--zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- 设置jar包的入口类(可选) --><mainClass>com.toroidal.mysql.MysqlCdcStreamApp</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
</project>
2.2 测试代码
package com.toroidal.mysql;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;/*** @Author Toroidal* @Date 2024/12/04 14:42* @Version 1.0*/
public class MysqlCdcStreamApp {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkcdc").password("123456")// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc_demo")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc_demo.flink_cdc_test")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(4).print().setParallelism(1);env.execute("Print MySQL Snapshot + Binlog");}
}
运行结果

3. mysql-cdc扩展
3.1 时区设置
mysql-cdc读取出来的 timestamp 字段时区相差8小时,将时区和MySQL服务器时区设置一致即可:
查询当前数据库时区:
SELECT * FROM mysql.time_zone_name;
设置时区为东八时区
.serverTimeZone("Asia/Shanghai")
MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkcdc").password("123456")// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc_demo")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc_demo.flink_cdc_test").serverTimeZone("Asia/Shanghai")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();
3.2 为每个读取器设置不同的 SERVER ID
每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 ID,称为 server id。MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。因此,如果不同的作业共享相同的服务器 ID,则可能会导致从错误的 binlog 位置读取。
.serverId("flink-cdc-01")
MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306)// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc.flink_cdc_test").username("flinkcdc").serverTimeZone("Asia/Shanghai").serverId("flink-cdc-01").password("123456")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();
相关文章:
flink-connector-mysql-cdc:01 mysql-cdc础配置代码演示
flink-connector-mysql-cdc: 01 mysql-cdc基础配置代码演示02 mysql-cdc高级扩展03 mysql-cdc常见问题汇总04 mysql-cdc-kafka生产级代码分享05 flink-kafka-doris生产级代码分享06 flink-kafka-hudi生产级代码分享 flink-cdc版本:3.2.0 flink版本&…...
java计算机毕设课设—进销存管理系统(附源码、文章、相关截图、部署视频)
这是什么系统? 资源获取方式再最下方 java计算机毕设课设—进销存管理系统(附源码、文章、相关截图、部署视频) 一、项目简介 项目名称: 基于Java的进销存管理系统 开发背景: 在现代企业管理中,库存管理是核心环节之一&#…...
鸿蒙UI开发——渐变色效果
1、概 述 ArkTs可以通过颜色渐变接口,设置组件的背景颜色渐变效果,实现在两个或多个指定的颜色之间进行平稳的过渡。 目前提供三种渐变类型:线性渐变、角度渐变、径向渐变。 我们在鸿蒙UI布局实战 —— 个人中心页面开发中,默认…...
嵌入式硬件设计 — 智能设备背后的隐形架构大师
目录 引言 一、嵌入式硬件设计概述 (一)需求分析 (二)硬件选型 (三)电路设计 (四)PCB 制作与焊接 (五)硬件调试与测试 (六)软…...
QNX的系统资源访问机制
资料参考: QNX官网文档 在QNX中,一些系统的资源默认是无法访问的,或者可访问的范围过大,导致产生不可控的危险,此时便需要对系统资源进行访问限制 接口如下 #include <sys/rsrcdbmgr.h> #include <sys/rsrcdbmsg.h>int rsrcdbmgr_create(...
高校数字化运营平台解决方案:构建统一的服务大厅、业务平台、办公平台,助力打造智慧校园
教育数字化是建设教育强国的重要基础,利用技术和数据助推高校管理转型,从而更好地支撑教学业务开展。 近年来,国家多次发布政策,驱动教育行业的数字化转型。《“十四五”国家信息化规划》,推进信息技术、智能技术与教育…...
多模态大型语言模型MM-1.5采用数据驱动的方法,通过不断优化数据组合提高模型性能
多模态大型语言模型MM-1.5采用数据驱动的方法,通过不断优化数据组合提高模型性能 MM-1.5模型的设计核心在于其数据驱动的方法,这意味着模型的性能在很大程度上取决于所使用的数据类型和组合。这种方法的实施细节可以从以下几个方面来展开: …...
16 设计模式之适配器模式(充电器转换案例)
一、适配器模式的定义 适配器模式(Adapter Pattern)是一种结构型设计模式,常用于解决接口不兼容的问题。适配器模式通过引入一个“适配器”类,将一个接口转化为客户端期望的另一种接口,使得原本因接口不兼容而无法交互…...
基于Java Springboot在线招聘APP且微信小程序
一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA/eclipse 微信…...
多组学数据如何发表高分SCI论文,以RNA-Seq数据为例
随着高通量测序以及生物信息学的发展,R语言在生物大数据分析以及数据挖掘中发挥着越来越重要的作用。想要成为一名优秀的生物数据分析者与科研团队不可或缺的人才,除了掌握对生物大数据挖掘与分析技能之外,还要具备一定的统计分析能力与SCI论…...
Qt Designer Ui设计 功能增加
效果展示 输入密码,密码错误,弹出提示 密码正确,弹出提示并且关闭原窗口 代码(只提供重要关键主代码)lxh_log.py代码: import sysfrom PySide6.QtWidgets import QApplication, QWidget, QPushButtonfrom …...
【Android学习】2024最新版Android Studio安装与配置
准备工作 Windows系统的要求 一、下载 Android Studio官网:https://developer.android.google.cn/studio?hlen 今天是2024年9月27日,Android Studio已经更新到了Koala版本 直接下载 二、安装 笔者当前环境变量中配置的JDK版本为1.8 双击.exe文件运行…...
RabbitMQ延时队列
RabbitMQ延时队列 什么是延时队列 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 应用场景 场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如…...
a8204 基于微信小程序的音乐播放器微信小程序的研究与实现 服务器端Java+Mysql+Servlet 文档 源码
音乐播放微信小程序 1.项目描述2. 绪论3.项目功能4.界面展示5.源码获取 1.项目描述 随着科技的发展,手机在我们生活中起到了重要的作用。软件作为手机重要的一部分,用户体验显得尤为重要。微信小程序一起操作便捷、用户基数大、分享便利、既用即走等特点…...
游戏新纪元:用栈记录数据,轻松实现悔棋功能
游戏介绍 嘿,各位游戏爱好者们!今天我要给大家介绍一款颠覆传统、创新十足的游戏项目。这款游戏不仅让你沉浸在紧张刺激的游戏世界中,还引入了前所未有的两大特色功能:记录游戏数据和轻松实现悔棋。 首先,让我们来聊…...
C/C++基础知识复习(36)
函数重载是指在同一作用域内,定义多个同名但参数列表不同的函数。通过函数重载,程序员可以使用相同的函数名称处理不同类型或数量的参数,而不需要为每种情况创建不同的函数名称。编译器根据函数调用时传递的参数类型和数量来决定调用哪个版本…...
JAVA |日常开发中连接Sqlite数据库详解
JAVA |日常开发中连接Sqlite数据库详解 前言一、SQLite 数据库概述1.1 定义与特点1.2 适用场景 二、Java 连接 SQLite 数据库的准备工作2.1 添加 SQLite JDBC 驱动依赖2.2 了解 JDBC 基础概念 三、建立数据库连接3.1 代码示例3.2 步骤解析 四、执行 SQL 语句4.1 创建…...
Java项目实战II基于微信小程序的消防隐患在线举报系统(开发文档+数据库+源码)
目录 一、前言 二、技术介绍 三、系统实现 四、核心代码 五、源码获取 全栈码农以及毕业设计实战开发,CSDN平台Java领域新星创作者,专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 随着城市化进程的加快&…...
python编程Day12-属性和方法的分类
私有和公有 在python中 定义类的时候,可以给 属性和方法设置 访问权限,即规定在什么地方可以使用。 权限一般分为两种:公有权限、私有权限 公有权限 定义:直接定义的属性和方法就是公有的特点: 可以在任何地方访问和使…...
【unity小技巧】在 Unity 中,Application获取各种文件路径或访问不同类型的存储路径
文章目录 前言1. **Application.persistentDataPath**2. **Application.dataPath**3. **Application.streamingAssetsPath**4. **Application.temporaryCachePath**5. **Application.consoleLogPath**6. **Application.userDataPath**7. **Application.streamingAssetsPath 与 …...
如何用Sunshine打造个人游戏串流中心:跨设备畅玩的终极指南
如何用Sunshine打造个人游戏串流中心:跨设备畅玩的终极指南 【免费下载链接】Sunshine Sunshine: Sunshine是一个自托管的游戏流媒体服务器,支持通过Moonlight在各种设备上进行低延迟的游戏串流。 项目地址: https://gitcode.com/GitHub_Trending/su/S…...
从‘偏差-方差’到一行代码:用NumPy/PyTorch五步实现GAE,附PPO实战避坑点
从‘偏差-方差’到一行代码:用NumPy/PyTorch五步实现GAE,附PPO实战避坑点 强化学习中的策略优化常常面临一个核心挑战:如何准确评估动作的价值?广义优势估计(GAE)通过巧妙平衡偏差与方差,成为PP…...
Phi-4-Reasoning-Vision基础教程:双卡4090环境安装、镜像拉取与端口映射
Phi-4-Reasoning-Vision基础教程:双卡4090环境安装、镜像拉取与端口映射 1. 环境准备与快速部署 在开始之前,请确保您的系统满足以下要求: 硬件配置:至少两张NVIDIA RTX 4090显卡(24GB显存)软件环境&…...
IPD实战指南:CBB模块化设计如何加速产品创新与资源整合
1. CBB模块化设计的本质与价值 第一次接触CBB这个概念时,我正负责一款智能家居产品的研发。当时团队为了赶进度,每个新产品都从零开始设计电路板,结果发现80%的功能模块都是重复的。这种低效的开发方式让我开始思考:能不能像搭积木…...
开发提效新组合:用Cursor生成代码片段,在快马一键集成与部署
最近在做一个数据整理的小工具时,发现了一个特别高效的工作流组合:先用Cursor快速生成核心代码片段,再用InsCode(快马)平台一键整合部署。整个过程就像搭积木一样顺畅,特别适合需要快速实现功能模块的场景。 需求分析 我们经常要处…...
LazyLLM架构设计揭秘:低代码如何支撑复杂多Agent系统
LazyLLM架构设计揭秘:低代码如何支撑复杂多Agent系统 【免费下载链接】LazyLLM 项目地址: https://gitcode.com/gh_mirrors/la/LazyLLM 在当今AI应用开发领域,构建复杂的多Agent系统往往需要大量的工程投入和专业知识。然而,LazyLLM框…...
2026年上海网站建设市场分析:企业官网从展示到增长的演进路径
2026年,上海企业数字化服务市场迎来结构性变革。据2026年上半年上海企业数字化服务市场调研数据显示,上海地区企业官网新建与升级需求同比增长45%,中大型企业对官网的核心诉求已从基础信息展示转向AI智能赋能、全球化跨境适配、全链路营销转化…...
3步轻松上手BepInEx:Unity插件框架新手必备指南
3步轻松上手BepInEx:Unity插件框架新手必备指南 【免费下载链接】BepInEx Unity / XNA game patcher and plugin framework 项目地址: https://gitcode.com/GitHub_Trending/be/BepInEx BepInEx是一款专为Unity游戏设计的插件框架,能帮助开发者轻…...
Virtuoso ADE仿真避坑指南:你的时钟占空比测对了吗?详解dutyCycle函数threshold参数设置
Virtuoso ADE仿真避坑指南:时钟占空比测量的关键参数解析 在模拟电路设计中,时钟信号的占空比精度往往直接影响系统性能。许多工程师虽然熟悉Virtuoso ADE的基础操作,却在自动测量占空比时遭遇"数据看起来合理但实际存在偏差"的困境…...
AI巨头集体“铸Token”:从ChatGPT到“数字员工工厂”,程序员的狂欢还是危机?
想象一下:你早上醒来,打开电脑,不是自己敲代码,而是对着一只“龙虾”说:“帮我把昨天的Bug修了,顺便给老板发份周报。” 这不是科幻——2026年3月,这事儿正在发生。 全球头部科技公司突然集体“…...
