当前位置: 首页 > news >正文

FlinkCDC的分析和应用代码

前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始,然后讲述其基于Flink的实现原理和代码应用,为下一篇介绍基于Flink开发定制化引擎做铺垫。

一、FlinkCDC应用场景

经常有同事或朋友问,Flink和FlinkCDC有什么区别?

Flink是一个流数据处理计算框架,FlinkCDC是数据采集工具:

Flink应用场景对比的是Storm、Spark;

FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等;

FlinkCDC是Flink社区伙伴对数据采集需求,开发的一个SDK工具,让Flink在数据捕捉场景,使用起来更方便一些。

1.1 CDC的应用场景分析

       CDC的英文名是Change Data Capture (变化数据获取);解决的应用场景,是对存储中间件中数据的采集,比如Mysql、Orcle、PGSql、MongoDB等中间件;

采集的方式分为基于查询和基于BinLog两种;

以mysql的数据采集为例:可以通过jdbc批次查询,也可以通过Binlog解析增量数据采集;

两者的一些特性对比如下:

基于查询的CDC直接获取数据,基于binlog的采集需要开启binlog服务。

1.2 FlinkCDC的应用分析

       FlinkCDC和Canal实现的应用场景需求是差不多的,都是通过binlog采集增量数据;

但是可用性上的不同是:  

对于cannal类似的采集服务需要三步:

  • 1.开启mysql的binlog
  • 2.将数据写到kafka
  • 3.用flink订阅kafka中的数据进行业务需求处理

对于FlinkCDC:只需要在binlog开启后,直接在一个Flink任务内做业务处理(可以写到kafka处理也行);

       所有如cannal的采集功能服务,都需要单独维护一套服务,增加了运维负担,FlinkCDC可以当作任务部署到集群,大幅减轻了数据采集的应用难度;

用一个任务就完成这个应用功能:

二、FlinkCDC技术分析与本地操作

2.1 FlinkCDC的技术架构分析

       与Canal这些提供服务能力的服务不同,FlinkCdc只是一个任务,可以简单的开发和部署。

       Flink是借用了Debezium的功能,Debezium是一个可轻量级嵌入代码逻辑的服务,将Debezium的采集功能,用Flink的sourceFunction包装,然后打包成SDK提供给Flink开发使用;

       借用Flink自己的算子和sink能力,可以将采集到的数据以Flink的特性加工数据,并将数据写入Flink内置的connect组件,sink到服务里,如Kafka、Pulser、ES、RabbitMQ、MongoDB等。

2.2 本地操作
2.2.1准备mysql数据库表和数据
use flink_test;
#检测binlog是否开启
show variables like '%log_bin%'#构建测试表
CREATE TABLE `event_info` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`category` varchar(512) DEFAULT NULL,`pv` int DEFAULT 0,`uv` int DEFAULT 0,PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;#写入数据
insert  into `event_info`(`id`,`name`,`category`,`pv`,`uv`) values 
(1,'aaa','nfh',20,8),
(2,'bbb','dfgf',30,2),
(3,'ccc','fsd',40,4),
(4,'ddd','afs',50,7),
(5,'eee','asfa',60,3)
(6,'aaa','nfh',20,8),
(7,'bbb','dfgf',30,2),
(8,'ccc','fsd',40,4),
(9,'ddd','afs',50,7),
(10,'eee','asfa',60,3);
2.2.2 pom文件

       注意Flink和FlinkCDC的版本映射,很多显示的可以关联的版本之间是冲突的,这是一个很繁琐的工作,我调试各个版本之间的映射,花了一天左右的时间[求赞求收藏];

下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>changedateDoris</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12</scala.version><java.version>1.8</java.version><flink.version>1.14.5</flink.version><fastjson.version>1.2.62</fastjson.version><hadoop.version>2.8.3</hadoop.version><scope.mode>compile</scope.mode><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!--        springboot 依赖--><!-- flink  --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- Add log dependencies when debugging locally --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><!-- mysql-connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.2.1 </version><exclusions><exclusion><artifactId>flink-shaded-guava</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies><build>
<!--        <filters>-->
<!--            <filter>${project.basedir}/src/main/resources/env/application-${profileActive}.properties</filter>-->
<!--        </filters>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><artifactSet><includes><include>*:*</include></includes><excludes><exclude>org.slf4j:slf4j-api:jar:</exclude></excludes></artifactSet><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.factories</resource></transformer></transformers></configuration></execution></executions></plugin><plugin><artifactId>maven-resources-plugin</artifactId><configuration><encoding>utf-8</encoding><useDefaultDelimiters>true</useDefaultDelimiters><delimiters><delimiter>$[*]</delimiter></delimiters></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>single</goal></goals><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
</project>
2.2.3 java代码
package yto.com.net.demo;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class mysqlBinlogRead {private static final Logger log = LoggerFactory.getLogger(mysqlBinlogRead.class);public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("ip").port(3306).databaseList("flink_test").tableList("flink_test.event_info").username("mysqlUser").password("password").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
//                .startupOptions(StartupOptions.earliest()).build();Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8083);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// enable checkpointenv.enableCheckpointing(10000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");SingleOutputStreamOperator<String> process = cdcSource.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String row, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");JSONObject source = rowJson.getJSONObject("source");String table = source.getString("table");}});cdcSource.print("message=:");env.execute("flinkCdc Read message");}}
2.2.4 运行结果

如图所示,已经写入库的结果通过connect获取,增量数据通过binlog获取;

注意:表中的历史数据过多,全量读取的时候将会内存溢出。

相关文章:

FlinkCDC的分析和应用代码

前言&#xff1a;原本想讲如何基于Flink实现定制化计算引擎的开发&#xff0c;并以FlinkCDC为例介绍&#xff1b;发现这两个在表达上不知以谁为主&#xff0c;所以先分析FlinkCDC的应用场景和技术实现原理&#xff0c;下一篇再去分析Flink能在哪些方面&#xff0c;做定制化计算…...

序章 搭建环境篇—准备战士的剑和盾

第一步&#xff1a;安装node.js Node.js 内置了npm&#xff0c;只要安装了node.js&#xff0c;就可以直接使用 npm&#xff0c;官网地址&#xff1a; Download | Node.js 在这里不建议安装最新版本的node.js&#xff0c;可以选跟我一样的版本&#xff0c;node版本v16.13.2 链…...

【C++】vector的使用及模拟实现

目录 一、vector的介绍及使用1.1 介绍vector1.2 vector的使用1.2.1 构造1.2.2 遍历访问1.2.3 容量空间1.2.4 增删查改 二、vector的模拟实现2.1 成员变量2.2 迭代器相关函数2.3 构造-析构-赋值重载2.3.1 无参构造2.3.2 有参构造12.3.3 有参构造22.3.4 拷贝构造2.3.5 赋值重载2.…...

【数据库】sql优化有哪些?从query层面和数据库层面分析

目录 归纳sql本身的优化数据库层面的优化 归纳 这类型问题可以称为&#xff1a;Query Optimization&#xff0c;从清华AI4DB的paper list中&#xff0c;该类问题大致可以分为&#xff1a; Query RewriterCardinality EstimationCost EstimationPlan Optimization 从中文的角…...

nginx基本优化

安装nginx隐藏版本号 查看百度web服务器 [rootcjq11 ~]# curl -I http://www.baidu.com 隐藏nginx服务器版本号 [rootcjq11 ~]# cd /usr/local/src/nginx-1.22.0/ [rootcjq11 nginx-1.22.0]# vim src/core/nginx.h第13、14行修改版本号和服务器名称 [rootcjq11 nginx-1.2…...

软件测试|使用selenium处理单选框和多选框

简介 我们在web自动化测试工作中&#xff0c;经常会遇到对单选框&#xff08;Radio Buttons&#xff09;或者多选框&#xff08;Checkboxes&#xff09;进行操作的场景&#xff0c;单选框和多选框主要是用于我们做出选择或提交数据。本文将主要介绍selenium对于单选框和多选框…...

openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表

文章目录 openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表概述如何找到算法名称字符串列表?openssl-3.2.0\providers\implementations\include\prov\names.h备注END openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表 概述 进行加解密时, 先…...

vue3中的hook公共函数封装及运用

vue3 中的 hooks 就是函数的一种写法&#xff0c;就是将文件的一些单独功能的js代码进行抽离出来&#xff0c;放到单独的js文件中&#xff0c;或者说是一些可以复用的公共方法/功能 使用Vue3的组合API封装的可复用的功能函数自定义hook的作用类似于vue2中的mixin技术自定义Hook…...

广州市工信局、天河区商务金融局及广州专精特新促进会走访思迈特

2024年1月11日下午&#xff0c;广州市工信局、天河区商务金融局及广州专精特新促进会相关负责人莅临广州思迈特软件总部调研指导&#xff0c;思迈特软件总裁兼COO姚诗成代表公司热情接待&#xff0c;并陪同调研。 调研组实地参观了思迈特软件&#xff0c;深入了解了思迈特发展历…...

vi编辑器显示行号和不显示行号切换命令

文章目录 1.临时生效只需要在vi编辑器里面输入1.1.显示行号1.2.不显示行号 2.永久生效 1.临时生效只需要在vi编辑器里面输入 1.1.显示行号 set number 或者 set nu如下图 1.2.不显示行号 set nonumber 或者 set nonu2.永久生效 首先打开配置文件/etc/vim/vimrc,向文件中添…...

使用 LLVM clang C/C++ 编译器编译 boost 基础框架类库

1、下载 boost 1.84 库的源代码放到待编译目录 2、解压并接入 boost 1.84 库源码的根目录 搜索默认的 clang 版本&#xff0c;WSL 2.0/Ubuntu 18.04 LTS 为 clang 6.x 执行命令&#xff1a; ./bootstrap.sh --with-toolsetclang ./b2 toolsetclang 另外一个方法比较麻烦需要…...

推荐一款.NET开发的物联网开源项目

物联网&#xff08;IoT&#xff09;是一个正在快速发展的技术领域&#xff0c;它涉及到各种设备、物体和系统的互联。所以各种物联网平台和物联网网关项目层出不穷&#xff0c;在物联网&#xff08;IoT&#xff09;领域&#xff0c;.NET平台扮演着重要的角色。作为一款广泛使用…...

正则表达式 (用于灵活匹配文本的表达式)

目录 . * . 用于匹配任意单个字符&#xff0c;除了换行符。 例如使用正则表达式 a.b, 它可以匹配aab、acb、a#b * 用于匹配前一个字符零次或多次。 例如&#xff0c;使用正则表达式 ab*c&#xff0c;它可以匹配 "ac"、"abc"、"abbc"&#…...

基于4G数采终端的供热管网在线监测方案

我国大部地区全面进入到冬季&#xff0c;北方各地已开启冬季供暖&#xff0c;以保障居民生活所需。由于城市化的发展&#xff0c;城市内各供热区域愈发分散、供热管道漫长、供热环境复杂&#xff0c;对于供热管网及换热站点的监测和维护提出了诸多挑战。 方案介绍 针对提高供热…...

OPC UA 开源库编译方法及通过OPC UA连接西门S7-1200 PLC通信并进行数据交换[一]

前言 在现代工业自动化领域&#xff0c;OPC UA&#xff08;开放性生产控制和统一架构&#xff09;是一种广泛应用的通信协议。本文将以通俗易懂的方式解释OPC UA的含义和作用&#xff0c;帮助读者更好地理解这一概念。 一、OPC UA的定义 OPC UA全称为“开放性生产控制和统一…...

2019年认证杯SPSSPRO杯数学建模B题(第二阶段)外星语词典全过程文档及程序

2019年认证杯SPSSPRO杯数学建模 基于统计和迭代匹配的未知语言文本片段提取模型 B题 外星语词典 原题再现&#xff1a; 我们发现了一种未知的语言&#xff0c;现只知道其文字是以 20 个字母构成的。我们已经获取了许多段由该语言写成的文本&#xff0c;但每段文本只是由字母…...

NFS的共享与挂载

一、NFS网络文件服务 1.1简介 NFS&#xff08;Network File System 网络文件服务&#xff09; 文件系统&#xff08;软件&#xff09;文件的权限 NFS 是一种基于 TCP/IP 传输的网络文件系统协议&#xff0c;最初由 Sun 公司开发。 通过使用 NFS 协议&#xff0c;客户机可以像访…...

函数式编程的Java编码实践:利用惰性写出高性能且抽象的代码

转载链接 本文会以惰性加载为例一步步介绍函数式编程中各种概念&#xff0c;所以读者不需要任何函数式编程的基础&#xff0c;只需要对 Java 8 有些许了解即可。 一 抽象一定会导致代码性能降低&#xff1f; 程序员的梦想就是能写出 “高内聚&#xff0c;低耦合”的代码&…...

2024年甘肃省职业院校技能大赛信息安全管理与评估 样题二 模块二

竞赛需要完成三个阶段的任务&#xff0c;分别完成三个模块&#xff0c;总分共计 1000分。三个模块内容和分值分别是&#xff1a; 1.第一阶段&#xff1a;模块一 网络平台搭建与设备安全防护&#xff08;180 分钟&#xff0c;300 分&#xff09;。 2.第二阶段&#xff1a;模块二…...

mysql 一对多 合并多个通过 逗号拼接展示

mysql 一对多 合并多个通过 逗号拼接展示 以上内容由chatgpt中文网 动态生成 SELECTuser_id,project_id,GROUP_CONCAT(project_specs_id) AS merged_specs_ids FROMzt_medication_specs_total WHEREspecs_num_total < 5 GROUP BYuser_id, project_id;laravel model 对应写…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

【C语言练习】080. 使用C语言实现简单的数据库操作

080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

dify打造数据可视化图表

一、概述 在日常工作和学习中&#xff0c;我们经常需要和数据打交道。无论是分析报告、项目展示&#xff0c;还是简单的数据洞察&#xff0c;一个清晰直观的图表&#xff0c;往往能胜过千言万语。 一款能让数据可视化变得超级简单的 MCP Server&#xff0c;由蚂蚁集团 AntV 团队…...

ABAP设计模式之---“简单设计原则(Simple Design)”

“Simple Design”&#xff08;简单设计&#xff09;是软件开发中的一个重要理念&#xff0c;倡导以最简单的方式实现软件功能&#xff0c;以确保代码清晰易懂、易维护&#xff0c;并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计&#xff0c;遵循“让事情保…...

重启Eureka集群中的节点,对已经注册的服务有什么影响

先看答案&#xff0c;如果正确地操作&#xff0c;重启Eureka集群中的节点&#xff0c;对已经注册的服务影响非常小&#xff0c;甚至可以做到无感知。 但如果操作不当&#xff0c;可能会引发短暂的服务发现问题。 下面我们从Eureka的核心工作原理来详细分析这个问题。 Eureka的…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

排序算法总结(C++)

目录 一、稳定性二、排序算法选择、冒泡、插入排序归并排序随机快速排序堆排序基数排序计数排序 三、总结 一、稳定性 排序算法的稳定性是指&#xff1a;同样大小的样本 **&#xff08;同样大小的数据&#xff09;**在排序之后不会改变原始的相对次序。 稳定性对基础类型对象…...

华为OD机考-机房布局

import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...