当前位置: 首页 > article >正文

10 Flink CDC

10 Flink CDC

  • 1. CDC是什么
  • 2. CDC 的种类
  • 3. 传统CDC与Flink CDC对比
  • 4. Flink-CDC 案例
  • 5. Flink SQL 方式的案例

1. CDC是什么

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

2. CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
在这里插入图片描述

3. 传统CDC与Flink CDC对比

  1. 传统 CDC ETL 分析
    在这里插入图片描述

  2. 基于 Flink CDC 的 ETL 分析
    在这里插入图片描述

  3. 基于 Flink CDC 的聚合分析
    在这里插入图片描述

  4. 基于 Flink CDC 的数据打宽
    在这里插入图片描述

4. Flink-CDC 案例

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors。
示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");//3.创建 Flink-MySQL-CDC 的 Source//initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop01").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会
读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new StringDebeziumDeserializationSchema()).build();//4.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//5.打印数据mysqlDS.print();//6.执行任务env.execute();} 
}

5. Flink SQL 方式的案例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建 Flink-MySQL-CDC 的 SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop01'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'gmall-flink'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}
}

相关文章:

10 Flink CDC

10 Flink CDC 1. CDC是什么2. CDC 的种类3. 传统CDC与Flink CDC对比4. Flink-CDC 案例5. Flink SQL 方式的案例 1. CDC是什么 CDC 是 Change Data Capture&#xff08;变更数据获取&#xff09;的简称。核心思想是&#xff0c;监测并捕获数据库的变动&#xff08;包括数据或数…...

【含文档+PPT+源码】基于微信小程序连锁药店商城

项目介绍 本课程演示的是一款基于微信小程序连锁药店商城&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 3.该项目附带的…...

使用 EXISTS 解决 SQL 中 IN 查询数量过多的问题

在 SQL 查询中&#xff0c;当我们面对需要在 IN 子句中列举大量数据的场景时&#xff0c;查询的性能往往会受到显著影响。这时候&#xff0c;使用 EXISTS 可以成为一种优化的良方。 问题的来源 假设我们有两个表&#xff0c;orders 和 customers&#xff0c;我们需要查询所有…...

[免费]微信小程序智能商城系统(uniapp+Springboot后端+vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序智能商城系统(uniappSpringboot后端vue管理端)&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序智能商城系统(uniappSpringboot后端vue管理端) Java毕业设计_哔哩哔哩_bilibili 项目介绍…...

UE学习日志#19 C++笔记#5 基础复习5 引用1

C中的引用&#xff08;reference&#xff09;是另一个变量的别名。对引用的所有修改都会更改其引用的变量的值。可以将引用视为隐式指针&#xff0c;它省去了获取变量地址和解引用指针的麻烦。另外&#xff0c;可以将引用视为原始变量的另一个名称。可以创建独立的引用变量&…...

PHP代码审计学习02

目录 代码审计一般思路 Beescms代码审计&#xff08;upload&#xff09; Finecms基于前台MVC任意文件上传挖掘思路 CLTPHP基于thinkphp5框架的文件上传挖掘思路 今天来看PHP有框架MVC类&#xff0c;文件上传&#xff0c;断点调试挖掘。 同样还是有关键字搜索和功能点抓包两…...

2025年02月02日Github流行趋势

项目名称&#xff1a;oumi 项目地址url&#xff1a;https://github.com/oumi-ai/oumi 项目语言&#xff1a;Python 历史star数&#xff1a;1416 今日star数&#xff1a;205 项目维护者&#xff1a;xrdaukar, oelachqar, taenin, wizeng23, kaisopos 项目简介&#xff1a;构建最…...

vue入门到实战 三

目录 3.1 v-bind 3.1.1 v-bind指令用法 ​编辑3.1.2 使用v-bind绑定class 3.1.3 使用v-bind绑定style 3.2.1 v-if指令 3.2.1 v-if指令 3.2.2 v-show指令 ​3.3 列表渲染指令v-for 3.3.1 基本用法 3.3.2 数组更新 3.3.3 过滤与排序 3.4 事件处理 3.4.1 使用v-on指令…...

[创业之路-271]:站在管理的角度看计算机操作系统,OS是技术思想与管理思想的完美融合,是物理世界和数字化虚拟世界的桥梁

一、前言&#xff1a; Operating System&#xff0c;站在终端用户的角度&#xff0c;翻译成了"操作系统"&#xff0c;但站在计算机系统本身角度&#xff0c;翻译成"运营系统"更合适&#xff0c;是指它是负责运营计算机所有的硬件&#xff08;物理&#xf…...

实验六 项目二 简易信号发生器的设计与实现 (HEU)

声明&#xff1a;代码部分使用了AI工具 实验六 综合考核 Quartus 18.0 FPGA 5CSXFC6D6F31C6N 1. 实验项目 要求利用硬件描述语言Verilog&#xff08;或VHDL&#xff09;、图形描述方式、IP核&#xff0c;结合数字系统设计方法&#xff0c;在Quartus开发环境下&#xff…...

java SSM框架 商城系统源码(含数据库脚本)

商城购物功能&#xff0c;项目代码&#xff0c;mysql脚本&#xff0c;html等静态资源在压缩包里面 注册界面 登陆界面 商城首页 文件列表 shop/.classpath , 1768 shop/.project , 1440 shop/.settings/.jsdtscope , 639 shop/.settings/org.eclipse.core.resources.prefs , …...

Unet 改进:在encoder和decoder间加入TransformerBlock

目录 1. TransformerBlock 2. Unet 改进 3. 完整代码 Tips:融入模块后的网络经过测试,可以直接使用,设置好输入和输出的图片维度即可 1. TransformerBlock TransformerBlock是Transformer模型架构的基本组件,广泛应用于机器翻译、文本摘要和情感分析等自然语言处理任务…...

【黄啊码】DeepSeek提示词大道至简版

1.1 有效提问的五个黄金法 法则一&#xff1a;明确需求 错误示例&#xff1a; Γ帮我写点东西」 正确姿势&#xff1a; Γ我需要一封求职邮件&#xff0c;应聘新媒体运营岗位&#xff0c;强调B年公众号运营经验 法则二&#xff1a;提供背景 错误示例 &#xff1a; Γ分析这个…...

【Linux系统】信号:认识信号 与 信号的产生

信号快速认识 1、生活角度的信号 异步&#xff1a;你是老师正在上课&#xff0c;突然有个电话过来资料到了&#xff0c;你安排小明过去取资料&#xff0c;然后继续上课&#xff0c;则小明取资料这个过程就是异步的 同步&#xff1a;小明取快递&#xff0c;你停下等待小明回来再…...

一、html笔记

(一)前端概述 1、定义 前端是Web应用程序的前台部分,运行在PC端、移动端等浏览器上,展现给用户浏览的网页。通过HTML、CSS、JavaScript等技术实现,是用户能够直接看到和操作的界面部分。上网就是下载html文档,浏览器是一个解释器,运行从服务器下载的html文件,解析html、…...

C# 数组和列表的基本知识及 LINQ 查询

数组和列表的基本知识及 LINQ 查询 一、基本知识二、引用命名空间声明三、数组3.1、一维数组3.2、二维数组3.3、不规则数组 Jagged Array 四、列表 List4.1、一维列表4.2、二维列表 五、数组和列表使用 LINQ的操作和运算5.1、一维 LIST 删除所有含 double.NaN 的行5.2、一维 LI…...

PyQt5超详细教程终篇

PyQt5超详细教程 前言 接&#xff1a; [【Python篇】PyQt5 超详细教程——由入门到精通&#xff08;序篇&#xff09;](【Python篇】PyQt5 超详细教程——由入门到精通&#xff08;序篇&#xff09;-CSDN博客) 建议把代码复制到pycahrm等IDE上面看实际效果&#xff0c;方便理…...

【llm对话系统】大模型 Llama 源码分析之归一化方法 RMS Norm

1. 引言 在深度学习中&#xff0c;归一化 (Normalization) 是一种常用的技术&#xff0c;它可以加速模型的训练并提高模型的性能。常见的归一化方法包括 Batch Normalization (BatchNorm)、Layer Normalization (LayerNorm) 等。Llama 模型采用了一种称为 RMS Norm 的归一化方…...

Ubuntu 24.04 安装 NVIDIA Container Toolkit 全指南:让Docker拥抱GPU

Ubuntu 24.04 安装 NVIDIA Container Toolkit 全指南&#xff1a;让Docker拥抱GPU 前言一、环境准备1.1 验证驱动状态 二、安装NVIDIA Container Toolkit2.1 添加官方仓库2.2 执行安装 三、配置Docker运行时3.1 更新Docker配置 四、验证安装结果4.1 运行测试容器 五、实战应用 …...

青少年编程与数学 02-008 Pyhon语言编程基础 15课题、运用函数

青少年编程与数学 02-008 Pyhon语言编程基础 15课题、运用函数 一、函数的运用1. 问题分解2. 定义函数接口3. 实现函数4. 测试函数5. 组合函数6. 处理错误和异常7. 优化和重构示例&#xff1a;使用函数解决复杂数学问题 二、递归三、递归函数1. 确定基本情况&#xff08;Base C…...

ARM TEE

在ARM的语境中&#xff0c;TEE是Trusted Execution Environment&#xff08;可信执行环境&#xff09;的缩写。ARM TEE就是基于ARM架构实现的可信执行环境&#xff0c;以下是具体介绍&#xff1a; 定义与原理 定义&#xff1a;ARM TEE是基于独立硬件&#xff0c;和主操作系统…...

洛谷 P8724 [蓝桥杯 2020 省 AB3] 限高杆

洛谷题目传送门 题目描述 某市有 n 个路口&#xff0c;有 m 段道路连接这些路口&#xff0c;组成了该市的公路系统。其中一段道路两端一定连接两个不同的路口。道路中间不会穿过路口。 由于各种原因&#xff0c;在一部分道路的中间设置了一些限高杆&#xff0c;有限高杆的路…...

虚幻UE5手机安卓Android Studio开发设置2025

一、下载Android Studio历史版本 步骤1&#xff1a;虚幻4.27、5.0、5.1、5.2官方要求Andrd Studio 4.0版本&#xff1b; 5.3、5.4、5.5官方要求的版本为Android Studio Flamingo | 2022.2.1 Patch 2 May 24, 2023 虚幻官网查看对应Andrd Studiob下载版本&#xff1a; https:/…...

JavaWeb入门-请求响应(Day3)

(一)请求响应概述 请求(HttpServletRequest):获取请求数据 响应(HttpServletResponse):设置响应数据 BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器就可访问,应用程序的逻辑和数据都存储在服务端(维护方便,响应速度一般) CS架构:Client/ser…...

【Rust】18.2. 可辩驳性:模式是否会无法匹配

喜欢的话别忘了点赞、收藏加关注哦&#xff08;加关注即可阅读全文&#xff09;&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 18.2.1. 模式的两种形式 模式有两种形式&#xff1a; 可辩驳的&#xff08;可失败的&…...

【SLAM】于AutoDL云上GPU运行GCNv2_SLAM的记录

配置GCNv2_SLAM所需环境并实现AutoDL云端运行项目的全过程记录。 1. 引子 前几天写了一篇在本地虚拟机里面CPU运行GCNv2_SLAM项目的博客&#xff1a;链接&#xff0c;关于GCNv2_SLAM项目相关的介绍请移步此文章&#xff0c;本文不再重复说明。 GCNv2: Efficient Corresponde…...

SQL进阶实战技巧:某芯片工厂设备任务排产调度分析 | 间隙分析技术应用

目录 0 技术定义与核心原理 1 场景描述 2 数据准备 3 间隙分析法 步骤1:原始时间线可视化...

【自然语言处理(NLP)】基于Transformer架构的预训练语言模型:BERT 训练之数据集处理、训练代码实现

文章目录 介绍BERT 训练之数据集处理BERT 原理及模型代码实现数据集处理导包加载数据生成下一句预测任务的数据从段落中获取nsp数据生成遮蔽语言模型任务的数据从token中获取mlm数据将文本转换为预训练数据集创建Dataset加载WikiText-2数据集 BERT 训练代码实现导包加载数据构建…...

Kotlin判空辅助工具

1&#xff09;?.操作符 //执行逻辑 if (person ! null) {person.doSomething() } //表达式 person?.doSomething() 2&#xff09;?:操作符 //执行逻辑 val c if (a ! null) {a } else {b } //表达式 val c a ?: b 3&#xff09;!!表达式 var message: String? &qu…...

41【文件名的编码规则】

我们在学习的过程中&#xff0c;写出数据或读取数据时需要考虑编码类型 火山采用&#xff1a;UTF-16 易语言采用&#xff1a;GBK php采用&#xff1a;UTF-8 那么我们写出的文件名应该是何种编码的&#xff1f;比如火山程序向本地写出一个“测试.txt”&#xff0c;理论上这个“测…...