当前位置: 首页 > news >正文

FlinkCDC快速搭建实现数据监控

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sand</groupId><artifactId>flinkcdc</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><!--        <flink.version>1.14.4</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-streaming-java_2.12</artifactId>--><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-scala_2.12</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-clients_2.12</artifactId>--><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.17</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!-- 打印日志的jar包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.10</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.sand.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

数据库配置类

package com.sand;import org.apache.commons.collections.CollectionUtils;import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;/*** @author zdd*/
public class CDCKit {public static void main(String[] args) {String tempDir = System.getProperty("java.io.tmpdir");System.out.println("tempDir = " + tempDir);}/*** 数据库*/private static final String database = "byyy_iowtb_wms_test";/*** 表名*/private static final List<String> tableList = Arrays.asList("inv_tt_stock_info","base_tm_sku","base_tm_third_sku_certificate","base_tm_sku_gsp");/*** ip*/private static final String hostname = "192.168.111.107";/*** 端口*/private static final int port = 3306;/*** 用户名*/private static final String username = "test_cdc";/*** 密码*/private static final String password = "Test_cdc@123";public static String getDatabase() {return database;}public static String getTableList() {if (CollectionUtils.isEmpty(tableList)) {return null;}//,分割StringJoiner stringJoiner = new StringJoiner(",");for (String tableName : tableList) {stringJoiner.add(getDatabase() + "." + tableName);}return stringJoiner.toString();}public static String getHostname() {return hostname;}public static int getPort() {return port;}public static String getUsername() {return username;}public static String getPassword() {return password;}}

监控类

package com.sand;import cn.hutool.core.io.FileUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File;
import java.util.Objects;
import java.util.Properties;public class DataStreamJob {public static void main(String[] args) throws Exception {//获取临时文件目录String tempDir = System.getProperty("java.io.tmpdir");String latestCheckpoint = getLatestCheckpoint();System.out.println("latestCheckpoint = " + latestCheckpoint);Configuration configuration = new Configuration();if(StringUtils.isNotBlank(latestCheckpoint)){configuration.setString("execution.savepoint.path", "file:///" + latestCheckpoint);}StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);//2.1 开启 Checkpoint,每隔 60 秒钟做一次 CKenv.enableCheckpointing(1000L * 60);//2.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("file:///" + tempDir + "ck"));// ck 设置env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Properties properties = new Properties();properties.setProperty("snapshot.locking.mode", "none");properties.setProperty("decimal.handling.mode", "string");MySqlSource<String> sourceFunction = MySqlSource.<String>builder().hostname(CDCKit.getHostname()).port(CDCKit.getPort()).databaseList(CDCKit.getDatabase()).tableList(CDCKit.getTableList()).username(CDCKit.getUsername()).password(CDCKit.getPassword()).scanNewlyAddedTableEnabled(true).deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).debeziumProperties(properties).build();//4.使用 CDC Source 从 MySQL 读取数据env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "mysql-source").addSink(new MysqlSink());//5.打印数据
//        mysqlStream.print();//6.执行任务env.execute();}private static String getLatestCheckpoint() {File ckDir = new File(System.getProperty("java.io.tmpdir") + "ck");File[] files = ckDir.listFiles();if (files == null) {return null;}String path = null;long lastModified = 0;for (File file : files) {//获取文件夹下-chk-开头文件夹-最新的文件夹if (file.isDirectory()) {File[] files1 = file.listFiles();if (files1 == null) {continue;}for (File file1 : files1) {if (!file1.isDirectory() || !file1.getName().startsWith("chk-")) {continue;}if (file1.lastModified() > lastModified) {lastModified = file1.lastModified();path = file1.getAbsolutePath();}}}}//删除其余目录if (StringUtils.isEmpty(path)) {return null;}String tempPath = path.substring(0, path.lastIndexOf("\\"));for (File file : files) {if (file.isDirectory() && !Objects.equals(file.getAbsolutePath(), tempPath)) {FileUtil.del(file);}}return path;}
}

数据处理类

package com.sand;/*** @author zdd*/
public class MysqlSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<String> {@Overridepublic void invoke(String value, org.apache.flink.streaming.api.functions.sink.SinkFunction.Context context) throws Exception {System.out.println("value = " + value);}
}

相关文章:

FlinkCDC快速搭建实现数据监控

引入依赖 <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelV…...

应急布控球远程视频监控方案:视频监控平台EasyCVR+4G/5G应急布控球

随着科技的不断发展&#xff0c;应急布控球远程视频监控方案在公共安全、交通管理、城市管理等领域的应用越来越广泛。这种方案通过在现场部署应急布控球&#xff0c;实现对特定区域的实时监控&#xff0c;有助于及时发现问题、快速响应&#xff0c;提高管理效率。 智慧安防视…...

3.6 C语言和汇编语言混合编程 “每日读书”

在一些嵌入式场合&#xff0c;我们经常看到C程序和汇编程序相互调用&#xff0c;混合编程&#xff0c;如在ARM启动代码中&#xff0c;系统上电首先运行的是汇编代码&#xff0c;等初始化好内存堆栈环境之后&#xff0c;才会跳到C程序中执行&#xff0c;对嵌入式软件进行优化时&…...

利用“定时执行专家”循环执行BAT、VBS、Python脚本——含参数指定功能

目录 一、软件概述 二、VBS脚本执行设置 三、触发器设置 四、功能亮点 五、总结 在自动化办公和日常计算机任务管理中&#xff0c;定时执行脚本是一项非常重要的功能。今天&#xff0c;我将为大家带来一款名为“定时执行专家”的软件的评测&#xff0c;特别是其定时执行VB…...

【算法集训】基础算法:模拟

一、基本理解 顾名思义&#xff0c;就是题目要求做什么&#xff0c;代码中就跟着做就可以。 二、题目练习 1252. 奇数值单元格的数目 根据题目要求列出如下代码。需要注意填充列和行的时候注意下标。 int oddCells(int m, int n, int** indices, int indicesSize, int* in…...

基于SSM的房客源信息管理系统设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 相关技术 3 1.1 SSM框架 3 1.2 Vue框架 3 1.3 ECharts 3 1.4 JQuery技术 3 1.5 本章小结 4 2系统分析 5 2.1 需求分析 5 2.2 非功能需求 8 2.3 本章小节 8 3 系统设计 9 3.1 系统总体设计 9 3.1.1 系统体系结构 9 3.1.2 系统目录结构 9 3…...

常见数据类型

目录 数据类型 字符串 char nchar varchar varchar2 nvarchar 数字 number integer binary_float binary_double float 日期 date timestamp 大文本数据 大对象数据 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 数…...

基于vue的联通积分商城数据可视化APP设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 前端技术介绍 3 1.1 前端开发语言 3 1.1.1 HTML5 3 1.1.2 CSS3 3 1.1.3 JavaScript 3 1.2 MVVM开发模式 4 1.3 Vue框架 4 1.4 Axios技术 5 1.5 ECharts 5 1.6 数据库技术 5 1.7 本章小结 6 2 前端开发的分析 7 2.1 功能性需求分析 7 2.2 …...

2024年flink面试真题(一)

&#xff08;北京&#xff09;taskManager和slot、task的关系 ? &#xff08;北京&#xff09;flink状态太大怎么解决 ? &#xff08;北京 flink提交方式和运行模式 ? &#xff08;北京&#xff09; 怎么提交的实时任务&#xff0c;有多少Job Manager&#xff1f; &…...

Java面试挂在线程创建后续,不要再被八股文误导了!创建线程的方式只有1种

线程创建之源 OK&#xff01;咱们闲话少叙&#xff0c;直接进入正题&#xff0c;回顾一下通过实现Runnable接口&#xff0c;重写run方法创建线程的方式&#xff0c;真的可以创建一个线程吗&#xff1f;来看下面这段demo。 【代码示例1】 public class Test implements Runnab…...

JavaEE面试题

一、String面试题 1、String s1 "123"; 和 String s2 new String("123");的区别 在Java中&#xff0c;"String s1 "123";"和"String s2 new String("123");"这两行代码有一些重要的区别&#xff1a; "…...

探索macOS上的最佳MySQL客户端工具

在数据库管理和开发的世界里&#xff0c;选择一个高效、功能全面的客户端工具对于提升工作效率至关重要。尤其对于使用 macOS 的开发者来说&#xff0c;一个好的 MySQL 客户端不仅可以简化数据库操作&#xff0c;还能提供强大的数据分析和管理功能。本文将介绍几款适用于 macOS…...

[Android] MediaPlayer SDK API glance

参考&#xff1a; https://developer.android.com/reference/android/media/MediaPlayer 如何使用MediaPlayer SDK&#xff1a; https://developer.android.com/media/platform/mediaplayer 概述&#xff1a; 音视频的 playback。创建 MediaPlayer 的线程必须和调用 SDK 接口…...

原始手写helloworld并打jar包允许

1.创建文件夹test统一在其中操作 2.创建hello.java文件 【hello.txt改属性为hello.java】并在里面添加代码 public class hello {public static void main(String[] args) {System.out.println("hello world");} } 注意&#xff1a;类名与文件名一致 然后运行…...

maven 的安装与配置(Command ‘mvn‘ not found)修改配置文件后新终端依旧无法识别到 mvn 命令

下载 maven 安装包 wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz 解压 apache-maven-3.9.4-bin.tar.gz tar -zxvf apache-maven-3.9.4-bin.tar.gz 找到文件解压到的位置&#xff0c;由于解压时我们没有指定路径&#xff0c;因…...

Pycharm无法粘贴外部文本问题

Pycharm无法粘贴外部文本问题 百度找了好多是因为安装了vim&#xff0c;最后发现是因为pycharm粘贴框存在了很多内容导致 操作方法&#xff1a; 1、清理所有缓存的复制内容 ctrlshiftV 可以看到编译器所有缓存下来的复制文本 2、ctrlA然后delete 解决&#xff1a;此时再复…...

学习Java的第四天

目录 一、if选择结构 1、基本if选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a; 2、if-else 选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a; 3、多重if选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a…...

【Javaweb】【瑞吉外卖】登录功能plus--拦截器filterinterceptors实现

上手第二天&#xff0c;做到登录拦截器部分 需求&#xff1a;完成目标是&#xff0c;只有在登录的情况下才想让其访问后端&#xff0c;没有登录禁止访问&#xff0c;并且让其跳转。 这里有一个比较好的思想是&#xff1a;后端程序要主要需要考虑的是拦截接口&#xff0c;不能让…...

关于 Runes 协议及「公开铭刻」发行机制的拓展讨论

撰文&#xff1a;MiX 编辑&#xff1a;Faust&#xff0c;极客 web3 2024 年 3 月 2 日&#xff0c;Runes 生态基础设施项目 Rune alpha 的创始人&#xff0c;在 Github 的公开议题中&#xff0c;与 Runes 协议创始人 Casey 展开了讨论&#xff0c;双方对如何拓展 Runes 协议的…...

chkdsk修复会造成文件丢失吗?chkdsk数据丢失还能恢复吗

在Windows操作系统中&#xff0c;CHKDSK是一个强大的磁盘检查工具&#xff0c;它可以帮助我们诊断并修复硬盘的各种错误。然而&#xff0c;许多用户在运行CHKDSK之前都会担心一个问题&#xff1a;CHKDSK修复会造成文件丢失吗&#xff1f;如果不幸发生了数据丢失&#xff0c;CHK…...

5分钟搞定老旧电脑的Windows 11安装:WinDiskWriter让你的Mac变身万能启动盘制作器

5分钟搞定老旧电脑的Windows 11安装&#xff1a;WinDiskWriter让你的Mac变身万能启动盘制作器 【免费下载链接】windiskwriter &#x1f5a5; Windows Bootable USB creator for macOS. &#x1f6e0; Patches Windows 11 to bypass TPM and Secure Boot requirements. &#x…...

Linux环境下高效获取SRA数据的四种方法及实战技巧

1. SRA数据库基础与数据获取逻辑 在生物信息学研究中&#xff0c;SRA&#xff08;Sequence Read Archive&#xff09;数据库堪称原始测序数据的宝库。这个由NCBI维护的数据库&#xff0c;就像是一个全球共享的测序数据图书馆&#xff0c;里面存放着来自各种测序平台&#xff08…...

逆向工程实战:3步打造Windows微信/QQ防撤回终极方案

逆向工程实战&#xff1a;3步打造Windows微信/QQ防撤回终极方案 【免费下载链接】RevokeMsgPatcher :trollface: A hex editor for WeChat/QQ/TIM - PC版微信/QQ/TIM防撤回补丁&#xff08;我已经看到了&#xff0c;撤回也没用了&#xff09; 项目地址: https://gitcode.com/…...

故事力:软件测试工程师的技术汇报破局之道

在充斥着数据图表与缺陷统计的测试领域&#xff0c;一位资深测试工程师的汇报常陷入这样的困境&#xff1a;“本迭代发现缺陷127个&#xff0c;阻塞级3个&#xff0c;严重级15个...自动化覆盖率提升至72%...” 台下产品经理开始刷手机&#xff0c;技术总监皱眉打断&#xff1a;…...

番茄小说下载器:构建你的个人数字图书馆

番茄小说下载器&#xff1a;构建你的个人数字图书馆 【免费下载链接】fanqienovel-downloader 下载番茄小说 项目地址: https://gitcode.com/gh_mirrors/fa/fanqienovel-downloader 在数字阅读时代&#xff0c;我们常常面临一个困境&#xff1a;今天还在追更的热门小说&…...

打造你的专属数字书房:ReadCat开源小说阅读器深度体验指南

打造你的专属数字书房&#xff1a;ReadCat开源小说阅读器深度体验指南 【免费下载链接】read-cat 一款免费、开源、简洁、纯净、无广告的小说阅读器 项目地址: https://gitcode.com/gh_mirrors/re/read-cat 你是否厌倦了各大阅读平台层出不穷的广告弹窗&#xff1f;是否…...

【2024最危险的Agent设计陷阱】:CoT被高估?ReAct在长流程中失效率超63%?ToT的分支爆炸问题如何用动态剪枝破解

第一章&#xff1a;AIAgent架构模式&#xff1a;ReAct、CoT、ToT对比分析 2026奇点智能技术大会(https://ml-summit.org) AI Agent 的推理与决策能力高度依赖底层架构范式。ReAct&#xff08;Reasoning Acting&#xff09;、Chain-of-Thought&#xff08;CoT&#xff09;和Tr…...

Aeneas终极指南:3步搞定音频文本自动对齐,准确率超95% [特殊字符]

Aeneas终极指南&#xff1a;3步搞定音频文本自动对齐&#xff0c;准确率超95% &#x1f50a; 【免费下载链接】aeneas aeneas is a Python/C library and a set of tools to automagically synchronize audio and text (aka forced alignment) 项目地址: https://gitcode.com…...

PyTorch 2.8镜像实战案例:为电商客户定制文生视频营销内容的端到端流程

PyTorch 2.8镜像实战案例&#xff1a;为电商客户定制文生视频营销内容的端到端流程 1. 项目背景与需求分析 电商行业正面临内容生产的巨大挑战。根据行业调研&#xff0c;头部电商平台平均每个商品需要15-30秒的短视频素材&#xff0c;而传统制作方式每支视频成本高达500-200…...

Chrome文本替换插件:3步解决网页内容编辑难题

Chrome文本替换插件&#xff1a;3步解决网页内容编辑难题 【免费下载链接】chrome-extensions-searchReplace 项目地址: https://gitcode.com/gh_mirrors/ch/chrome-extensions-searchReplace 你是否曾为网页中的错别字烦恼&#xff1f;是否需要对产品页面进行批量修改…...