Flink CDC将MySQL数据同步到数据湖
此项目可以理解为MySQL数据迁移,由Flink Stream监听MySQL的Binlog日志写入Kafka,在Kafka消费端将消息写入Doris或其他外部对象存储。
涉及的环境与版本
组件 | 版本 |
---|---|
flink | 1.20.1 |
flink-cdc | 3.4.0 |
kafka | 2.13-4.0.0 |
Dragonwell | 17 |
引入相关依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>etl</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.20.1</flink.version><flink-cdc.version>3.4.0</flink-cdc.version><kafka-clients.version>3.3.1</kafka-clients.version><fastjson.version>1.2.83</fastjson.version><aliyun-sdk-oss.version>3.18.2</aliyun-sdk-oss.version><lombok.version>1.18.30</lombok.version><hadoop.version>3.3.6</hadoop.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka-clients.version}</version></dependency><dependency><groupId>com.aliyun.oss</groupId><artifactId>aliyun-sdk-oss</artifactId><version>${aliyun-sdk-oss.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-avro</artifactId><version>1.13.1</version></dependency><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>1.12.3</version></dependency></dependencies>
</project>
主程序入口,flinck cdc监听mysql binlog
package org.example;import com.google.common.collect.ImmutableMap;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.time.Duration;public class ConditionalEventSync {public static void main(String[] args) throws Exception {// 配置源mysql连接信息MySqlSource<String> source =MySqlSource.<String>builder().hostname("xxx").port(3306).databaseList("xx").tableList("xx").username("xx").password("xx").deserializer(new JsonDebeziumDeserializationSchema())// 优化项.splitSize(50) // 表快照分片数(默认30).fetchSize(1024) // 每次fetch行数(默认1024).connectTimeout(Duration.ofSeconds(30)).connectionPoolSize(5) // 连接池大小(默认3).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").flatMap(new EventFilterFunction())// 将监听到的数据写入kafka.addSink(new KafkaSink("event_tracking"));env.setParallelism(4);env.enableCheckpointing(10000);// 避免因大数据量写入状态后端(如 RocksDB)导致 Checkpoint 超时env.getCheckpointConfig().setCheckpointTimeout(60000);// todo 大表快照可能导致频繁 Full GC,启动参数增大堆内存env.execute("Conditional Event Tracking Sync");}public static class EventFilterFunction implements FlatMapFunction<String, String> {@Overridepublic void flatMap(String json, Collector<String> out) {// JSONObject event = JSONObject.parseObject(json);// // 条件1:只同步特定类型// if (event.getIntValue("type") == 2) return;// // 条件2:过滤测试IP段// if (event.getString("ip").startsWith("192.168.")) return;out.collect(json);}}
}
将监听到的binlog日志写入kafka
kafka需要先创建对应的topic,UI客户端可以使用https://github.com/obsidiandynamics/kafdrop
package org.example;
import java.util.Properties;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaSink implements SinkFunction<String> {private transient KafkaProducer<String, String> producer;private final String topic;public KafkaSink(String topic) {this.topic = topic;}@Overridepublic void invoke(String value, Context context) {if (producer == null) {producer = createKafkaProducer();}System.out.println("【KafkaSink】Sending event to Kafka.topic: "+topic+",body:" + value);producer.send(new ProducerRecord<>(topic, value));}private KafkaProducer<String, String> createKafkaProducer() {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092"); // Kafka broker 地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<>(props);}public void close() throws Exception {if (producer != null) {producer.close();}}
}
监听到的binlog数据如下,根据op字段判断监听到的数据变更是新增、更新还是删除,消费端需要区分做对应的处理。
{"before": null,"after": {"id": 3,"type": 1,"tag": "pay_enter","user_id": 23,"ip": null,"client": null,"create_time": 1744045915000},"source": {"version": "1.9.8.Final","connector": "mysql","name": "mysql_binlog_source","ts_ms": 0,"snapshot": "false","db": "linda_source","sequence": null,"table": "event_tracking","server_id": 0,"gtid": null,"file": "","pos": 0,"row": 0,"thread": null,"query": null},"op": "r","ts_ms": 1745309434361,"transaction": null
}
kafka消费端可以单独起个项目部署在其他服务器
package org.example;import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Properties;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;public class KafkaConsumer {public static void main(String[] args) {// ===================kafka消费==================Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092"); // Kafka broker 地址props.put("group.id", "test-group"); // 消费者组 IDprops.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer =new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("tool_event_tracking")); // 订阅 topictry {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("【KafkaConsumer】Received message: topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",record.topic(), record.partition(), record.offset(), record.key(), record.value());handleMqEvent(record.value());}}} finally {consumer.close();}}public static void handleMqEvent(String event) {System.out.println("handleMqEvent接收内容:" + event);JSONObject value = JSONObject.parseObject(event);String op = value.getString("op");// u:更新,r:新增,d:删除JSONObject before = value.getJSONObject("before");JSONObject after = value.getJSONObject("after");String userId = null;String path = null;switch (op) {case "c":// 新增saveToDoris(Lists.newArrayList(after.toJavaObject(EventTrackingEntity.class)));break;case "d":userId = before.getString("user_id");// 删除// todobreak;case "u":userId = after.getString("user_id");// 更新// todobreak;}}public static String saveToDoris(List<EventTrackingEntity> dataList) {String jdbcUrl = "jdbc:mysql://172.20.89.65:9030/devops";String username = "root";String password = "";String insertSQL ="INSERT INTO event_tracking (id, type, tag, user_id, ip, client, create_time) VALUES (?, ?, ?, ?, ?, ?, ?)";try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password);PreparedStatement ps = conn.prepareStatement(insertSQL)) {// 设置自动提交为 false,提高性能conn.setAutoCommit(false);for (EventTrackingEntity item : dataList) {ps.setLong(1, item.getId() != null ? item.getId() : 0);ps.setInt(2, item.getType() != null ? item.getType() : 0);ps.setString(3, item.getTag());ps.setLong(4, item.getUserId() != null ? item.getUserId() : 0);ps.setString(5, item.getIp());ps.setString(6, item.getClient());ps.setLong(7, item.getCreateTime().toEpochSecond(ZoneOffset.UTC));ps.addBatch();}int[] result = ps.executeBatch();conn.commit();System.out.println("批量插入完成,影响记录数:" + result.length);return "Success";} catch (SQLException e) {throw new RuntimeException("JDBC 写入 Doris 出错", e);}}}
相关文章:
Flink CDC将MySQL数据同步到数据湖
此项目可以理解为MySQL数据迁移,由Flink Stream监听MySQL的Binlog日志写入Kafka,在Kafka消费端将消息写入Doris或其他外部对象存储。 涉及的环境与版本 组件版本flink1.20.1flink-cdc3.4.0kafka2.13-4.0.0Dragonwell17 引入相关依赖 <?xml versio…...

使用Mathematica观察多形式根的分布随参数的变化
有两种方式观察多项式的根随着参数变化:(1)直接制作一个小的动态视频;(2)绘制所有根形成的痕迹(locus)。 制作动态视频: (*Arg-plane plotting routine with plotting …...

【C++高级主题】转换与多个基类
目录 一、多重继承的虚函数表结构:每个基类一个虚表 1.1 单继承与多重继承的虚表差异 1.2 代码示例:多重继承的虚函数覆盖 1.3 虚表结构示意图 二、指针与引用的类型转换:地址调整的底层逻辑 2.1 派生类指针转基类指针的地址偏移 2.2 …...
C++.双指针算法(1.1目录修正)
C.双指针算法 1. 双指针算法概述1.1 双指针算法的定义1.2 双指针算法的应用场景1.2.1 数组中的两数之和问题1.2.2 链表中的环检测问题1.2.3 滑动窗口问题1.2.4 有序数组的合并问题 2. 双指针算法的实现基础2.1 指针的基本概念2.2 指针的运算操作 3. 双指针算法的常见类型及示例…...

『uniapp』添加桌面长按快捷操作 shortcuts(详细图文注释)
目录 手机环境适配说明安卓效果图代码 iOS(暂未实测,没有水果开发者)总结 欢迎关注 『uniapp』 专栏,持续更新中 欢迎关注 『uniapp』 专栏,持续更新中 手机环境适配说明 个别手机系统可能需要进行特别的权限设置,否则会无法使用 桌面快捷方式: 已知的有…...

【LLM vs Agent】从语言模型到智能体,人工智能迈出的关键一步
目录 一、什么是 LLM?语言的天才,思维的起点 ✅ 特点小结: 二、什么是 Agent?智能的执行者,自主的决策者 ✅ 特点小结: 三、LLM 与 Agent 的关系:是工具,更是大脑 四、案例实战…...
【看到哪里写到哪里】C的指针-3(函数指针)
//定义四个函数 加减乘数 int add(int a, int b) {return a b; } int subtract(int a, int b) {return a - b; } int multiply(int a, int b) {return a * b; } int divide(int a, int b) {if (b 0){printf("Error: devision by ZERO!");return 0;}return a / b; }…...

麦克风和电脑内播放声音实时识别转文字软件FunASR整合包V5下载
我基于FunASR制作的实时语音识别转文字软件当前更新到V5版本。软件可以实时识别麦克风声音和电脑内播放声音转为文字。 FunASR软件介绍 FunASR 是一款基础语音识别工具包和开源 SOTA 预训练模型,支持语音识别、语音活动检测、文本后处理等。 我使用FunASR制作了一…...

PyTorch——卷积层(3)
conv_arithmetic/README.md at master vdumoulin/conv_arithmetic GitHub out_channel1 out_channel2...
(面试)OkHttp实现原理
OkHttp 是一个高效的 HTTP 客户端,被广泛应用于 Android 和 Java 应用中。它提供了许多强大的特性,例如连接池、透明的 GZIP 压缩、HTTP/2 支持等。理解 OkHttp 的实现原理有助于更好地使用和调试它。 以下是 OkHttp 的一些核心实现原理: 1…...

从 PyTorch 到 TensorFlow Lite:模型训练与推理
一、方案介绍 研发阶段:利用 PyTorch 的动态图特性进行快速原型验证,快速迭代模型设计。 灵活性与易用性:PyTorch 是一个非常灵活且易于使用的深度学习框架,特别适合研究和实验。其动态计算图特性使得模型的构建和调试变得更加直…...
C++ 17 正则表达式
正则表达式不是C语言的一部分,这里仅做简单的介绍。 将这项技术引进,在 』的讨论 正则表达式描述了一种字符串匹配的模式。一般使用正则表达式主要是实现下面三个需求: 1,检查一个串是否包含某种形式的子串; 2,将匹配的子串替换&a…...

【存储基础】存储设备和服务器的关系和区别
文章目录 1. 存储设备和服务器的区别2. 客户端访问数据路径场景1:经过服务器处理场景2:客户端直连 3. 服务器作为"中转站"的作用 刚开始接触存储的时候,以为数据都是存放在服务器上的,服务器和存储设备是一个东西&#…...
kernel内核和driver驱动的区别
“kernel”和“driver”虽然都跟操作系统和硬件有关,但它们指的是不同的东西。 1. Kernel(内核) 定义:操作系统的核心组件,是操作系统中负责管理系统资源和硬件的最底层软件。 职责: 管理CPU调度ÿ…...

5.29打卡
浙大疏锦行 DAY 38 Dataset和Dataloader类 知识点回顾: 1. Dataset类的__getitem__和__len__方法(本质是python的特殊方法) 2. Dataloader类 3. minist手写数据集的了解 作业:了解下cifar数据集,尝试获取其中一张图…...

【黑马程序员uniapp】项目配置、请求函数封装
黑马程序员前端项目uniapp小兔鲜儿微信小程序项目视频教程,基于Vue3TsPiniauni-app的最新组合技术栈开发的电商业务全流程_哔哩哔哩_bilibili 参考 有代码,还有app、h5页面、小程序的演示 小兔鲜儿-vue3ts-uniapp-一套代码多端部署: 小兔鲜儿-vue3ts-un…...
ios tableview吸顶
由于项目需要实现一个上滑吸顶的效果,网上也看到有很多种方式实现,但是如果加上下拉刷新的功能会导致界面异常,还有第三方库实现方式库,太繁琐了,下面是我的实现方式,效果如下: tablevie滑动吸顶…...

PyTorch——DataLoader的使用
batch_size, drop_last 的用法 shuffle shuffleTrue 各批次训练的图像不一样 shuffleFalse 在第156step顺序一致...
【Python 进阶2】抽象方法和实例调用方法
抽象方法和实例调用方法 对比表格: 特性抽象方法 (forward)实例调用方法 (call)定义方式abc.abstractmethod 装饰器特殊方法名 __call__调用方式不能直接调用,必须通过子类实现可以直接调用对象:controller(attn, ...)实现要求必须由子类实…...
第1章:走进Golang
第1章:走进Golang 一、Golang简介 Go语言(又称Golang)是由Google的Robert Griesemer、Rob Pike及Ken Thompson开发的一种开源编程语言。它诞生于2007年,2009年11月正式开源。Go语言的设计初衷是为了在不损失应用程序性能的情况下…...

Predixy的docker化
概述 当前已有一套redis cluster的集群,但是fs中的hiredis只能配置单实例redis。 AI了一下方案,可以使用redis的proxy组件来实现从hiredis到redis cluster的互通。 代码地址:https://github.com/joyieldInc/predixy Predixy特性介绍&…...

C++ 之 多态 【虚函数表、多态的原理、动态绑定与静态绑定】
目录 前言 1.多态的原理 1.1虚函数表 1.2派生类中的虚表 1.3虚函数、虚表存放位置 1.4多态的原理 1.5多态条件的思考 2.动态绑定与静态绑定 3.单继承和虚继承中的虚函数表 3.1单继承中的虚函数表 3.2多继承(非菱形继承)中的虚函数表 4.问答题 前言 需要声明的&#x…...

【JavaWeb】Maven、Servlet、cookie/session
目录 5. Maven6. Servlet6.1 Servlet 简介6.2 HelloServlet6.3 Servlet原理6.4 Mapping( **<font style"color:rgb(44, 44, 54);">映射 ** )问题6.5 ServletContext6.6 HttpServletResponse<font style"color:rgb(232, 62, 140);background-color:rgb(…...
[蓝桥杯]阶乘求值【省模拟赛】
问题描述 给定 nn,求 n!n! 除以 10000000071000000007 的余数。 其中 n!n! 表示 nn 的阶乘,值为从 11 连乘到 nn 的积,即 n!123…nn!123…n。 输入格式 输入一行包含一个整数 nn。 输出格式 输出一行,包含一个整数ÿ…...
鸿蒙OSUniApp微服务架构实践:从设计到鸿蒙部署#三方框架 #Uniapp
UniApp微服务架构实践:从设计到鸿蒙部署 引言 在最近的一个大型跨平台项目中,我们面临着一个有趣的挑战:如何在UniApp框架下构建一个可扩展的微服务架构,并确保其在包括鸿蒙在内的多个操作系统上流畅运行。本文将分享我们的实践…...

Rust 编程实现猜数字游戏
文章目录 编程实现猜数字游戏游戏规则创建新项目默认代码处理用户输入代码解析 生成随机数添加依赖生成逻辑 比较猜测值与目标值类型转换 循环与错误处理优化添加循环优雅处理非法输入 最终完整代码核心概念总结 编程实现猜数字游戏 我们使用cargo和rust实现一个经典编程练习…...

关于神经网络中的激活函数
这篇博客主要介绍一下神经网络中的激活函数以及为什么要存在激活函数。 首先,我先做一个简单的类比:激活函数的作用就像给神经网络里的 “数字信号” 加了一个 “智能阀门”,让机器能学会像人类一样思考复杂问题。 没有激活i函数的神经网络…...

CentOS_7.9 2U物理服务器上部署系统简易操作步骤
近期单位网站革新,鉴于安全加固,计划将原有Windows环境更新到Linux-CentOS 7.9,这版本也没的说(绝)了(版)官方停止更新,但无论如何还是被sisi的牵挂着这一大批人,毕竟从接…...
第十三篇:MySQL 运维自动化与可观测性建设实践指南
本篇重点介绍 MySQL 运维自动化的关键工具与流程,深入实践如何构建高效可观测体系,实现数据库系统的持续稳定运行与故障快速响应。 一、为什么需要 MySQL 运维自动化与可观测性? 运维挑战: 手动备份容易遗漏或失败; …...

短视频平台差异视角下开源AI智能名片链动2+1模式S2B2C商城小程序的适配性研究——以抖音与快手为例
摘要 本文以抖音与快手两大短视频平台为研究对象,从用户群体、内容生态、推荐逻辑三维度分析其差异化特征,并探讨开源AI智能名片链动21模式与S2B2C商城小程序在平台适配中的创新价值。研究发现,抖音的流量中心化机制与优质内容导向适合品牌化…...