当前位置: 首页 > news >正文

使用Flink CDC实时监控MySQL数据库变更

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
*  自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value  = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/***  获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。

相关文章:

使用Flink CDC实时监控MySQL数据库变更

在现代数据架构中&#xff0c;实时数据处理变得越来越重要。Flink CDC&#xff08;Change Data Capture&#xff09;是一种强大的工具&#xff0c;可以帮助我们实时捕获数据库的变更&#xff0c;并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据&#xff0…...

学生课程信息管理系统

摘 要 目前&#xff0c;随着科学经济的不断发展&#xff0c;高校规模不断扩大&#xff0c;所招收的学生人数越来越 多&#xff1b;所开设的课程也越来越多。随之而来的是高校需要管理更多的事务。对于日益增 长的学生相关专业的课程也在不断增多&#xff0c;高校对其管理具有一…...

如何看待鸿蒙HarmonyOS?

鸿蒙系统&#xff0c;自2019年8月9日诞生就一直处于舆论风口浪尖上的系统&#xff0c;从最开始的“套壳”OpenHarmony安卓的说法&#xff0c;到去年的不再兼容安卓的NEXT版本的技术预览版发布&#xff0c;对于鸿蒙到底是什么&#xff0c;以及鸿蒙的应用开发的讨论从来没停止过。…...

【论文复现|智能算法改进】一种基于多策略改进的鲸鱼算法

目录 1.算法原理2.改进点3.结果展示4.参考文献5.代码获取 1.算法原理 SCI二区|鲸鱼优化算法&#xff08;WOA&#xff09;原理及实现【附完整Matlab代码】 2.改进点 混沌反向学习策略 将混沌映射和反向学习策略结合&#xff0c;形成混沌反向学习方法&#xff0c;通过该方 法…...

yarn安装配置及使用教程

Yarn 是一款 JavaScript 的包管理工具&#xff0c;是 Facebook, Google, Exponent 和 Tilde 开发的一款新的 JavaScript 包管理工具&#xff0c;它提供了确定性、依赖关系树扁平化等特性&#xff0c;并且与 npm 完全兼容。以下是 Yarn 的安装及使用教程&#xff1a; Yarn 安装…...

有那么点道理。

...

蔚蓝资源包和数据分析

代码如下 /* * COMPUTER GENERATED -- DO NOT EDIT* */#include <windows.h>static FARPROC __Init_Fun_2__; int __RestartAppIfNecessary__Fun() {return 0; } int Init_Fun() {__Init_Fun_2__();return 1; }static FARPROC __GameServer_BSecure__; static FARPROC _…...

MySQL----利用Mycat配置读写分离

首先确保主从复制是正常的&#xff0c;具体步骤在MySQL----配置主从复制。MySQL----配置主从复制 环境 master(CtenOS7)&#xff1a;192.168.200.131 ----ifconfig查看->ens33->inetslave(win10)&#xff1a;192.168.207.52 ----ipconfig查看->无线局域网适配器 WLA…...

【科学计算与可视化】2. pandas 基础

1. 安装 Pandas 首先&#xff0c;确保你已经安装了 Pandas。你可以使用以下命令安装&#xff1a;pip install pandas 2. 导入 Pandas 在开始使用 Pandas 之前&#xff0c;你需要先导入它&#xff1a;import pandas as pd 3. 创建数据结构 Pandas 主要有两种数据结构&#…...

医学记录 --- 腋下异味

逻辑图地址 症状 病因 汗液分泌旺盛&#xff1a;由于天气炎热、活动出汗、肥胖等因素导致汗液分泌旺盛&#xff0c;可引起腋下有异味表现。在这种情况下&#xff0c;建议保持身体清洁&#xff0c;特别是在炎热和潮湿的环境下。可以使用抗菌洗液、喷雾或霜剂来帮助减少细菌滋…...

【Linux】进程间通信_1

文章目录 七、进程间通信1. 进程间通信分类管道 未完待续 七、进程间通信 进程间由于 进程具有独立性 &#xff0c;所以不可以直接进行数据传递。但是我们通常需要多个进程协同&#xff0c;共同完成一件事&#xff0c;所以我们需要进程间通信的手段。进程间通信的本质就是先让…...

Linux Kernel入门到精通系列讲解(RV-Kernel 篇) 5.6 在kernel 中实现系统复位和系统关机驱动

1. 概述 上一章节Qemu篇我们已经实现了我们SOC的power reset和 power down 寄存器,本章节我们就在Linux driver中去实现它。 2. Linux kernel 访问其他节点 Linux kernel中有一种机制,就是在driver中访问其它设备树节点的信息,了解设备树的应该都知道,每个设备节点都有一…...

如何在Java中进行单元测试?

如何在Java中进行单元测试&#xff1f; 大家好&#xff0c;我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编&#xff0c;也是冬天不穿秋裤&#xff0c;天冷也要风度的程序猿&#xff01;今天我们将探讨如何在Java中进行单元测试&#xff0c;这是一项确保代码质…...

代码随想录训练营Day32

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、买卖股票的最佳时机2二、跳跃游戏三、跳跃游戏2四、K次取反后最大化的数组和 前言 今天是跟着代码随想录刷题的第32天&#xff0c;主要是学了买卖股票的最…...

代码随想录训练营Day31

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、分发饼干二、摆动序列三、最大子树组合 前言 今天是跟着代码随想录刷题的第31天&#xff0c;主要学习了分发饼干&#xff0c;摆动序列和最大子树组合这三个…...

Docker 多阶段构建

多阶段构建 目录 尝试创建 Dockerfile构建容器镜像运行 Spring Boot 应用程序使用多阶段构建额外资源 在传统构建中&#xff0c;所有构建指令都在一个构建容器中顺序执行&#xff1a;下载依赖项、编译代码、打包应用程序。所有这些层最终都在你的最终镜像中。这种方法虽然可行…...

Linux应急响应——知攻善防应急靶场-Linux(1)

文章目录 查看history历史指令查看开机自启动项异常连接和端口异常进程定时任务异常服务日志分析账户排查总结 靶场出处是知攻善防 Linux应急响应靶机 1 前景需要&#xff1a; 小王急匆匆地找到小张&#xff0c;小王说"李哥&#xff0c;我dev服务器被黑了",快救救我&…...

基于CDMA的多用户水下无线光通信(1)——背景介绍

研究生期间做多用户水下无线光通信&#xff08;Underwater Optical Wireless Communication&#xff0c;UOWC&#xff09;&#xff0c;写几篇博客分享一下学的内容。导师给了大方向&#xff0c;让我用直接序列码分多址&#xff08;Direct Sequence Code Division Multiple Acce…...

vlan三层交换技术--交换机--(自作)

...

基于springboot websocket和okhttp实现消息中转

1、业务介绍 消息源服务的消息不能直接推给用户侧&#xff0c;用户与中间服务建立websocket连接&#xff0c;中间服务再与源服务建立websocket连接&#xff0c;源服务的消息推给中间服务&#xff0c;中间服务再将消息推送给用户。流程如下图&#xff1a; 此例中我们定义中间服…...

seo sem公司如何制定营销策略

SEO SEM公司如何制定有效的营销策略 在当今数字化时代&#xff0c;SEO&#xff08;搜索引擎优化&#xff09;和SEM&#xff08;搜索引擎营销&#xff09;已经成为企业推广和品牌建立的关键组成部分。无论是中小企业还是大型跨国公司&#xff0c;它们都需要高效、精准的营销策略…...

突破平台壁垒:探索5种在Windows运行Android应用的实战方案与终极选择

突破平台壁垒&#xff1a;探索5种在Windows运行Android应用的实战方案与终极选择 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer 在数字化办公与娱乐深度融合的今天&am…...

基于西门子PLC的空压机组储气风冷机组自动控制系统:“手动自动切换、多机控制及实时监测报警系统

基于西门子plc的空压机组储气风冷机组自动控制系统 可以实现手动自动切换 三组空压机分别自动控制&#xff0c;自动检测三路压力 风冷机运行实时检测 报警查寻&#xff0c;参数设置等上周刚把车间那套跑了快十年的空压机组控制系统给换了&#xff0c;用的是西门子S7-1200&#…...

5分钟上手Vane容器化部署:从零搭建隐私优先的AI搜索引擎

5分钟上手Vane容器化部署&#xff1a;从零搭建隐私优先的AI搜索引擎 【免费下载链接】Vane Vane is an AI-powered answering engine. 项目地址: https://gitcode.com/GitHub_Trending/pe/Vane 想要在5分钟内搭建一个功能强大的AI搜索引擎吗&#xff1f;Vane是一个专注于…...

Qwen3-14B开源模型实战:跨境电商多平台产品文案批量生成

Qwen3-14B开源模型实战&#xff1a;跨境电商多平台产品文案批量生成 1. 跨境电商文案生成的痛点与解决方案 跨境电商运营面临的最大挑战之一&#xff0c;就是需要为同一款产品在不同平台&#xff08;亚马逊、eBay、速卖通等&#xff09;生成符合各自规范的优质文案。传统人工…...

网络安全的概念与规范:从基础到实践

网络安全的概念与规范&#xff1a;从基础到实践 在数字化浪潮席卷全球的今天&#xff0c;网络安全已成为国家安全的重要组成部分。本文将系统梳理网络安全的核心概念、发展历程、主要威胁、前沿趋势以及标准规范&#xff0c;帮助读者建立完整的网络安全知识体系。 一、网络安全…...

秋招简历模板下载怎么选?6款主流简历模板工具深度测评

秋招季来临&#xff0c;对应届生来说&#xff0c;简历是踏入职场的第一块敲门砖&#xff0c;而一份贴合岗位需求、契合HR筛选思路的简历模板&#xff0c;既能降低简历制作难度&#xff0c;也是提高简历初筛通过率的关键。如今市面上的简历模板工具五花八门&#xff0c;功能定位…...

BilibiliDown:B站视频下载的完整解决方案

BilibiliDown&#xff1a;B站视频下载的完整解决方案 【免费下载链接】BilibiliDown (GUI-多平台支持) B站 哔哩哔哩 视频下载器。支持稍后再看、收藏夹、UP主视频批量下载|Bilibili Video Downloader &#x1f633; 项目地址: https://gitcode.com/gh_mirrors/bi/BilibiliDo…...

GitHub开源项目日报 · 2026年3月30日 · 微软开源VibeVoice语音模型登顶,Claude Code生态项目持续火爆

本期榜单涵盖了语音AI、Claude Code辅助编程工具、换脸技术、金融数据平台、在线教育、数据可视化等多个领域的开源项目。超过10000星以上的项目有9个,其中freeCodeCamp以近44万星稳居榜首,Apache Superset、OpenBB、Deep-Live-Cam等项目也获得广泛关注。微软开源的VibeVoice…...

如何高效一站式解决B站资源下载难题:BiliTools全方位使用指南

如何高效一站式解决B站资源下载难题&#xff1a;BiliTools全方位使用指南 【免费下载链接】BiliTools A cross-platform bilibili toolbox. 跨平台哔哩哔哩工具箱&#xff0c;支持下载视频、番剧等等各类资源 项目地址: https://gitcode.com/GitHub_Trending/bilit/BiliTools…...