使用Flink CDC实时监控MySQL数据库变更
在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。
环境准备
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
- 获取Flink执行环境
首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 启用检查点和设置并行度
为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。
env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
- 使用Debezium Source读取MySQL的binlog
接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。
DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();
这里 JsonDebeziumDeserializationSchema类的代码如下:
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
* 自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/*** 获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
- 添加数据源并打印数据
将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。
DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
- 启动任务
最后,启动Flink作业,开始处理数据流。
env.execute("Flink-CDC");
6.测试

总结
通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。
相关文章:
使用Flink CDC实时监控MySQL数据库变更
在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据࿰…...
学生课程信息管理系统
摘 要 目前,随着科学经济的不断发展,高校规模不断扩大,所招收的学生人数越来越 多;所开设的课程也越来越多。随之而来的是高校需要管理更多的事务。对于日益增 长的学生相关专业的课程也在不断增多,高校对其管理具有一…...
如何看待鸿蒙HarmonyOS?
鸿蒙系统,自2019年8月9日诞生就一直处于舆论风口浪尖上的系统,从最开始的“套壳”OpenHarmony安卓的说法,到去年的不再兼容安卓的NEXT版本的技术预览版发布,对于鸿蒙到底是什么,以及鸿蒙的应用开发的讨论从来没停止过。…...
【论文复现|智能算法改进】一种基于多策略改进的鲸鱼算法
目录 1.算法原理2.改进点3.结果展示4.参考文献5.代码获取 1.算法原理 SCI二区|鲸鱼优化算法(WOA)原理及实现【附完整Matlab代码】 2.改进点 混沌反向学习策略 将混沌映射和反向学习策略结合,形成混沌反向学习方法,通过该方 法…...
yarn安装配置及使用教程
Yarn 是一款 JavaScript 的包管理工具,是 Facebook, Google, Exponent 和 Tilde 开发的一款新的 JavaScript 包管理工具,它提供了确定性、依赖关系树扁平化等特性,并且与 npm 完全兼容。以下是 Yarn 的安装及使用教程: Yarn 安装…...
有那么点道理。
...
蔚蓝资源包和数据分析
代码如下 /* * COMPUTER GENERATED -- DO NOT EDIT* */#include <windows.h>static FARPROC __Init_Fun_2__; int __RestartAppIfNecessary__Fun() {return 0; } int Init_Fun() {__Init_Fun_2__();return 1; }static FARPROC __GameServer_BSecure__; static FARPROC _…...
MySQL----利用Mycat配置读写分离
首先确保主从复制是正常的,具体步骤在MySQL----配置主从复制。MySQL----配置主从复制 环境 master(CtenOS7):192.168.200.131 ----ifconfig查看->ens33->inetslave(win10):192.168.207.52 ----ipconfig查看->无线局域网适配器 WLA…...
【科学计算与可视化】2. pandas 基础
1. 安装 Pandas 首先,确保你已经安装了 Pandas。你可以使用以下命令安装:pip install pandas 2. 导入 Pandas 在开始使用 Pandas 之前,你需要先导入它:import pandas as pd 3. 创建数据结构 Pandas 主要有两种数据结构&#…...
医学记录 --- 腋下异味
逻辑图地址 症状 病因 汗液分泌旺盛:由于天气炎热、活动出汗、肥胖等因素导致汗液分泌旺盛,可引起腋下有异味表现。在这种情况下,建议保持身体清洁,特别是在炎热和潮湿的环境下。可以使用抗菌洗液、喷雾或霜剂来帮助减少细菌滋…...
【Linux】进程间通信_1
文章目录 七、进程间通信1. 进程间通信分类管道 未完待续 七、进程间通信 进程间由于 进程具有独立性 ,所以不可以直接进行数据传递。但是我们通常需要多个进程协同,共同完成一件事,所以我们需要进程间通信的手段。进程间通信的本质就是先让…...
Linux Kernel入门到精通系列讲解(RV-Kernel 篇) 5.6 在kernel 中实现系统复位和系统关机驱动
1. 概述 上一章节Qemu篇我们已经实现了我们SOC的power reset和 power down 寄存器,本章节我们就在Linux driver中去实现它。 2. Linux kernel 访问其他节点 Linux kernel中有一种机制,就是在driver中访问其它设备树节点的信息,了解设备树的应该都知道,每个设备节点都有一…...
如何在Java中进行单元测试?
如何在Java中进行单元测试? 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天我们将探讨如何在Java中进行单元测试,这是一项确保代码质…...
代码随想录训练营Day32
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、买卖股票的最佳时机2二、跳跃游戏三、跳跃游戏2四、K次取反后最大化的数组和 前言 今天是跟着代码随想录刷题的第32天,主要是学了买卖股票的最…...
代码随想录训练营Day31
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、分发饼干二、摆动序列三、最大子树组合 前言 今天是跟着代码随想录刷题的第31天,主要学习了分发饼干,摆动序列和最大子树组合这三个…...
Docker 多阶段构建
多阶段构建 目录 尝试创建 Dockerfile构建容器镜像运行 Spring Boot 应用程序使用多阶段构建额外资源 在传统构建中,所有构建指令都在一个构建容器中顺序执行:下载依赖项、编译代码、打包应用程序。所有这些层最终都在你的最终镜像中。这种方法虽然可行…...
Linux应急响应——知攻善防应急靶场-Linux(1)
文章目录 查看history历史指令查看开机自启动项异常连接和端口异常进程定时任务异常服务日志分析账户排查总结 靶场出处是知攻善防 Linux应急响应靶机 1 前景需要: 小王急匆匆地找到小张,小王说"李哥,我dev服务器被黑了",快救救我&…...
基于CDMA的多用户水下无线光通信(1)——背景介绍
研究生期间做多用户水下无线光通信(Underwater Optical Wireless Communication,UOWC),写几篇博客分享一下学的内容。导师给了大方向,让我用直接序列码分多址(Direct Sequence Code Division Multiple Acce…...
基于springboot websocket和okhttp实现消息中转
1、业务介绍 消息源服务的消息不能直接推给用户侧,用户与中间服务建立websocket连接,中间服务再与源服务建立websocket连接,源服务的消息推给中间服务,中间服务再将消息推送给用户。流程如下图: 此例中我们定义中间服…...
利用最小二乘法找圆心和半径
#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …...
vscode里如何用git
打开vs终端执行如下: 1 初始化 Git 仓库(如果尚未初始化) git init 2 添加文件到 Git 仓库 git add . 3 使用 git commit 命令来提交你的更改。确保在提交时加上一个有用的消息。 git commit -m "备注信息" 4 …...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
脑机新手指南(八):OpenBCI_GUI:从环境搭建到数据可视化(下)
一、数据处理与分析实战 (一)实时滤波与参数调整 基础滤波操作 60Hz 工频滤波:勾选界面右侧 “60Hz” 复选框,可有效抑制电网干扰(适用于北美地区,欧洲用户可调整为 50Hz)。 平滑处理&…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
蓝桥杯 2024 15届国赛 A组 儿童节快乐
P10576 [蓝桥杯 2024 国 A] 儿童节快乐 题目描述 五彩斑斓的气球在蓝天下悠然飘荡,轻快的音乐在耳边持续回荡,小朋友们手牵着手一同畅快欢笑。在这样一片安乐祥和的氛围下,六一来了。 今天是六一儿童节,小蓝老师为了让大家在节…...
测试markdown--肇兴
day1: 1、去程:7:04 --11:32高铁 高铁右转上售票大厅2楼,穿过候车厅下一楼,上大巴车 ¥10/人 **2、到达:**12点多到达寨子,买门票,美团/抖音:¥78人 3、中饭&a…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
