当前位置: 首页 > news >正文

FlinkCDC快速搭建实现数据监控

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sand</groupId><artifactId>flinkcdc</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><!--        <flink.version>1.14.4</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-streaming-java_2.12</artifactId>--><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-scala_2.12</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-clients_2.12</artifactId>--><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.17</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!-- 打印日志的jar包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.10</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.sand.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

数据库配置类

package com.sand;import org.apache.commons.collections.CollectionUtils;import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;/*** @author zdd*/
public class CDCKit {public static void main(String[] args) {String tempDir = System.getProperty("java.io.tmpdir");System.out.println("tempDir = " + tempDir);}/*** 数据库*/private static final String database = "byyy_iowtb_wms_test";/*** 表名*/private static final List<String> tableList = Arrays.asList("inv_tt_stock_info","base_tm_sku","base_tm_third_sku_certificate","base_tm_sku_gsp");/*** ip*/private static final String hostname = "192.168.111.107";/*** 端口*/private static final int port = 3306;/*** 用户名*/private static final String username = "test_cdc";/*** 密码*/private static final String password = "Test_cdc@123";public static String getDatabase() {return database;}public static String getTableList() {if (CollectionUtils.isEmpty(tableList)) {return null;}//,分割StringJoiner stringJoiner = new StringJoiner(",");for (String tableName : tableList) {stringJoiner.add(getDatabase() + "." + tableName);}return stringJoiner.toString();}public static String getHostname() {return hostname;}public static int getPort() {return port;}public static String getUsername() {return username;}public static String getPassword() {return password;}}

监控类

package com.sand;import cn.hutool.core.io.FileUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File;
import java.util.Objects;
import java.util.Properties;public class DataStreamJob {public static void main(String[] args) throws Exception {//获取临时文件目录String tempDir = System.getProperty("java.io.tmpdir");String latestCheckpoint = getLatestCheckpoint();System.out.println("latestCheckpoint = " + latestCheckpoint);Configuration configuration = new Configuration();if(StringUtils.isNotBlank(latestCheckpoint)){configuration.setString("execution.savepoint.path", "file:///" + latestCheckpoint);}StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);//2.1 开启 Checkpoint,每隔 60 秒钟做一次 CKenv.enableCheckpointing(1000L * 60);//2.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("file:///" + tempDir + "ck"));// ck 设置env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Properties properties = new Properties();properties.setProperty("snapshot.locking.mode", "none");properties.setProperty("decimal.handling.mode", "string");MySqlSource<String> sourceFunction = MySqlSource.<String>builder().hostname(CDCKit.getHostname()).port(CDCKit.getPort()).databaseList(CDCKit.getDatabase()).tableList(CDCKit.getTableList()).username(CDCKit.getUsername()).password(CDCKit.getPassword()).scanNewlyAddedTableEnabled(true).deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).debeziumProperties(properties).build();//4.使用 CDC Source 从 MySQL 读取数据env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "mysql-source").addSink(new MysqlSink());//5.打印数据
//        mysqlStream.print();//6.执行任务env.execute();}private static String getLatestCheckpoint() {File ckDir = new File(System.getProperty("java.io.tmpdir") + "ck");File[] files = ckDir.listFiles();if (files == null) {return null;}String path = null;long lastModified = 0;for (File file : files) {//获取文件夹下-chk-开头文件夹-最新的文件夹if (file.isDirectory()) {File[] files1 = file.listFiles();if (files1 == null) {continue;}for (File file1 : files1) {if (!file1.isDirectory() || !file1.getName().startsWith("chk-")) {continue;}if (file1.lastModified() > lastModified) {lastModified = file1.lastModified();path = file1.getAbsolutePath();}}}}//删除其余目录if (StringUtils.isEmpty(path)) {return null;}String tempPath = path.substring(0, path.lastIndexOf("\\"));for (File file : files) {if (file.isDirectory() && !Objects.equals(file.getAbsolutePath(), tempPath)) {FileUtil.del(file);}}return path;}
}

数据处理类

package com.sand;/*** @author zdd*/
public class MysqlSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<String> {@Overridepublic void invoke(String value, org.apache.flink.streaming.api.functions.sink.SinkFunction.Context context) throws Exception {System.out.println("value = " + value);}
}

相关文章:

FlinkCDC快速搭建实现数据监控

引入依赖 <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelV…...

应急布控球远程视频监控方案:视频监控平台EasyCVR+4G/5G应急布控球

随着科技的不断发展&#xff0c;应急布控球远程视频监控方案在公共安全、交通管理、城市管理等领域的应用越来越广泛。这种方案通过在现场部署应急布控球&#xff0c;实现对特定区域的实时监控&#xff0c;有助于及时发现问题、快速响应&#xff0c;提高管理效率。 智慧安防视…...

3.6 C语言和汇编语言混合编程 “每日读书”

在一些嵌入式场合&#xff0c;我们经常看到C程序和汇编程序相互调用&#xff0c;混合编程&#xff0c;如在ARM启动代码中&#xff0c;系统上电首先运行的是汇编代码&#xff0c;等初始化好内存堆栈环境之后&#xff0c;才会跳到C程序中执行&#xff0c;对嵌入式软件进行优化时&…...

利用“定时执行专家”循环执行BAT、VBS、Python脚本——含参数指定功能

目录 一、软件概述 二、VBS脚本执行设置 三、触发器设置 四、功能亮点 五、总结 在自动化办公和日常计算机任务管理中&#xff0c;定时执行脚本是一项非常重要的功能。今天&#xff0c;我将为大家带来一款名为“定时执行专家”的软件的评测&#xff0c;特别是其定时执行VB…...

【算法集训】基础算法:模拟

一、基本理解 顾名思义&#xff0c;就是题目要求做什么&#xff0c;代码中就跟着做就可以。 二、题目练习 1252. 奇数值单元格的数目 根据题目要求列出如下代码。需要注意填充列和行的时候注意下标。 int oddCells(int m, int n, int** indices, int indicesSize, int* in…...

基于SSM的房客源信息管理系统设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 相关技术 3 1.1 SSM框架 3 1.2 Vue框架 3 1.3 ECharts 3 1.4 JQuery技术 3 1.5 本章小结 4 2系统分析 5 2.1 需求分析 5 2.2 非功能需求 8 2.3 本章小节 8 3 系统设计 9 3.1 系统总体设计 9 3.1.1 系统体系结构 9 3.1.2 系统目录结构 9 3…...

常见数据类型

目录 数据类型 字符串 char nchar varchar varchar2 nvarchar 数字 number integer binary_float binary_double float 日期 date timestamp 大文本数据 大对象数据 Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 数…...

基于vue的联通积分商城数据可视化APP设计与实现

目 录 摘 要 I Abstract II 引 言 1 1 前端技术介绍 3 1.1 前端开发语言 3 1.1.1 HTML5 3 1.1.2 CSS3 3 1.1.3 JavaScript 3 1.2 MVVM开发模式 4 1.3 Vue框架 4 1.4 Axios技术 5 1.5 ECharts 5 1.6 数据库技术 5 1.7 本章小结 6 2 前端开发的分析 7 2.1 功能性需求分析 7 2.2 …...

2024年flink面试真题(一)

&#xff08;北京&#xff09;taskManager和slot、task的关系 ? &#xff08;北京&#xff09;flink状态太大怎么解决 ? &#xff08;北京 flink提交方式和运行模式 ? &#xff08;北京&#xff09; 怎么提交的实时任务&#xff0c;有多少Job Manager&#xff1f; &…...

Java面试挂在线程创建后续,不要再被八股文误导了!创建线程的方式只有1种

线程创建之源 OK&#xff01;咱们闲话少叙&#xff0c;直接进入正题&#xff0c;回顾一下通过实现Runnable接口&#xff0c;重写run方法创建线程的方式&#xff0c;真的可以创建一个线程吗&#xff1f;来看下面这段demo。 【代码示例1】 public class Test implements Runnab…...

JavaEE面试题

一、String面试题 1、String s1 "123"; 和 String s2 new String("123");的区别 在Java中&#xff0c;"String s1 "123";"和"String s2 new String("123");"这两行代码有一些重要的区别&#xff1a; "…...

探索macOS上的最佳MySQL客户端工具

在数据库管理和开发的世界里&#xff0c;选择一个高效、功能全面的客户端工具对于提升工作效率至关重要。尤其对于使用 macOS 的开发者来说&#xff0c;一个好的 MySQL 客户端不仅可以简化数据库操作&#xff0c;还能提供强大的数据分析和管理功能。本文将介绍几款适用于 macOS…...

[Android] MediaPlayer SDK API glance

参考&#xff1a; https://developer.android.com/reference/android/media/MediaPlayer 如何使用MediaPlayer SDK&#xff1a; https://developer.android.com/media/platform/mediaplayer 概述&#xff1a; 音视频的 playback。创建 MediaPlayer 的线程必须和调用 SDK 接口…...

原始手写helloworld并打jar包允许

1.创建文件夹test统一在其中操作 2.创建hello.java文件 【hello.txt改属性为hello.java】并在里面添加代码 public class hello {public static void main(String[] args) {System.out.println("hello world");} } 注意&#xff1a;类名与文件名一致 然后运行…...

maven 的安装与配置(Command ‘mvn‘ not found)修改配置文件后新终端依旧无法识别到 mvn 命令

下载 maven 安装包 wget https://dlcdn.apache.org/maven/maven-3/3.9.4/binaries/apache-maven-3.9.4-bin.tar.gz 解压 apache-maven-3.9.4-bin.tar.gz tar -zxvf apache-maven-3.9.4-bin.tar.gz 找到文件解压到的位置&#xff0c;由于解压时我们没有指定路径&#xff0c;因…...

Pycharm无法粘贴外部文本问题

Pycharm无法粘贴外部文本问题 百度找了好多是因为安装了vim&#xff0c;最后发现是因为pycharm粘贴框存在了很多内容导致 操作方法&#xff1a; 1、清理所有缓存的复制内容 ctrlshiftV 可以看到编译器所有缓存下来的复制文本 2、ctrlA然后delete 解决&#xff1a;此时再复…...

学习Java的第四天

目录 一、if选择结构 1、基本if选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a; 2、if-else 选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a; 3、多重if选择结构 语法结构&#xff1a; 流程图&#xff1a; 示例&#xff1a…...

【Javaweb】【瑞吉外卖】登录功能plus--拦截器filterinterceptors实现

上手第二天&#xff0c;做到登录拦截器部分 需求&#xff1a;完成目标是&#xff0c;只有在登录的情况下才想让其访问后端&#xff0c;没有登录禁止访问&#xff0c;并且让其跳转。 这里有一个比较好的思想是&#xff1a;后端程序要主要需要考虑的是拦截接口&#xff0c;不能让…...

关于 Runes 协议及「公开铭刻」发行机制的拓展讨论

撰文&#xff1a;MiX 编辑&#xff1a;Faust&#xff0c;极客 web3 2024 年 3 月 2 日&#xff0c;Runes 生态基础设施项目 Rune alpha 的创始人&#xff0c;在 Github 的公开议题中&#xff0c;与 Runes 协议创始人 Casey 展开了讨论&#xff0c;双方对如何拓展 Runes 协议的…...

chkdsk修复会造成文件丢失吗?chkdsk数据丢失还能恢复吗

在Windows操作系统中&#xff0c;CHKDSK是一个强大的磁盘检查工具&#xff0c;它可以帮助我们诊断并修复硬盘的各种错误。然而&#xff0c;许多用户在运行CHKDSK之前都会担心一个问题&#xff1a;CHKDSK修复会造成文件丢失吗&#xff1f;如果不幸发生了数据丢失&#xff0c;CHK…...

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…...

Docker 离线安装指南

参考文章 1、确认操作系统类型及内核版本 Docker依赖于Linux内核的一些特性&#xff0c;不同版本的Docker对内核版本有不同要求。例如&#xff0c;Docker 17.06及之后的版本通常需要Linux内核3.10及以上版本&#xff0c;Docker17.09及更高版本对应Linux内核4.9.x及更高版本。…...

【Oracle APEX开发小技巧12】

有如下需求&#xff1a; 有一个问题反馈页面&#xff0c;要实现在apex页面展示能直观看到反馈时间超过7天未处理的数据&#xff0c;方便管理员及时处理反馈。 我的方法&#xff1a;直接将逻辑写在SQL中&#xff0c;这样可以直接在页面展示 完整代码&#xff1a; SELECTSF.FE…...

Zustand 状态管理库:极简而强大的解决方案

Zustand 是一个轻量级、快速和可扩展的状态管理库&#xff0c;特别适合 React 应用。它以简洁的 API 和高效的性能解决了 Redux 等状态管理方案中的繁琐问题。 核心优势对比 基本使用指南 1. 创建 Store // store.js import create from zustandconst useStore create((set)…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

在HarmonyOS ArkTS ArkUI-X 5.0及以上版本中,手势开发全攻略:

在 HarmonyOS 应用开发中&#xff0c;手势交互是连接用户与设备的核心纽带。ArkTS 框架提供了丰富的手势处理能力&#xff0c;既支持点击、长按、拖拽等基础单一手势的精细控制&#xff0c;也能通过多种绑定策略解决父子组件的手势竞争问题。本文将结合官方开发文档&#xff0c…...

定时器任务——若依源码分析

分析util包下面的工具类schedule utils&#xff1a; ScheduleUtils 是若依中用于与 Quartz 框架交互的工具类&#xff0c;封装了定时任务的 创建、更新、暂停、删除等核心逻辑。 createScheduleJob createScheduleJob 用于将任务注册到 Quartz&#xff0c;先构建任务的 JobD…...

oracle与MySQL数据库之间数据同步的技术要点

Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异&#xff0c;它们的数据同步要求既要保持数据的准确性和一致性&#xff0c;又要处理好性能问题。以下是一些主要的技术要点&#xff1a; 数据结构差异 数据类型差异&#xff…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

代理篇12|深入理解 Vite中的Proxy接口代理配置

在前端开发中,常常会遇到 跨域请求接口 的情况。为了解决这个问题,Vite 和 Webpack 都提供了 proxy 代理功能,用于将本地开发请求转发到后端服务器。 什么是代理(proxy)? 代理是在开发过程中,前端项目通过开发服务器,将指定的请求“转发”到真实的后端服务器,从而绕…...