当前位置: 首页 > news >正文

FlinkCDC快速搭建实现数据监控

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sand</groupId><artifactId>flinkcdc</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><!--        <flink.version>1.14.4</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-streaming-java_2.12</artifactId>--><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-scala_2.12</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-clients_2.12</artifactId>--><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.17</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!-- 打印日志的jar包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.10</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.sand.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

数据库配置类

package com.sand;import org.apache.commons.collections.CollectionUtils;import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;/*** @author zdd*/
public class CDCKit {public static void main(String[] args) {String tempDir = System.getProperty("java.io.tmpdir");System.out.println("tempDir = " + tempDir);}/*** 数据库*/private static final String database = "byyy_iowtb_wms_test";/*** 表名*/private static final List<String> tableList = Arrays.asList("inv_tt_stock_info","base_tm_sku","base_tm_third_sku_certificate","base_tm_sku_gsp");/*** ip*/private static final String hostname = "192.168.111.107";/*** 端口*/private static final int port = 3306;/*** 用户名*/private static final String username = "test_cdc";/*** 密码*/private static final String password = "Test_cdc@123";public static String getDatabase() {return database;}public static String getTableList() {if (CollectionUtils.isEmpty(tableList)) {return null;}//,分割StringJoiner stringJoiner = new StringJoiner(",");for (String tableName : tableList) {stringJoiner.add(getDatabase() + "." + tableName);}return stringJoiner.toString();}public static String getHostname() {return hostname;}public static int getPort() {return port;}public static String getUsername() {return username;}public static String getPassword() {return password;}}

监控类

package com.sand;import cn.hutool.core.io.FileUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File;
import java.util.Objects;
import java.util.Properties;public class DataStreamJob {public static void main(String[] args) throws Exception {//获取临时文件目录String tempDir = System.getProperty("java.io.tmpdir");String latestCheckpoint = getLatestCheckpoint();System.out.println("latestCheckpoint = " + latestCheckpoint);Configuration configuration = new Configuration();if(StringUtils.isNotBlank(latestCheckpoint)){configuration.setString("execution.savepoint.path", "file:///" + latestCheckpoint);}StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);//2.1 开启 Checkpoint,每隔 60 秒钟做一次 CKenv.enableCheckpointing(1000L * 60);//2.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("file:///" + tempDir + "ck"));// ck 设置env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Properties properties = new Properties();properties.setProperty("snapshot.locking.mode", "none");properties.setProperty("decimal.handling.mode", "string");MySqlSource<String> sourceFunction = MySqlSource.<String>builder().hostname(CDCKit.getHostname()).port(CDCKit.getPort()).databaseList(CDCKit.getDatabase()).tableList(CDCKit.getTableList()).username(CDCKit.getUsername()).password(CDCKit.getPassword()).scanNewlyAddedTableEnabled(true).deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).debeziumProperties(properties).build();//4.使用 CDC Source 从 MySQL 读取数据env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "mysql-source").addSink(new MysqlSink());//5.打印数据
//        mysqlStream.print();//6.执行任务env.execute();}private static String getLatestCheckpoint() {File ckDir = new File(System.getProperty("java.io.tmpdir") + "ck");File[] files = ckDir.listFiles();if (files == null) {return null;}String path = null;long lastModified = 0;for (File file : files) {//获取文件夹下-chk-开头文件夹-最新的文件夹if (file.isDirectory()) {File[] files1 = file.listFiles();if (files1 == null) {continue;}for (File file1 : files1) {if (!file1.isDirectory() || !file1.getName().startsWith("chk-")) {continue;}if (file1.lastModified() > lastModified) {lastModified = file1.lastModified();path = file1.getAbsolutePath();}}}}//删除其余目录if (StringUtils.isEmpty(path)) {return null;}String tempPath = path.substring(0, path.lastIndexOf("\\"));for (File file : files) {if (file.isDirectory() && !Objects.equals(file.getAbsolutePath(), tempPath)) {FileUtil.del(file);}}return path;}
}

数据处理类

package com.sand;/*** @author zdd*/
public class MysqlSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<String> {@Overridepublic void invoke(String value, org.apache.flink.streaming.api.functions.sink.SinkFunction.Context context) throws Exception {System.out.println("value = " + value);}
}

相关文章:

FlinkCDC快速搭建实现数据监控

引入依赖 <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelV…...

应急布控球远程视频监控方案:视频监控平台EasyCVR+4G/5G应急布控球

随着科技的不断发展&#xff0c;应急布控球远程视频监控方案在公共安全、交通管理、城市管理等领域的应用越来越广泛。这种方案通过在现场部署应急布控球&#xff0c;实现对特定区域的实时监控&#xff0c;有助于及时发现问题、快速响应&#xff0c;提高管理效率。 智慧安防视…...

3.6 C语言和汇编语言混合编程 “每日读书”

在一些嵌入式场合&#xff0c;我们经常看到C程序和汇编程序相互调用&#xff0c;混合编程&#xff0c;如在ARM启动代码中&#xff0c;系统上电首先运行的是汇编代码&#xff0c;等初始化好内存堆栈环境之后&#xff0c;才会跳到C程序中执行&#xff0c;对嵌入式软件进行优化时&…...

利用“定时执行专家”循环执行BAT、VBS、Python脚本——含参数指定功能

目录 一、软件概述 二、VBS脚本执行设置 三、触发器设置 四、功能亮点 五、总结 在自动化办公和日常计算机任务管理中&#xff0c;定时执行脚本是一项非常重要的功能。今天&#xff0c;我将为大家带来一款名为“定时执行专家”的软件的评测&#xff0c;特别是其定时执行VB…...

【算法集训】基础算法:模拟

一、基本理解 顾名思义&#xff0c;就是题目要求做什么&#xff0c;代码中就跟着做就可以。 二、题目练习 1252. 奇数值单元格的数目 根据题目要求列出如下代码。需要注意填充列和行的时候注意下标。 int oddCells(int m, int n, int** indices, int indicesSize, int* in…...

基于SSM的房客源信息管理系统设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 相关技术 3 1.1 SSM框架 3 1.2 Vue框架 3 1.3 ECharts 3 1.4 JQuery技术 3 1.5 本章小结 4 2系统分析 5 2.1 需求分析 5 2.2 非功能需求 8 2.3 本章小节 8 3 系统设计 9 3.1 系统总体设计 9 3.1.1 系统体系结构 9 3.1.2 系统目录结构 9 3…...

常见数据类型

目录 数据类型 字符串 char nchar varchar varchar2 nvarchar 数字 number integer binary_float binary_double float 日期 date timestamp 大文本数据 大对象数据 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 数…...

基于vue的联通积分商城数据可视化APP设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 前端技术介绍 3 1.1 前端开发语言 3 1.1.1 HTML5 3 1.1.2 CSS3 3 1.1.3 JavaScript 3 1.2 MVVM开发模式 4 1.3 Vue框架 4 1.4 Axios技术 5 1.5 ECharts 5 1.6 数据库技术 5 1.7 本章小结 6 2 前端开发的分析 7 2.1 功能性需求分析 7 2.2 …...

2024年flink面试真题(一)

&#xff08;北京&#xff09;taskManager和slot、task的关系 ? &#xff08;北京&#xff09;flink状态太大怎么解决 ? &#xff08;北京 flink提交方式和运行模式 ? &#xff08;北京&#xff09; 怎么提交的实时任务&#xff0c;有多少Job Manager&#xff1f; &…...

Java面试挂在线程创建后续,不要再被八股文误导了!创建线程的方式只有1种

线程创建之源 OK&#xff01;咱们闲话少叙&#xff0c;直接进入正题&#xff0c;回顾一下通过实现Runnable接口&#xff0c;重写run方法创建线程的方式&#xff0c;真的可以创建一个线程吗&#xff1f;来看下面这段demo。 【代码示例1】 public class Test implements Runnab…...

JavaEE面试题

一、String面试题 1、String s1 "123"; 和 String s2 new String("123");的区别 在Java中&#xff0c;"String s1 "123";"和"String s2 new String("123");"这两行代码有一些重要的区别&#xff1a; "…...

探索macOS上的最佳MySQL客户端工具

在数据库管理和开发的世界里&#xff0c;选择一个高效、功能全面的客户端工具对于提升工作效率至关重要。尤其对于使用 macOS 的开发者来说&#xff0c;一个好的 MySQL 客户端不仅可以简化数据库操作&#xff0c;还能提供强大的数据分析和管理功能。本文将介绍几款适用于 macOS…...

[Android] MediaPlayer SDK API glance

参考&#xff1a; https://developer.android.com/reference/android/media/MediaPlayer 如何使用MediaPlayer SDK&#xff1a; https://developer.android.com/media/platform/mediaplayer 概述&#xff1a; 音视频的 playback。创建 MediaPlayer 的线程必须和调用 SDK 接口…...

原始手写helloworld并打jar包允许

1.创建文件夹test统一在其中操作 2.创建hello.java文件 【hello.txt改属性为hello.java】并在里面添加代码 public class hello {public static void main(String[] args) {System.out.println("hello world");} } 注意&#xff1a;类名与文件名一致 然后运行…...

maven 的安装与配置(Command ‘mvn‘ not found)修改配置文件后新终端依旧无法识别到 mvn 命令

下载 maven 安装包 wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz 解压 apache-maven-3.9.4-bin.tar.gz tar -zxvf apache-maven-3.9.4-bin.tar.gz 找到文件解压到的位置&#xff0c;由于解压时我们没有指定路径&#xff0c;因…...

Pycharm无法粘贴外部文本问题

Pycharm无法粘贴外部文本问题 百度找了好多是因为安装了vim&#xff0c;最后发现是因为pycharm粘贴框存在了很多内容导致 操作方法&#xff1a; 1、清理所有缓存的复制内容 ctrlshiftV 可以看到编译器所有缓存下来的复制文本 2、ctrlA然后delete 解决&#xff1a;此时再复…...

学习Java的第四天

目录 一、if选择结构 1、基本if选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a; 2、if-else 选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a; 3、多重if选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a…...

【Javaweb】【瑞吉外卖】登录功能plus--拦截器filterinterceptors实现

上手第二天&#xff0c;做到登录拦截器部分 需求&#xff1a;完成目标是&#xff0c;只有在登录的情况下才想让其访问后端&#xff0c;没有登录禁止访问&#xff0c;并且让其跳转。 这里有一个比较好的思想是&#xff1a;后端程序要主要需要考虑的是拦截接口&#xff0c;不能让…...

关于 Runes 协议及「公开铭刻」发行机制的拓展讨论

撰文&#xff1a;MiX 编辑&#xff1a;Faust&#xff0c;极客 web3 2024 年 3 月 2 日&#xff0c;Runes 生态基础设施项目 Rune alpha 的创始人&#xff0c;在 Github 的公开议题中&#xff0c;与 Runes 协议创始人 Casey 展开了讨论&#xff0c;双方对如何拓展 Runes 协议的…...

chkdsk修复会造成文件丢失吗?chkdsk数据丢失还能恢复吗

在Windows操作系统中&#xff0c;CHKDSK是一个强大的磁盘检查工具&#xff0c;它可以帮助我们诊断并修复硬盘的各种错误。然而&#xff0c;许多用户在运行CHKDSK之前都会担心一个问题&#xff1a;CHKDSK修复会造成文件丢失吗&#xff1f;如果不幸发生了数据丢失&#xff0c;CHK…...

Python|GIF 解析与构建(5):手搓截屏和帧率控制

目录 Python&#xff5c;GIF 解析与构建&#xff08;5&#xff09;&#xff1a;手搓截屏和帧率控制 一、引言 二、技术实现&#xff1a;手搓截屏模块 2.1 核心原理 2.2 代码解析&#xff1a;ScreenshotData类 2.2.1 截图函数&#xff1a;capture_screen 三、技术实现&…...

椭圆曲线密码学(ECC)

一、ECC算法概述 椭圆曲线密码学&#xff08;Elliptic Curve Cryptography&#xff09;是基于椭圆曲线数学理论的公钥密码系统&#xff0c;由Neal Koblitz和Victor Miller在1985年独立提出。相比RSA&#xff0c;ECC在相同安全强度下密钥更短&#xff08;256位ECC ≈ 3072位RSA…...

如何在看板中体现优先级变化

在看板中有效体现优先级变化的关键措施包括&#xff1a;采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中&#xff0c;设置任务排序规则尤其重要&#xff0c;因为它让看板视觉上直观地体…...

条件运算符

C中的三目运算符&#xff08;也称条件运算符&#xff0c;英文&#xff1a;ternary operator&#xff09;是一种简洁的条件选择语句&#xff0c;语法如下&#xff1a; 条件表达式 ? 表达式1 : 表达式2• 如果“条件表达式”为true&#xff0c;则整个表达式的结果为“表达式1”…...

【HTML-16】深入理解HTML中的块元素与行内元素

HTML元素根据其显示特性可以分为两大类&#xff1a;块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...

Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信

文章目录 Linux C语言网络编程详细入门教程&#xff1a;如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket&#xff08;服务端和客户端都要&#xff09;2. 绑定本地地址和端口&#x…...

PHP 8.5 即将发布:管道操作符、强力调试

前不久&#xff0c;PHP宣布了即将在 2025 年 11 月 20 日 正式发布的 PHP 8.5&#xff01;作为 PHP 语言的又一次重要迭代&#xff0c;PHP 8.5 承诺带来一系列旨在提升代码可读性、健壮性以及开发者效率的改进。而更令人兴奋的是&#xff0c;借助强大的本地开发环境 ServBay&am…...

Ubuntu系统复制(U盘-电脑硬盘)

所需环境 电脑自带硬盘&#xff1a;1块 (1T) U盘1&#xff1a;Ubuntu系统引导盘&#xff08;用于“U盘2”复制到“电脑自带硬盘”&#xff09; U盘2&#xff1a;Ubuntu系统盘&#xff08;1T&#xff0c;用于被复制&#xff09; &#xff01;&#xff01;&#xff01;建议“电脑…...

鸿蒙HarmonyOS 5军旗小游戏实现指南

1. 项目概述 本军旗小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;采用DevEco Studio实现&#xff0c;包含完整的游戏逻辑和UI界面。 2. 项目结构 /src/main/java/com/example/militarychess/├── MainAbilitySlice.java // 主界面├── GameView.java // 游戏核…...

文件上传漏洞防御全攻略

要全面防范文件上传漏洞&#xff0c;需构建多层防御体系&#xff0c;结合技术验证、存储隔离与权限控制&#xff1a; &#x1f512; 一、基础防护层 前端校验&#xff08;仅辅助&#xff09; 通过JavaScript限制文件后缀名&#xff08;白名单&#xff09;和大小&#xff0c;提…...