FlinkCDC的分析和应用代码
前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始,然后讲述其基于Flink的实现原理和代码应用,为下一篇介绍基于Flink开发定制化引擎做铺垫。
一、FlinkCDC应用场景
经常有同事或朋友问,Flink和FlinkCDC有什么区别?
Flink是一个流数据处理计算框架,FlinkCDC是数据采集工具:
Flink应用场景对比的是Storm、Spark;
FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等;
FlinkCDC是Flink社区伙伴对数据采集需求,开发的一个SDK工具,让Flink在数据捕捉场景,使用起来更方便一些。
1.1 CDC的应用场景分析
CDC的英文名是Change Data Capture (变化数据获取);解决的应用场景,是对存储中间件中数据的采集,比如Mysql、Orcle、PGSql、MongoDB等中间件;
采集的方式分为基于查询和基于BinLog两种;
以mysql的数据采集为例:可以通过jdbc批次查询,也可以通过Binlog解析增量数据采集;
两者的一些特性对比如下:

基于查询的CDC直接获取数据,基于binlog的采集需要开启binlog服务。
1.2 FlinkCDC的应用分析
FlinkCDC和Canal实现的应用场景需求是差不多的,都是通过binlog采集增量数据;
但是可用性上的不同是:
对于cannal类似的采集服务需要三步:
- 1.开启mysql的binlog
- 2.将数据写到kafka
- 3.用flink订阅kafka中的数据进行业务需求处理
对于FlinkCDC:只需要在binlog开启后,直接在一个Flink任务内做业务处理(可以写到kafka处理也行);
所有如cannal的采集功能服务,都需要单独维护一套服务,增加了运维负担,FlinkCDC可以当作任务部署到集群,大幅减轻了数据采集的应用难度;
用一个任务就完成这个应用功能:

二、FlinkCDC技术分析与本地操作
2.1 FlinkCDC的技术架构分析
与Canal这些提供服务能力的服务不同,FlinkCdc只是一个任务,可以简单的开发和部署。
Flink是借用了Debezium的功能,Debezium是一个可轻量级嵌入代码逻辑的服务,将Debezium的采集功能,用Flink的sourceFunction包装,然后打包成SDK提供给Flink开发使用;
借用Flink自己的算子和sink能力,可以将采集到的数据以Flink的特性加工数据,并将数据写入Flink内置的connect组件,sink到服务里,如Kafka、Pulser、ES、RabbitMQ、MongoDB等。
2.2 本地操作
2.2.1准备mysql数据库表和数据
use flink_test;
#检测binlog是否开启
show variables like '%log_bin%'#构建测试表
CREATE TABLE `event_info` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`category` varchar(512) DEFAULT NULL,`pv` int DEFAULT 0,`uv` int DEFAULT 0,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;#写入数据
insert into `event_info`(`id`,`name`,`category`,`pv`,`uv`) values
(1,'aaa','nfh',20,8),
(2,'bbb','dfgf',30,2),
(3,'ccc','fsd',40,4),
(4,'ddd','afs',50,7),
(5,'eee','asfa',60,3)
(6,'aaa','nfh',20,8),
(7,'bbb','dfgf',30,2),
(8,'ccc','fsd',40,4),
(9,'ddd','afs',50,7),
(10,'eee','asfa',60,3);
2.2.2 pom文件
注意Flink和FlinkCDC的版本映射,很多显示的可以关联的版本之间是冲突的,这是一个很繁琐的工作,我调试各个版本之间的映射,花了一天左右的时间[求赞求收藏];
下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>changedateDoris</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12</scala.version><java.version>1.8</java.version><flink.version>1.14.5</flink.version><fastjson.version>1.2.62</fastjson.version><hadoop.version>2.8.3</hadoop.version><scope.mode>compile</scope.mode><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!-- springboot 依赖--><!-- flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- Add log dependencies when debugging locally --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><!-- mysql-connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.2.1 </version><exclusions><exclusion><artifactId>flink-shaded-guava</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies><build>
<!-- <filters>-->
<!-- <filter>${project.basedir}/src/main/resources/env/application-${profileActive}.properties</filter>-->
<!-- </filters>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><artifactSet><includes><include>*:*</include></includes><excludes><exclude>org.slf4j:slf4j-api:jar:</exclude></excludes></artifactSet><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.factories</resource></transformer></transformers></configuration></execution></executions></plugin><plugin><artifactId>maven-resources-plugin</artifactId><configuration><encoding>utf-8</encoding><useDefaultDelimiters>true</useDefaultDelimiters><delimiters><delimiter>$[*]</delimiter></delimiters></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>single</goal></goals><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
</project>
2.2.3 java代码
package yto.com.net.demo;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class mysqlBinlogRead {private static final Logger log = LoggerFactory.getLogger(mysqlBinlogRead.class);public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("ip").port(3306).databaseList("flink_test").tableList("flink_test.event_info").username("mysqlUser").password("password").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
// .startupOptions(StartupOptions.earliest()).build();Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8083);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// enable checkpointenv.enableCheckpointing(10000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");SingleOutputStreamOperator<String> process = cdcSource.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String row, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");JSONObject source = rowJson.getJSONObject("source");String table = source.getString("table");}});cdcSource.print("message=:");env.execute("flinkCdc Read message");}}
2.2.4 运行结果
如图所示,已经写入库的结果通过connect获取,增量数据通过binlog获取;
注意:表中的历史数据过多,全量读取的时候将会内存溢出。

相关文章:
FlinkCDC的分析和应用代码
前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算…...
序章 搭建环境篇—准备战士的剑和盾
第一步:安装node.js Node.js 内置了npm,只要安装了node.js,就可以直接使用 npm,官网地址: Download | Node.js 在这里不建议安装最新版本的node.js,可以选跟我一样的版本,node版本v16.13.2 链…...
【C++】vector的使用及模拟实现
目录 一、vector的介绍及使用1.1 介绍vector1.2 vector的使用1.2.1 构造1.2.2 遍历访问1.2.3 容量空间1.2.4 增删查改 二、vector的模拟实现2.1 成员变量2.2 迭代器相关函数2.3 构造-析构-赋值重载2.3.1 无参构造2.3.2 有参构造12.3.3 有参构造22.3.4 拷贝构造2.3.5 赋值重载2.…...
【数据库】sql优化有哪些?从query层面和数据库层面分析
目录 归纳sql本身的优化数据库层面的优化 归纳 这类型问题可以称为:Query Optimization,从清华AI4DB的paper list中,该类问题大致可以分为: Query RewriterCardinality EstimationCost EstimationPlan Optimization 从中文的角…...
nginx基本优化
安装nginx隐藏版本号 查看百度web服务器 [rootcjq11 ~]# curl -I http://www.baidu.com 隐藏nginx服务器版本号 [rootcjq11 ~]# cd /usr/local/src/nginx-1.22.0/ [rootcjq11 nginx-1.22.0]# vim src/core/nginx.h第13、14行修改版本号和服务器名称 [rootcjq11 nginx-1.2…...
软件测试|使用selenium处理单选框和多选框
简介 我们在web自动化测试工作中,经常会遇到对单选框(Radio Buttons)或者多选框(Checkboxes)进行操作的场景,单选框和多选框主要是用于我们做出选择或提交数据。本文将主要介绍selenium对于单选框和多选框…...
openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表
文章目录 openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表概述如何找到算法名称字符串列表?openssl-3.2.0\providers\implementations\include\prov\names.h备注END openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表 概述 进行加解密时, 先…...
vue3中的hook公共函数封装及运用
vue3 中的 hooks 就是函数的一种写法,就是将文件的一些单独功能的js代码进行抽离出来,放到单独的js文件中,或者说是一些可以复用的公共方法/功能 使用Vue3的组合API封装的可复用的功能函数自定义hook的作用类似于vue2中的mixin技术自定义Hook…...
广州市工信局、天河区商务金融局及广州专精特新促进会走访思迈特
2024年1月11日下午,广州市工信局、天河区商务金融局及广州专精特新促进会相关负责人莅临广州思迈特软件总部调研指导,思迈特软件总裁兼COO姚诗成代表公司热情接待,并陪同调研。 调研组实地参观了思迈特软件,深入了解了思迈特发展历…...
vi编辑器显示行号和不显示行号切换命令
文章目录 1.临时生效只需要在vi编辑器里面输入1.1.显示行号1.2.不显示行号 2.永久生效 1.临时生效只需要在vi编辑器里面输入 1.1.显示行号 set number 或者 set nu如下图 1.2.不显示行号 set nonumber 或者 set nonu2.永久生效 首先打开配置文件/etc/vim/vimrc,向文件中添…...
使用 LLVM clang C/C++ 编译器编译 boost 基础框架类库
1、下载 boost 1.84 库的源代码放到待编译目录 2、解压并接入 boost 1.84 库源码的根目录 搜索默认的 clang 版本,WSL 2.0/Ubuntu 18.04 LTS 为 clang 6.x 执行命令: ./bootstrap.sh --with-toolsetclang ./b2 toolsetclang 另外一个方法比较麻烦需要…...
推荐一款.NET开发的物联网开源项目
物联网(IoT)是一个正在快速发展的技术领域,它涉及到各种设备、物体和系统的互联。所以各种物联网平台和物联网网关项目层出不穷,在物联网(IoT)领域,.NET平台扮演着重要的角色。作为一款广泛使用…...
正则表达式 (用于灵活匹配文本的表达式)
目录 . * . 用于匹配任意单个字符,除了换行符。 例如使用正则表达式 a.b, 它可以匹配aab、acb、a#b * 用于匹配前一个字符零次或多次。 例如,使用正则表达式 ab*c,它可以匹配 "ac"、"abc"、"abbc"&#…...
基于4G数采终端的供热管网在线监测方案
我国大部地区全面进入到冬季,北方各地已开启冬季供暖,以保障居民生活所需。由于城市化的发展,城市内各供热区域愈发分散、供热管道漫长、供热环境复杂,对于供热管网及换热站点的监测和维护提出了诸多挑战。 方案介绍 针对提高供热…...
OPC UA 开源库编译方法及通过OPC UA连接西门S7-1200 PLC通信并进行数据交换[一]
前言 在现代工业自动化领域,OPC UA(开放性生产控制和统一架构)是一种广泛应用的通信协议。本文将以通俗易懂的方式解释OPC UA的含义和作用,帮助读者更好地理解这一概念。 一、OPC UA的定义 OPC UA全称为“开放性生产控制和统一…...
2019年认证杯SPSSPRO杯数学建模B题(第二阶段)外星语词典全过程文档及程序
2019年认证杯SPSSPRO杯数学建模 基于统计和迭代匹配的未知语言文本片段提取模型 B题 外星语词典 原题再现: 我们发现了一种未知的语言,现只知道其文字是以 20 个字母构成的。我们已经获取了许多段由该语言写成的文本,但每段文本只是由字母…...
NFS的共享与挂载
一、NFS网络文件服务 1.1简介 NFS(Network File System 网络文件服务) 文件系统(软件)文件的权限 NFS 是一种基于 TCP/IP 传输的网络文件系统协议,最初由 Sun 公司开发。 通过使用 NFS 协议,客户机可以像访…...
函数式编程的Java编码实践:利用惰性写出高性能且抽象的代码
转载链接 本文会以惰性加载为例一步步介绍函数式编程中各种概念,所以读者不需要任何函数式编程的基础,只需要对 Java 8 有些许了解即可。 一 抽象一定会导致代码性能降低? 程序员的梦想就是能写出 “高内聚,低耦合”的代码&…...
2024年甘肃省职业院校技能大赛信息安全管理与评估 样题二 模块二
竞赛需要完成三个阶段的任务,分别完成三个模块,总分共计 1000分。三个模块内容和分值分别是: 1.第一阶段:模块一 网络平台搭建与设备安全防护(180 分钟,300 分)。 2.第二阶段:模块二…...
mysql 一对多 合并多个通过 逗号拼接展示
mysql 一对多 合并多个通过 逗号拼接展示 以上内容由chatgpt中文网 动态生成 SELECTuser_id,project_id,GROUP_CONCAT(project_specs_id) AS merged_specs_ids FROMzt_medication_specs_total WHEREspecs_num_total < 5 GROUP BYuser_id, project_id;laravel model 对应写…...
Day131 | 灵神 | 回溯算法 | 子集型 子集
Day131 | 灵神 | 回溯算法 | 子集型 子集 78.子集 78. 子集 - 力扣(LeetCode) 思路: 笔者写过很多次这道题了,不想写题解了,大家看灵神讲解吧 回溯算法套路①子集型回溯【基础算法精讲 14】_哔哩哔哩_bilibili 完…...
【解密LSTM、GRU如何解决传统RNN梯度消失问题】
解密LSTM与GRU:如何让RNN变得更聪明? 在深度学习的世界里,循环神经网络(RNN)以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而,传统RNN存在的一个严重问题——梯度消失&#…...
屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!
5月28日,中天合创屋面分布式光伏发电项目顺利并网发电,该项目位于内蒙古自治区鄂尔多斯市乌审旗,项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站,总装机容量为9.96MWp。 项目投运后,每年可节约标煤3670…...
Neo4j 集群管理:原理、技术与最佳实践深度解析
Neo4j 的集群技术是其企业级高可用性、可扩展性和容错能力的核心。通过深入分析官方文档,本文将系统阐述其集群管理的核心原理、关键技术、实用技巧和行业最佳实践。 Neo4j 的 Causal Clustering 架构提供了一个强大而灵活的基石,用于构建高可用、可扩展且一致的图数据库服务…...
云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...
tomcat指定使用的jdk版本
说明 有时候需要对tomcat配置指定的jdk版本号,此时,我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...
【无标题】湖北理元理律师事务所:债务优化中的生活保障与法律平衡之道
文/法律实务观察组 在债务重组领域,专业机构的核心价值不仅在于减轻债务数字,更在于帮助债务人在履行义务的同时维持基本生活尊严。湖北理元理律师事务所的服务实践表明,合法债务优化需同步实现三重平衡: 法律刚性(债…...
软件工程 期末复习
瀑布模型:计划 螺旋模型:风险低 原型模型: 用户反馈 喷泉模型:代码复用 高内聚 低耦合:模块内部功能紧密 模块之间依赖程度小 高内聚:指的是一个模块内部的功能应该紧密相关。换句话说,一个模块应当只实现单一的功能…...
Neko虚拟浏览器远程协作方案:Docker+内网穿透技术部署实践
前言:本文将向开发者介绍一款创新性协作工具——Neko虚拟浏览器。在数字化协作场景中,跨地域的团队常需面对实时共享屏幕、协同编辑文档等需求。通过本指南,你将掌握在Ubuntu系统中使用容器化技术部署该工具的具体方案,并结合内网…...
