当前位置: 首页 > news >正文

FlinkCDC的分析和应用代码

前言:原本想讲如何基于Flink实现定制化计算引擎的开发,并以FlinkCDC为例介绍;发现这两个在表达上不知以谁为主,所以先分析FlinkCDC的应用场景和技术实现原理,下一篇再去分析Flink能在哪些方面,做定制化计算引擎的开发操作。本文将从FlinkCDC应用场景开始,然后讲述其基于Flink的实现原理和代码应用,为下一篇介绍基于Flink开发定制化引擎做铺垫。

一、FlinkCDC应用场景

经常有同事或朋友问,Flink和FlinkCDC有什么区别?

Flink是一个流数据处理计算框架,FlinkCDC是数据采集工具:

Flink应用场景对比的是Storm、Spark;

FlinkCDC应用场景对比的是Sqoop、Canal、Maxwell和KafkaConnectSource、Debezium等;

FlinkCDC是Flink社区伙伴对数据采集需求,开发的一个SDK工具,让Flink在数据捕捉场景,使用起来更方便一些。

1.1 CDC的应用场景分析

       CDC的英文名是Change Data Capture (变化数据获取);解决的应用场景,是对存储中间件中数据的采集,比如Mysql、Orcle、PGSql、MongoDB等中间件;

采集的方式分为基于查询和基于BinLog两种;

以mysql的数据采集为例:可以通过jdbc批次查询,也可以通过Binlog解析增量数据采集;

两者的一些特性对比如下:

基于查询的CDC直接获取数据,基于binlog的采集需要开启binlog服务。

1.2 FlinkCDC的应用分析

       FlinkCDC和Canal实现的应用场景需求是差不多的,都是通过binlog采集增量数据;

但是可用性上的不同是:  

对于cannal类似的采集服务需要三步:

  • 1.开启mysql的binlog
  • 2.将数据写到kafka
  • 3.用flink订阅kafka中的数据进行业务需求处理

对于FlinkCDC:只需要在binlog开启后,直接在一个Flink任务内做业务处理(可以写到kafka处理也行);

       所有如cannal的采集功能服务,都需要单独维护一套服务,增加了运维负担,FlinkCDC可以当作任务部署到集群,大幅减轻了数据采集的应用难度;

用一个任务就完成这个应用功能:

二、FlinkCDC技术分析与本地操作

2.1 FlinkCDC的技术架构分析

       与Canal这些提供服务能力的服务不同,FlinkCdc只是一个任务,可以简单的开发和部署。

       Flink是借用了Debezium的功能,Debezium是一个可轻量级嵌入代码逻辑的服务,将Debezium的采集功能,用Flink的sourceFunction包装,然后打包成SDK提供给Flink开发使用;

       借用Flink自己的算子和sink能力,可以将采集到的数据以Flink的特性加工数据,并将数据写入Flink内置的connect组件,sink到服务里,如Kafka、Pulser、ES、RabbitMQ、MongoDB等。

2.2 本地操作
2.2.1准备mysql数据库表和数据
use flink_test;
#检测binlog是否开启
show variables like '%log_bin%'#构建测试表
CREATE TABLE `event_info` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`category` varchar(512) DEFAULT NULL,`pv` int DEFAULT 0,`uv` int DEFAULT 0,PRIMARY KEY (`id`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;#写入数据
insert  into `event_info`(`id`,`name`,`category`,`pv`,`uv`) values 
(1,'aaa','nfh',20,8),
(2,'bbb','dfgf',30,2),
(3,'ccc','fsd',40,4),
(4,'ddd','afs',50,7),
(5,'eee','asfa',60,3)
(6,'aaa','nfh',20,8),
(7,'bbb','dfgf',30,2),
(8,'ccc','fsd',40,4),
(9,'ddd','afs',50,7),
(10,'eee','asfa',60,3);
2.2.2 pom文件

       注意Flink和FlinkCDC的版本映射,很多显示的可以关联的版本之间是冲突的,这是一个很繁琐的工作,我调试各个版本之间的映射,花了一天左右的时间[求赞求收藏];

下面这是Flink1.14.5版本和FlinkCDC2.2.1版本已经调好的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>changedateDoris</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.version>2.12</scala.version><java.version>1.8</java.version><flink.version>1.14.5</flink.version><fastjson.version>1.2.62</fastjson.version><hadoop.version>2.8.3</hadoop.version><scope.mode>compile</scope.mode><slf4j.version>1.7.30</slf4j.version></properties><dependencies><!--        springboot 依赖--><!-- flink  --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- Add log dependencies when debugging locally --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><!-- mysql-connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.12</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.2.1 </version><exclusions><exclusion><artifactId>flink-shaded-guava</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency></dependencies><build>
<!--        <filters>-->
<!--            <filter>${project.basedir}/src/main/resources/env/application-${profileActive}.properties</filter>-->
<!--        </filters>--><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><artifactSet><includes><include>*:*</include></includes><excludes><exclude>org.slf4j:slf4j-api:jar:</exclude></excludes></artifactSet><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.factories</resource></transformer></transformers></configuration></execution></executions></plugin><plugin><artifactId>maven-resources-plugin</artifactId><configuration><encoding>utf-8</encoding><useDefaultDelimiters>true</useDefaultDelimiters><delimiters><delimiter>$[*]</delimiter></delimiters></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><executions><execution><phase>package</phase><goals><goal>single</goal></goals><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>
</project>
2.2.3 java代码
package yto.com.net.demo;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class mysqlBinlogRead {private static final Logger log = LoggerFactory.getLogger(mysqlBinlogRead.class);public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("ip").port(3306).databaseList("flink_test").tableList("flink_test.event_info").username("mysqlUser").password("password").deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
//                .startupOptions(StartupOptions.earliest()).build();Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8083);StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// enable checkpointenv.enableCheckpointing(10000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");SingleOutputStreamOperator<String> process = cdcSource.process(new ProcessFunction<String, String>() {@Overridepublic void processElement(String row, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");JSONObject source = rowJson.getJSONObject("source");String table = source.getString("table");}});cdcSource.print("message=:");env.execute("flinkCdc Read message");}}
2.2.4 运行结果

如图所示,已经写入库的结果通过connect获取,增量数据通过binlog获取;

注意:表中的历史数据过多,全量读取的时候将会内存溢出。

相关文章:

FlinkCDC的分析和应用代码

前言&#xff1a;原本想讲如何基于Flink实现定制化计算引擎的开发&#xff0c;并以FlinkCDC为例介绍&#xff1b;发现这两个在表达上不知以谁为主&#xff0c;所以先分析FlinkCDC的应用场景和技术实现原理&#xff0c;下一篇再去分析Flink能在哪些方面&#xff0c;做定制化计算…...

序章 搭建环境篇—准备战士的剑和盾

第一步&#xff1a;安装node.js Node.js 内置了npm&#xff0c;只要安装了node.js&#xff0c;就可以直接使用 npm&#xff0c;官网地址&#xff1a; Download | Node.js 在这里不建议安装最新版本的node.js&#xff0c;可以选跟我一样的版本&#xff0c;node版本v16.13.2 链…...

【C++】vector的使用及模拟实现

目录 一、vector的介绍及使用1.1 介绍vector1.2 vector的使用1.2.1 构造1.2.2 遍历访问1.2.3 容量空间1.2.4 增删查改 二、vector的模拟实现2.1 成员变量2.2 迭代器相关函数2.3 构造-析构-赋值重载2.3.1 无参构造2.3.2 有参构造12.3.3 有参构造22.3.4 拷贝构造2.3.5 赋值重载2.…...

【数据库】sql优化有哪些?从query层面和数据库层面分析

目录 归纳sql本身的优化数据库层面的优化 归纳 这类型问题可以称为&#xff1a;Query Optimization&#xff0c;从清华AI4DB的paper list中&#xff0c;该类问题大致可以分为&#xff1a; Query RewriterCardinality EstimationCost EstimationPlan Optimization 从中文的角…...

nginx基本优化

安装nginx隐藏版本号 查看百度web服务器 [rootcjq11 ~]# curl -I http://www.baidu.com 隐藏nginx服务器版本号 [rootcjq11 ~]# cd /usr/local/src/nginx-1.22.0/ [rootcjq11 nginx-1.22.0]# vim src/core/nginx.h第13、14行修改版本号和服务器名称 [rootcjq11 nginx-1.2…...

软件测试|使用selenium处理单选框和多选框

简介 我们在web自动化测试工作中&#xff0c;经常会遇到对单选框&#xff08;Radio Buttons&#xff09;或者多选框&#xff08;Checkboxes&#xff09;进行操作的场景&#xff0c;单选框和多选框主要是用于我们做出选择或提交数据。本文将主要介绍selenium对于单选框和多选框…...

openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表

文章目录 openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表概述如何找到算法名称字符串列表?openssl-3.2.0\providers\implementations\include\prov\names.h备注END openssl3.2 - EVP_CIPHER_fetch算法名称字符串(参数2)的有效值列表 概述 进行加解密时, 先…...

vue3中的hook公共函数封装及运用

vue3 中的 hooks 就是函数的一种写法&#xff0c;就是将文件的一些单独功能的js代码进行抽离出来&#xff0c;放到单独的js文件中&#xff0c;或者说是一些可以复用的公共方法/功能 使用Vue3的组合API封装的可复用的功能函数自定义hook的作用类似于vue2中的mixin技术自定义Hook…...

广州市工信局、天河区商务金融局及广州专精特新促进会走访思迈特

2024年1月11日下午&#xff0c;广州市工信局、天河区商务金融局及广州专精特新促进会相关负责人莅临广州思迈特软件总部调研指导&#xff0c;思迈特软件总裁兼COO姚诗成代表公司热情接待&#xff0c;并陪同调研。 调研组实地参观了思迈特软件&#xff0c;深入了解了思迈特发展历…...

vi编辑器显示行号和不显示行号切换命令

文章目录 1.临时生效只需要在vi编辑器里面输入1.1.显示行号1.2.不显示行号 2.永久生效 1.临时生效只需要在vi编辑器里面输入 1.1.显示行号 set number 或者 set nu如下图 1.2.不显示行号 set nonumber 或者 set nonu2.永久生效 首先打开配置文件/etc/vim/vimrc,向文件中添…...

使用 LLVM clang C/C++ 编译器编译 boost 基础框架类库

1、下载 boost 1.84 库的源代码放到待编译目录 2、解压并接入 boost 1.84 库源码的根目录 搜索默认的 clang 版本&#xff0c;WSL 2.0/Ubuntu 18.04 LTS 为 clang 6.x 执行命令&#xff1a; ./bootstrap.sh --with-toolsetclang ./b2 toolsetclang 另外一个方法比较麻烦需要…...

推荐一款.NET开发的物联网开源项目

物联网&#xff08;IoT&#xff09;是一个正在快速发展的技术领域&#xff0c;它涉及到各种设备、物体和系统的互联。所以各种物联网平台和物联网网关项目层出不穷&#xff0c;在物联网&#xff08;IoT&#xff09;领域&#xff0c;.NET平台扮演着重要的角色。作为一款广泛使用…...

正则表达式 (用于灵活匹配文本的表达式)

目录 . * . 用于匹配任意单个字符&#xff0c;除了换行符。 例如使用正则表达式 a.b, 它可以匹配aab、acb、a#b * 用于匹配前一个字符零次或多次。 例如&#xff0c;使用正则表达式 ab*c&#xff0c;它可以匹配 "ac"、"abc"、"abbc"&#…...

基于4G数采终端的供热管网在线监测方案

我国大部地区全面进入到冬季&#xff0c;北方各地已开启冬季供暖&#xff0c;以保障居民生活所需。由于城市化的发展&#xff0c;城市内各供热区域愈发分散、供热管道漫长、供热环境复杂&#xff0c;对于供热管网及换热站点的监测和维护提出了诸多挑战。 方案介绍 针对提高供热…...

OPC UA 开源库编译方法及通过OPC UA连接西门S7-1200 PLC通信并进行数据交换[一]

前言 在现代工业自动化领域&#xff0c;OPC UA&#xff08;开放性生产控制和统一架构&#xff09;是一种广泛应用的通信协议。本文将以通俗易懂的方式解释OPC UA的含义和作用&#xff0c;帮助读者更好地理解这一概念。 一、OPC UA的定义 OPC UA全称为“开放性生产控制和统一…...

2019年认证杯SPSSPRO杯数学建模B题(第二阶段)外星语词典全过程文档及程序

2019年认证杯SPSSPRO杯数学建模 基于统计和迭代匹配的未知语言文本片段提取模型 B题 外星语词典 原题再现&#xff1a; 我们发现了一种未知的语言&#xff0c;现只知道其文字是以 20 个字母构成的。我们已经获取了许多段由该语言写成的文本&#xff0c;但每段文本只是由字母…...

NFS的共享与挂载

一、NFS网络文件服务 1.1简介 NFS&#xff08;Network File System 网络文件服务&#xff09; 文件系统&#xff08;软件&#xff09;文件的权限 NFS 是一种基于 TCP/IP 传输的网络文件系统协议&#xff0c;最初由 Sun 公司开发。 通过使用 NFS 协议&#xff0c;客户机可以像访…...

函数式编程的Java编码实践:利用惰性写出高性能且抽象的代码

转载链接 本文会以惰性加载为例一步步介绍函数式编程中各种概念&#xff0c;所以读者不需要任何函数式编程的基础&#xff0c;只需要对 Java 8 有些许了解即可。 一 抽象一定会导致代码性能降低&#xff1f; 程序员的梦想就是能写出 “高内聚&#xff0c;低耦合”的代码&…...

2024年甘肃省职业院校技能大赛信息安全管理与评估 样题二 模块二

竞赛需要完成三个阶段的任务&#xff0c;分别完成三个模块&#xff0c;总分共计 1000分。三个模块内容和分值分别是&#xff1a; 1.第一阶段&#xff1a;模块一 网络平台搭建与设备安全防护&#xff08;180 分钟&#xff0c;300 分&#xff09;。 2.第二阶段&#xff1a;模块二…...

mysql 一对多 合并多个通过 逗号拼接展示

mysql 一对多 合并多个通过 逗号拼接展示 以上内容由chatgpt中文网 动态生成 SELECTuser_id,project_id,GROUP_CONCAT(project_specs_id) AS merged_specs_ids FROMzt_medication_specs_total WHEREspecs_num_total < 5 GROUP BYuser_id, project_id;laravel model 对应写…...

网络六边形受到攻击

大家读完觉得有帮助记得关注和点赞&#xff01;&#xff01;&#xff01; 抽象 现代智能交通系统 &#xff08;ITS&#xff09; 的一个关键要求是能够以安全、可靠和匿名的方式从互联车辆和移动设备收集地理参考数据。Nexagon 协议建立在 IETF 定位器/ID 分离协议 &#xff08;…...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

(十)学生端搭建

本次旨在将之前的已完成的部分功能进行拼装到学生端&#xff0c;同时完善学生端的构建。本次工作主要包括&#xff1a; 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

转转集团旗下首家二手多品类循环仓店“超级转转”开业

6月9日&#xff0c;国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解&#xff0c;“超级…...

多种风格导航菜单 HTML 实现(附源码)

下面我将为您展示 6 种不同风格的导航菜单实现&#xff0c;每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

多模态大语言模型arxiv论文略读(108)

CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文标题&#xff1a;CROME: Cross-Modal Adapters for Efficient Multimodal LLM ➡️ 论文作者&#xff1a;Sayna Ebrahimi, Sercan O. Arik, Tejas Nama, Tomas Pfister ➡️ 研究机构: Google Cloud AI Re…...

jmeter聚合报告中参数详解

sample、average、min、max、90%line、95%line,99%line、Error错误率、吞吐量Thoughput、KB/sec每秒传输的数据量 sample&#xff08;样本数&#xff09; 表示测试中发送的请求数量&#xff0c;即测试执行了多少次请求。 单位&#xff0c;以个或者次数表示。 示例&#xff1a;…...

0x-3-Oracle 23 ai-sqlcl 25.1 集成安装-配置和优化

是不是受够了安装了oracle database之后sqlplus的简陋&#xff0c;无法删除无法上下翻页的苦恼。 可以安装readline和rlwrap插件的话&#xff0c;配置.bahs_profile后也能解决上下翻页这些&#xff0c;但是很多生产环境无法安装rpm包。 oracle提供了sqlcl免费许可&#xff0c…...

土建施工员考试:建筑施工技术重点知识有哪些?

《管理实务》是土建施工员考试中侧重实操应用与管理能力的科目&#xff0c;核心考查施工组织、质量安全、进度成本等现场管理要点。以下是结合考试大纲与高频考点整理的重点内容&#xff0c;附学习方向和应试技巧&#xff1a; 一、施工组织与进度管理 核心目标&#xff1a; 规…...