FlinkCDC的分析和应用代码
前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始,然后讲述其基于Flink的实现原理和代码应用,为下一篇介绍基于Flink开发定制化引擎做铺垫。
一、FlinkCDC应用场景
经常有同事或朋友问,Flink和FlinkCDC有什么区别?
Flink是一个流数据处理计算框架,FlinkCDC是数据采集工具:
Flink应用场景对比的是Storm、Spark;
FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等;
FlinkCDC是Flink社区伙伴对数据采集需求,开发的一个SDK工具,让Flink在数据捕捉场景,使用起来更方便一些。
1.1 CDC的应用场景分析
CDC的英文名是Change Data Capture (变化数据获取);解决的应用场景,是对存储中间件中数据的采集,比如Mysql、Orcle、PGSql、MongoDB等中间件;
采集的方式分为基于查询和基于BinLog两种;
以mysql的数据采集为例:可以通过jdbc批次查询,也可以通过Binlog解析增量数据采集;
两者的一些特性对比如下:

基于查询的CDC直接获取数据,基于binlog的采集需要开启binlog服务。
1.2 FlinkCDC的应用分析
FlinkCDC和Canal实现的应用场景需求是差不多的,都是通过binlog采集增量数据;
但是可用性上的不同是:
对于cannal类似的采集服务需要三步:
- 1.开启mysql的binlog
- 2.将数据写到kafka
- 3.用flink订阅kafka中的数据进行业务需求处理
对于FlinkCDC:只需要在binlog开启后,直接在一个Flink任务内做业务处理(可以写到kafka处理也行);
所有如cannal的采集功能服务,都需要单独维护一套服务,增加了运维负担,FlinkCDC可以当作任务部署到集群,大幅减轻了数据采集的应用难度;
用一个任务就完成这个应用功能:

二、FlinkCDC技术分析与本地操作
2.1 FlinkCDC的技术架构分析
与Canal这些提供服务能力的服务不同,FlinkCdc只是一个任务,可以简单的开发和部署。
Flink是借用了Debezium的功能,Debezium是一个可轻量级嵌入代码逻辑的服务,将Debezium的采集功能,用Flink的sourceFunction包装,然后打包成SDK提供给Flink开发使用;
借用Flink自己的算子和sink能力,可以将采集到的数据以Flink的特性加工数据,并将数据写入Flink内置的connect组件,sink到服务里,如Kafka、Pulser、ES、RabbitMQ、MongoDB等。
2.2 本地操作
2.2.1准备mysql数据库表和数据
use flink_test;
#检测binlog是否开启
show variables like '%log_bin%'#构建测试表
CREATE TABLE `event_info` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`category` varchar(512) DEFAULT NULL,`pv` int DEFAULT 0,`uv` int DEFAULT 0,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;#写入数据
insert into `event_info`(`id`,`name`,`category`,`pv`,`uv`) values
(1,'aaa','nfh',20,8),
(2,'bbb','dfgf',30,2),
(3,'ccc','fsd',40,4),
(4,'ddd','afs',50,7),
(5,'eee','asfa',60,3)
(6,'aaa','nfh',20,8),
(7,'bbb','dfgf',30,2),
(8,'ccc','fsd',40,4),
(9,'ddd','afs',50,7),
(10,'eee','asfa',60,3);
2.2.2 pom文件
注意Flink和FlinkCDC的版本映射,很多显示的可以关联的版本之间是冲突的,这是一个很繁琐的工作,我调试各个版本之间的映射,花了一天左右的时间[求赞求收藏];
下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>changedateDoris</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12</scala.version><java.version>1.8</java.version><flink.version>1.14.5</flink.version><fastjson.version>1.2.62</fastjson.version><hadoop.version>2.8.3</hadoop.version><scope.mode>compile</scope.mode><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!-- springboot 依赖--><!-- flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- Add log dependencies when debugging locally --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><!-- mysql-connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.2.1 </version><exclusions><exclusion><artifactId>flink-shaded-guava</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies><build>
<!-- <filters>-->
<!-- <filter>${project.basedir}/src/main/resources/env/application-${profileActive}.properties</filter>-->
<!-- </filters>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><artifactSet><includes><include>*:*</include></includes><excludes><exclude>org.slf4j:slf4j-api:jar:</exclude></excludes></artifactSet><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.factories</resource></transformer></transformers></configuration></execution></executions></plugin><plugin><artifactId>maven-resources-plugin</artifactId><configuration><encoding>utf-8</encoding><useDefaultDelimiters>true</useDefaultDelimiters><delimiters><delimiter>$[*]</delimiter></delimiters></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>single</goal></goals><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
</project>
2.2.3 java代码
package yto.com.net.demo;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class mysqlBinlogRead {private static final Logger log = LoggerFactory.getLogger(mysqlBinlogRead.class);public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("ip").port(3306).databaseList("flink_test").tableList("flink_test.event_info").username("mysqlUser").password("password").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
// .startupOptions(StartupOptions.earliest()).build();Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8083);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// enable checkpointenv.enableCheckpointing(10000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");SingleOutputStreamOperator<String> process = cdcSource.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String row, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");JSONObject source = rowJson.getJSONObject("source");String table = source.getString("table");}});cdcSource.print("message=:");env.execute("flinkCdc Read message");}}
2.2.4 运行结果
如图所示,已经写入库的结果通过connect获取,增量数据通过binlog获取;
注意:表中的历史数据过多,全量读取的时候将会内存溢出。

相关文章:
FlinkCDC的分析和应用代码
前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算…...
序章 搭建环境篇—准备战士的剑和盾
第一步:安装node.js Node.js 内置了npm,只要安装了node.js,就可以直接使用 npm,官网地址: Download | Node.js 在这里不建议安装最新版本的node.js,可以选跟我一样的版本,node版本v16.13.2 链…...
【C++】vector的使用及模拟实现
目录 一、vector的介绍及使用1.1 介绍vector1.2 vector的使用1.2.1 构造1.2.2 遍历访问1.2.3 容量空间1.2.4 增删查改 二、vector的模拟实现2.1 成员变量2.2 迭代器相关函数2.3 构造-析构-赋值重载2.3.1 无参构造2.3.2 有参构造12.3.3 有参构造22.3.4 拷贝构造2.3.5 赋值重载2.…...
【数据库】sql优化有哪些?从query层面和数据库层面分析
目录 归纳sql本身的优化数据库层面的优化 归纳 这类型问题可以称为:Query Optimization,从清华AI4DB的paper list中,该类问题大致可以分为: Query RewriterCardinality EstimationCost EstimationPlan Optimization 从中文的角…...
nginx基本优化
安装nginx隐藏版本号 查看百度web服务器 [rootcjq11 ~]# curl -I http://www.baidu.com 隐藏nginx服务器版本号 [rootcjq11 ~]# cd /usr/local/src/nginx-1.22.0/ [rootcjq11 nginx-1.22.0]# vim src/core/nginx.h第13、14行修改版本号和服务器名称 [rootcjq11 nginx-1.2…...
软件测试|使用selenium处理单选框和多选框
简介 我们在web自动化测试工作中,经常会遇到对单选框(Radio Buttons)或者多选框(Checkboxes)进行操作的场景,单选框和多选框主要是用于我们做出选择或提交数据。本文将主要介绍selenium对于单选框和多选框…...
openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表
文章目录 openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表概述如何找到算法名称字符串列表?openssl-3.2.0\providers\implementations\include\prov\names.h备注END openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表 概述 进行加解密时, 先…...
vue3中的hook公共函数封装及运用
vue3 中的 hooks 就是函数的一种写法,就是将文件的一些单独功能的js代码进行抽离出来,放到单独的js文件中,或者说是一些可以复用的公共方法/功能 使用Vue3的组合API封装的可复用的功能函数自定义hook的作用类似于vue2中的mixin技术自定义Hook…...
广州市工信局、天河区商务金融局及广州专精特新促进会走访思迈特
2024年1月11日下午,广州市工信局、天河区商务金融局及广州专精特新促进会相关负责人莅临广州思迈特软件总部调研指导,思迈特软件总裁兼COO姚诗成代表公司热情接待,并陪同调研。 调研组实地参观了思迈特软件,深入了解了思迈特发展历…...
vi编辑器显示行号和不显示行号切换命令
文章目录 1.临时生效只需要在vi编辑器里面输入1.1.显示行号1.2.不显示行号 2.永久生效 1.临时生效只需要在vi编辑器里面输入 1.1.显示行号 set number 或者 set nu如下图 1.2.不显示行号 set nonumber 或者 set nonu2.永久生效 首先打开配置文件/etc/vim/vimrc,向文件中添…...
使用 LLVM clang C/C++ 编译器编译 boost 基础框架类库
1、下载 boost 1.84 库的源代码放到待编译目录 2、解压并接入 boost 1.84 库源码的根目录 搜索默认的 clang 版本,WSL 2.0/Ubuntu 18.04 LTS 为 clang 6.x 执行命令: ./bootstrap.sh --with-toolsetclang ./b2 toolsetclang 另外一个方法比较麻烦需要…...
推荐一款.NET开发的物联网开源项目
物联网(IoT)是一个正在快速发展的技术领域,它涉及到各种设备、物体和系统的互联。所以各种物联网平台和物联网网关项目层出不穷,在物联网(IoT)领域,.NET平台扮演着重要的角色。作为一款广泛使用…...
正则表达式 (用于灵活匹配文本的表达式)
目录 . * . 用于匹配任意单个字符,除了换行符。 例如使用正则表达式 a.b, 它可以匹配aab、acb、a#b * 用于匹配前一个字符零次或多次。 例如,使用正则表达式 ab*c,它可以匹配 "ac"、"abc"、"abbc"&#…...
基于4G数采终端的供热管网在线监测方案
我国大部地区全面进入到冬季,北方各地已开启冬季供暖,以保障居民生活所需。由于城市化的发展,城市内各供热区域愈发分散、供热管道漫长、供热环境复杂,对于供热管网及换热站点的监测和维护提出了诸多挑战。 方案介绍 针对提高供热…...
OPC UA 开源库编译方法及通过OPC UA连接西门S7-1200 PLC通信并进行数据交换[一]
前言 在现代工业自动化领域,OPC UA(开放性生产控制和统一架构)是一种广泛应用的通信协议。本文将以通俗易懂的方式解释OPC UA的含义和作用,帮助读者更好地理解这一概念。 一、OPC UA的定义 OPC UA全称为“开放性生产控制和统一…...
2019年认证杯SPSSPRO杯数学建模B题(第二阶段)外星语词典全过程文档及程序
2019年认证杯SPSSPRO杯数学建模 基于统计和迭代匹配的未知语言文本片段提取模型 B题 外星语词典 原题再现: 我们发现了一种未知的语言,现只知道其文字是以 20 个字母构成的。我们已经获取了许多段由该语言写成的文本,但每段文本只是由字母…...
NFS的共享与挂载
一、NFS网络文件服务 1.1简介 NFS(Network File System 网络文件服务) 文件系统(软件)文件的权限 NFS 是一种基于 TCP/IP 传输的网络文件系统协议,最初由 Sun 公司开发。 通过使用 NFS 协议,客户机可以像访…...
函数式编程的Java编码实践:利用惰性写出高性能且抽象的代码
转载链接 本文会以惰性加载为例一步步介绍函数式编程中各种概念,所以读者不需要任何函数式编程的基础,只需要对 Java 8 有些许了解即可。 一 抽象一定会导致代码性能降低? 程序员的梦想就是能写出 “高内聚,低耦合”的代码&…...
2024年甘肃省职业院校技能大赛信息安全管理与评估 样题二 模块二
竞赛需要完成三个阶段的任务,分别完成三个模块,总分共计 1000分。三个模块内容和分值分别是: 1.第一阶段:模块一 网络平台搭建与设备安全防护(180 分钟,300 分)。 2.第二阶段:模块二…...
mysql 一对多 合并多个通过 逗号拼接展示
mysql 一对多 合并多个通过 逗号拼接展示 以上内容由chatgpt中文网 动态生成 SELECTuser_id,project_id,GROUP_CONCAT(project_specs_id) AS merged_specs_ids FROMzt_medication_specs_total WHEREspecs_num_total < 5 GROUP BYuser_id, project_id;laravel model 对应写…...
量子软件不稳定测试检测:基于机器学习的自动化解决方案
1. 量子软件测试中的“幽灵”:不稳定测试的挑战与机遇在量子软件开发的日常工作中,最让人头疼的莫过于那些“薛定谔的测试”——你永远不知道下一次运行它会通过还是失败。这就是不稳定测试(Flaky Tests),它们像幽灵一…...
3D层析SAR与AutoML融合:实现高精度森林树种自动识别
1. 项目概述:当3D雷达“透视”森林,机器学习如何识别每一棵树?在森林资源管理与生态研究中,准确识别树种一直是个既基础又棘手的难题。传统的野外调查方法,依赖人力跋山涉水,不仅成本高昂、效率低下&#x…...
基于流形学习与kNN的稀疏传感风场估计:无人机安全起降新思路
1. 项目概述与核心挑战在无人机城市空中交通(UAM)和垂直起降场(Vertiport)的运营中,起降阶段的安全性是重中之重。这个阶段,无人机对风场的变化极为敏感,突如其来的阵风或复杂涡流都可能导致姿态…...
ZygiskFrida:安卓逆向的Zygote层动态插桩新范式
1. 这不是“又一个 Frida 模块”,而是安卓逆向工作流的物理层重构你有没有过这样的经历:在一台已 root 的测试机上,想用 Frida hook 一个刚启动的系统服务,结果发现frida-server启动失败,报错Permission denied&#x…...
ZygiskFrida:安卓逆向中基于Zygote的零感知Frida注入方案
1. 这不是“又一个 Frida 注入工具”,而是安卓逆向工作流的物理层重构你有没有过这样的经历:在一台已 root 的测试机上调试某个金融类 App,想 hook 它的 SSL Pinning 检查逻辑,结果 Frida Server 启动失败;换用 frida-…...
分布式机器学习中的精度与效率权衡:从近似计算到自动驾驶实践
1. 项目概述:当“算得准”遇上“算得快”在分布式机器学习的世界里,我们每天都在面对一个看似简单、实则深刻的抉择:是要一个“算得准”但慢吞吞的模型,还是要一个“算得快”但偶尔会出点小错的系统?这个抉择ÿ…...
CoQMoE:面向FPGA的MoE-ViT量化与硬件协同设计实践
1. 项目概述:当视觉Transformer遇上FPGA,为何需要“协同设计”?最近几年,视觉Transformer(ViT)在图像识别、目标检测等任务上展现出了不输甚至超越传统卷积神经网络(CNN)的性能。但随…...
机器学习优化算法在激光等离子体加速实验中的应用与选型指南
1. 项目概述:当机器学习算法遇见激光等离子体加速在激光等离子体加速(Laser Wakefield Acceleration, LWFA)这类前沿物理实验中,我们常常面临一个经典难题:如何从一堆相互耦合、影响复杂的实验参数中,快速、…...
模块化AI:从大脑启示到工程实践,构建高效智能系统的核心范式
1. 引言:为什么我们需要重新审视“模块化”?在人工智能领域,我们正处在一个看似矛盾的时代。一方面,以大型语言模型(LLM)和深度神经网络(DNN)为代表的“单体巨兽”展现出了前所未有的…...
AI Agent记忆系统工程:从短期记忆到长期知识的完整架构
为什么"记忆"是Agent工程化的核心难题 在2026年,构建一个能在单次对话中完成复杂任务的AI Agent已经相对成熟——LangGraph、AutoGen等框架提供了完善的工具链。但当我们试图构建一个能够跨会话学习、记住用户偏好、积累领域知识的AI应用时,挑…...
