使用Flink CDC实时监控MySQL数据库变更
在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。
环境准备
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
- 获取Flink执行环境
首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 启用检查点和设置并行度
为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。
env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
- 使用Debezium Source读取MySQL的binlog
接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();
这里 JsonDebeziumDeserializationSchema类的代码如下:
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
* 自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/*** 获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
- 添加数据源并打印数据
将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。
DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
- 启动任务
最后,启动Flink作业,开始处理数据流。
env.execute("Flink-CDC");
6.测试
总结
通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。
相关文章:

使用Flink CDC实时监控MySQL数据库变更
在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据࿰…...

学生课程信息管理系统
摘 要 目前,随着科学经济的不断发展,高校规模不断扩大,所招收的学生人数越来越 多;所开设的课程也越来越多。随之而来的是高校需要管理更多的事务。对于日益增 长的学生相关专业的课程也在不断增多,高校对其管理具有一…...

如何看待鸿蒙HarmonyOS?
鸿蒙系统,自2019年8月9日诞生就一直处于舆论风口浪尖上的系统,从最开始的“套壳”OpenHarmony安卓的说法,到去年的不再兼容安卓的NEXT版本的技术预览版发布,对于鸿蒙到底是什么,以及鸿蒙的应用开发的讨论从来没停止过。…...

【论文复现|智能算法改进】一种基于多策略改进的鲸鱼算法
目录 1.算法原理2.改进点3.结果展示4.参考文献5.代码获取 1.算法原理 SCI二区|鲸鱼优化算法(WOA)原理及实现【附完整Matlab代码】 2.改进点 混沌反向学习策略 将混沌映射和反向学习策略结合,形成混沌反向学习方法,通过该方 法…...
yarn安装配置及使用教程
Yarn 是一款 JavaScript 的包管理工具,是 Facebook, Google, Exponent 和 Tilde 开发的一款新的 JavaScript 包管理工具,它提供了确定性、依赖关系树扁平化等特性,并且与 npm 完全兼容。以下是 Yarn 的安装及使用教程: Yarn 安装…...

有那么点道理。
...
蔚蓝资源包和数据分析
代码如下 /* * COMPUTER GENERATED -- DO NOT EDIT* */#include <windows.h>static FARPROC __Init_Fun_2__; int __RestartAppIfNecessary__Fun() {return 0; } int Init_Fun() {__Init_Fun_2__();return 1; }static FARPROC __GameServer_BSecure__; static FARPROC _…...

MySQL----利用Mycat配置读写分离
首先确保主从复制是正常的,具体步骤在MySQL----配置主从复制。MySQL----配置主从复制 环境 master(CtenOS7):192.168.200.131 ----ifconfig查看->ens33->inetslave(win10):192.168.207.52 ----ipconfig查看->无线局域网适配器 WLA…...
【科学计算与可视化】2. pandas 基础
1. 安装 Pandas 首先,确保你已经安装了 Pandas。你可以使用以下命令安装:pip install pandas 2. 导入 Pandas 在开始使用 Pandas 之前,你需要先导入它:import pandas as pd 3. 创建数据结构 Pandas 主要有两种数据结构&#…...
医学记录 --- 腋下异味
逻辑图地址 症状 病因 汗液分泌旺盛:由于天气炎热、活动出汗、肥胖等因素导致汗液分泌旺盛,可引起腋下有异味表现。在这种情况下,建议保持身体清洁,特别是在炎热和潮湿的环境下。可以使用抗菌洗液、喷雾或霜剂来帮助减少细菌滋…...

【Linux】进程间通信_1
文章目录 七、进程间通信1. 进程间通信分类管道 未完待续 七、进程间通信 进程间由于 进程具有独立性 ,所以不可以直接进行数据传递。但是我们通常需要多个进程协同,共同完成一件事,所以我们需要进程间通信的手段。进程间通信的本质就是先让…...
Linux Kernel入门到精通系列讲解(RV-Kernel 篇) 5.6 在kernel 中实现系统复位和系统关机驱动
1. 概述 上一章节Qemu篇我们已经实现了我们SOC的power reset和 power down 寄存器,本章节我们就在Linux driver中去实现它。 2. Linux kernel 访问其他节点 Linux kernel中有一种机制,就是在driver中访问其它设备树节点的信息,了解设备树的应该都知道,每个设备节点都有一…...
如何在Java中进行单元测试?
如何在Java中进行单元测试? 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨如何在Java中进行单元测试,这是一项确保代码质…...
代码随想录训练营Day32
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、买卖股票的最佳时机2二、跳跃游戏三、跳跃游戏2四、K次取反后最大化的数组和 前言 今天是跟着代码随想录刷题的第32天,主要是学了买卖股票的最…...
代码随想录训练营Day31
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、分发饼干二、摆动序列三、最大子树组合 前言 今天是跟着代码随想录刷题的第31天,主要学习了分发饼干,摆动序列和最大子树组合这三个…...
Docker 多阶段构建
多阶段构建 目录 尝试创建 Dockerfile构建容器镜像运行 Spring Boot 应用程序使用多阶段构建额外资源 在传统构建中,所有构建指令都在一个构建容器中顺序执行:下载依赖项、编译代码、打包应用程序。所有这些层最终都在你的最终镜像中。这种方法虽然可行…...

Linux应急响应——知攻善防应急靶场-Linux(1)
文章目录 查看history历史指令查看开机自启动项异常连接和端口异常进程定时任务异常服务日志分析账户排查总结 靶场出处是知攻善防 Linux应急响应靶机 1 前景需要: 小王急匆匆地找到小张,小王说"李哥,我dev服务器被黑了",快救救我&…...

基于CDMA的多用户水下无线光通信(1)——背景介绍
研究生期间做多用户水下无线光通信(Underwater Optical Wireless Communication,UOWC),写几篇博客分享一下学的内容。导师给了大方向,让我用直接序列码分多址(Direct Sequence Code Division Multiple Acce…...

基于springboot websocket和okhttp实现消息中转
1、业务介绍 消息源服务的消息不能直接推给用户侧,用户与中间服务建立websocket连接,中间服务再与源服务建立websocket连接,源服务的消息推给中间服务,中间服务再将消息推送给用户。流程如下图: 此例中我们定义中间服…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql
智慧工地管理云平台系统,智慧工地全套源码,java版智慧工地源码,支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求,提供“平台网络终端”的整体解决方案,提供劳务管理、视频管理、智能监测、绿色施工、安全管…...
ssc377d修改flash分区大小
1、flash的分区默认分配16M、 / # df -h Filesystem Size Used Available Use% Mounted on /dev/root 1.9M 1.9M 0 100% / /dev/mtdblock4 3.0M...

网络编程(UDP编程)
思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...

零基础在实践中学习网络安全-皮卡丘靶场(第九期-Unsafe Fileupload模块)(yakit方式)
本期内容并不是很难,相信大家会学的很愉快,当然对于有后端基础的朋友来说,本期内容更加容易了解,当然没有基础的也别担心,本期内容会详细解释有关内容 本期用到的软件:yakit(因为经过之前好多期…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...

AI,如何重构理解、匹配与决策?
AI 时代,我们如何理解消费? 作者|王彬 封面|Unplash 人们通过信息理解世界。 曾几何时,PC 与移动互联网重塑了人们的购物路径:信息变得唾手可得,商品决策变得高度依赖内容。 但 AI 时代的来…...

群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...
Bean 作用域有哪些?如何答出技术深度?
导语: Spring 面试绕不开 Bean 的作用域问题,这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开,结合典型面试题及实战场景,帮你厘清重点,打破模板式回答,…...

Rust 开发环境搭建
环境搭建 1、开发工具RustRover 或者vs code 2、Cygwin64 安装 https://cygwin.com/install.html 在工具终端执行: rustup toolchain install stable-x86_64-pc-windows-gnu rustup default stable-x86_64-pc-windows-gnu 2、Hello World fn main() { println…...

基于开源AI智能名片链动2 + 1模式S2B2C商城小程序的沉浸式体验营销研究
摘要:在消费市场竞争日益激烈的当下,传统体验营销方式存在诸多局限。本文聚焦开源AI智能名片链动2 1模式S2B2C商城小程序,探讨其在沉浸式体验营销中的应用。通过对比传统品鉴、工厂参观等初级体验方式,分析沉浸式体验的优势与价值…...