关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题
flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型
- 问题
- 解决
- 1.自定义转换器类
- 2.代码引用
- 结果
问题
flink版本:1.18.1,mysql版本:8.0.40
使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段,Flink CDC 会自动将 datetime 类型的字段转换为时间戳(BIGINT 类型)。如:2020-10-21 18:49:12 变成 1603306152000



解决
1.自定义转换器类
创建 MyDateToStringConverter 类 实现 CustomConverter 接口,并重写
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;/*** 日期时间类型转换成字符串*/
public class MyDateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private static final Logger log = LoggerFactory.getLogger(MyDateToStringConverter.class);private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;private ZoneId timestampZoneId = ZoneId.systemDefault();public static final String CONVERTERS = "converters";public static final String DATE = "date";public static final String DATE_TYPE = "date.type";public static final String DATE_FORMAT_DATE = "date.format.date";public static final String DATE_FORMAT_DATETIME = "date.format.datetime";public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";public static final String TIME_ZONE_UTC_8 = "UTC+8";public static final String FORMAT_DATE = "format.date";public static final String FORMAT_TIME = "format.time";public static final String FORMAT_DATETIME = "format.datetime";public static final String FORMAT_TIMESTAMP = "format.timestamp";public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";public static final String UPPERCASE_DATE = "DATE";public static final String TIME = "TIME";public static final String DATETIME = "DATETIME";public static final String TIMESTAMP = "TIMESTAMP";public static final String SMALLDATETIME = "SMALLDATETIME";public static final String DATETIME2 = "DATETIME2";public static final Properties DEFAULT_PROPS = new Properties();static {DEFAULT_PROPS.setProperty(CONVERTERS, DATE);DEFAULT_PROPS.setProperty(DATE_TYPE, "com.flink.test.MyDateToStringConverter"); // 需要设置本类的全类名引用,具体根据自己的类设置DEFAULT_PROPS.setProperty(DATE_FORMAT_DATE, YEAR_MONTH_DAY_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_DATETIME, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP_ZONE, TIME_ZONE_UTC_8);}@Overridepublic void configure(Properties props) {readProps(props, FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z));}private void readProps(Properties properties, String settingKey, Consumer<String> callback) {String settingValue = (String) properties.get(settingKey);if (settingValue == null || settingValue.length() == 0) {return;}try {callback.accept(settingValue.trim());} catch (IllegalArgumentException | DateTimeException e) {log.error("setting {} is illegal:{}", settingKey, settingValue);throw e;}}@Overridepublic void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder = null;Converter converter = null;if (UPPERCASE_DATE.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDate;}if (TIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTime;}if (DATETIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDateTime;}if (TIMESTAMP.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTimestamp;}if (schemaBuilder != null) {registration.register(schemaBuilder, converter);}}private String convertDate(Object input) {if (input instanceof LocalDate) {return dateFormatter.format((LocalDate) input);} else if (input instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return null;}private String convertTime(Object input) {if (input instanceof Duration) {Duration duration = (Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return null;}private String convertDateTime(Object input) {if (input instanceof LocalDateTime) {return datetimeFormatter.format((LocalDateTime) input);} else if (input instanceof Timestamp) {return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());}return null;}private String convertTimestamp(Object input) {if (input instanceof ZonedDateTime) {// MySQL中的TIMESTAMP数据类型会被转换为UTC时间进行存储,//而在程序中处理这个zonedDatetime时,它表示的是UTC时间ZonedDateTime zonedDateTime = (ZonedDateTime) input;LocalDateTime localDateTime =zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime);} else if (input instanceof Timestamp) {return timestampFormatter.format(((Timestamp) input).toInstant().atZone(timestampZoneId).toLocalDateTime());}return null;}
}
2.代码引用
//获取Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 连接mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("junbo-bigdata01").port(3306).username("root").password("123456").databaseList("gmall").tableList("gmall.activity_info").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).serverTimeZone("Asia/Shanghai").includeSchemaChanges(true)// 日期时间类型转换成字符串 的设置引用.debeziumProperties(MyDateToStringConverter.DEFAULT_PROPS) .build();//读取数据DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "mysql-source");dataStreamSource.print();env.execute();
结果
datetime 类型 成功转换成 字符串 类型

相关文章:
关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题
flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型 问题解决1.自定义转换器类2.代码引用 结果 问题 flink版本:1.18.1,mysql版本:8.0.40 使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段&…...
基于Springboot校园失物招领系统【附源码】
基于Springboot校园失物招领系统 效果如下: 系统登陆页面 物品页面 系统首页面 失物招领管理页面 失物认领页面 宣传视频页面 物品挂失留言管理页面 宣传视频类型管理页面 研究背景 在校园环境中,失物招领是一个常见的问题。传统的失物招领方式主要依…...
单片机端口操作和独立引脚操作
单片机端口操作和独立引脚操作 在单片机编程中,控制I/O端口是最基础的操作之一。通过控制端口,我们可以实现对外设(如LED、按键、继电器等)的控制。在51单片机中,有两种常见的端口操作方式:整体控制&#…...
【Vim Masterclass 笔记03】S03L10 + S03L11:Vim 中的文本删除操作以及 Vim 思维习惯的培养(含 DIY 拓展知识点)
文章目录 Section 3:Vim Essentials(Vim 核心知识)S03L10 Vim 核心浏览命令同步练习点评课S03L11 Deleting Text and "Thinking in Vim" 文本的删除及 Vim 思维习惯的培养1 删除单个字符2 删除一个单词2.1 推广1:D HJK…...
ARM200~500部署
前提:数据库已经安装好,并且正常运行 1.修改hostname,将里面的AR-A 改为hzx vi /etc/hostname 2.重启网络服务 sudo systemctl restart NetworkManager 3.修改community-admin.service 文件,更改小区名称和IP,并将文件上传到/…...
word中插入zotero引用
1、参考文献末尾没有文献? 在文献条目要显示的地方点击“refresh” 2、参考文献条目没有悬挂缩进? 把“书目”添加到样式库中,修改样式为悬挂缩进1.5字符 3、交叉引用? 宏 新建一个宏 粘贴下面代码 Public Sub ZoteroLinkCita…...
需求上线,为什么要刷缓存?
在需求上线的过程中,刷缓存主要有以下几个重要原因: 一、保证数据的准确性 旧数据残留问题 缓存是为了加快数据访问速度而存储的数据副本。在需求更新后,之前缓存中的数据可能已经不符合新的业务逻辑。例如,一个电商网站修改了商…...
TVS二极管选型【EMC】
TVS器件并联在电路中,当电路正常工作时,他处于截止状态(高阻态),不影响线路正常工作,当线路处于异常过压并达到其击穿电压时,他迅速由高阻态变为低阻态,给瞬间电流提供一个低阻抗导通…...
《从入门到精通:蓝桥杯编程大赛知识点全攻略》(一)-递归实现指数型枚举、递归实现排列型枚举
本篇博客将聚焦于通过递归来实现两种经典的枚举方法:指数型枚举和排列型枚举。这两种枚举方式在计算机科学和算法竞赛中都有广泛应用,无论是在解题中,还是在实际工作中都极具价值。 目录 前言 斐波那契数列递归 递归实现指数型枚举 算法思…...
C#对线程同步的应用
什么是线程同步?线程同步的应用场景有哪些?在C#中有哪些线程同步方式?下面对这些问题做一个总结,让大家在面试的时候遇到这些问题能够游刃有余。 线程同步是指在多线程环境下,多个线程同时访问共享资源时,确…...
基于微信小程序的面部动作检测系统
引言 本技术文档旨在详细阐述一个基于微信小程序的面部动作检测系统的技术路线、实现方法及关键技术框架。系统的核心功能包括检测用户的左右转头、眨眼和张嘴动作,并根据检测结果逐步引导用户完成任务。为确保系统的安全性和准确性,特别是防止用户通过…...
Activation Functions
Chapter4:Activation Functions 声明:本篇博客笔记来源于《Neural Networks from scratch in Python》,作者的youtube 其实关于神经网络的入门博主已经写过几篇了,这里就不再赘述,附上链接。 1.一文窥见神经网络 2.神经…...
《Vue3实战教程》37:Vue3生产部署
如果您有疑问,请观看视频教程《Vue3实战教程》 生产部署 开发环境 vs. 生产环境 在开发过程中,Vue 提供了许多功能来提升开发体验: 对常见错误和隐患的警告对组件 props / 自定义事件的校验响应性调试钩子开发工具集成 然而ÿ…...
Linux:各发行版及其包管理工具
相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 Debian 包管理工具:dpkg(低级包管理器)、apt(高级包管理器,建立在dpkg基础上)包格式:…...
【计算机网络】课程 作业一 搭建连续覆盖的办公网络
作业一 搭建连续覆盖的办公网络 题目:论述题(共1题,100分) 充分利用所学习的数据链路层局域网知识,加上物理层的基础知识,请给一个办公场所(三层,每层约100平方)…...
C++ 设计模式:单例模式(Singleton Pattern)
链接:C 设计模式 链接:C 设计模式 - 享元模式 单例模式(Singleton Pattern)是创建型设计模式,它确保一个类只有一个实例,并提供一个全局访问点来访问这个实例。单例模式在需要全局共享资源或控制实例数量的…...
OpenCV调整图像亮度和对比度
【欢迎关注编码小哥,学习更多实用的编程方法和技巧】 1、基本方法---线性变换 // 亮度和对比度调整 cv::Mat adjustBrightnessContrast(const cv::Mat& src, double alpha, int beta) {cv::Mat dst;src.convertTo(dst, -1, alpha, beta);return dst; }// 使用…...
Kafka Offset explorer使用
Kafka集群配置好以后以后运维这边先用工具测试一下,便于rd展开后续的工作,本地调试时一般使用Offset explorer工具进行连接 使用SASL(Simple Authentication and Security Layer)验证方式 使用SCRAM-SHA-256(Salted Challenge Response Authentication…...
二维码文件在线管理系统-收费版
需求背景 如果大家想要在网上管理自己的文件,而且需要生成二维码,下面推荐【草料二维码】,这个系统很好。特别适合那些制造业,实体业的使用手册,你可以生成一个二维码,贴在设备上,然后这个二维码…...
UE4.27 Android环境下获取手机电量
获取电量方法 使用的方法时FAndroidMisc::GetBatteryLevel(); 出现的问题 但是在电脑上编译时发现,会发现编译无法通过。 因为安卓环境下编译时,包含 #include "Android/AndroidPlatformMisc.h" 头文件是可以正常链接的,但在电…...
JavaScript 中的 ES|QL:利用 Apache Arrow 工具
作者:来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗?了解下一期 Elasticsearch Engineer 培训的时间吧! Elasticsearch 拥有众多新功能,助你为自己…...
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility
Cilium动手实验室: 精通之旅---20.Isovalent Enterprise for Cilium: Zero Trust Visibility 1. 实验室环境1.1 实验室环境1.2 小测试 2. The Endor System2.1 部署应用2.2 检查现有策略 3. Cilium 策略实体3.1 创建 allow-all 网络策略3.2 在 Hubble CLI 中验证网络策略源3.3 …...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...
Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
