使用Flink CDC实时监控MySQL数据库变更
在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。
环境准备
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
- 获取Flink执行环境
首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 启用检查点和设置并行度
为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。
env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
- 使用Debezium Source读取MySQL的binlog
接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();
这里 JsonDebeziumDeserializationSchema类的代码如下:
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
* 自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/*** 获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
- 添加数据源并打印数据
将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。
DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
- 启动任务
最后,启动Flink作业,开始处理数据流。
env.execute("Flink-CDC");
6.测试

总结
通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。
相关文章:
使用Flink CDC实时监控MySQL数据库变更
在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据࿰…...
学生课程信息管理系统
摘 要 目前,随着科学经济的不断发展,高校规模不断扩大,所招收的学生人数越来越 多;所开设的课程也越来越多。随之而来的是高校需要管理更多的事务。对于日益增 长的学生相关专业的课程也在不断增多,高校对其管理具有一…...
如何看待鸿蒙HarmonyOS?
鸿蒙系统,自2019年8月9日诞生就一直处于舆论风口浪尖上的系统,从最开始的“套壳”OpenHarmony安卓的说法,到去年的不再兼容安卓的NEXT版本的技术预览版发布,对于鸿蒙到底是什么,以及鸿蒙的应用开发的讨论从来没停止过。…...
【论文复现|智能算法改进】一种基于多策略改进的鲸鱼算法
目录 1.算法原理2.改进点3.结果展示4.参考文献5.代码获取 1.算法原理 SCI二区|鲸鱼优化算法(WOA)原理及实现【附完整Matlab代码】 2.改进点 混沌反向学习策略 将混沌映射和反向学习策略结合,形成混沌反向学习方法,通过该方 法…...
yarn安装配置及使用教程
Yarn 是一款 JavaScript 的包管理工具,是 Facebook, Google, Exponent 和 Tilde 开发的一款新的 JavaScript 包管理工具,它提供了确定性、依赖关系树扁平化等特性,并且与 npm 完全兼容。以下是 Yarn 的安装及使用教程: Yarn 安装…...
有那么点道理。
...
蔚蓝资源包和数据分析
代码如下 /* * COMPUTER GENERATED -- DO NOT EDIT* */#include <windows.h>static FARPROC __Init_Fun_2__; int __RestartAppIfNecessary__Fun() {return 0; } int Init_Fun() {__Init_Fun_2__();return 1; }static FARPROC __GameServer_BSecure__; static FARPROC _…...
MySQL----利用Mycat配置读写分离
首先确保主从复制是正常的,具体步骤在MySQL----配置主从复制。MySQL----配置主从复制 环境 master(CtenOS7):192.168.200.131 ----ifconfig查看->ens33->inetslave(win10):192.168.207.52 ----ipconfig查看->无线局域网适配器 WLA…...
【科学计算与可视化】2. pandas 基础
1. 安装 Pandas 首先,确保你已经安装了 Pandas。你可以使用以下命令安装:pip install pandas 2. 导入 Pandas 在开始使用 Pandas 之前,你需要先导入它:import pandas as pd 3. 创建数据结构 Pandas 主要有两种数据结构&#…...
医学记录 --- 腋下异味
逻辑图地址 症状 病因 汗液分泌旺盛:由于天气炎热、活动出汗、肥胖等因素导致汗液分泌旺盛,可引起腋下有异味表现。在这种情况下,建议保持身体清洁,特别是在炎热和潮湿的环境下。可以使用抗菌洗液、喷雾或霜剂来帮助减少细菌滋…...
【Linux】进程间通信_1
文章目录 七、进程间通信1. 进程间通信分类管道 未完待续 七、进程间通信 进程间由于 进程具有独立性 ,所以不可以直接进行数据传递。但是我们通常需要多个进程协同,共同完成一件事,所以我们需要进程间通信的手段。进程间通信的本质就是先让…...
Linux Kernel入门到精通系列讲解(RV-Kernel 篇) 5.6 在kernel 中实现系统复位和系统关机驱动
1. 概述 上一章节Qemu篇我们已经实现了我们SOC的power reset和 power down 寄存器,本章节我们就在Linux driver中去实现它。 2. Linux kernel 访问其他节点 Linux kernel中有一种机制,就是在driver中访问其它设备树节点的信息,了解设备树的应该都知道,每个设备节点都有一…...
如何在Java中进行单元测试?
如何在Java中进行单元测试? 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨如何在Java中进行单元测试,这是一项确保代码质…...
代码随想录训练营Day32
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、买卖股票的最佳时机2二、跳跃游戏三、跳跃游戏2四、K次取反后最大化的数组和 前言 今天是跟着代码随想录刷题的第32天,主要是学了买卖股票的最…...
代码随想录训练营Day31
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、分发饼干二、摆动序列三、最大子树组合 前言 今天是跟着代码随想录刷题的第31天,主要学习了分发饼干,摆动序列和最大子树组合这三个…...
Docker 多阶段构建
多阶段构建 目录 尝试创建 Dockerfile构建容器镜像运行 Spring Boot 应用程序使用多阶段构建额外资源 在传统构建中,所有构建指令都在一个构建容器中顺序执行:下载依赖项、编译代码、打包应用程序。所有这些层最终都在你的最终镜像中。这种方法虽然可行…...
Linux应急响应——知攻善防应急靶场-Linux(1)
文章目录 查看history历史指令查看开机自启动项异常连接和端口异常进程定时任务异常服务日志分析账户排查总结 靶场出处是知攻善防 Linux应急响应靶机 1 前景需要: 小王急匆匆地找到小张,小王说"李哥,我dev服务器被黑了",快救救我&…...
基于CDMA的多用户水下无线光通信(1)——背景介绍
研究生期间做多用户水下无线光通信(Underwater Optical Wireless Communication,UOWC),写几篇博客分享一下学的内容。导师给了大方向,让我用直接序列码分多址(Direct Sequence Code Division Multiple Acce…...
基于springboot websocket和okhttp实现消息中转
1、业务介绍 消息源服务的消息不能直接推给用户侧,用户与中间服务建立websocket连接,中间服务再与源服务建立websocket连接,源服务的消息推给中间服务,中间服务再将消息推送给用户。流程如下图: 此例中我们定义中间服…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
Linux云原生安全:零信任架构与机密计算
Linux云原生安全:零信任架构与机密计算 构建坚不可摧的云原生防御体系 引言:云原生安全的范式革命 随着云原生技术的普及,安全边界正在从传统的网络边界向工作负载内部转移。Gartner预测,到2025年,零信任架构将成为超…...
华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建
华为云FlexusDeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色,华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型,能助力我们轻松驾驭 DeepSeek-V3/R1,本文中将分享如何…...
GruntJS-前端自动化任务运行器从入门到实战
Grunt 完全指南:从入门到实战 一、Grunt 是什么? Grunt是一个基于 Node.js 的前端自动化任务运行器,主要用于自动化执行项目开发中重复性高的任务,例如文件压缩、代码编译、语法检查、单元测试、文件合并等。通过配置简洁的任务…...
解析奥地利 XARION激光超声检测系统:无膜光学麦克风 + 无耦合剂的技术协同优势及多元应用
在工业制造领域,无损检测(NDT)的精度与效率直接影响产品质量与生产安全。奥地利 XARION开发的激光超声精密检测系统,以非接触式光学麦克风技术为核心,打破传统检测瓶颈,为半导体、航空航天、汽车制造等行业提供了高灵敏…...
Vue ③-生命周期 || 脚手架
生命周期 思考:什么时候可以发送初始化渲染请求?(越早越好) 什么时候可以开始操作dom?(至少dom得渲染出来) Vue生命周期: 一个Vue实例从 创建 到 销毁 的整个过程。 生命周期四个…...
MySQL 主从同步异常处理
阅读原文:https://www.xiaozaoshu.top/articles/mysql-m-s-update-pk MySQL 做双主,遇到的这个错误: Could not execute Update_rows event on table ... Error_code: 1032是 MySQL 主从复制时的经典错误之一,通常表示ÿ…...
【Post-process】【VBA】ETABS VBA FrameObj.GetNameList and write to EXCEL
ETABS API实战:导出框架元素数据到Excel 在结构工程师的日常工作中,经常需要从ETABS模型中提取框架元素信息进行后续分析。手动复制粘贴不仅耗时,还容易出错。今天我们来用简单的VBA代码实现自动化导出。 🎯 我们要实现什么? 一键点击,就能将ETABS中所有框架元素的基…...
Spring Boot + MyBatis 集成支付宝支付流程
Spring Boot MyBatis 集成支付宝支付流程 核心流程 商户系统生成订单调用支付宝创建预支付订单用户跳转支付宝完成支付支付宝异步通知支付结果商户处理支付结果更新订单状态支付宝同步跳转回商户页面 代码实现示例(电脑网站支付) 1. 添加依赖 <!…...
