关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题
flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型
- 问题
- 解决
- 1.自定义转换器类
- 2.代码引用
- 结果
问题
flink版本:1.18.1,mysql版本:8.0.40
使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段,Flink CDC 会自动将 datetime 类型的字段转换为时间戳(BIGINT 类型)。如:2020-10-21 18:49:12 变成 1603306152000
解决
1.自定义转换器类
创建 MyDateToStringConverter 类 实现 CustomConverter 接口,并重写
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;/*** 日期时间类型转换成字符串*/
public class MyDateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private static final Logger log = LoggerFactory.getLogger(MyDateToStringConverter.class);private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;private ZoneId timestampZoneId = ZoneId.systemDefault();public static final String CONVERTERS = "converters";public static final String DATE = "date";public static final String DATE_TYPE = "date.type";public static final String DATE_FORMAT_DATE = "date.format.date";public static final String DATE_FORMAT_DATETIME = "date.format.datetime";public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";public static final String TIME_ZONE_UTC_8 = "UTC+8";public static final String FORMAT_DATE = "format.date";public static final String FORMAT_TIME = "format.time";public static final String FORMAT_DATETIME = "format.datetime";public static final String FORMAT_TIMESTAMP = "format.timestamp";public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";public static final String UPPERCASE_DATE = "DATE";public static final String TIME = "TIME";public static final String DATETIME = "DATETIME";public static final String TIMESTAMP = "TIMESTAMP";public static final String SMALLDATETIME = "SMALLDATETIME";public static final String DATETIME2 = "DATETIME2";public static final Properties DEFAULT_PROPS = new Properties();static {DEFAULT_PROPS.setProperty(CONVERTERS, DATE);DEFAULT_PROPS.setProperty(DATE_TYPE, "com.flink.test.MyDateToStringConverter"); // 需要设置本类的全类名引用,具体根据自己的类设置DEFAULT_PROPS.setProperty(DATE_FORMAT_DATE, YEAR_MONTH_DAY_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_DATETIME, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP_ZONE, TIME_ZONE_UTC_8);}@Overridepublic void configure(Properties props) {readProps(props, FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z));}private void readProps(Properties properties, String settingKey, Consumer<String> callback) {String settingValue = (String) properties.get(settingKey);if (settingValue == null || settingValue.length() == 0) {return;}try {callback.accept(settingValue.trim());} catch (IllegalArgumentException | DateTimeException e) {log.error("setting {} is illegal:{}", settingKey, settingValue);throw e;}}@Overridepublic void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder = null;Converter converter = null;if (UPPERCASE_DATE.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDate;}if (TIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTime;}if (DATETIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDateTime;}if (TIMESTAMP.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTimestamp;}if (schemaBuilder != null) {registration.register(schemaBuilder, converter);}}private String convertDate(Object input) {if (input instanceof LocalDate) {return dateFormatter.format((LocalDate) input);} else if (input instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return null;}private String convertTime(Object input) {if (input instanceof Duration) {Duration duration = (Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return null;}private String convertDateTime(Object input) {if (input instanceof LocalDateTime) {return datetimeFormatter.format((LocalDateTime) input);} else if (input instanceof Timestamp) {return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());}return null;}private String convertTimestamp(Object input) {if (input instanceof ZonedDateTime) {// MySQL中的TIMESTAMP数据类型会被转换为UTC时间进行存储,//而在程序中处理这个zonedDatetime时,它表示的是UTC时间ZonedDateTime zonedDateTime = (ZonedDateTime) input;LocalDateTime localDateTime =zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime);} else if (input instanceof Timestamp) {return timestampFormatter.format(((Timestamp) input).toInstant().atZone(timestampZoneId).toLocalDateTime());}return null;}
}
2.代码引用
//获取Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 连接mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("junbo-bigdata01").port(3306).username("root").password("123456").databaseList("gmall").tableList("gmall.activity_info").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).serverTimeZone("Asia/Shanghai").includeSchemaChanges(true)// 日期时间类型转换成字符串 的设置引用.debeziumProperties(MyDateToStringConverter.DEFAULT_PROPS) .build();//读取数据DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "mysql-source");dataStreamSource.print();env.execute();
结果
datetime 类型 成功转换成 字符串 类型
相关文章:

关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题
flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型 问题解决1.自定义转换器类2.代码引用 结果 问题 flink版本:1.18.1,mysql版本:8.0.40 使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段&…...

基于Springboot校园失物招领系统【附源码】
基于Springboot校园失物招领系统 效果如下: 系统登陆页面 物品页面 系统首页面 失物招领管理页面 失物认领页面 宣传视频页面 物品挂失留言管理页面 宣传视频类型管理页面 研究背景 在校园环境中,失物招领是一个常见的问题。传统的失物招领方式主要依…...

单片机端口操作和独立引脚操作
单片机端口操作和独立引脚操作 在单片机编程中,控制I/O端口是最基础的操作之一。通过控制端口,我们可以实现对外设(如LED、按键、继电器等)的控制。在51单片机中,有两种常见的端口操作方式:整体控制&#…...

【Vim Masterclass 笔记03】S03L10 + S03L11:Vim 中的文本删除操作以及 Vim 思维习惯的培养(含 DIY 拓展知识点)
文章目录 Section 3:Vim Essentials(Vim 核心知识)S03L10 Vim 核心浏览命令同步练习点评课S03L11 Deleting Text and "Thinking in Vim" 文本的删除及 Vim 思维习惯的培养1 删除单个字符2 删除一个单词2.1 推广1:D HJK…...

ARM200~500部署
前提:数据库已经安装好,并且正常运行 1.修改hostname,将里面的AR-A 改为hzx vi /etc/hostname 2.重启网络服务 sudo systemctl restart NetworkManager 3.修改community-admin.service 文件,更改小区名称和IP,并将文件上传到/…...

word中插入zotero引用
1、参考文献末尾没有文献? 在文献条目要显示的地方点击“refresh” 2、参考文献条目没有悬挂缩进? 把“书目”添加到样式库中,修改样式为悬挂缩进1.5字符 3、交叉引用? 宏 新建一个宏 粘贴下面代码 Public Sub ZoteroLinkCita…...

需求上线,为什么要刷缓存?
在需求上线的过程中,刷缓存主要有以下几个重要原因: 一、保证数据的准确性 旧数据残留问题 缓存是为了加快数据访问速度而存储的数据副本。在需求更新后,之前缓存中的数据可能已经不符合新的业务逻辑。例如,一个电商网站修改了商…...

TVS二极管选型【EMC】
TVS器件并联在电路中,当电路正常工作时,他处于截止状态(高阻态),不影响线路正常工作,当线路处于异常过压并达到其击穿电压时,他迅速由高阻态变为低阻态,给瞬间电流提供一个低阻抗导通…...

《从入门到精通:蓝桥杯编程大赛知识点全攻略》(一)-递归实现指数型枚举、递归实现排列型枚举
本篇博客将聚焦于通过递归来实现两种经典的枚举方法:指数型枚举和排列型枚举。这两种枚举方式在计算机科学和算法竞赛中都有广泛应用,无论是在解题中,还是在实际工作中都极具价值。 目录 前言 斐波那契数列递归 递归实现指数型枚举 算法思…...

C#对线程同步的应用
什么是线程同步?线程同步的应用场景有哪些?在C#中有哪些线程同步方式?下面对这些问题做一个总结,让大家在面试的时候遇到这些问题能够游刃有余。 线程同步是指在多线程环境下,多个线程同时访问共享资源时,确…...

基于微信小程序的面部动作检测系统
引言 本技术文档旨在详细阐述一个基于微信小程序的面部动作检测系统的技术路线、实现方法及关键技术框架。系统的核心功能包括检测用户的左右转头、眨眼和张嘴动作,并根据检测结果逐步引导用户完成任务。为确保系统的安全性和准确性,特别是防止用户通过…...

Activation Functions
Chapter4:Activation Functions 声明:本篇博客笔记来源于《Neural Networks from scratch in Python》,作者的youtube 其实关于神经网络的入门博主已经写过几篇了,这里就不再赘述,附上链接。 1.一文窥见神经网络 2.神经…...

《Vue3实战教程》37:Vue3生产部署
如果您有疑问,请观看视频教程《Vue3实战教程》 生产部署 开发环境 vs. 生产环境 在开发过程中,Vue 提供了许多功能来提升开发体验: 对常见错误和隐患的警告对组件 props / 自定义事件的校验响应性调试钩子开发工具集成 然而ÿ…...

Linux:各发行版及其包管理工具
相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 Debian 包管理工具:dpkg(低级包管理器)、apt(高级包管理器,建立在dpkg基础上)包格式:…...

【计算机网络】课程 作业一 搭建连续覆盖的办公网络
作业一 搭建连续覆盖的办公网络 题目:论述题(共1题,100分) 充分利用所学习的数据链路层局域网知识,加上物理层的基础知识,请给一个办公场所(三层,每层约100平方)…...

C++ 设计模式:单例模式(Singleton Pattern)
链接:C 设计模式 链接:C 设计模式 - 享元模式 单例模式(Singleton Pattern)是创建型设计模式,它确保一个类只有一个实例,并提供一个全局访问点来访问这个实例。单例模式在需要全局共享资源或控制实例数量的…...

OpenCV调整图像亮度和对比度
【欢迎关注编码小哥,学习更多实用的编程方法和技巧】 1、基本方法---线性变换 // 亮度和对比度调整 cv::Mat adjustBrightnessContrast(const cv::Mat& src, double alpha, int beta) {cv::Mat dst;src.convertTo(dst, -1, alpha, beta);return dst; }// 使用…...

Kafka Offset explorer使用
Kafka集群配置好以后以后运维这边先用工具测试一下,便于rd展开后续的工作,本地调试时一般使用Offset explorer工具进行连接 使用SASL(Simple Authentication and Security Layer)验证方式 使用SCRAM-SHA-256(Salted Challenge Response Authentication…...

二维码文件在线管理系统-收费版
需求背景 如果大家想要在网上管理自己的文件,而且需要生成二维码,下面推荐【草料二维码】,这个系统很好。特别适合那些制造业,实体业的使用手册,你可以生成一个二维码,贴在设备上,然后这个二维码…...

UE4.27 Android环境下获取手机电量
获取电量方法 使用的方法时FAndroidMisc::GetBatteryLevel(); 出现的问题 但是在电脑上编译时发现,会发现编译无法通过。 因为安卓环境下编译时,包含 #include "Android/AndroidPlatformMisc.h" 头文件是可以正常链接的,但在电…...

vue-i18n报错
1. 开发环境报错Uncaught (in promise) TypeError: ‘set’ on proxy: trap returned falsish for property ‘$t’ legacy需要设置为false const i18n createI18n({legacy: false,// 默认语言locale: lang,// 设置语言环境messages, })2. 打包配置tsc --noEmit时报错&#…...

Docker新手:在tencent云上实现Python服务打包到容器
1 使用docker的原因 一致性和可移植性:Docker 容器可以在任何支持 Docker 的环境中运行,无论是开发者的笔记本电脑、测试服务器还是生产环境。这确保了应用在不同环境中的行为一致,减少了“在我的机器上可以运行”的问题。 隔离性ÿ…...

React基础知识学习
学习React前端框架是一个系统而深入的过程,以下是一份详细的学习指南: 一、React基础知识 React简介 React是一个用于构建用户界面的JavaScript库,由Facebook开发和维护。它强调组件化和声明式编程,使得构建复杂的用户界面变得更…...

ES IK分词器插件
前言 ES中默认了许多分词器,但是对中文的支持并不友好,IK分词器是一个专门为中文文本设计的分词工具,它不是ES的内置组件,而是一个需要单独安装和配置的插件。 Ik分词器的下载安装(Winows 版本) 下载地址:…...

二十三种设计模式-抽象工厂模式
抽象工厂模式(Abstract Factory Pattern)是一种创建型设计模式,它提供了一种方式,用于创建一系列相关或相互依赖的对象,而不需要指定它们具体的类。这种模式主要用于系统需要独立于其产品的创建逻辑时,并且…...

python opencv的orb特征检测(Oriented FAST and Rotated BRIEF)
官方文档:https://docs.opencv.org/4.10.0/d1/d89/tutorial_py_orb.html SIFT/SURF/ORB对比 https://www.bilibili.com/video/BV1Yw411S7hH?spm_id_from333.788.player.switch&vd_source26bb43d70f463acac2b0cce092be2eaa&p80 ORB代码 import numpy a…...

高阶数据结构----布隆过滤器和位图
(一)位图 位图是用来存放某种状态的,因为一个bit上只能存0和1所以一般只有两种状态的情况下适合用位图,所以非常适合判断数据在或者不在,而且位图十分节省空间,很适合于海量数据,且容易存储&…...

VScode使用密钥进行ssh连接服务器方法
如何正常连接ssh的方式可以看我原来那篇文章:Windows上使用VSCode连接远程服务器ssh 1.连接 点击ssh加号,然后关键点在第2步的书写上 2.命令 2的位置写命令: ssh -i "C:\Users\userName\.ssh\id_rsa" usernameIP -p 端口号 端…...

艾体宝产品丨加速开发:Redis 首款 VS Code 扩展上线!
Redis 宣布推出其首款专为 VS Code 设计的 Redis 扩展。这一扩展将 Redis 功能直接整合进您的集成开发环境(IDE),旨在简化您的工作流程,提升工作效率。 我们一直致力于构建强大的开发者生态系统,并在您工作的每一步提…...

应用架构模式
设计模式 设计模式是指根据通用需求来设计解决方案的模板或蓝图,使用设计模式能够更加有效地解决设计过程中的常见问题。设计模式针对不同的问题域有不同的内涵,主要涉及业务、架构、程序设计等问题域,本文主要讨论架构设计模式。 业务设计模…...