当前位置: 首页 > news >正文

关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题

flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型

  • 问题
  • 解决
    • 1.自定义转换器类
    • 2.代码引用
  • 结果

问题

flink版本:1.18.1,mysql版本:8.0.40
使用FlinkCDC的MySqlSource 连接mysql,对于datetime 类型字段,Flink CDC 会自动将 datetime 类型的字段转换为时间戳(BIGINT 类型)。如:2020-10-21 18:49:12 变成 1603306152000
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

解决

1.自定义转换器类

创建 MyDateToStringConverter 类 实现 CustomConverter 接口,并重写


import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.sql.Timestamp;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.function.Consumer;/*** 日期时间类型转换成字符串*/
public class MyDateToStringConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {private static final Logger log = LoggerFactory.getLogger(MyDateToStringConverter.class);private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;private ZoneId timestampZoneId = ZoneId.systemDefault();public static final String CONVERTERS = "converters";public static final String DATE = "date";public static final String DATE_TYPE = "date.type";public static final String DATE_FORMAT_DATE = "date.format.date";public static final String DATE_FORMAT_DATETIME = "date.format.datetime";public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";public static final String TIME_ZONE_UTC_8 = "UTC+8";public static final String FORMAT_DATE = "format.date";public static final String FORMAT_TIME = "format.time";public static final String FORMAT_DATETIME = "format.datetime";public static final String FORMAT_TIMESTAMP = "format.timestamp";public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";public static final String UPPERCASE_DATE = "DATE";public static final String TIME = "TIME";public static final String DATETIME = "DATETIME";public static final String TIMESTAMP = "TIMESTAMP";public static final String SMALLDATETIME = "SMALLDATETIME";public static final String DATETIME2 = "DATETIME2";public static final Properties DEFAULT_PROPS = new Properties();static {DEFAULT_PROPS.setProperty(CONVERTERS, DATE);DEFAULT_PROPS.setProperty(DATE_TYPE, "com.flink.test.MyDateToStringConverter"); // 需要设置本类的全类名引用,具体根据自己的类设置DEFAULT_PROPS.setProperty(DATE_FORMAT_DATE, YEAR_MONTH_DAY_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_DATETIME, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP, DATE_TIME_FORMAT);DEFAULT_PROPS.setProperty(DATE_FORMAT_TIMESTAMP_ZONE, TIME_ZONE_UTC_8);}@Overridepublic void configure(Properties props) {readProps(props, FORMAT_DATE, p -> dateFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIME, p -> timeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_DATETIME, p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP, p -> timestampFormatter = DateTimeFormatter.ofPattern(p));readProps(props, FORMAT_TIMESTAMP_ZONE, z -> timestampZoneId = ZoneId.of(z));}private void readProps(Properties properties, String settingKey, Consumer<String> callback) {String settingValue = (String) properties.get(settingKey);if (settingValue == null || settingValue.length() == 0) {return;}try {callback.accept(settingValue.trim());} catch (IllegalArgumentException | DateTimeException e) {log.error("setting {} is illegal:{}", settingKey, settingValue);throw e;}}@Overridepublic void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder = null;Converter converter = null;if (UPPERCASE_DATE.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDate;}if (TIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTime;}if (DATETIME.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertDateTime;}if (TIMESTAMP.equals(sqlType)) {schemaBuilder = SchemaBuilder.string().optional();converter = this::convertTimestamp;}if (schemaBuilder != null) {registration.register(schemaBuilder, converter);}}private String convertDate(Object input) {if (input instanceof LocalDate) {return dateFormatter.format((LocalDate) input);} else if (input instanceof Integer) {LocalDate date = LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return null;}private String convertTime(Object input) {if (input instanceof Duration) {Duration duration = (Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return null;}private String convertDateTime(Object input) {if (input instanceof LocalDateTime) {return datetimeFormatter.format((LocalDateTime) input);} else if (input instanceof Timestamp) {return datetimeFormatter.format(((Timestamp) input).toLocalDateTime());}return null;}private String convertTimestamp(Object input) {if (input instanceof ZonedDateTime) {// MySQL中的TIMESTAMP数据类型会被转换为UTC时间进行存储,//而在程序中处理这个zonedDatetime时,它表示的是UTC时间ZonedDateTime zonedDateTime = (ZonedDateTime) input;LocalDateTime localDateTime =zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime);} else if (input instanceof Timestamp) {return timestampFormatter.format(((Timestamp) input).toInstant().atZone(timestampZoneId).toLocalDateTime());}return null;}
}

2.代码引用

//获取Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 连接mysqlMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("junbo-bigdata01").port(3306).username("root").password("123456").databaseList("gmall").tableList("gmall.activity_info").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).serverTimeZone("Asia/Shanghai").includeSchemaChanges(true)// 日期时间类型转换成字符串 的设置引用.debeziumProperties(MyDateToStringConverter.DEFAULT_PROPS) .build();//读取数据DataStreamSource<String> mysqlDS = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(), "mysql-source");dataStreamSource.print();env.execute();

结果

datetime 类型 成功转换成 字符串 类型
在这里插入图片描述

相关文章:

关于flinkCDC监控mysql binlog时,datetime类型自动转换成时间戳类型问题

flinkCDC监控mysql binlog时&#xff0c;datetime类型自动转换成时间戳类型 问题解决1.自定义转换器类2.代码引用 结果 问题 flink版本&#xff1a;1.18.1&#xff0c;mysql版本&#xff1a;8.0.40 使用FlinkCDC的MySqlSource 连接mysql&#xff0c;对于datetime 类型字段&…...

基于Springboot校园失物招领系统【附源码】

基于Springboot校园失物招领系统 效果如下&#xff1a; 系统登陆页面 物品页面 系统首页面 失物招领管理页面 失物认领页面 宣传视频页面 物品挂失留言管理页面 宣传视频类型管理页面 研究背景 在校园环境中&#xff0c;失物招领是一个常见的问题。传统的失物招领方式主要依…...

单片机端口操作和独立引脚操作

单片机端口操作和独立引脚操作 在单片机编程中&#xff0c;控制I/O端口是最基础的操作之一。通过控制端口&#xff0c;我们可以实现对外设&#xff08;如LED、按键、继电器等&#xff09;的控制。在51单片机中&#xff0c;有两种常见的端口操作方式&#xff1a;整体控制&#…...

【Vim Masterclass 笔记03】S03L10 + S03L11:Vim 中的文本删除操作以及 Vim 思维习惯的培养(含 DIY 拓展知识点)

文章目录 Section 3&#xff1a;Vim Essentials&#xff08;Vim 核心知识&#xff09;S03L10 Vim 核心浏览命令同步练习点评课S03L11 Deleting Text and "Thinking in Vim" 文本的删除及 Vim 思维习惯的培养1 删除单个字符2 删除一个单词2.1 推广1&#xff1a;D HJK…...

ARM200~500部署

前提&#xff1a;数据库已经安装好&#xff0c;并且正常运行 1.修改hostname,将里面的AR-A 改为hzx vi /etc/hostname 2.重启网络服务 sudo systemctl restart NetworkManager 3.修改community-admin.service 文件&#xff0c;更改小区名称和IP&#xff0c;并将文件上传到/…...

word中插入zotero引用

1、参考文献末尾没有文献&#xff1f; 在文献条目要显示的地方点击“refresh” 2、参考文献条目没有悬挂缩进&#xff1f; 把“书目”添加到样式库中&#xff0c;修改样式为悬挂缩进1.5字符 3、交叉引用&#xff1f; 宏 新建一个宏 粘贴下面代码 Public Sub ZoteroLinkCita…...

需求上线,为什么要刷缓存?

在需求上线的过程中&#xff0c;刷缓存主要有以下几个重要原因&#xff1a; 一、保证数据的准确性 旧数据残留问题 缓存是为了加快数据访问速度而存储的数据副本。在需求更新后&#xff0c;之前缓存中的数据可能已经不符合新的业务逻辑。例如&#xff0c;一个电商网站修改了商…...

TVS二极管选型【EMC】

TVS器件并联在电路中&#xff0c;当电路正常工作时&#xff0c;他处于截止状态&#xff08;高阻态&#xff09;&#xff0c;不影响线路正常工作&#xff0c;当线路处于异常过压并达到其击穿电压时&#xff0c;他迅速由高阻态变为低阻态&#xff0c;给瞬间电流提供一个低阻抗导通…...

《从入门到精通:蓝桥杯编程大赛知识点全攻略》(一)-递归实现指数型枚举、递归实现排列型枚举

本篇博客将聚焦于通过递归来实现两种经典的枚举方法&#xff1a;指数型枚举和排列型枚举。这两种枚举方式在计算机科学和算法竞赛中都有广泛应用&#xff0c;无论是在解题中&#xff0c;还是在实际工作中都极具价值。 目录 前言 斐波那契数列递归 递归实现指数型枚举 算法思…...

C#对线程同步的应用

什么是线程同步&#xff1f;线程同步的应用场景有哪些&#xff1f;在C#中有哪些线程同步方式&#xff1f;下面对这些问题做一个总结&#xff0c;让大家在面试的时候遇到这些问题能够游刃有余。 线程同步是指在多线程环境下&#xff0c;多个线程同时访问共享资源时&#xff0c;确…...

基于微信小程序的面部动作检测系统

引言 本技术文档旨在详细阐述一个基于微信小程序的面部动作检测系统的技术路线、实现方法及关键技术框架。系统的核心功能包括检测用户的左右转头、眨眼和张嘴动作&#xff0c;并根据检测结果逐步引导用户完成任务。为确保系统的安全性和准确性&#xff0c;特别是防止用户通过…...

Activation Functions

Chapter4&#xff1a;Activation Functions 声明&#xff1a;本篇博客笔记来源于《Neural Networks from scratch in Python》&#xff0c;作者的youtube 其实关于神经网络的入门博主已经写过几篇了&#xff0c;这里就不再赘述&#xff0c;附上链接。 1.一文窥见神经网络 2.神经…...

《Vue3实战教程》37:Vue3生产部署

如果您有疑问&#xff0c;请观看视频教程《Vue3实战教程》 生产部署​ 开发环境 vs. 生产环境​ 在开发过程中&#xff0c;Vue 提供了许多功能来提升开发体验&#xff1a; 对常见错误和隐患的警告对组件 props / 自定义事件的校验响应性调试钩子开发工具集成 然而&#xff…...

Linux:各发行版及其包管理工具

相关阅读 Linuxhttps://blog.csdn.net/weixin_45791458/category_12234591.html?spm1001.2014.3001.5482 Debian 包管理工具&#xff1a;dpkg&#xff08;低级包管理器&#xff09;、apt&#xff08;高级包管理器&#xff0c;建立在dpkg基础上&#xff09;包格式&#xff1a;…...

【计算机网络】课程 作业一 搭建连续覆盖的办公网络

作业一 搭建连续覆盖的办公网络 题目&#xff1a;论述题&#xff08;共1题&#xff0c;100分&#xff09; 充分利用所学习的数据链路层局域网知识&#xff0c;加上物理层的基础知识&#xff0c;请给一个办公场所&#xff08;三层&#xff0c;每层约100平方&#xff09;&#xf…...

C++ 设计模式:单例模式(Singleton Pattern)

链接&#xff1a;C 设计模式 链接&#xff1a;C 设计模式 - 享元模式 单例模式&#xff08;Singleton Pattern&#xff09;是创建型设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一个全局访问点来访问这个实例。单例模式在需要全局共享资源或控制实例数量的…...

OpenCV调整图像亮度和对比度

【欢迎关注编码小哥&#xff0c;学习更多实用的编程方法和技巧】 1、基本方法---线性变换 // 亮度和对比度调整 cv::Mat adjustBrightnessContrast(const cv::Mat& src, double alpha, int beta) {cv::Mat dst;src.convertTo(dst, -1, alpha, beta);return dst; }// 使用…...

Kafka Offset explorer使用

Kafka集群配置好以后以后运维这边先用工具测试一下&#xff0c;便于rd展开后续的工作&#xff0c;本地调试时一般使用Offset explorer工具进行连接 使用SASL(Simple Authentication and Security Layer)验证方式 使用SCRAM-SHA-256(Salted Challenge Response Authentication…...

二维码文件在线管理系统-收费版

需求背景 如果大家想要在网上管理自己的文件&#xff0c;而且需要生成二维码&#xff0c;下面推荐【草料二维码】&#xff0c;这个系统很好。特别适合那些制造业&#xff0c;实体业的使用手册&#xff0c;你可以生成一个二维码&#xff0c;贴在设备上&#xff0c;然后这个二维码…...

UE4.27 Android环境下获取手机电量

获取电量方法 使用的方法时FAndroidMisc::GetBatteryLevel(); 出现的问题 但是在电脑上编译时发现&#xff0c;会发现编译无法通过。 因为安卓环境下编译时&#xff0c;包含 #include "Android/AndroidPlatformMisc.h" 头文件是可以正常链接的&#xff0c;但在电…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

使用VSCode开发Django指南

使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架&#xff0c;专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用&#xff0c;其中包含三个使用通用基本模板的页面。在此…...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用&#xff1a;作为微服务架构的网关&#xff0c;统一入口&#xff0c;处理所有外部请求。 核心能力&#xff1a; 路由转发&#xff08;基于路径、服务名等&#xff09;过滤器&#xff08;鉴权、限流、日志、Header 处理&#xff09;支持负…...

DAY 47

三、通道注意力 3.1 通道注意力的定义 # 新增&#xff1a;通道注意力模块&#xff08;SE模块&#xff09; class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...

基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解

JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用&#xff0c;结合SQLite数据库实现联系人管理功能&#xff0c;并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能&#xff0c;同时可以最小化到系统…...

C#学习第29天:表达式树(Expression Trees)

目录 什么是表达式树&#xff1f; 核心概念 1.表达式树的构建 2. 表达式树与Lambda表达式 3.解析和访问表达式树 4.动态条件查询 表达式树的优势 1.动态构建查询 2.LINQ 提供程序支持&#xff1a; 3.性能优化 4.元数据处理 5.代码转换和重写 适用场景 代码复杂性…...

Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换

目录 关键点 技术实现1 技术实现2 摘要&#xff1a; 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式&#xff08;自动驾驶、人工驾驶、远程驾驶、主动安全&#xff09;&#xff0c;并通过实时消息推送更新车…...

三分算法与DeepSeek辅助证明是单峰函数

前置 单峰函数有唯一的最大值&#xff0c;最大值左侧的数值严格单调递增&#xff0c;最大值右侧的数值严格单调递减。 单谷函数有唯一的最小值&#xff0c;最小值左侧的数值严格单调递减&#xff0c;最小值右侧的数值严格单调递增。 三分的本质 三分和二分一样都是通过不断缩…...

[ACTF2020 新生赛]Include 1(php://filter伪协议)

题目 做法 启动靶机&#xff0c;点进去 点进去 查看URL&#xff0c;有 ?fileflag.php说明存在文件包含&#xff0c;原理是php://filter 协议 当它与包含函数结合时&#xff0c;php://filter流会被当作php文件执行。 用php://filter加编码&#xff0c;能让PHP把文件内容…...

WebRTC从入门到实践 - 零基础教程

WebRTC从入门到实践 - 零基础教程 目录 WebRTC简介 基础概念 工作原理 开发环境搭建 基础实践 三个实战案例 常见问题解答 1. WebRTC简介 1.1 什么是WebRTC&#xff1f; WebRTC&#xff08;Web Real-Time Communication&#xff09;是一个支持网页浏览器进行实时语音…...