FlinkCDC的分析和应用代码
前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始,然后讲述其基于Flink的实现原理和代码应用,为下一篇介绍基于Flink开发定制化引擎做铺垫。
一、FlinkCDC应用场景
经常有同事或朋友问,Flink和FlinkCDC有什么区别?
Flink是一个流数据处理计算框架,FlinkCDC是数据采集工具:
Flink应用场景对比的是Storm、Spark;
FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等;
FlinkCDC是Flink社区伙伴对数据采集需求,开发的一个SDK工具,让Flink在数据捕捉场景,使用起来更方便一些。
1.1 CDC的应用场景分析
CDC的英文名是Change Data Capture (变化数据获取);解决的应用场景,是对存储中间件中数据的采集,比如Mysql、Orcle、PGSql、MongoDB等中间件;
采集的方式分为基于查询和基于BinLog两种;
以mysql的数据采集为例:可以通过jdbc批次查询,也可以通过Binlog解析增量数据采集;
两者的一些特性对比如下:

基于查询的CDC直接获取数据,基于binlog的采集需要开启binlog服务。
1.2 FlinkCDC的应用分析
FlinkCDC和Canal实现的应用场景需求是差不多的,都是通过binlog采集增量数据;
但是可用性上的不同是:
对于cannal类似的采集服务需要三步:
- 1.开启mysql的binlog
- 2.将数据写到kafka
- 3.用flink订阅kafka中的数据进行业务需求处理
对于FlinkCDC:只需要在binlog开启后,直接在一个Flink任务内做业务处理(可以写到kafka处理也行);
所有如cannal的采集功能服务,都需要单独维护一套服务,增加了运维负担,FlinkCDC可以当作任务部署到集群,大幅减轻了数据采集的应用难度;
用一个任务就完成这个应用功能:

二、FlinkCDC技术分析与本地操作
2.1 FlinkCDC的技术架构分析
与Canal这些提供服务能力的服务不同,FlinkCdc只是一个任务,可以简单的开发和部署。
Flink是借用了Debezium的功能,Debezium是一个可轻量级嵌入代码逻辑的服务,将Debezium的采集功能,用Flink的sourceFunction包装,然后打包成SDK提供给Flink开发使用;
借用Flink自己的算子和sink能力,可以将采集到的数据以Flink的特性加工数据,并将数据写入Flink内置的connect组件,sink到服务里,如Kafka、Pulser、ES、RabbitMQ、MongoDB等。
2.2 本地操作
2.2.1准备mysql数据库表和数据
use flink_test;
#检测binlog是否开启
show variables like '%log_bin%'#构建测试表
CREATE TABLE `event_info` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`category` varchar(512) DEFAULT NULL,`pv` int DEFAULT 0,`uv` int DEFAULT 0,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;#写入数据
insert into `event_info`(`id`,`name`,`category`,`pv`,`uv`) values
(1,'aaa','nfh',20,8),
(2,'bbb','dfgf',30,2),
(3,'ccc','fsd',40,4),
(4,'ddd','afs',50,7),
(5,'eee','asfa',60,3)
(6,'aaa','nfh',20,8),
(7,'bbb','dfgf',30,2),
(8,'ccc','fsd',40,4),
(9,'ddd','afs',50,7),
(10,'eee','asfa',60,3);
2.2.2 pom文件
注意Flink和FlinkCDC的版本映射,很多显示的可以关联的版本之间是冲突的,这是一个很繁琐的工作,我调试各个版本之间的映射,花了一天左右的时间[求赞求收藏];
下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>changedateDoris</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12</scala.version><java.version>1.8</java.version><flink.version>1.14.5</flink.version><fastjson.version>1.2.62</fastjson.version><hadoop.version>2.8.3</hadoop.version><scope.mode>compile</scope.mode><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!-- springboot 依赖--><!-- flink --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- Add log dependencies when debugging locally --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><!-- mysql-connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.2.1 </version><exclusions><exclusion><artifactId>flink-shaded-guava</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies><build>
<!-- <filters>-->
<!-- <filter>${project.basedir}/src/main/resources/env/application-${profileActive}.properties</filter>-->
<!-- </filters>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><artifactSet><includes><include>*:*</include></includes><excludes><exclude>org.slf4j:slf4j-api:jar:</exclude></excludes></artifactSet><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.factories</resource></transformer></transformers></configuration></execution></executions></plugin><plugin><artifactId>maven-resources-plugin</artifactId><configuration><encoding>utf-8</encoding><useDefaultDelimiters>true</useDefaultDelimiters><delimiters><delimiter>$[*]</delimiter></delimiters></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>single</goal></goals><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
</project>
2.2.3 java代码
package yto.com.net.demo;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class mysqlBinlogRead {private static final Logger log = LoggerFactory.getLogger(mysqlBinlogRead.class);public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("ip").port(3306).databaseList("flink_test").tableList("flink_test.event_info").username("mysqlUser").password("password").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
// .startupOptions(StartupOptions.earliest()).build();Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8083);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// enable checkpointenv.enableCheckpointing(10000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");SingleOutputStreamOperator<String> process = cdcSource.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String row, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");JSONObject source = rowJson.getJSONObject("source");String table = source.getString("table");}});cdcSource.print("message=:");env.execute("flinkCdc Read message");}}
2.2.4 运行结果
如图所示,已经写入库的结果通过connect获取,增量数据通过binlog获取;
注意:表中的历史数据过多,全量读取的时候将会内存溢出。

相关文章:
FlinkCDC的分析和应用代码
前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算…...
序章 搭建环境篇—准备战士的剑和盾
第一步:安装node.js Node.js 内置了npm,只要安装了node.js,就可以直接使用 npm,官网地址: Download | Node.js 在这里不建议安装最新版本的node.js,可以选跟我一样的版本,node版本v16.13.2 链…...
【C++】vector的使用及模拟实现
目录 一、vector的介绍及使用1.1 介绍vector1.2 vector的使用1.2.1 构造1.2.2 遍历访问1.2.3 容量空间1.2.4 增删查改 二、vector的模拟实现2.1 成员变量2.2 迭代器相关函数2.3 构造-析构-赋值重载2.3.1 无参构造2.3.2 有参构造12.3.3 有参构造22.3.4 拷贝构造2.3.5 赋值重载2.…...
【数据库】sql优化有哪些?从query层面和数据库层面分析
目录 归纳sql本身的优化数据库层面的优化 归纳 这类型问题可以称为:Query Optimization,从清华AI4DB的paper list中,该类问题大致可以分为: Query RewriterCardinality EstimationCost EstimationPlan Optimization 从中文的角…...
nginx基本优化
安装nginx隐藏版本号 查看百度web服务器 [rootcjq11 ~]# curl -I http://www.baidu.com 隐藏nginx服务器版本号 [rootcjq11 ~]# cd /usr/local/src/nginx-1.22.0/ [rootcjq11 nginx-1.22.0]# vim src/core/nginx.h第13、14行修改版本号和服务器名称 [rootcjq11 nginx-1.2…...
软件测试|使用selenium处理单选框和多选框
简介 我们在web自动化测试工作中,经常会遇到对单选框(Radio Buttons)或者多选框(Checkboxes)进行操作的场景,单选框和多选框主要是用于我们做出选择或提交数据。本文将主要介绍selenium对于单选框和多选框…...
openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表
文章目录 openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表概述如何找到算法名称字符串列表?openssl-3.2.0\providers\implementations\include\prov\names.h备注END openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表 概述 进行加解密时, 先…...
vue3中的hook公共函数封装及运用
vue3 中的 hooks 就是函数的一种写法,就是将文件的一些单独功能的js代码进行抽离出来,放到单独的js文件中,或者说是一些可以复用的公共方法/功能 使用Vue3的组合API封装的可复用的功能函数自定义hook的作用类似于vue2中的mixin技术自定义Hook…...
广州市工信局、天河区商务金融局及广州专精特新促进会走访思迈特
2024年1月11日下午,广州市工信局、天河区商务金融局及广州专精特新促进会相关负责人莅临广州思迈特软件总部调研指导,思迈特软件总裁兼COO姚诗成代表公司热情接待,并陪同调研。 调研组实地参观了思迈特软件,深入了解了思迈特发展历…...
vi编辑器显示行号和不显示行号切换命令
文章目录 1.临时生效只需要在vi编辑器里面输入1.1.显示行号1.2.不显示行号 2.永久生效 1.临时生效只需要在vi编辑器里面输入 1.1.显示行号 set number 或者 set nu如下图 1.2.不显示行号 set nonumber 或者 set nonu2.永久生效 首先打开配置文件/etc/vim/vimrc,向文件中添…...
使用 LLVM clang C/C++ 编译器编译 boost 基础框架类库
1、下载 boost 1.84 库的源代码放到待编译目录 2、解压并接入 boost 1.84 库源码的根目录 搜索默认的 clang 版本,WSL 2.0/Ubuntu 18.04 LTS 为 clang 6.x 执行命令: ./bootstrap.sh --with-toolsetclang ./b2 toolsetclang 另外一个方法比较麻烦需要…...
推荐一款.NET开发的物联网开源项目
物联网(IoT)是一个正在快速发展的技术领域,它涉及到各种设备、物体和系统的互联。所以各种物联网平台和物联网网关项目层出不穷,在物联网(IoT)领域,.NET平台扮演着重要的角色。作为一款广泛使用…...
正则表达式 (用于灵活匹配文本的表达式)
目录 . * . 用于匹配任意单个字符,除了换行符。 例如使用正则表达式 a.b, 它可以匹配aab、acb、a#b * 用于匹配前一个字符零次或多次。 例如,使用正则表达式 ab*c,它可以匹配 "ac"、"abc"、"abbc"&#…...
基于4G数采终端的供热管网在线监测方案
我国大部地区全面进入到冬季,北方各地已开启冬季供暖,以保障居民生活所需。由于城市化的发展,城市内各供热区域愈发分散、供热管道漫长、供热环境复杂,对于供热管网及换热站点的监测和维护提出了诸多挑战。 方案介绍 针对提高供热…...
OPC UA 开源库编译方法及通过OPC UA连接西门S7-1200 PLC通信并进行数据交换[一]
前言 在现代工业自动化领域,OPC UA(开放性生产控制和统一架构)是一种广泛应用的通信协议。本文将以通俗易懂的方式解释OPC UA的含义和作用,帮助读者更好地理解这一概念。 一、OPC UA的定义 OPC UA全称为“开放性生产控制和统一…...
2019年认证杯SPSSPRO杯数学建模B题(第二阶段)外星语词典全过程文档及程序
2019年认证杯SPSSPRO杯数学建模 基于统计和迭代匹配的未知语言文本片段提取模型 B题 外星语词典 原题再现: 我们发现了一种未知的语言,现只知道其文字是以 20 个字母构成的。我们已经获取了许多段由该语言写成的文本,但每段文本只是由字母…...
NFS的共享与挂载
一、NFS网络文件服务 1.1简介 NFS(Network File System 网络文件服务) 文件系统(软件)文件的权限 NFS 是一种基于 TCP/IP 传输的网络文件系统协议,最初由 Sun 公司开发。 通过使用 NFS 协议,客户机可以像访…...
函数式编程的Java编码实践:利用惰性写出高性能且抽象的代码
转载链接 本文会以惰性加载为例一步步介绍函数式编程中各种概念,所以读者不需要任何函数式编程的基础,只需要对 Java 8 有些许了解即可。 一 抽象一定会导致代码性能降低? 程序员的梦想就是能写出 “高内聚,低耦合”的代码&…...
2024年甘肃省职业院校技能大赛信息安全管理与评估 样题二 模块二
竞赛需要完成三个阶段的任务,分别完成三个模块,总分共计 1000分。三个模块内容和分值分别是: 1.第一阶段:模块一 网络平台搭建与设备安全防护(180 分钟,300 分)。 2.第二阶段:模块二…...
mysql 一对多 合并多个通过 逗号拼接展示
mysql 一对多 合并多个通过 逗号拼接展示 以上内容由chatgpt中文网 动态生成 SELECTuser_id,project_id,GROUP_CONCAT(project_specs_id) AS merged_specs_ids FROMzt_medication_specs_total WHEREspecs_num_total < 5 GROUP BYuser_id, project_id;laravel model 对应写…...
【杂谈】-递归进化:人工智能的自我改进与监管挑战
递归进化:人工智能的自我改进与监管挑战 文章目录 递归进化:人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管?3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...
【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
return this;返回的是谁
一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请,不同级别的经理有不同的审批权限: // 抽象处理者:审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...
Mysql8 忘记密码重置,以及问题解决
1.使用免密登录 找到配置MySQL文件,我的文件路径是/etc/mysql/my.cnf,有的人的是/etc/mysql/mysql.cnf 在里最后加入 skip-grant-tables重启MySQL服务 service mysql restartShutting down MySQL… SUCCESS! Starting MySQL… SUCCESS! 重启成功 2.登…...
08. C#入门系列【类的基本概念】:开启编程世界的奇妙冒险
C#入门系列【类的基本概念】:开启编程世界的奇妙冒险 嘿,各位编程小白探险家!欢迎来到 C# 的奇幻大陆!今天咱们要深入探索这片大陆上至关重要的 “建筑”—— 类!别害怕,跟着我,保准让你轻松搞…...
9-Oracle 23 ai Vector Search 特性 知识准备
很多小伙伴是不是参加了 免费认证课程(限时至2025/5/15) Oracle AI Vector Search 1Z0-184-25考试,都顺利拿到certified了没。 各行各业的AI 大模型的到来,传统的数据库中的SQL还能不能打,结构化和非结构的话数据如何和…...
Docker拉取MySQL后数据库连接失败的解决方案
在使用Docker部署MySQL时,拉取并启动容器后,有时可能会遇到数据库连接失败的问题。这种问题可能由多种原因导致,包括配置错误、网络设置问题、权限问题等。本文将分析可能的原因,并提供解决方案。 一、确认MySQL容器的运行状态 …...
