当前位置: 首页 > news >正文

FlinkCDC快速搭建实现数据监控

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sand</groupId><artifactId>flinkcdc</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><!--        <flink.version>1.14.4</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-streaming-java_2.12</artifactId>--><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-scala_2.12</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-clients_2.12</artifactId>--><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.17</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!-- 打印日志的jar包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.10</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.sand.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

数据库配置类

package com.sand;import org.apache.commons.collections.CollectionUtils;import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;/*** @author zdd*/
public class CDCKit {public static void main(String[] args) {String tempDir = System.getProperty("java.io.tmpdir");System.out.println("tempDir = " + tempDir);}/*** 数据库*/private static final String database = "byyy_iowtb_wms_test";/*** 表名*/private static final List<String> tableList = Arrays.asList("inv_tt_stock_info","base_tm_sku","base_tm_third_sku_certificate","base_tm_sku_gsp");/*** ip*/private static final String hostname = "192.168.111.107";/*** 端口*/private static final int port = 3306;/*** 用户名*/private static final String username = "test_cdc";/*** 密码*/private static final String password = "Test_cdc@123";public static String getDatabase() {return database;}public static String getTableList() {if (CollectionUtils.isEmpty(tableList)) {return null;}//,分割StringJoiner stringJoiner = new StringJoiner(",");for (String tableName : tableList) {stringJoiner.add(getDatabase() + "." + tableName);}return stringJoiner.toString();}public static String getHostname() {return hostname;}public static int getPort() {return port;}public static String getUsername() {return username;}public static String getPassword() {return password;}}

监控类

package com.sand;import cn.hutool.core.io.FileUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File;
import java.util.Objects;
import java.util.Properties;public class DataStreamJob {public static void main(String[] args) throws Exception {//获取临时文件目录String tempDir = System.getProperty("java.io.tmpdir");String latestCheckpoint = getLatestCheckpoint();System.out.println("latestCheckpoint = " + latestCheckpoint);Configuration configuration = new Configuration();if(StringUtils.isNotBlank(latestCheckpoint)){configuration.setString("execution.savepoint.path", "file:///" + latestCheckpoint);}StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);//2.1 开启 Checkpoint,每隔 60 秒钟做一次 CKenv.enableCheckpointing(1000L * 60);//2.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("file:///" + tempDir + "ck"));// ck 设置env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Properties properties = new Properties();properties.setProperty("snapshot.locking.mode", "none");properties.setProperty("decimal.handling.mode", "string");MySqlSource<String> sourceFunction = MySqlSource.<String>builder().hostname(CDCKit.getHostname()).port(CDCKit.getPort()).databaseList(CDCKit.getDatabase()).tableList(CDCKit.getTableList()).username(CDCKit.getUsername()).password(CDCKit.getPassword()).scanNewlyAddedTableEnabled(true).deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).debeziumProperties(properties).build();//4.使用 CDC Source 从 MySQL 读取数据env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "mysql-source").addSink(new MysqlSink());//5.打印数据
//        mysqlStream.print();//6.执行任务env.execute();}private static String getLatestCheckpoint() {File ckDir = new File(System.getProperty("java.io.tmpdir") + "ck");File[] files = ckDir.listFiles();if (files == null) {return null;}String path = null;long lastModified = 0;for (File file : files) {//获取文件夹下-chk-开头文件夹-最新的文件夹if (file.isDirectory()) {File[] files1 = file.listFiles();if (files1 == null) {continue;}for (File file1 : files1) {if (!file1.isDirectory() || !file1.getName().startsWith("chk-")) {continue;}if (file1.lastModified() > lastModified) {lastModified = file1.lastModified();path = file1.getAbsolutePath();}}}}//删除其余目录if (StringUtils.isEmpty(path)) {return null;}String tempPath = path.substring(0, path.lastIndexOf("\\"));for (File file : files) {if (file.isDirectory() && !Objects.equals(file.getAbsolutePath(), tempPath)) {FileUtil.del(file);}}return path;}
}

数据处理类

package com.sand;/*** @author zdd*/
public class MysqlSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<String> {@Overridepublic void invoke(String value, org.apache.flink.streaming.api.functions.sink.SinkFunction.Context context) throws Exception {System.out.println("value = " + value);}
}

相关文章:

FlinkCDC快速搭建实现数据监控

引入依赖 <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelV…...

应急布控球远程视频监控方案:视频监控平台EasyCVR+4G/5G应急布控球

随着科技的不断发展&#xff0c;应急布控球远程视频监控方案在公共安全、交通管理、城市管理等领域的应用越来越广泛。这种方案通过在现场部署应急布控球&#xff0c;实现对特定区域的实时监控&#xff0c;有助于及时发现问题、快速响应&#xff0c;提高管理效率。 智慧安防视…...

3.6 C语言和汇编语言混合编程 “每日读书”

在一些嵌入式场合&#xff0c;我们经常看到C程序和汇编程序相互调用&#xff0c;混合编程&#xff0c;如在ARM启动代码中&#xff0c;系统上电首先运行的是汇编代码&#xff0c;等初始化好内存堆栈环境之后&#xff0c;才会跳到C程序中执行&#xff0c;对嵌入式软件进行优化时&…...

利用“定时执行专家”循环执行BAT、VBS、Python脚本——含参数指定功能

目录 一、软件概述 二、VBS脚本执行设置 三、触发器设置 四、功能亮点 五、总结 在自动化办公和日常计算机任务管理中&#xff0c;定时执行脚本是一项非常重要的功能。今天&#xff0c;我将为大家带来一款名为“定时执行专家”的软件的评测&#xff0c;特别是其定时执行VB…...

【算法集训】基础算法:模拟

一、基本理解 顾名思义&#xff0c;就是题目要求做什么&#xff0c;代码中就跟着做就可以。 二、题目练习 1252. 奇数值单元格的数目 根据题目要求列出如下代码。需要注意填充列和行的时候注意下标。 int oddCells(int m, int n, int** indices, int indicesSize, int* in…...

基于SSM的房客源信息管理系统设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 相关技术 3 1.1 SSM框架 3 1.2 Vue框架 3 1.3 ECharts 3 1.4 JQuery技术 3 1.5 本章小结 4 2系统分析 5 2.1 需求分析 5 2.2 非功能需求 8 2.3 本章小节 8 3 系统设计 9 3.1 系统总体设计 9 3.1.1 系统体系结构 9 3.1.2 系统目录结构 9 3…...

常见数据类型

目录 数据类型 字符串 char nchar varchar varchar2 nvarchar 数字 number integer binary_float binary_double float 日期 date timestamp 大文本数据 大对象数据 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 数…...

基于vue的联通积分商城数据可视化APP设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 前端技术介绍 3 1.1 前端开发语言 3 1.1.1 HTML5 3 1.1.2 CSS3 3 1.1.3 JavaScript 3 1.2 MVVM开发模式 4 1.3 Vue框架 4 1.4 Axios技术 5 1.5 ECharts 5 1.6 数据库技术 5 1.7 本章小结 6 2 前端开发的分析 7 2.1 功能性需求分析 7 2.2 …...

2024年flink面试真题(一)

&#xff08;北京&#xff09;taskManager和slot、task的关系 ? &#xff08;北京&#xff09;flink状态太大怎么解决 ? &#xff08;北京 flink提交方式和运行模式 ? &#xff08;北京&#xff09; 怎么提交的实时任务&#xff0c;有多少Job Manager&#xff1f; &…...

Java面试挂在线程创建后续,不要再被八股文误导了!创建线程的方式只有1种

线程创建之源 OK&#xff01;咱们闲话少叙&#xff0c;直接进入正题&#xff0c;回顾一下通过实现Runnable接口&#xff0c;重写run方法创建线程的方式&#xff0c;真的可以创建一个线程吗&#xff1f;来看下面这段demo。 【代码示例1】 public class Test implements Runnab…...

JavaEE面试题

一、String面试题 1、String s1 "123"; 和 String s2 new String("123");的区别 在Java中&#xff0c;"String s1 "123";"和"String s2 new String("123");"这两行代码有一些重要的区别&#xff1a; "…...

探索macOS上的最佳MySQL客户端工具

在数据库管理和开发的世界里&#xff0c;选择一个高效、功能全面的客户端工具对于提升工作效率至关重要。尤其对于使用 macOS 的开发者来说&#xff0c;一个好的 MySQL 客户端不仅可以简化数据库操作&#xff0c;还能提供强大的数据分析和管理功能。本文将介绍几款适用于 macOS…...

[Android] MediaPlayer SDK API glance

参考&#xff1a; https://developer.android.com/reference/android/media/MediaPlayer 如何使用MediaPlayer SDK&#xff1a; https://developer.android.com/media/platform/mediaplayer 概述&#xff1a; 音视频的 playback。创建 MediaPlayer 的线程必须和调用 SDK 接口…...

原始手写helloworld并打jar包允许

1.创建文件夹test统一在其中操作 2.创建hello.java文件 【hello.txt改属性为hello.java】并在里面添加代码 public class hello {public static void main(String[] args) {System.out.println("hello world");} } 注意&#xff1a;类名与文件名一致 然后运行…...

maven 的安装与配置(Command ‘mvn‘ not found)修改配置文件后新终端依旧无法识别到 mvn 命令

下载 maven 安装包 wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz 解压 apache-maven-3.9.4-bin.tar.gz tar -zxvf apache-maven-3.9.4-bin.tar.gz 找到文件解压到的位置&#xff0c;由于解压时我们没有指定路径&#xff0c;因…...

Pycharm无法粘贴外部文本问题

Pycharm无法粘贴外部文本问题 百度找了好多是因为安装了vim&#xff0c;最后发现是因为pycharm粘贴框存在了很多内容导致 操作方法&#xff1a; 1、清理所有缓存的复制内容 ctrlshiftV 可以看到编译器所有缓存下来的复制文本 2、ctrlA然后delete 解决&#xff1a;此时再复…...

学习Java的第四天

目录 一、if选择结构 1、基本if选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a; 2、if-else 选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a; 3、多重if选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a…...

【Javaweb】【瑞吉外卖】登录功能plus--拦截器filterinterceptors实现

上手第二天&#xff0c;做到登录拦截器部分 需求&#xff1a;完成目标是&#xff0c;只有在登录的情况下才想让其访问后端&#xff0c;没有登录禁止访问&#xff0c;并且让其跳转。 这里有一个比较好的思想是&#xff1a;后端程序要主要需要考虑的是拦截接口&#xff0c;不能让…...

关于 Runes 协议及「公开铭刻」发行机制的拓展讨论

撰文&#xff1a;MiX 编辑&#xff1a;Faust&#xff0c;极客 web3 2024 年 3 月 2 日&#xff0c;Runes 生态基础设施项目 Rune alpha 的创始人&#xff0c;在 Github 的公开议题中&#xff0c;与 Runes 协议创始人 Casey 展开了讨论&#xff0c;双方对如何拓展 Runes 协议的…...

chkdsk修复会造成文件丢失吗?chkdsk数据丢失还能恢复吗

在Windows操作系统中&#xff0c;CHKDSK是一个强大的磁盘检查工具&#xff0c;它可以帮助我们诊断并修复硬盘的各种错误。然而&#xff0c;许多用户在运行CHKDSK之前都会担心一个问题&#xff1a;CHKDSK修复会造成文件丢失吗&#xff1f;如果不幸发生了数据丢失&#xff0c;CHK…...

在软件开发中正确使用MySQL日期时间类型的深度解析

在日常软件开发场景中&#xff0c;时间信息的存储是底层且核心的需求。从金融交易的精确记账时间、用户操作的行为日志&#xff0c;到供应链系统的物流节点时间戳&#xff0c;时间数据的准确性直接决定业务逻辑的可靠性。MySQL作为主流关系型数据库&#xff0c;其日期时间类型的…...

2025年能源电力系统与流体力学国际会议 (EPSFD 2025)

2025年能源电力系统与流体力学国际会议&#xff08;EPSFD 2025&#xff09;将于本年度在美丽的杭州盛大召开。作为全球能源、电力系统以及流体力学领域的顶级盛会&#xff0c;EPSFD 2025旨在为来自世界各地的科学家、工程师和研究人员提供一个展示最新研究成果、分享实践经验及…...

Debian系统简介

目录 Debian系统介绍 Debian版本介绍 Debian软件源介绍 软件包管理工具dpkg dpkg核心指令详解 安装软件包 卸载软件包 查询软件包状态 验证软件包完整性 手动处理依赖关系 dpkg vs apt Debian系统介绍 Debian 和 Ubuntu 都是基于 Debian内核 的 Linux 发行版&#xff…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

Objective-C常用命名规范总结

【OC】常用命名规范总结 文章目录 【OC】常用命名规范总结1.类名&#xff08;Class Name)2.协议名&#xff08;Protocol Name)3.方法名&#xff08;Method Name)4.属性名&#xff08;Property Name&#xff09;5.局部变量/实例变量&#xff08;Local / Instance Variables&…...

Cloudflare 从 Nginx 到 Pingora:性能、效率与安全的全面升级

在互联网的快速发展中&#xff0c;高性能、高效率和高安全性的网络服务成为了各大互联网基础设施提供商的核心追求。Cloudflare 作为全球领先的互联网安全和基础设施公司&#xff0c;近期做出了一个重大技术决策&#xff1a;弃用长期使用的 Nginx&#xff0c;转而采用其内部开发…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)

骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术&#xff0c;它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton)&#xff1a;由层级结构的骨头组成&#xff0c;类似于人体骨骼蒙皮 (Mesh Skinning)&#xff1a;将模型网格顶点绑定到骨骼上&#xff0c;使骨骼移动…...

Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)

参考官方文档&#xff1a;https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java&#xff08;供 Kotlin 使用&#xff09; 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...

【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统

目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索&#xff08;基于物理空间 广播范围&#xff09;2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...

安宝特方案丨船舶智造的“AR+AI+作业标准化管理解决方案”(装配)

船舶制造装配管理现状&#xff1a;装配工作依赖人工经验&#xff0c;装配工人凭借长期实践积累的操作技巧完成零部件组装。企业通常制定了装配作业指导书&#xff0c;但在实际执行中&#xff0c;工人对指导书的理解和遵循程度参差不齐。 船舶装配过程中的挑战与需求 挑战 (1…...