当前位置: 首页 > news >正文

Flink CDC实时同步mysql数据

官方参考资料:

https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/

Apache Flink 的 Change Data Capture (CDC) 是一种用于捕获数据库变化(如插入、更新和删除操作)的技术。Flink CDC Connector 允许你使用 Flink 从 MySQL 等数据库中读取变化数据,并处理这些流式数据。以下是如何在 Flink 中配置和使用 CDC Connector 读取 MySQL 数据的步骤:

前提条件

  1. MySQL 数据库:确保你已经有一个 MySQL 数据库,并且知道数据库的连接信息(如主机名、端口、用户名、密码、数据库名)。
  2. Flink 环境:你需要在本地或集群上配置好 Flink 环境。
  3. MySQL Binlog:确保 MySQL 数据库启用了 Binlog(Binary Logging),因为 Flink CDC 依赖于 Binlog 来捕获数据变化。

支持的数据库

依赖

Maven dependency

<dependency>

   <groupId>org.apache.flink</groupId>

   <artifactId>flink-connector-mysql-cdc</artifactId>

   <!--  请使用已发布的版本依赖,snapshot 版本的依赖需要本地自行编译。 -->

   <version>3.3-SNAPSHOT</version>

</dependency>

SQL Client JAR

下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。

下载 flink-sql-connector-mysql-cdc-3.3-SNAPSHOT.jar 到 <FLINK_HOME>/lib/ 目录下。

由于 MySQL Connector 采用的 GPLv2 协议与 Flink CDC 项目不兼容,我们无法在 jar 包中提供 MySQL 连接器。 您可能需要手动配置以下依赖:

配置 MySQL Binlog

修改MySQL 配置文件(my.cnf或my.ini):

[mysqld]

server-id = 1

log-bin = mysql-bin

binlog-format = ROW

配置 MySQL 服务器

先创建一个 MySQL 用户,并授权。

  1. 创建 MySQL 用户:

mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';

  1. 向用户授予所需的权限:

mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

注意: 在 scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

  1. 刷新用户权限:

mysql> FLUSH PRIVILEGES;

创建 MySQL CDC 表

MySQL CDC 表可以定义如下:

-- 每 3 秒做一次 checkpoint,用于测试,生产配置建议5到10分钟                     

Flink SQL> SET 'execution.checkpointing.interval' = '3s';  

-- 在 Flink SQL中注册 MySQL 表 'orders'

Flink SQL> CREATE TABLE orders (

     order_id INT,

     order_date TIMESTAMP(0),

     customer_name STRING,

     price DECIMAL(10, 5),

     product_id INT,

     order_status BOOLEAN,

     PRIMARY KEY(order_id) NOT ENFORCED

     ) WITH (

     'connector' = 'mysql-cdc',

     'hostname' = 'localhost',

     'port' = '3306',

     'username' = 'root',

     'password' = '123456',

     'database-name' = 'mydb',

     'table-name' = 'orders');

-- 从订单表读取全量数据(快照)和增量数据(binlog)

Flink SQL> SELECT * FROM orders;

关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

  1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。
  2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:
  • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
  • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

相关文章:

Flink CDC实时同步mysql数据

官方参考资料&#xff1a; https://nightlies.apache.org/flink/flink-cdc-docs-master/zh/docs/connectors/flink-sources/mysql-cdc/ Apache Flink 的 Change Data Capture (CDC) 是一种用于捕获数据库变化&#xff08;如插入、更新和删除操作&#xff09;的技术。Flink CDC…...

题解 - 自然数无序拆分

题目描述 美羊羊给喜羊羊和沸羊羊出了一道难题&#xff0c;说谁能先做出来&#xff0c;我就奖励给他我自己做的一样礼物。沸羊羊这下可乐了&#xff0c;于是马上答应立刻做出来&#xff0c;喜羊羊见状&#xff0c;当然也不甘示弱&#xff0c;向沸羊羊发起了挑战。 可是这道题目…...

dfs_bool_void 两种写法感悟

dfs 的两种写法 在看之前实现图的遍历 dfs 和拓扑排序 dfs 实现的代码的时候的感悟 图的遍历 dfs 和拓扑排序 dfs 的区别 0 → 1 ↓ ↓ 2 → 3图的邻接表表示&#xff1a; adjList[0] {1, 2}; adjList[1] {3}; adjList[2] {3}; adjList[3] {};正常的 DFS 遍历&#x…...

MySQL 主从复制与 Binlog 深度解析

目录 1. Binlog的工作原理与配置2. 主从复制的设置与故障排除3. 数据一致性与同步延迟的处理 小结 MySQL的binlog&#xff08;二进制日志&#xff09;和主从复制是实现数据备份、容灾、负载均衡以及数据同步的重要机制。在高可用性架构和分布式数据库设计中&#xff0c;binlog同…...

大连理工大学《2024年845自动控制原理真题》 (完整版)

本文内容&#xff0c;全部选自自动化考研联盟的&#xff1a;《大连理工大学845自控考研资料》的真题篇。后续会持续更新更多学校&#xff0c;更多年份的真题&#xff0c;记得关注哦 目录 2024年真题 Part1&#xff1a;2024年完整版真题 2024年真题...

Java性能调优 - 多线程性能调优

锁优化 Synchronized 在JDK1.6中引入了分级锁机制来优化Synchronized。当一个线程获取锁时 首先对象锁将成为一个偏向锁&#xff0c;这样做是为了优化同一线程重复获取锁&#xff0c;导致的用户态与内核态的切换问题&#xff1b;其次如果有多个线程竞争锁资源&#xff0c;锁…...

行为树详解(4)——节点参数配置化

【分析】 行为树是否足够灵活强大依赖于足够丰富的各类条件节点和动作节点&#xff0c;在实现这些节点时&#xff0c;不可避免的&#xff0c;节点本身需要有一些参数供配置。 这些参数可以分为静态的固定值的参数以及动态读取设置的参数。 静态参数直接设置为Public即可&…...

计算机网络中的三大交换技术详解与实现

目录 计算机网络中的三大交换技术详解与实现1. 计算机网络中的交换技术概述1.1 交换技术的意义1.2 三大交换技术简介 2. 电路交换技术2.1 理论介绍2.2 Python实现及代码详解2.3 案例分析 3. 分组交换技术3.1 理论介绍3.2 Python实现及代码详解3.3 案例分析 4. 报文交换技术4.1 …...

《杨辉三角》

题目描述 给出 n(1≤n≤20)n(1≤n≤20)&#xff0c;输出杨辉三角的前 nn 行。 如果你不知道什么是杨辉三角&#xff0c;可以观察样例找找规律。 输入格式 无 输出格式 无 输入输出样例 输入 #1复制 6 输出 #1复制 1 1 1 1 2 1 1 3 3 1 1 4 6 4 1 1 5 10 10 5 1 C语言…...

ARM学习(35)单元测试框架以及MinGW GCC覆盖率报告

单元测试框架以及MinGW GCC覆盖率报告 1、单元测试与覆盖率简介 随着代码越写越多,越来越需要注意自测的重要性,基本可以提前解决90%的问题,所以就来介绍一下单元测试,单元测试是否测试充分,需要进行评价,覆盖率就是单元测试是否充分的评估工具。 例如跑过单元测试后,…...

边缘计算+人工智能:让设备更聪明的秘密

引言&#xff1a;日常生活中的“智能”设备 你是否发现&#xff0c;身边的设备正变得越来越“聪明”&#xff1f; 早上醒来时&#xff0c;智能音箱已经根据你的日程播放舒缓音乐&#xff1b;走进厨房&#xff0c;智能冰箱提醒你今天的食材库存&#xff1b;而在城市道路上&…...

neo4j知识图谱AOPC的安装方法

AOPC下载链接&#xff1a;aopc全版本github下载 APOC&#xff0c;全称为Awesome Procedures On Cypher&#xff0c;是Neo4j图数据库的一个非常强大和流行的扩展库。它极大地丰富了Cypher查询语言的功能&#xff0c;提供了超过450个过程&#xff08;procedures&#xff09;和函数…...

图像分割数据集植物图像叶片健康状态分割数据集labelme格式180张3类别

数据集格式&#xff1a;labelme格式(不包含mask文件&#xff0c;仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数)&#xff1a;180 标注数量(json文件个数)&#xff1a;180 标注类别数&#xff1a;3 标注类别名称:["Healthy","nitrogen deficiency"…...

Python学习(二)—— 基础语法(上)

目录 一&#xff0c;表达式和常量和变量 1.1 表达式 1.2 变量 1.3 动态类型特性 1.4 输入 二&#xff0c;运算符 2.1 算术运算符 2.2 关系运算符 2.3 逻辑运算符 2.4 赋值运算符 2.5 练习 三&#xff0c;语句 3.1 条件语句 3.2 while循环 3.3 for循环 四&#…...

Cesium-(Primitive)-(CircleOutlineGeometry)

CircleOutlineGeometry 效果: CircleOutlineGeometry 是 CesiumJS 中的一个类,它用来描述在椭球体上圆的轮廓。以下是 CircleOutlineGeometry 的构造函数属性,以表格形式展示: 属性名类型默认值描述centerCartesian3圆心点在固定坐标系中的坐标。radiusnumber圆的半径,…...

计算机网络技术基础:2.计算机网络的组成

计算机网络从逻辑上可以分为两个子网&#xff1a;资源子网和通信子网。 一、资源子网 资源子网主要负责全网的数据处理业务&#xff0c;为全网用户提供各种网络资源与网络服务。资源子网由主机、终端、各种软件资源与信息资源等组成。 1&#xff09;主机 主机是资源子网的主要…...

EasyExcel使用管道流连接InputStream和OutputStream

前言 Java中的InputSteam 是程序从其中读取数据&#xff0c; OutputSteam是程序可以往里面写入数据。 如果我们有在项目中读取数据库的记录&#xff0c; 在转存成Excel文件, 再把文件转存到OSS中。 生成Excel使用的是阿里的EasyExcel 。 他支持Output的方式写出文件内容。 而…...

OpenWebUI连接不上Ollama模型,Ubuntu24.04

这里写自定义目录标题 问题介绍解决方法 问题介绍 操作系统 Ubuntu24.04Ollama 使用默认安装方法&#xff08;官网https://github.com/ollama/ollama&#xff09; curl -fsSL https://ollama.com/install.sh | sh 安装在本机OpenWebUI 使用默认docker安装方法&#xff08;官网…...

C#C++获取当前应用程序的安装目录和工作目录

很多时候&#xff0c;用户自己点击打开read.exe加载的时候都没有问题&#xff0c;读取ini配置文件也没有问题。但是如果应用程序是开机启动呢&#xff1f;32位Windows系统当前目录是C盘的windows\system32&#xff1b;而64位系统软件启动后默认的当前目录是&#xff1a;C:\Wind…...

Linux中vi和vim的区别详解

文章目录 Linux中vi和vim的区别详解一、引言二、vi和vim的起源与发展三、功能和特性1、语法高亮2、显示行号3、编辑模式4、可视化界面5、功能扩展6、插件支持 四、使用示例1、启动编辑器2、基本操作 五、总结 Linux中vi和vim的区别详解 一、引言 在Linux系统中&#xff0c;vi和…...

网络编程(Modbus进阶)

思维导图 Modbus RTU&#xff08;先学一点理论&#xff09; 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议&#xff0c;由 Modicon 公司&#xff08;现施耐德电气&#xff09;于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

基于Flask实现的医疗保险欺诈识别监测模型

基于Flask实现的医疗保险欺诈识别监测模型 项目截图 项目简介 社会医疗保险是国家通过立法形式强制实施&#xff0c;由雇主和个人按一定比例缴纳保险费&#xff0c;建立社会医疗保险基金&#xff0c;支付雇员医疗费用的一种医疗保险制度&#xff0c; 它是促进社会文明和进步的…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

JUC笔记(上)-复习 涉及死锁 volatile synchronized CAS 原子操作

一、上下文切换 即使单核CPU也可以进行多线程执行代码&#xff0c;CPU会给每个线程分配CPU时间片来实现这个机制。时间片非常短&#xff0c;所以CPU会不断地切换线程执行&#xff0c;从而让我们感觉多个线程是同时执行的。时间片一般是十几毫秒(ms)。通过时间片分配算法执行。…...

C# 求圆面积的程序(Program to find area of a circle)

给定半径r&#xff0c;求圆的面积。圆的面积应精确到小数点后5位。 例子&#xff1a; 输入&#xff1a;r 5 输出&#xff1a;78.53982 解释&#xff1a;由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982&#xff0c;因为我们只保留小数点后 5 位数字。 输…...

深度学习水论文:mamba+图像增强

&#x1f9c0;当前视觉领域对高效长序列建模需求激增&#xff0c;对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模&#xff0c;以及动态计算优势&#xff0c;在图像质量提升和细节恢复方面有难以替代的作用。 &#x1f9c0;因此短时间内&#xff0c;就有不…...

DingDing机器人群消息推送

文章目录 1 新建机器人2 API文档说明3 代码编写 1 新建机器人 点击群设置 下滑到群管理的机器人&#xff0c;点击进入 添加机器人 选择自定义Webhook服务 点击添加 设置安全设置&#xff0c;详见说明文档 成功后&#xff0c;记录Webhook 2 API文档说明 点击设置说明 查看自…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...