当前位置: 首页 > news >正文

flink-connector-mysql-cdc:02 mysql-cdc高级扩展

  • flink-connector-mysql-cdc:
  • 01 mysql-cdc基础配置代码演示
  • 02 mysql-cdc高级扩展
  • 03 mysql-cdc常见问题汇总
  • 04 mysql-cdc-kafka生产级代码分享
  • 05 flink-kafka-doris生产级代码分享
  • 06 flink-kafka-hudi生产级代码分享

  • flink-cdc版本:3.2.0
  • flink版本:flink-1.18.0
  • mysql版本:8.0.26
  • java版本:1.8
  • maven版本:3.8.4

  • mysql-cdc同步从库数据

  • 从库需要配置 log-slave-updates = 1 使从实例也能将从主实例同步的数据写入从库的 binlog 文件中,如果主库开启了gtid mode,从库也需要开启。
log-slave-updates = 1
gtid_mode = on 
enforce_gtid_consistency = on 

mysql-cdc同步分库分表的表

mysql cdc 的表名和库名均支持正则配置,比如 ’.tableList("cdc_demo.flink_cdc_.*")’ 可以匹配表名 cdc_demo.flink_cdc_01, cdc_demo.flink_cdc_02,cdc_demo.flink_cdc_a表.

注意正则匹配任意字符是’.’ 而不是 ‘*’, 其中点号表示任意字符,星号表示0个或多个,databaseList也如此。

MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkcdc").password("123456")// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc_.*")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc_demo.flink_cdc_.*").serverTimeZone("Asia/Shanghai")// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();

 自定义类型转器

3.3.1 实现类型转换类

package com.toroidal.utils;import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Date;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;public class DebeziumConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private static final Logger log = LoggerFactory.getLogger(DebeziumConverter.class);private static final String DATE_FORMAT = "yyyy-MM-dd";private static final String TIME_FORMAT = "HH:mm:ss";private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter;private DateTimeFormatter datetimeFormatter;private SchemaBuilder schemaBuilder;private String databaseType;private String schemaNamePrefix;// 获取默认时区private final ZoneId zoneId = ZoneOffset.systemDefault();@Overridepublic void configure(Properties properties) {// 必填参数:database.type。获取数据库的类型,暂时支持mysql、sqlserverthis.databaseType = properties.getProperty("database.type");// 如果未设置,或者设置的不是mysql、sqlserver,则抛出异常。if (this.databaseType == null || (!this.databaseType.equals("mysql") && !this.databaseType.equals("oracle") && !this.databaseType.equals("sqlserver"))) {throw new IllegalArgumentException("database.type 必须设置为: mysql、sqlserver或oracle");}// 选填参数:format.date、format.time、format.datetime。获取时间格式化的格式String dateFormat = properties.getProperty("format.date", DATE_FORMAT);String timeFormat = properties.getProperty("format.time", TIME_FORMAT);String datetimeFormat = properties.getProperty("format.datetime", DATETIME_FORMAT);// 获取自身类的包名+数据库类型为默认schema.nameString className = this.getClass().getName();// 查看是否设置schema.name.prefixthis.schemaNamePrefix = properties.getProperty("schema.name.prefix", className + "." + this.databaseType);// 初始化时间格式化器dateFormatter = DateTimeFormatter.ofPattern(dateFormat);timeFormatter = DateTimeFormatter.ofPattern(timeFormat);datetimeFormatter = DateTimeFormatter.ofPattern(datetimeFormat);}// mysql的转换器public void registerMysqlConverter(String columnType, ConverterRegistration<SchemaBuilder> converterRegistration) {String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();schemaBuilder = SchemaBuilder.string().name(schemaName);switch (columnType) {case "DATE":converterRegistration.register(schemaBuilder, value -> {if (value == null) {return null;} else if (value instanceof LocalDate) {return dateFormatter.format((LocalDate) value);} else if (value instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) value);return dateFormatter.format(date);} else {return String.valueOf(value);}});break;case "TIME":converterRegistration.register(schemaBuilder, value -> {if (value == null) {return null;} else if (value instanceof java.time.Duration) {return timeFormatter.format(java.time.LocalTime.ofNanoOfDay(((java.time.Duration) value).toNanos()));} else if (value instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) value);return dateFormatter.format(date);} else {return String.valueOf(value);}});break;case "DATETIME":case "TIMESTAMP":converterRegistration.register(schemaBuilder, value -> {if (value == null) {return null;} else if (value instanceof java.time.LocalDateTime) {return datetimeFormatter.format((java.time.LocalDateTime) value);} else if (value instanceof java.time.ZonedDateTime) {// 获取系统默认时区
//                        ZoneOffset zoneOffset = java.time.ZoneId.systemDefault().getRules().getOffset(java.time.Instant.now());
//                        return datetimeFormatter.format(((java.time.ZonedDateTime) value).withZoneSameInstant(zoneOffset).toLocalDateTime());return datetimeFormatter.format(((java.time.ZonedDateTime) value).withZoneSameInstant(zoneId).toLocalDateTime());} else if (value instanceof java.sql.Timestamp) {return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());} else if (value instanceof String) {// 初始化出现1970-01-01T00:00:00Zd的值,需要转换Instant instant = Instant.parse((String) value);java.time.LocalDateTime dateTime = java.time.LocalDateTime.ofInstant(instant, ZoneOffset.UTC);return datetimeFormatter.format(dateTime);} else {return String.valueOf(value);}});break;default:schemaBuilder = null;break;}}// oracle的转换器public void registerSqlserverConverter(String columnType, ConverterRegistration<SchemaBuilder> converterRegistration) {String schemaName = this.schemaNamePrefix + "." + columnType.toLowerCase();schemaBuilder = SchemaBuilder.string().name(schemaName);switch (columnType) {case "DATE":converterRegistration.register(schemaBuilder, value -> {System.out.println("120 value: " + value + " columnType: " + columnType);if (value == null) {return null;} else if (value instanceof Date) {return dateFormatter.format(((Date) value).toLocalDate());} else if (value instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) value);return dateFormatter.format(date);} else {return String.valueOf(value).replace("TO_DATE('", "").replace("', 'YYYY-MM-DD HH24:MI:SS')", "");}});break;case "TIMESTAMP":case "TIMESTAMP(3)":case "TIMESTAMP(6)":case "TIMESTAMP(9)":converterRegistration.register(schemaBuilder, value -> {System.out.println("137 value: " + value + " columnType: " + columnType);if (value == null) {return null;} else if (value instanceof java.sql.Timestamp) {return timeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());} else {return String.valueOf(value).replace("TO_TIMESTAMP('", "").replace("')", "");}});break;default:schemaBuilder = null;break;}}@Overridepublic void converterFor(RelationalColumn relationalColumn, ConverterRegistration<SchemaBuilder> converterRegistration) {// 获取字段类型String columnType = relationalColumn.typeName().toUpperCase();log.info("数据库:{},字段名称:{},字段类型:{},jdbc type :{}", this.databaseType, relationalColumn.name(), columnType, relationalColumn.jdbcType());// 根据数据库类型调用不同的转换器if (this.databaseType.equals("mysql")) {this.registerMysqlConverter(columnType, converterRegistration);} else if (this.databaseType.equals("oracle")) {this.registerSqlserverConverter(columnType, converterRegistration);} else {log.warn("===failed 不支持的数据库类型: {}", this.databaseType);schemaBuilder = null;}}private String getClassName(Object value) {if (value == null) {return null;}return value.getClass().getName();}
}

3.3.2 配置自定义类型转换器

Properties prop = new Properties();prop.setProperty("converters", "dateConverters");prop.setProperty("dateConverters.type", "com.toroidal.utils.DebeziumConverter");prop.setProperty("dateConverters.database.type", "mysql");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("localhost").port(3306)// 表所在的数据库库名,如果需要捕获整个库的表,配置为:".*".;如果需要捕获多个数据库配置为:.databaseList("cdc01", "cdc02").databaseList("cdc")// 设置需要捕获日志的表名,注意需要配置库名,大小敏感.tableList("cdc.flink_cdc_test").username("flinkcdc").serverTimeZone("Asia/Shanghai").serverId("flink-cdc-01").password("123456").debeziumProperties(prop)// 将 SourceRecord 转换为 JSON 字符串。.deserializer(new JsonDebeziumDeserializationSchema()).build();

相关文章:

flink-connector-mysql-cdc:02 mysql-cdc高级扩展

flink-connector-mysql-cdc&#xff1a;01 mysql-cdc基础配置代码演示02 mysql-cdc高级扩展03 mysql-cdc常见问题汇总04 mysql-cdc-kafka生产级代码分享05 flink-kafka-doris生产级代码分享06 flink-kafka-hudi生产级代码分享 flink-cdc版本&#xff1a;3.2.0flink版本&#xf…...

Couchbase 简介

Couchbase 是一款分布式 NoSQL 数据库&#xff0c;主要用于现代应用程序中高性能、高可扩展性和灵活的数据存储需求。它结合了文档存储和键值存储的特性&#xff0c;为开发者提供了一种高效的数据库解决方案。 Couchbase 的特点 高性能&#xff1a; 支持内存优先的架构&#x…...

我们来学mysql -- 事务并发之幻读(原理篇)

事务并发之幻读 题记幻读系列文章 题记 在《事务之概念》提到事务对应现实世界的状态转换&#xff0c;这个过程要满足4个特性这世界&#xff0c;真理只在大炮射程之类&#xff0c;通往和平的道路&#xff0c;非“常人”可以驾驭一个人生活按部就班&#xff0c;人多起来&#x…...

Ubuntu Linux 图形界面工具管理磁盘分区和文件系统(八)

本文为Ubuntu Linux操作系统- 第八弹~~ 今天接着上文的内容&#xff0c;讲Linux磁盘分区存储的相关知识~ 上期回顾&#xff1a;命令行-管理磁盘分区和文件系统 今天看酷酷的雪獒铠甲&#xff01;&#xff01;雪獒铠甲合体~ 文章目录 磁盘管理器GNOME Disks主要功能安装命令 磁盘…...

Eclipse IDE 各个版本的用途和区别

Eclipse官方下载地址:https://www.eclipse.org/downloads/packages/ 会出现很多个Eclipse版本,初学者可能会感觉到很迷惑,不知道下载哪个版本。 Eclipse IDE for Enterprise Java and Web Developers (544 MB) 专为 Java 和 Web 应用开发者设计 包含 Java IDE、JavaScript、…...

国产GPU中,VLLM0.5.0发布Qwen2.5-14B-Instruct-GPTQ-Int8模型,请求返回结果乱码

概述 国产GPU: DCU Z100 推理框架&#xff1a; vllm0.5.0 docker容器化部署 运行如下代码&#xff1a; python -m vllm.entrypoints.openai.api_server --model /app/models/Qwen2.5-14B-Instruct-GPTQ-Int8 --served-model-name qwen-gptq --trust-remote-code --enforce…...

在 Vue 3 中实现点击按钮后禁止浏览器前进或后退

在 Vue 3 中实现点击按钮后禁止浏览器前进或后退&#xff0c;我们可以通过 ref 和 watch 来管理状态&#xff0c;同时使用 onBeforeUnmount 来清理事件监听。 使用 Vue 3 实现&#xff1a; <template><div><button click"disableNavigation">点击…...

Linux:软硬链接

目录 一、概念 软链接 硬链接 二、原理 硬链接 软链接 三、使用场景 硬链接 软链接 一、概念 软链接 在当前目录下&#xff0c;有一个普通文件a.txt。 ln -s a.txt a_soft.link结论&#xff1a; 软链接是一个文件。 观察inode_id&#xff0c;发现软链接有着独立…...

Delphi XE 安卓Web开发 错误:net::ERR_CLEARTEXT_NOT_PERMITTED

解决方法&#xff1a; 1、确保已经申明权限&#xff08;AndroidManifest.xml 文件&#xff09; 1 <uses-permission android:name"android.permission.INTERNET" /> 2、开启 usesCleartextTraffic 1 2 <application android:usesCleartextTraffic&qu…...

深入理解malloc与vector:内存管理的对比

引言‌ 在编程中&#xff0c;内存管理是一个至关重要的环节。无论是C语言中的malloc函数&#xff0c;还是C标准库中的vector容器&#xff0c;它们都在内存分配和释放上扮演着关键角色。然而&#xff0c;它们的设计理念和用法有着显著的不同。本文将深入探讨malloc和vector的区…...

多个输入框联合搜索

如果你有多个输入框&#xff0c;并希望进行联合精准搜索&#xff0c;可以通过组合多个输入框的值来过滤数据。在JavaScript中&#xff0c;常见的做法是先收集每个输入框的值&#xff0c;然后使用这些值过滤数据。 示例&#xff1a;多个输入框联合精准搜索 假设有多个输入框用…...

笔记03----NeurIPS2024 涨点!SSA:用于语义分割的语义和空间自适应像素级分类器(即插即用)

前言 文章标题&#xff1a;《SSA-Seg: Semantic and Spatial Adaptive Pixel-level Classiffer for Semantic Segmentation》 助力语义分割涨点!SSA:一种新颖的语义和空间自适应分类器&#xff0c;显著提高了基线模型的分割性能&#xff0c;比如SegNeXt、OCRNet和UperNet等模型…...

自定义比较函数 down 作为 sort 函数的参数实现数组元素从大到小排序

【自定义比较函数 down 作为 sort 函数的参数实现数组元素从大到小排序】 #include <bits/stdc.h> using namespace std;const int maxn1e35; int a[maxn];bool down(int u,int v) {return u>v; }int main() {int n;cin>>n;for(int i0; i<n; i) cin>>…...

在 Spring Boot 中使用 JPA(Java Persistence API)进行数据库操作

步骤 1: 添加依赖 在 pom.xml 文件中添加相关依赖&#xff1a; <dependencies><!-- Spring Boot Starter Web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><…...

简单聊聊PLT和GOT

在 Linux 的动态链接中&#xff0c;PLT&#xff08;Procedure Linkage Table&#xff09; 和 GOT&#xff08;Global Offset Table&#xff09; 是动态链接机制中的两个关键组件&#xff0c;它们一起支持程序动态加载共享库以及在运行时解析符号地址。下面是它们的作用和原理&a…...

FaRM译文

No compromises: distributed transactions with consistency, availability, and performance Aleksandar Dragojevic, Dushyanth Narayanan, Edmund B. Nightingale, Matthew Renzelmann, Alex Shamis, Anirudh Badam, Miguel Castro Microsoft Research 目录 摘要 1. 引…...

用vue框架写一个时钟的页面

你可以使用Vue框架来创建一个简单的时钟页面。首先&#xff0c;你需要在HTML文件中引入Vue框架的CDN&#xff1a; <script src"https://cdn.jsdelivr.net/npm/vue"></script>然后&#xff0c;创建一个包含时钟功能的Vue实例&#xff1a; <div id&qu…...

HTML表单-第二部分

HTML表单 表单元素是允许用户在表单中输入内容&#xff0c;比如&#xff1a;文本域&#xff0c;下拉列表&#xff0c;单选框&#xff0c;复选框等等‘ 使用<from>标签创建 例如 <from> . input . </from> HTML表单-输入元素 <input>标签创建&#xff…...

PyQt5:一个逗号引发的闪退血案

【日常小计】 在开发PyQt5程序时&#xff0c;调用了一个写入excel表格的后端方法&#xff0c;但是每次打开页面点击对应的动作&#xff0c;窗口就会闪退&#xff0c;而且Python后台也没有提示出任何的异常堆栈&#xff0c;后来经过在后端一点一点的单点测试&#xff0c;终于发…...

AI智能体Prompt预设词指令大全+GPTs应用使用

AI智能体使用指南 直接复制在AI工具助手中使用&#xff08;提问前&#xff09; 可前往SparkAi系统用户官网进行直接使用 SparkAI系统介绍文档&#xff1a;Docs 常见AI智能体GPTs应用大全在线使用 自定义添加制作AI智能体进行使用&#xff1a; 文章润色器 你是一位具有敏锐洞察…...

揭秘新篇!AI应用架构师的数据安全服务AI防护新思路

揭秘新篇&#xff01;AI应用架构师的数据安全服务AI防护新思路 一、引言&#xff1a;AI时代的数据安全困局 当我们谈论AI应用时&#xff0c;数据是一切的核心——它是模型训练的“燃料”&#xff0c;是推理决策的“依据”&#xff0c;更是企业的核心资产。但随着AI技术的普及&a…...

Video2X:用AI突破视频质量瓶颈的全栈解决方案

Video2X&#xff1a;用AI突破视频质量瓶颈的全栈解决方案 【免费下载链接】video2x A lossless video/GIF/image upscaler achieved with waifu2x, Anime4K, SRMD and RealSR. Started in Hack the Valley II, 2018. 项目地址: https://gitcode.com/GitHub_Trending/vi/video…...

效率提升神器:快马AI自动生成安装脚本,告别重复配置工作

效率提升神器&#xff1a;快马AI自动生成安装脚本&#xff0c;告别重复配置工作 每次给团队批量安装正版软件时&#xff0c;最头疼的就是重复配置。记得上个月部署开发环境&#xff0c;光是手动点下一步、选路径、勾选组件就花了整整一上午&#xff0c;还因为手滑选错选项导致…...

实战应用:从零到一,使用快马构建资料更新内容管理系统的完整案例

实战应用&#xff1a;从零到一&#xff0c;使用快马构建资料更新内容管理系统的完整案例 最近接手了一个资料大全的版本更新管理需求&#xff0c;需要搭建一个简单高效的内容管理系统。经过一番摸索&#xff0c;我发现用InsCode(快马)平台可以快速实现这个功能&#xff0c;整个…...

3大实战技巧:专业级Python通达信数据接口深度应用指南

3大实战技巧&#xff1a;专业级Python通达信数据接口深度应用指南 【免费下载链接】mootdx 通达信数据读取的一个简便使用封装 项目地址: https://gitcode.com/GitHub_Trending/mo/mootdx 在量化投资和金融数据分析领域&#xff0c;获取稳定、全面且经济的数据是开展工作…...

IE浏览器已成过去式?Win10用户必看的IE性能优化与安全设置

IE浏览器性能优化与安全设置指南&#xff1a;告别卡顿与劫持困扰 微软宣布放弃IE浏览器已经过去多年&#xff0c;但这款"古董级"浏览器依然顽固地存在于我们的Windows系统中。对于许多企业用户和特定行业从业者来说&#xff0c;完全卸载IE并非可行选项——某些老旧的…...

告别collect2.exe和ld报错:VSCode C语言环境从配置到避坑的完整指南

从零构建VSCode C语言开发环境&#xff1a;编译错误诊断与高效配置指南 当你在VSCode中按下F5期待看到第一个"C语言Hello World"程序运行时&#xff0c;却迎面撞上"undefined reference to WinMain"和"collect2.exe: error: ld returned 1 exit statu…...

计算机毕设 java 基于 Javaweb 的家教管理系统 智能家教匹配管理系统 家教服务综合平台

计算机毕设 java 基于 Javaweb 的家教管理系统 f7xm39&#xff08;配套有源码 程序 mysql 数据库 论文&#xff09;本套源码可以先看具体功能演示视频领取&#xff0c;文末有联 xi 可分享随着家庭教育需求的不断增长&#xff0c;家教市场规模持续扩大&#xff0c;但传统家教模式…...

Go Routine 调度器任务分配策略

Go语言凭借其轻量级线程——Goroutine和高性能调度器&#xff0c;成为高并发编程的热门选择。Goroutine调度器的任务分配策略直接影响程序性能&#xff0c;其核心在于如何高效利用CPU资源&#xff0c;平衡负载并减少上下文切换开销。本文将深入解析调度器的核心机制&#xff0c…...

从字节码到机器码的终极跨越,Python AOT编译面试核心链路全解析,含LLVM IR生成、符号剥离与冷启动优化

第一章&#xff1a;Python 原生 AOT 编译方案 2026 面试题汇总Python 原生 AOT&#xff08;Ahead-of-Time&#xff09;编译在 2026 年已进入工程落地深水区&#xff0c;CPython 官方 3.14 版本正式集成 pyc-compile --aot 工具链&#xff0c;同时第三方方案如 nuitka 15.x、cod…...