当前位置: 首页 > news >正文

10 Flink CDC

10 Flink CDC

  • 1. CDC是什么
  • 2. CDC 的种类
  • 3. 传统CDC与Flink CDC对比
  • 4. Flink-CDC 案例
  • 5. Flink SQL 方式的案例

1. CDC是什么

CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
在广义的概念上,只要能捕获数据变更的技术,我们都可以称为 CDC 。通常我们说的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
CDC 技术应用场景非常广泛:
数据同步,用于备份,容灾;
数据分发,一个数据源分发给多个下游;
数据采集(E),面向数据仓库/数据湖的 ETL 数据集成。

2. CDC 的种类

CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
在这里插入图片描述

3. 传统CDC与Flink CDC对比

  1. 传统 CDC ETL 分析
    在这里插入图片描述

  2. 基于 Flink CDC 的 ETL 分析
    在这里插入图片描述

  3. 基于 Flink CDC 的聚合分析
    在这里插入图片描述

  4. 基于 Flink CDC 的数据打宽
    在这里插入图片描述

4. Flink-CDC 案例

Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。
开源地址:https://github.com/ververica/flink-cdc-connectors。
示例代码:

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点
续传,需要从 Checkpoint 或者 Savepoint 启动程序//2.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//2.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckp
ointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//2.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "atguigu");//3.创建 Flink-MySQL-CDC 的 Source//initial (default): Performs an initial snapshot on the monitored database tables upon 
first startup, and continue to read the latest binlog.//latest-offset: Never to perform snapshot on the monitored database tables upon first 
startup, just read from the end of the binlog which means only have the changes since the 
connector was started.//timestamp: Never to perform snapshot on the monitored database tables upon first 
startup, and directly read binlog from the specified timestamp. The consumer will traverse the 
binlog from the beginning and ignore change events whose timestamp is smaller than the 
specified timestamp.//specific-offset: Never to perform snapshot on the monitored database tables upon 
first startup, and directly read binlog from the specified offset.DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder().hostname("hadoop01").port(3306).username("root").password("000000").databaseList("gmall-flink").tableList("gmall-flink.z_user_info") //可选配置项,如果不指定该参数,则会
读取上一个配置下的所有表的数据,注意:指定的时候需要使用"db.table"的方式.startupOptions(StartupOptions.initial()).deserializer(new StringDebeziumDeserializationSchema()).build();//4.使用 CDC Source 从 MySQL 读取数据DataStreamSource<String> mysqlDS = env.addSource(mysqlSource);//5.打印数据mysqlDS.print();//6.执行任务env.execute();} 
}

5. Flink SQL 方式的案例

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkSQL_CDC {public static void main(String[] args) throws Exception {//1.创建执行环境StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//2.创建 Flink-MySQL-CDC 的 SourcetableEnv.executeSql("CREATE TABLE user_info (" +" id INT," +" name STRING," +" phone_num STRING" +") WITH (" +" 'connector' = 'mysql-cdc'," +" 'hostname' = 'hadoop01'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '000000'," +" 'database-name' = 'gmall-flink'," +" 'table-name' = 'z_user_info'" +")");tableEnv.executeSql("select * from user_info").print();env.execute();}
}

相关文章:

10 Flink CDC

10 Flink CDC 1. CDC是什么2. CDC 的种类3. 传统CDC与Flink CDC对比4. Flink-CDC 案例5. Flink SQL 方式的案例 1. CDC是什么 CDC 是 Change Data Capture&#xff08;变更数据获取&#xff09;的简称。核心思想是&#xff0c;监测并捕获数据库的变动&#xff08;包括数据或数…...

【LeetCode 刷题】回溯算法-子集问题

此博客为《代码随想录》二叉树章节的学习笔记&#xff0c;主要内容为回溯算法子集问题相关的题目解析。 文章目录 78.子集90.子集II 78.子集 题目链接 class Solution:def subsets(self, nums: List[int]) -> List[List[int]]:res, path [], []def dfs(start: int) ->…...

OpenCV 版本不兼容导致的问题

问题和解决方案 今天运行如下代码&#xff0c;发生了意外的错误&#xff0c;代码如下&#xff0c;其中输入的 frame 来自于 OpenCV 开启数据流的读取 """ cap cv2.VideoCapture(RTSP_URL) print("链接视频流完成") while True:ret, frame cap.rea…...

低成本、高附加值,具有较强的可扩展性和流通便利性的行业

目录 虚拟资源类 1. 网课教程 2. 设计素材 3. 软件工具 服务类 1. 写作服务 2. 咨询顾问 3. 在线教育 4. 社交媒体管理 虚拟资源类 1. 网课教程 特点&#xff1a;高附加值&#xff0c;可复制性强&#xff0c;市场需求大。 执行流程&#xff1a; 选择领域&#xff1a…...

DirectShow过滤器开发-读视频文件过滤器(再写)

下载本过滤器DLL 本过滤器读取视频文件输出视频流和音频流。流类型由文件决定。已知可读取的文件格式有&#xff1a;AVI&#xff0c;ASF&#xff0c;MOV&#xff0c;MP4&#xff0c;MPG&#xff0c;WMV。 过滤器信息 过滤器名称&#xff1a;读视频文件 过滤器GUID&#xff1a…...

代码练习2.3

终端输入10个学生成绩&#xff0c;使用冒泡排序对学生成绩从低到高排序 #include <stdio.h>void bubbleSort(int arr[], int n) {for (int i 0; i < n-1; i) {for (int j 0; j < n-i-1; j) {if (arr[j] > arr[j1]) {// 交换 arr[j] 和 arr[j1]int temp arr[…...

基于 Redis GEO 实现条件分页查询用户附近的场馆列表

&#x1f3af; 本文档详细介绍了如何使用Redis GEO模块实现场馆位置的存储与查询&#xff0c;以支持“附近场馆”搜索功能。首先&#xff0c;通过微信小程序获取用户当前位置&#xff0c;并将该位置信息与场馆的经纬度数据一同存储至Redis中。利用Redis GEO高效的地理空间索引能…...

【大数据技术】案例01:词频统计样例(hadoop+mapreduce+yarn)

词频统计(hadoop+mapreduce+yarn) 搭建完全分布式高可用大数据集群(VMware+CentOS+FinalShell) 搭建完全分布式高可用大数据集群(Hadoop+MapReduce+Yarn) 在阅读本文前,请确保已经阅读过以上两篇文章,成功搭建了Hadoop+MapReduce+Yarn的大数据集群环境。 写在前面 Wo…...

Selenium 使用指南:从入门到精通

Selenium 使用指南&#xff1a;从入门到精通 Selenium 是一个用于自动化 Web 浏览器操作的强大工具&#xff0c;广泛应用于自动化测试和 Web 数据爬取中。本文将带你从入门到精通地掌握 Selenium&#xff0c;涵盖其基本操作、常用用法以及一个完整的图片爬取示例。 1. 环境配…...

笔试-排列组合

应用 一个长度为[1, 50]、元素都是字符串的非空数组&#xff0c;每个字符串的长度为[1, 30]&#xff0c;代表非负整数&#xff0c;元素可以以“0”开头。例如&#xff1a;[“13”, “045”&#xff0c;“09”&#xff0c;“56”]。 将所有字符串排列组合&#xff0c;拼起来组成…...

Java序列化详解

1 什么是序列化、反序列化 在Java编程实践中&#xff0c;当我们需要持久化Java对象&#xff0c;比如把Java对象保存到文件里&#xff0c;或是在网络中传输Java对象时&#xff0c;序列化机制就发挥着关键作用。 序列化&#xff1a;指的是把数据结构或对象转变为可存储、可传输的…...

ChatGPT与GPT的区别与联系

ChatGPT 和 GPT 都是基于 Transformer 架构的语言模型&#xff0c;但它们有不同的侧重点和应用。下面我们来探讨一下它们的区别与联系。 1. GPT&#xff08;Generative Pre-trained Transformer&#xff09; GPT 是一类由 OpenAI 开发的语言模型&#xff0c;基于 Transformer…...

MySQL入门 – CRUD基本操作

MySQL入门 – CRUD基本操作 Essential CRUD Manipulation to MySQL Database By JacksonML 本文简要介绍操作MySQL数据库的基本操作&#xff0c;即创建(Create), 读取&#xff08;Read&#xff09;, 更新(Update)和删除&#xff08;Delete&#xff09;。 基于数据表的关系型…...

Redis背景介绍

⭐️前言⭐️ 本文主要做Redis相关背景介绍&#xff0c;包括核心能力、重要特性和使用场景。 &#x1f349;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f349;博主将持续更新学习记录收获&#xff0c;友友们有任何问题可以在评论区留言 &#x1f349;博客中涉及源码及博主…...

PPT演示设置:插入音频同步切换播放时长计算

PPT中插入音频&同步切换&放时长计算 一、 插入音频及音频设置二、设置页面切换和音频同步三、播放时长计算 一、 插入音频及音频设置 1.插入音频&#xff1a;点击菜单栏插入-音频-选择PC上的音频&#xff08;已存在的音频&#xff09;或者录制音频&#xff08;现场录制…...

DIFY源码解析

偶然发现Github上某位大佬开源的DIFY源码注释和解析&#xff0c;目前还处于陆续不断更新地更新过程中&#xff0c;为大佬的专业和开源贡献精神点赞。先收藏链接&#xff0c;后续慢慢学习。 相关链接如下&#xff1a; DIFY源码解析...

[权限提升] Wdinwos 提权 维持 — 系统错误配置提权 - Trusted Service Paths 提权

关注这个专栏的其他相关笔记&#xff1a;[内网安全] 内网渗透 - 学习手册-CSDN博客 0x01&#xff1a;Trusted Service Paths 提权原理 Windows 的服务通常都是以 System 权限运行的&#xff0c;所以系统在解析服务的可执行文件路径中的空格的时候也会以 System 权限进行解析&a…...

【算法】回溯算法专题② ——组合型回溯 + 剪枝 python

目录 前置知识进入正题小试牛刀实战演练总结 前置知识 【算法】回溯算法专题① ——子集型回溯 python 进入正题 组合https://leetcode.cn/problems/combinations/submissions/596357179/ 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以…...

LeetCode:121.买卖股票的最佳时机1

跟着carl学算法&#xff0c;本系列博客仅做个人记录&#xff0c;建议大家都去看carl本人的博客&#xff0c;写的真的很好的&#xff01; 代码随想录 LeetCode&#xff1a;121.买卖股票的最佳时机1 给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票…...

pytorch生成对抗网络

人工智能例子汇总&#xff1a;AI常见的算法和例子-CSDN博客 生成对抗网络&#xff08;GAN&#xff0c;Generative Adversarial Network&#xff09;是一种深度学习模型&#xff0c;由两个神经网络组成&#xff1a;生成器&#xff08;Generator&#xff09;和判别器&#xff0…...

Visual Studio Code应用本地部署的deepseek

1.打开Visual Studio Code&#xff0c;在插件中搜索continue&#xff0c;安装插件。 2.添加新的大语言模型&#xff0c;我们选择ollama. 3.直接点connect&#xff0c;会链接本地下载好的deepseek模型。 参看上篇文章&#xff1a;deepseek本地部署-CSDN博客 4.输入需求生成可用…...

用 HTML、CSS 和 JavaScript 实现抽奖转盘效果

顺序抽奖 前言 这段代码实现了一个简单的抽奖转盘效果。页面上有一个九宫格布局的抽奖区域&#xff0c;周围八个格子分别放置了不同的奖品名称&#xff0c;中间是一个 “开始抽奖” 的按钮。点击按钮后&#xff0c;抽奖区域的格子会快速滚动&#xff0c;颜色不断变化&#xf…...

Skewer v0.2.2安装与使用-生信工具43

01 Skewer 介绍 Skewer&#xff08;来自于 SourceForge&#xff09;实现了一种基于位掩码的 k-差异匹配算法&#xff0c;专门用于接头修剪&#xff0c;特别设计用于处理下一代测序&#xff08;NGS&#xff09;双端序列。 fastp安装及使用-fastp v0.23.4&#xff08;bioinfoma…...

C语言:链表排序与插入的实现

好的!以下是一篇关于这段代码的博客文章: 从零开始:链表排序与插入的实现 在数据结构的学习中,链表是一种非常基础且重要的数据结构。今天,我们将通过一个简单的 C 语言程序,来探讨如何实现一个从小到大排序的链表,并在其中插入一个新的节点。这个过程不仅涉及链表的基…...

【Elasticsearch】doc_values 可以用于查询操作

确实&#xff0c;doc values 可以用于查询操作&#xff0c;尽管它们的主要用途是支持排序、聚合和脚本中的字段访问。在某些情况下&#xff0c;Elasticsearch 也会利用 doc values 来执行特定类型的查询。以下是关于 doc values 在查询操作中的使用及其影响的详细解释&#xff…...

深度学习深度解析:从基础到前沿

引言 深度学习作为人工智能的一个重要分支&#xff0c;通过模拟人脑的神经网络结构来进行数据分析和模式识别。它在图像识别、自然语言处理、语音识别等领域取得了显著成果。本文将深入探讨深度学习的基础知识、主要模型架构以及当前的研究热点和发展趋势。 基础概念与数学原理…...

JVM的GC详解

获取GC日志方式大抵有两种 第一种就是设定JVM参数在程序启动时查看&#xff0c;具体的命令参数为: -XX:PrintGCDetails # 打印GC日志 -XX:PrintGCTimeStamps # 打印每一次触发GC时发生的时间第二种则是在服务器上监控:使用jstat查看,如下所示&#xff0c;命令格式为jstat -gc…...

【开源免费】基于Vue和SpringBoot的校园网上店铺系统(附论文)

本文项目编号 T 187 &#xff0c;文末自助获取源码 \color{red}{T187&#xff0c;文末自助获取源码} T187&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…...

测压表压力表计量表针头针尾检测数据集VOC+YOLO格式4862张4类别

数据集格式&#xff1a;Pascal VOC格式YOLO格式(不包含分割路径的txt文件&#xff0c;仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数)&#xff1a;4862 标注数量(xml文件个数)&#xff1a;4862 标注数量(txt文件个数)&#xff1a;4862 …...

Vue 3 30天精进之旅:Day 12 - 异步操作

在现代前端开发中&#xff0c;异步操作是一个非常常见的需求&#xff0c;例如从后端API获取数据、进行文件上传等任务。Vue 3 结合组合式API和Vuex可以方便地处理这些异步操作。今天我们将重点学习如何在Vue应用中进行异步操作&#xff0c;包括以下几个主题&#xff1a; 异步操…...