当前位置: 首页 > news >正文

关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题

flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型

  • 问题
  • 解决
    • 1.自定义转换器类
    • 2.代码引用
  • 结果

问题

flink版本:1.18.1,mysql版本:8.0.40
使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段,Flink CDC 会自动将 datetime 类型的字段转换为时间戳(BIGINT 类型)。如:2020-10-21 18:49:12 变成 1603306152000
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

解决

1.自定义转换器类

创建 MyDateToStringConverter 类 实现 CustomConverter 接口,并重写


import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;/*** 日期时间类型转换成字符串*/
public class MyDateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private static final Logger log = LoggerFactory.getLogger(MyDateToStringConverter.class);private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;private ZoneId timestampZoneId = ZoneId.systemDefault();public static final String CONVERTERS = "converters";public static final String DATE = "date";public static final String DATE_TYPE = "date.type";public static final String DATE_FORMAT_DATE = "date.format.date";public static final String DATE_FORMAT_DATETIME = "date.format.datetime";public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";public static final String TIME_ZONE_UTC_8 = "UTC+8";public static final String FORMAT_DATE = "format.date";public static final String FORMAT_TIME = "format.time";public static final String FORMAT_DATETIME = "format.datetime";public static final String FORMAT_TIMESTAMP = "format.timestamp";public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";public static final String UPPERCASE_DATE = "DATE";public static final String TIME = "TIME";public static final String DATETIME = "DATETIME";public static final String TIMESTAMP = "TIMESTAMP";public static final String SMALLDATETIME = "SMALLDATETIME";public static final String DATETIME2 = "DATETIME2";public static final Properties DEFAULT_PROPS = new Properties();static {DEFAULT_PROPS.setProperty(CONVERTERS, DATE);DEFAULT_PROPS.setProperty(DATE_TYPE, "com.flink.test.MyDateToStringConverter"); // 需要设置本类的全类名引用,具体根据自己的类设置DEFAULT_PROPS.setProperty(DATE_FORMAT_DATE, YEAR_MONTH_DAY_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_DATETIME, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP_ZONE, TIME_ZONE_UTC_8);}@Overridepublic void configure(Properties props) {readProps(props, FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z));}private void readProps(Properties properties, String settingKey, Consumer<String> callback) {String settingValue = (String) properties.get(settingKey);if (settingValue == null || settingValue.length() == 0) {return;}try {callback.accept(settingValue.trim());} catch (IllegalArgumentException | DateTimeException e) {log.error("setting {} is illegal:{}", settingKey, settingValue);throw e;}}@Overridepublic void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder = null;Converter converter = null;if (UPPERCASE_DATE.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDate;}if (TIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTime;}if (DATETIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDateTime;}if (TIMESTAMP.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTimestamp;}if (schemaBuilder != null) {registration.register(schemaBuilder, converter);}}private String convertDate(Object input) {if (input instanceof LocalDate) {return dateFormatter.format((LocalDate) input);} else if (input instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return null;}private String convertTime(Object input) {if (input instanceof Duration) {Duration duration = (Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return null;}private String convertDateTime(Object input) {if (input instanceof LocalDateTime) {return datetimeFormatter.format((LocalDateTime) input);} else if (input instanceof Timestamp) {return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());}return null;}private String convertTimestamp(Object input) {if (input instanceof ZonedDateTime) {// MySQL中的TIMESTAMP数据类型会被转换为UTC时间进行存储,//而在程序中处理这个zonedDatetime时,它表示的是UTC时间ZonedDateTime zonedDateTime = (ZonedDateTime) input;LocalDateTime localDateTime =zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime);} else if (input instanceof Timestamp) {return timestampFormatter.format(((Timestamp) input).toInstant().atZone(timestampZoneId).toLocalDateTime());}return null;}
}

2.代码引用

//获取Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 连接mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("junbo-bigdata01").port(3306).username("root").password("123456").databaseList("gmall").tableList("gmall.activity_info").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).serverTimeZone("Asia/Shanghai").includeSchemaChanges(true)// 日期时间类型转换成字符串 的设置引用.debeziumProperties(MyDateToStringConverter.DEFAULT_PROPS) .build();//读取数据DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "mysql-source");dataStreamSource.print();env.execute();

结果

datetime 类型 成功转换成 字符串 类型
在这里插入图片描述

相关文章:

关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题

flinkCDC监控mysql binlog时&#xff0c;datetime类型自动转换成时间戳类型 问题解决1.自定义转换器类2.代码引用 结果 问题 flink版本&#xff1a;1.18.1&#xff0c;mysql版本&#xff1a;8.0.40 使用FlinkCDC的MySqlSource 连接mysql&#xff0c;对于datetime 类型字段&…...

基于Springboot校园失物招领系统【附源码】

基于Springboot校园失物招领系统 效果如下&#xff1a; 系统登陆页面 物品页面 系统首页面 失物招领管理页面 失物认领页面 宣传视频页面 物品挂失留言管理页面 宣传视频类型管理页面 研究背景 在校园环境中&#xff0c;失物招领是一个常见的问题。传统的失物招领方式主要依…...

单片机端口操作和独立引脚操作

单片机端口操作和独立引脚操作 在单片机编程中&#xff0c;控制I/O端口是最基础的操作之一。通过控制端口&#xff0c;我们可以实现对外设&#xff08;如LED、按键、继电器等&#xff09;的控制。在51单片机中&#xff0c;有两种常见的端口操作方式&#xff1a;整体控制&#…...

【Vim Masterclass 笔记03】S03L10 + S03L11:Vim 中的文本删除操作以及 Vim 思维习惯的培养(含 DIY 拓展知识点)

文章目录 Section 3&#xff1a;Vim Essentials&#xff08;Vim 核心知识&#xff09;S03L10 Vim 核心浏览命令同步练习点评课S03L11 Deleting Text and "Thinking in Vim" 文本的删除及 Vim 思维习惯的培养1 删除单个字符2 删除一个单词2.1 推广1&#xff1a;D HJK…...

ARM200~500部署

前提&#xff1a;数据库已经安装好&#xff0c;并且正常运行 1.修改hostname,将里面的AR-A 改为hzx vi /etc/hostname 2.重启网络服务 sudo systemctl restart NetworkManager 3.修改community-admin.service 文件&#xff0c;更改小区名称和IP&#xff0c;并将文件上传到/…...

word中插入zotero引用

1、参考文献末尾没有文献&#xff1f; 在文献条目要显示的地方点击“refresh” 2、参考文献条目没有悬挂缩进&#xff1f; 把“书目”添加到样式库中&#xff0c;修改样式为悬挂缩进1.5字符 3、交叉引用&#xff1f; 宏 新建一个宏 粘贴下面代码 Public Sub ZoteroLinkCita…...

需求上线,为什么要刷缓存?

在需求上线的过程中&#xff0c;刷缓存主要有以下几个重要原因&#xff1a; 一、保证数据的准确性 旧数据残留问题 缓存是为了加快数据访问速度而存储的数据副本。在需求更新后&#xff0c;之前缓存中的数据可能已经不符合新的业务逻辑。例如&#xff0c;一个电商网站修改了商…...

TVS二极管选型【EMC】

TVS器件并联在电路中&#xff0c;当电路正常工作时&#xff0c;他处于截止状态&#xff08;高阻态&#xff09;&#xff0c;不影响线路正常工作&#xff0c;当线路处于异常过压并达到其击穿电压时&#xff0c;他迅速由高阻态变为低阻态&#xff0c;给瞬间电流提供一个低阻抗导通…...

《从入门到精通:蓝桥杯编程大赛知识点全攻略》(一)-递归实现指数型枚举、递归实现排列型枚举

本篇博客将聚焦于通过递归来实现两种经典的枚举方法&#xff1a;指数型枚举和排列型枚举。这两种枚举方式在计算机科学和算法竞赛中都有广泛应用&#xff0c;无论是在解题中&#xff0c;还是在实际工作中都极具价值。 目录 前言 斐波那契数列递归 递归实现指数型枚举 算法思…...

C#对线程同步的应用

什么是线程同步&#xff1f;线程同步的应用场景有哪些&#xff1f;在C#中有哪些线程同步方式&#xff1f;下面对这些问题做一个总结&#xff0c;让大家在面试的时候遇到这些问题能够游刃有余。 线程同步是指在多线程环境下&#xff0c;多个线程同时访问共享资源时&#xff0c;确…...

基于微信小程序的面部动作检测系统

引言 本技术文档旨在详细阐述一个基于微信小程序的面部动作检测系统的技术路线、实现方法及关键技术框架。系统的核心功能包括检测用户的左右转头、眨眼和张嘴动作&#xff0c;并根据检测结果逐步引导用户完成任务。为确保系统的安全性和准确性&#xff0c;特别是防止用户通过…...

Activation Functions

Chapter4&#xff1a;Activation Functions 声明&#xff1a;本篇博客笔记来源于《Neural Networks from scratch in Python》&#xff0c;作者的youtube 其实关于神经网络的入门博主已经写过几篇了&#xff0c;这里就不再赘述&#xff0c;附上链接。 1.一文窥见神经网络 2.神经…...

《Vue3实战教程》37:Vue3生产部署

如果您有疑问&#xff0c;请观看视频教程《Vue3实战教程》 生产部署​ 开发环境 vs. 生产环境​ 在开发过程中&#xff0c;Vue 提供了许多功能来提升开发体验&#xff1a; 对常见错误和隐患的警告对组件 props / 自定义事件的校验响应性调试钩子开发工具集成 然而&#xff…...

Linux:各发行版及其包管理工具

相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 Debian 包管理工具&#xff1a;dpkg&#xff08;低级包管理器&#xff09;、apt&#xff08;高级包管理器&#xff0c;建立在dpkg基础上&#xff09;包格式&#xff1a;…...

【计算机网络】课程 作业一 搭建连续覆盖的办公网络

作业一 搭建连续覆盖的办公网络 题目&#xff1a;论述题&#xff08;共1题&#xff0c;100分&#xff09; 充分利用所学习的数据链路层局域网知识&#xff0c;加上物理层的基础知识&#xff0c;请给一个办公场所&#xff08;三层&#xff0c;每层约100平方&#xff09;&#xf…...

C++ 设计模式:单例模式(Singleton Pattern)

链接&#xff1a;C 设计模式 链接&#xff1a;C 设计模式 - 享元模式 单例模式&#xff08;Singleton Pattern&#xff09;是创建型设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一个全局访问点来访问这个实例。单例模式在需要全局共享资源或控制实例数量的…...

OpenCV调整图像亮度和对比度

【欢迎关注编码小哥&#xff0c;学习更多实用的编程方法和技巧】 1、基本方法---线性变换 // 亮度和对比度调整 cv::Mat adjustBrightnessContrast(const cv::Mat& src, double alpha, int beta) {cv::Mat dst;src.convertTo(dst, -1, alpha, beta);return dst; }// 使用…...

Kafka Offset explorer使用

Kafka集群配置好以后以后运维这边先用工具测试一下&#xff0c;便于rd展开后续的工作&#xff0c;本地调试时一般使用Offset explorer工具进行连接 使用SASL(Simple Authentication and Security Layer)验证方式 使用SCRAM-SHA-256(Salted Challenge Response Authentication…...

二维码文件在线管理系统-收费版

需求背景 如果大家想要在网上管理自己的文件&#xff0c;而且需要生成二维码&#xff0c;下面推荐【草料二维码】&#xff0c;这个系统很好。特别适合那些制造业&#xff0c;实体业的使用手册&#xff0c;你可以生成一个二维码&#xff0c;贴在设备上&#xff0c;然后这个二维码…...

UE4.27 Android环境下获取手机电量

获取电量方法 使用的方法时FAndroidMisc::GetBatteryLevel(); 出现的问题 但是在电脑上编译时发现&#xff0c;会发现编译无法通过。 因为安卓环境下编译时&#xff0c;包含 #include "Android/AndroidPlatformMisc.h" 头文件是可以正常链接的&#xff0c;但在电…...

实测对比:Faster-Whisper不同模型(Tiny到Large-V3)的识别精度与速度,你的电脑该选哪个?

Faster-Whisper模型选型实战指南&#xff1a;从Tiny到Large-V3的精准决策 去年在为一个跨国会议系统做语音转写方案时&#xff0c;我花了整整两周时间反复测试不同规模的Faster-Whisper模型。当客户要求既要实时转写又要高准确率时&#xff0c;我才真正理解模型选型就像在走钢丝…...

终极免费暗黑2存档编辑器:5分钟掌握游戏角色定制与装备管理

终极免费暗黑2存档编辑器&#xff1a;5分钟掌握游戏角色定制与装备管理 【免费下载链接】d2s-editor 项目地址: https://gitcode.com/gh_mirrors/d2/d2s-editor 还在为《暗黑破坏神2》中重复刷装备而烦恼吗&#xff1f;想要快速体验不同角色build却不想花数百小时&…...

Scala 2安全编程终极指南:7个代码审计与漏洞防范实践

Scala 2安全编程终极指南&#xff1a;7个代码审计与漏洞防范实践 【免费下载链接】scala Scala 2 compiler and standard library. Scala 2 bugs at https://github.com/scala/bug; Scala 3 at https://github.com/scala/scala3 项目地址: https://gitcode.com/gh_mirrors/sc…...

终极小说下载神器:一键保存200+网站小说的完整离线阅读方案

终极小说下载神器&#xff1a;一键保存200网站小说的完整离线阅读方案 【免费下载链接】novel-downloader 一个可扩展的通用型小说下载器。 项目地址: https://gitcode.com/gh_mirrors/no/novel-downloader 在数字阅读时代&#xff0c;小说爱好者常常面临一个令人沮丧的…...

3步学会在Windows上安装Android应用:APK Installer完整指南

3步学会在Windows上安装Android应用&#xff1a;APK Installer完整指南 【免费下载链接】APK-Installer An Android Application Installer for Windows 项目地址: https://gitcode.com/GitHub_Trending/ap/APK-Installer APK Installer是一款专为Windows用户设计的Andr…...

2026年5月阿里云集成Hermes Agent/OpenClaw步骤,百炼token Plan配置教程

2026年5月阿里云集成Hermes Agent/OpenClaw步骤&#xff0c;百炼token Plan配置教程。本文面向零基础用户&#xff0c;完整说明在轻量服务器与本地Windows11、macOS、Linux系统中部署OpenClaw&#xff08;Clawdbot&#xff09;的流程&#xff0c;包含环境配置、服务启动、Skill…...

为开源agent框架hermes配置taotoken作为自定义模型供应商

为开源 Agent 框架 Hermes 配置 Taotoken 作为自定义模型供应商 1. 准备工作 在开始配置前&#xff0c;请确保已安装 Hermes Agent 框架并完成基础环境搭建。同时需要准备好 Taotoken 的 API Key&#xff0c;可在 Taotoken 控制台的 API 密钥管理页面创建。模型 ID 可在模型广…...

从Powergui到阻抗曲线:Simulink电力仿真中‘阻抗依频特性测量’功能的保姆级使用指南与结果解读

从Powergui到阻抗曲线&#xff1a;Simulink电力仿真中‘阻抗依频特性测量’功能的保姆级使用指南与结果解读 在电力系统仿真领域&#xff0c;阻抗频率特性分析是评估输电线路动态行为的关键技术。对于300km以上的高压输电线路&#xff0c;传统的集总参数模型已无法准确反映高频…...

拯救你的图表审美:用Matplotlib内置色彩映射(cmap)让散点图瞬间高级

拯救你的图表审美&#xff1a;用Matplotlib内置色彩映射&#xff08;cmap&#xff09;让散点图瞬间高级 在科研论文、商业报告或数据分析项目中&#xff0c;一张精心设计的图表往往比千言万语更能清晰传达信息。然而&#xff0c;许多人在使用Matplotlib绘制散点图时&#xff0c…...

Cursor Free VIP终极指南:一键破解AI编程助手试用限制的完整解决方案

Cursor Free VIP终极指南&#xff1a;一键破解AI编程助手试用限制的完整解决方案 【免费下载链接】cursor-free-vip [Support 0.45]&#xff08;Multi Language 多语言&#xff09;自动注册 Cursor Ai &#xff0c;自动重置机器ID &#xff0c; 免费升级使用Pro 功能: Youve re…...