Flink CDC 提取记录变更时间作为事件时间和 Hudi 表的 precombine.field 以及1970-01-01 取值问题
![]() | 博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。 |
CDC 数据中的记录变更时间标记着这条记录在数据库中执行对应操作(创建/更新/删除)的时间,可以说是天然的“事件时间”,特别是对于那些本身没有记录时间字段的表来说就更加合适了。Flink 官方文档 也建议在使用 CDC 的情况下,优先使用 CDC 中的这个时间字段,这个时间更加精准。
与此同时,在定义 Hudi 表时,precombine.field 也是一个非常重要的配置,显然 CDC 数据中的记录变更时间是最合适的,没有之一。
CDC 数据中的记录变更时间属于元数据范畴,以 Flink CDC 的 MySQL 数据库为例,它提供四种元数据的抽取:
| Key | DataType | Description |
|---|---|---|
| table_name | STRING NOT NULL | Name of the table that contain the row. |
| database_name | STRING NOT NULL | Name of the database that contain the row. |
| op_ts | TIMESTAMP_LTZ(3) NOT NULL | It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0. |
| row_kind | STRING NOT NULL | It indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the ‘row_kind’ column for each record. It is recommended to use this metadata column only in simple synchronization jobs. ‘+I’ means INSERT message, ‘-D’ means DELETE message, ‘-U’ means UPDATE_BEFORE message and ‘+U’ means UPDATE_AFTER message. |
其中的 op_ts 就是我们想要的,也就是:CDC 数据中的记录变更时间。我们可以在定义数据表时声明这个列,Flink CDC 可以将其提取出来作为普通字段供下游使用,就像下表中这样:
CREATE TABLE IF NOT EXISTS orders_mysql_cdc (`order_number` INT NOT NULL,`order_date` DATE NOT NULL,`purchaser` INT NOT NULL,`quantity` INT NOT NULL,`product_id` INT NOT NULL,`op_ts` TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc',...
);
注意,在定义 Flink CDC 源表时,op_ts 的数据类型是 TIMESTAMP_LTZ(3),不是 TIMESTAMP(3),写入下游表时,可以是 TIMESTAMP(3)。
当我们初次使用这个 op_ts 字段时,你会发现,写入到的数据库的数据全部都是 1970-01-01 00:00:00.000,就像下面这样:

你可能会认为是哪里出错了,实际上,这是 Flink CDC 特别设计的,也是合理的,Flink CDC 官方文档的解释是:
If the record is read from snapshot of the table instead of the binlog, the value is always 0.
我们知道,Flink CDC ( 2.0+ ) 的一个显著特征是:它是全量 + 增量的一体化读取!全量就是经常说的历史数据,增量就是实时的数据,控制 Flink CDC 是从全部历史数据开始同步整个数据库还是从只当下的 binlog 中同步近期增量数据的配置项是:scan.startup.mode ( 官方文档 ),该配置项支持 5 种配置,而默认配置(initial)就是以当前分界点,数据中的现有数据使用全量方式读取(也叫快照读取),此后的数据从 binlog 中读取,这样就和上面描述的 op_ts 字段的取值吻合上了:
当 Flink CDC 使用全量方式读取表中的历史数据时,op_ts 字段全部取值为 0,即 1970-01-01 00:00:00.000,当 Flink CDC 使用增量方式读取 binlog 数据时,op_ts 字段的取值为数据发生变更的实际时间。
这种设计还是非常合理的,因为,Flink CDC 本身在使用快照方式读取时,就没有任何变更时间可以读取,这个时间只在 binlog 中才有,而这对下游也不会造成太大的影响,因为此时的数据都是 insert-only 的数据,同一主键也不会出现两条记录,至少对 Hudi 表是没有影响的。
此外,作为一个“额外收获”,你会发现:op_ts 这个字段本身恰好标记了一条记录是通过全量同步进来的,还是增量同步进来的!
补充:以下是 Flink CDC 官方文档对 scan.startup.mode 5 种同步模式的解释:
The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:
initial(default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.
相关文章:
Flink CDC 提取记录变更时间作为事件时间和 Hudi 表的 precombine.field 以及1970-01-01 取值问题
博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,…...
【网络安全】网络安全意识教育实用指南
随着科技的不断发展和数字世界的变革,我们不仅从中获得前所未有的力量,也同时面临着前所未有的风险挑战。多数CISO(首席信息安全官)时刻致力于协助企业抵御各种安全威胁。在“武器库”中有一件珍贵的法宝:网络安全意识…...
wordpress模板购买网站推荐
简站wordpress主题 老牌wordpress开发团队,开发过数百款wordpress主题,作品是最好的简历,靠作品说话,看作品喜欢不喜欢就可以了。 https://www.jianzhanpress.com WP模板牛 免费wordpress下载网站,上面有上百款免费…...
LeetCode 刷题 [C++] 第240题.搜索二维矩阵 II
题目描述 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。 每列的元素从上到下升序排列。 题目分析 通过分析矩阵的特点发现,其左下角和右上角可以看作一个“二叉搜索树的根节…...
HP笔记本电脑如何恢复出厂设置?这里提供几种方法
要恢复出厂设置Windows 11或10的HP笔记本电脑,你可以使用操作系统的标准方法。如果你运行的是早期版本,你可以使用HP提供的单独程序清除计算机并重新安装操作系统。 恢复出厂设置运行Windows 11的HP笔记本电脑 所有Windows 11计算机都有一个名为“重置此电脑”的功能,可…...
Elasticsearch:了解人工智能搜索算法
作者:来自 Elastic Jessica Taylor, Aditya Tripathi 人工智能工具无处不在,其原因并不神秘。 他们可以执行各种各样的任务并找到许多日常问题的解决方案。 但这些应用程序的好坏取决于它们的人工智能搜索算法。 简单来说,人工智能搜索算法是…...
(HAL)STM32F103C6T8——软件模拟I2C驱动0.96寸OLED屏幕
一、电路接法 电路接法参照江科大视频。 二、相关代码及文件 说明:代码采用hal库,通过修改江科大代码实现。仅OLED.c文件关于引脚定义作了hal库修改,并将宏定义OLED_W_SCL(x)、OLED_W_SDA(x)作了相关修改。 1、OLED.c void OLED_I2C_Init(voi…...
分享便携式血氧仪单片机方案
血氧仪主要测量指标分别为脉率、血氧饱和度、灌注指数。血氧饱和度是临床医疗上重要的基础数据之一。以家用指压式血氧仪为例,一个血氧仪一般由MCU、存储芯片、两个控制LED的数模转换器、两个发光二极管驱动等组成。 灵动微电子的MM32MCU产品已被广泛地应用在了一些…...
【Java设计模式】四、适配器模式
文章目录 1、适配器模式2、举例 1、适配器模式 适配器模式Adapter Pattern,是做为两个不兼容的接口之间的桥梁目的是将一个类的接口转换成客户希望的另外一个接口适配器模式可以使得原本由于接口不兼容而不能一起工作的那些类可以一起工作 最后,适配器…...
RV32/64 特权架构 - 特权模式与指令
RV32/64 特权架构 - 特权模式与指令 1 特权模式2 特权指令2.1 mret(从机器模式返回到先前的模式)2.2 sret(从监管模式返回到先前的模式)2.3 wfi(等待中断)2.4 sfence.vma(内存屏障) …...
多微服务合并为一个服务
公司微服务细分太多,最近跟我提说需要将几个微服务合为单体,经过几天的查阅,决定用二次打包的方式进行合并,然后部署的时候在nginx改下合并的微服务转发路劲即可,不需要前端修改路劲了。 方案 采用二次打包的方式进行…...
Springboot企业级开发--开发入门01
目录 目录 一.Spring Boot的主要特点和优势包括: 二.Spring Boot的核心功能可以归纳为以下几点: 三.Springboot是如何解决问题? Spring Boot 是一个开源的Java框架,其设计目标是为了简化新Spring应用的初始搭建以及开发过程。…...
bash和sh和./的区别
bash和sh和./的区别 今天在执行一个脚本的时候,用的是sh script.sh,执行报错,使用bash script.sh执行时就能成功,才知道sh和bash是不一样的 sh sh表示 Bourne Shell,是 Unix 系统上的一种基本的命令解释器。它也可以…...
LeetCode 3:寻找最长不含重复字符的子串长度
LeetCode 3:寻找最长不含重复字符的子串长度 在字符串处理中,寻找最长不含重复字符的子串长度是一个经典问题。 问题描述 给定一个字符串 s ,我们需要找出其中不含有重复字符的最长子串的长度。 解决方案 我们可以使用滑动窗口的方法来解…...
【自然语言处理四-从矩阵操作角度看 自注意self attention】
自然语言处理四-从矩阵操作角度看 自注意self attention 从矩阵角度看self attention获取Q K V矩阵注意力分数softmax注意力的输出再来分析整体的attention的矩阵操作过程从矩阵操作角度看,self attention如何解决问题的?W^q^ W^k^ W^v^这三个矩阵怎么获…...
Unity脚本,串行端口的握手协议(流控制)
在Unity的SerialPort构造函数中,流控制并没有被直接包含。流控制,也被称为握手,是一种过程,它管理数据的传输速度,以防止接收方被发送方发送的数据量所淹没。 在.NET的SerialPort类中,流控制是通过Handshak…...
2023 re:Invent 用 Amazon Q 打造你的知识库
前言 随着 ChatGPT 的问世,我们迎来了许多创新和变革的机会。一年一度的亚马逊云科技大会 re:Invent 也带来了许多前言的技术,其中 Amazon CEO Adam Selipsky 在 2023 re:Invent 大会中介绍 Amazon Q 让我印象深刻,这预示着生成式 AI 的又一…...
ChatGPT 国内快速上手指南
ChatGPT简介 ChatGPT是由OpenAI团队研发的自然语言处理模型,该模型在大量的互联网文本数据上进行了预训练,使其具备了深刻的语言理解和生成能力。 GPT拥有上亿个参数,这使得ChatGPT在处理各种语言任务时表现卓越。它的训练使得模型能够理解上…...
Docker 常用操作命令备忘
Docker 一旦设置好了环境,日常就只要使用简单命令就可以运行和停止。 于是,我每次用的时候,都想不起来一些关键性的命令到底怎么用,特此记录。 一、镜像管理 从公有仓库拉取镜像 (对于使用苹果电脑 M1/M2/M3 芯片的 …...
BUU [CISCN2019 华东南赛区]Web4
BUU [CISCN2019 华东南赛区]Web4 题目描述:Click to launch instance. 开题: 点击链接,有点像SSRF 使用local_file://协议读到本地文件,无法使用file://协议读取,有过滤。 local_file://协议: local_file…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...
Go 语言接口详解
Go 语言接口详解 核心概念 接口定义 在 Go 语言中,接口是一种抽象类型,它定义了一组方法的集合: // 定义接口 type Shape interface {Area() float64Perimeter() float64 } 接口实现 Go 接口的实现是隐式的: // 矩形结构体…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
什么是EULA和DPA
文章目录 EULA(End User License Agreement)DPA(Data Protection Agreement)一、定义与背景二、核心内容三、法律效力与责任四、实际应用与意义 EULA(End User License Agreement) 定义: EULA即…...
unix/linux,sudo,其发展历程详细时间线、由来、历史背景
sudo 的诞生和演化,本身就是一部 Unix/Linux 系统管理哲学变迁的微缩史。来,让我们拨开时间的迷雾,一同探寻 sudo 那波澜壮阔(也颇为实用主义)的发展历程。 历史背景:su的时代与困境 ( 20 世纪 70 年代 - 80 年代初) 在 sudo 出现之前,Unix 系统管理员和需要特权操作的…...
【服务器压力测试】本地PC电脑作为服务器运行时出现卡顿和资源紧张(Windows/Linux)
要让本地PC电脑作为服务器运行时出现卡顿和资源紧张的情况,可以通过以下几种方式模拟或触发: 1. 增加CPU负载 运行大量计算密集型任务,例如: 使用多线程循环执行复杂计算(如数学运算、加密解密等)。运行图…...
项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)
Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败,具体原因是客户端发送了密码认证请求,但Redis服务器未设置密码 1.为Redis设置密码(匹配客户端配置) 步骤: 1).修…...
STM32HAL库USART源代码解析及应用
STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...
解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...
Python网页自动化Selenium中文文档
1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API,让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API,你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…...

