当前位置: 首页 > news >正文

TiDB系列之:使用Flink TiDB CDC Connector采集数据

TiDB系列之:使用Flink TiDB CDC Connector采集数据

  • 一、依赖项
  • 二、Maven依赖
  • 三、SQL Client JAR
  • 四、如何创建 TiDB CDC 表
  • 五、连接器选项
  • 六、可用元数据
  • 七、特征
    • 一次性处理
    • 启动阅读位置
    • 多线程读取
    • DataStream Source
  • 八、数据类型映射

TiDB CDC 连接器允许从 TiDB 数据库读取快照数据和增量数据。本文档介绍如何设置 TiDB CDC 连接器以对 TiDB 数据库运行 SQL 查询。

  • TiDB系列之:使用TiCDC增量同步TiDB数据库数据
  • TiDB系列之:TiCDC同步数据到Kafka集群使用Debezium数据格式
  • TiDB系列之:TiCDC同步TiDB数据库数据到Kafka集群Topic

一、依赖项

为了设置 TiDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)的项目和带有 SQL JAR 包的 SQL Client 的依赖信息。

二、Maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-tidb-cdc</artifactId><version>3.0.1</version>
</dependency>

三、SQL Client JAR

下载链接仅适用于稳定版本。

下载 flink-sql-connector-tidb-cdc-3.0.1.jar 并将其放在 <FLINK_HOME>/lib/ 下。

四、如何创建 TiDB CDC 表

TiDB CDC 表可以定义如下:

-- checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   -- register a TiDB table 'orders' in Flink SQL
Flink SQL> CREATE TABLE orders (order_id INT,order_date TIMESTAMP(3),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY(order_id) NOT ENFORCED) WITH ('connector' = 'tidb-cdc','tikv.grpc.timeout_in_ms' = '20000', 'pd-addresses' = 'localhost:2379','database-name' = 'mydb','table-name' = 'orders'
);-- read snapshot and binlogs from orders table
Flink SQL> SELECT * FROM orders;

五、连接器选项

参数是否必须默认值类型描述
connectorrequired(none)String指定使用什么连接器,这里应该是“tidb-cdc”。
database-namerequired(none)String要监控的 TiDB 服务器的数据库名称。
table-namerequired(none)String要监控的 TiDB 数据库的表名。
scan.startup.modeoptionalinitialStringTiDB CDC Consumer 可选的启动模式,有效枚举为“initial”和“latest-offset”。
pd-addressesrequired(none)StringTiKV 集群的 PD 地址。
tikv.grpc.timeout_in_msoptional(none)LongTiKV GRPC 超时(以毫秒为单位)。
tikv.grpc.scan_timeout_in_msoptional(none)LongTiKV GRPC 扫描超时(以毫秒为单位)。
tikv.batch_get_concurrencyoptional20IntegerTiKV GRPC 批量获取并发。
tikv.*optional(none)String传递 TiDB 客户端的属性。

六、可用元数据

以下格式元数据可以在表定义中公开为只读(虚拟)列。

keyDataType描述
table_nameSTRING NOT NULL包含该行的表的名称。
database_nameSTRING NOT NULL包含该行的数据库的名称。
op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。
如果记录是从表的快照而不是binlog中读取的,则该值始终为0。

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

CREATE TABLE products (db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA  FROM 'table_name' VIRTUAL,operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),product_id INT,order_status BOOLEAN,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'tidb-cdc','tikv.grpc.timeout_in_ms' = '20000','pd-addresses' = 'localhost:2379','database-name' = 'mydb','table-name' = 'orders'
);

七、特征

一次性处理

TiDB CDC 连接器是一个 Flink Source 连接器,它会先读取数据库快照,然后继续读取更改事件,即使发生故障也只处理一次。

启动阅读位置

配置选项 scan.startup.mode 指定 TiDB CDC Consumer 的启动模式。有效的枚举是:

  • initial(默认):拍摄捕获表的结构和数据的快照;如果您想从捕获的表中获取数据的完整表示,则很有用。
  • latest-offset:仅对捕获的表的结构进行快照;如果只需要获取从现在开始发生的更改,则很有用。

多线程读取

TiDB CDC 源可以并行读取工作,因为有多个任务可以接收更改事件。

DataStream Source

TiDB CDC 连接器也可以是 DataStream 源。您可以创建一个 SourceFunction,如下所示:

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import org.apache.flink.cdc.connectors.tidb.TDBSourceOptions;
import org.apache.flink.cdc.connectors.tidb.TiDBSource;
import org.apache.flink.cdc.connectors.tidb.TiKVChangeEventDeserializationSchema;
import org.apache.flink.cdc.connectors.tidb.TiKVSnapshotEventDeserializationSchema;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Kvrpcpb;import java.util.HashMap;public class TiDBSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> tidbSource =TiDBSource.<String>builder().database("mydb") // set captured database.tableName("products") // set captured table.tiConf(TDBSourceOptions.getTiConfiguration("localhost:2399", new HashMap<>())).snapshotEventDeserializer(new TiKVSnapshotEventDeserializationSchema<String>() {@Overridepublic void deserialize(Kvrpcpb.KvPair record, Collector<String> out)throws Exception {out.collect(record.toString());}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).changeEventDeserializer(new TiKVChangeEventDeserializationSchema<String>() {@Overridepublic void deserialize(Cdcpb.Event.Row record, Collector<String> out)throws Exception {out.collect(record.toString());}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// enable checkpointenv.enableCheckpointing(3000);env.addSource(tidbSource).print().setParallelism(1);env.execute("Print TiDB Snapshot + Binlog");}
}

八、数据类型映射

TiDB typeFlink SQL typeNOTE
TINYINTTINYINT
SMALLINT、TINYINT UNSIGNEDSMALLINT
INT、MEDIUMINT、SMALLINT UNSIGNEDINT
BIGINT、INT UNSIGNEDBIGINT
BIGINT UNSIGNEDDECIMAL(20, 0)
FLOATFLOAT
REAL、DOUBLEDOUBLE
NUMERIC(p, s) DECIMAL(p, s) where p <= 38DECIMAL(p, s)
NUMERIC(p, s) DECIMAL(p, s) where 38 < p <= 65STRING在 TiDB 中 DECIMAL 数据类型的精度最高为 65,但在 Flink 中 DECIMAL 的精度限制为 38。因此,如果定义精度大于 38 的十进制列,则应将其映射到 STRING 以避免精度损失。
BOOLEAN、TINYINT(1)、BIT(1)BOOLEAN
DATEDATE
TIME [§]TIME [§]
TIMESTAMP [§]TIMESTAMP_LTZ [§]
DATETIME [§]TIMESTAMP [§]
CHAR(n)CHAR(n)
VARCHAR(n)VARCHAR(n)
BIT(n)BINARY(⌈n/8⌉)
BINARY(n)BINARY(n)
TINYTEXT、TEXT、MEDIUMTEXT、LONGTEXTSTRING
TINYBLOB、BLOB、MEDIUMBLOB、LONGBLOBBYTES目前,TiDB 中的 BLOB 数据类型仅支持长度不大于 2,147,483,647(2 ** 31 - 1) 的 Blob。
YEARINT
ENUMSTRING
JSONSTRINGJSON 数据类型在 Flink 中会被转换为 JSON 格式的 STRING。
SETARRAY由于 TiDB 中的 SET 数据类型是一个字符串对象,可以有零个或多个值,因此它应该始终映射到字符串数组

相关文章:

TiDB系列之:使用Flink TiDB CDC Connector采集数据

TiDB系列之&#xff1a;使用Flink TiDB CDC Connector采集数据 一、依赖项二、Maven依赖三、SQL Client JAR四、如何创建 TiDB CDC 表五、连接器选项六、可用元数据七、特征一次性处理启动阅读位置多线程读取DataStream Source 八、数据类型映射 TiDB CDC 连接器允许从 TiDB 数…...

每日一道算法题 最接近的三数之和

题目 16. 最接近的三数之和 - 力扣&#xff08;LeetCode&#xff09; Python class Solution:def threeSumClosest(self, nums: List[int], target: int) -> int:nums.sort()nlen(nums)ans0min_diffinf # infinite 无穷for i in range(n-2):tmpnums[i]li1rn-1while l<…...

搭建自己的金融数据源和量化分析平台(六):下载并存储沪深两市上市公司财报

基于不依赖wind、某花顺等第三方平台数据的考虑&#xff0c;尝试直接从财报中解析三大报表进而计算ROE等财务指标&#xff0c;因此需要下载沪深两市的上市公司财报数据&#xff0c;便于后续从pdf中解析三大报表。 深市爬虫好做&#xff0c;先放深市爬虫&#xff1a; 根据时间段…...

C语言-常见关键字详解

一、const 关键字const用于声明常量&#xff0c;赋值后&#xff0c;其值不能再被修改。 示例&#xff1a; const int MAX_COUNT 100; 二、static static关键字在不同情境下有不同作用&#xff1a; 1.函数中的静态变量&#xff1a;保留变量状态&#xff0c;仅初始化一次&a…...

异步编程之std::future(一): 使用

目录 1.概述 2.std::future的基本用法 3.使用 std::shared_future 4.std::future的使用场景 5.总结 1.概述 在编程实践中&#xff0c;我们常常需要使用异步调用。通过异步调用&#xff0c;我们可以将一些耗时、阻塞的任务交给其他线程来执行&#xff0c;从而保证当前线程的…...

Vue3 + JS项目配置ESLint Pretter

前言 如果在开发大型项目 同时为多人协作开发 那么 ESLint 在项目中极为重要 在使用 ESLint 的同时 也需要使用 Pretter插件 统一对代码进行格式化 二者相辅相成 缺一不可 1. 安装 VsCode 插件 在 VsCode 插件市场搜索安装 ESLint 和 Pretter 2. 安装依赖 这里直接在 pac…...

JavaScript (十四)——JavaScript typeof和类型转换

目录 JavaScript typeof, null, 和 undefined typeof 操作符 null undefined undefined 和 null 的区别 JavaScript 类型转换 JavaScript 数据类型 JavaScript 类型转换 将数字转换为字符串 将布尔值转换为字符串 将日期转换为字符串 将字符串转换为数字 一元运算符…...

CTF-web 基础

网络协议 OSI七层参考模型&#xff1a;一个标准的参考模型 物理层 网线&#xff0c;网线接口等。 数据链路层 可以处理物理层传入的信息。 网络层 比如IP地址 传输层 控制传输的内容的传输&#xff0c;在传输的过程中将要传输的信息分块传输完成之后再进行合并。 应用…...

CP AUTOSAR标准之ChineseV2XNetwork(AUTOSAR_SWS_ChineseV2XNetwork)(更新中……)

1 简介和功能概述 本文档指定了AUTOSAR基础软件模块中国车辆对接网络(CnV2xNet)的功能、API和配置。   中国车联网网络(CnV2xNet)与中国车联网消息(CnV2xMsg)、中国车联网管理(CnV2xMgt)、中国车联网安全(CnV2xSec)以及AUTOSAR BSW模块以太网接口(EthIf)共同构成了AUTOSAR架构…...

【hloc】 项目流程

hloc 项目流程 1. 数据集准备2. 特征提取3. 匹配特征4. 三维重建5. 定位6. 结果评估7. 示例脚本 这个项目涉及到了视觉定位和三维重建的一系列步骤&#xff0c;从特征提取、匹配、三维重建到定位和结果评估。通过提供的脚本文件&#xff0c;用户可以方便地运行整个流程。 1. 数…...

鸿蒙系统开发【应用接续】基本功能

应用接续 介绍 基于ArkTS扩展的声明式开发范式编程语言编写的一个分布式视频播放器&#xff0c;主要包括一个直播视频播放界面&#xff0c;实现视频播放时可以从一台设备迁移到另一台设备继续运行&#xff0c;来选择更合适的设备继续执行播放功能以及PAD视频播放时协同调用手…...

nextTick方法的作用是什么?什么时候会用到

nextTick 方法在 Vue.js 中扮演着重要的角色&#xff0c;它用于在下次 DOM 更新循环结束之后执行延迟回调。这主要用于确保在 Vue 完成 DOM 更新后执行依赖于 DOM 的操作。 作用 确保 DOM 更新完成&#xff1a;Vue 的 DOM 更新是异步的&#xff0c;当你修改了数据后&#xff0…...

多 NodeJS 环境管理

前言 对于某个项目依赖特定版本的 NodeJS&#xff0c;或几个项目的 NodeJS 版本冲突时&#xff0c;需要在系统中安装多个版本的 NodeJS&#xff0c;这时可以使用一些工具来进行多个 NodeJS 的管理。 有很多类似的 NodeJS 管理工具&#xff0c;如 nvm, nvs, n 等&#xff0c;接…...

解决网站被植入跳转木马病毒

概述 网站被植入跳转木马病毒是一种常见的安全威胁&#xff0c;它可能导致网站用户被重定向到恶意站点。本文将指导您如何检测、清除这类木马病毒以及采取预防措施。 步骤1&#xff1a;确认感染 首先&#xff0c;需要确认您的网站确实受到了跳转木马的影响。 示例&#xff…...

Node.js(6)——npm软件包管理

npm npm是Node.js标准的软件包管理器。 使用&#xff1a; 初始化清单文件&#xff1a;npm init-y(得到package.json文件&#xff0c;有则略过此命令)下载软件包&#xff1a;npm i 软件包名称使用软件包 示例&#xff1a; 初始状态下npm文件夹下只有server.js,下载软件包前看…...

区块链核心概念与技术架构简介

引言 区块链&#xff0c;一种分布式账本技术&#xff0c;不仅为数字货币提供了基础设施&#xff0c;更在金融、供应链、物联网等多个领域展现出广泛的应用前景。区块链技术被认为是继蒸汽机、电力、互联网之后&#xff0c;下一代颠覆性的核心技术。 如果说蒸汽机释放了人们的…...

≌图概念凸显包含射线V的直线W是比V长的线

黄小宁 x轴中&#xff1a;各非负数点xh≥0都变回自己即都作恒等变换&#xff0c;其余点x-h都变号为xh就使x轴失去负数点而变为射线V{xh≥0}。这x轴变为射线V⊂x轴是不保距变换即不是x轴的刚体运动使x轴不≌V⊂x轴&#xff08;小学生都知道x轴不≌射线V&#xff09;。据≌图概念…...

子路由的配置方法?

子路由的配置方法主要涉及到在Vue-router中定义嵌套路由&#xff0c;即一个路由内部包含多个子路由。以下是配置子路由的基本步骤&#xff1a; 1. 定义父路由 首先&#xff0c;在Vue Router中定义父路由。父路由可以像其他普通路由一样定义&#xff0c;但通常会有一个组件与之…...

【大模型从入门到精通2】openAI api的入门介绍2

互动对话界面的搭建 让我们来看看如何建立一个互动对话界面&#xff0c;用户可以在此输入查询&#xff0c;系统实时处理并显示响应。 import panel as pn # 用于构建图形用户界面# 初始化对话历史记录和GUI组件 conversation_history [] input_widget pn.widgets.TextInpu…...

【前端编程小白】的HTML从零入门到实战

之前有高中毕业生读了博客&#xff0c;想让我帮他找一些前端入门的内容&#xff0c;他们报的计算机专业&#xff0c;想利用开学前夕学习一下&#xff0c;我给他推荐了一些菜鸟教程呀什么的。后来想&#xff0c;看来还是很多人需要一些更加入门的可成的&#xff0c;而且很多教程…...

BGE-Reranker-v2-m3企业部署:高并发请求压力测试案例

BGE-Reranker-v2-m3企业部署&#xff1a;高并发请求压力测试案例 1. 项目背景与价值 在企业级RAG&#xff08;检索增强生成&#xff09;系统中&#xff0c;检索精度直接影响最终的回答质量。传统向量检索虽然快速&#xff0c;但容易受到关键词相似性的干扰&#xff0c;返回大…...

TypeScript实战:手把手教你实现4种不依赖第三方库的UUID生成器(附完整代码)

TypeScript实战&#xff1a;4种零依赖UUID生成器的实现与优化 在小程序开发或特殊环境下&#xff0c;我们常常面临无法使用第三方库的困境。UUID作为分布式系统中唯一标识符的核心组件&#xff0c;其生成逻辑却往往被封装在uuid这样的第三方库中。本文将带你从零实现四种不同格…...

Reset Windows Update Tool:开源工具解决Windows更新问题的3个高效方案

Reset Windows Update Tool&#xff1a;开源工具解决Windows更新问题的3个高效方案 【免费下载链接】Reset-Windows-Update-Tool Troubleshooting Tool with Windows Updates (Developed in Dev-C). 项目地址: https://gitcode.com/gh_mirrors/re/Reset-Windows-Update-Tool …...

ECharts地图标注避坑指南:解决区域地图显示不全、标注错位等常见问题

ECharts地图标注避坑指南&#xff1a;解决区域地图显示不全、标注错位等常见问题 当你在使用ECharts绘制区域地图时&#xff0c;是否遇到过地图显示不全、标注点位置偏移、JSON数据格式错误等问题&#xff1f;这些问题看似简单&#xff0c;却可能耗费开发者大量时间排查。本文将…...

别再死记硬背公式了!图解OpenCV相机标定:从像素到世界的坐标变换到底在干啥?

图解OpenCV相机标定&#xff1a;从像素到世界的坐标变换全解析 当你第一次看到相机标定的数学公式时&#xff0c;是不是感觉像在看天书&#xff1f;旋转矩阵、平移向量、内参矩阵...这些抽象的概念到底对应着现实世界中的什么&#xff1f;本文将用最直观的方式&#xff0c;带你…...

病床前尽孝心,脊柱 “被折得濒临损伤”!

长期弯腰照顾卧床病人、喂饭、翻身、擦洗&#xff0c;颈腰椎损伤风险显著。弯腰时腰椎弯曲角度过大&#xff0c;椎间盘承受压力剧增&#xff1b;反复弯腰起身照顾病人&#xff0c;肌肉与椎间盘反复冲击&#xff1b;低头专注护理时&#xff0c;颈椎前伸与腰椎受力形成双重负担。…...

location-to-phone-number:如何将电话号码转化为商业智能的地理信息平台

location-to-phone-number&#xff1a;如何将电话号码转化为商业智能的地理信息平台 【免费下载链接】location-to-phone-number This a project to search a location of a specified phone number, and locate the map to the phone number location. 项目地址: https://gi…...

DexGraspNet与多指手抓取算法详解:从理论到工程实现

目录 DexGraspNet与多指手抓取算法详解:从理论到工程实现 第一部分:原理详解 第一章 绪论与灵巧抓取的挑战 1.1 机器人抓取技术演进 1.1.1 从平行夹爪到多指灵巧手 1.1.2 灵巧抓取的独特挑战 1.2 DexGraspNet的研究背景与意义 1.2.1 大规模数据驱动的必要性 1.2.2 D…...

PDF-Parser-1.0保姆级教程:5分钟搞定PDF文档智能解析,小白也能快速上手

PDF-Parser-1.0保姆级教程&#xff1a;5分钟搞定PDF文档智能解析&#xff0c;小白也能快速上手 1. 为什么选择PDF-Parser-1.0&#xff1f; 你是否遇到过这些烦恼&#xff1a; 从PDF复制文字到Word后格式全乱表格数据粘贴后变成一堆乱码论文里的数学公式无法编辑双栏排版的文…...

美图靠AI一年收入38亿,不靠免费大模型API,靠的是什么?

财报数据显示&#xff0c;美图2025年全年实现营业收入38.6亿元&#xff0c;同比大幅增长28.8%&#xff0c;整体营收规模再创新高&#xff0c;展现出核心业务的强劲增长韧性。不过公司常规账面净利润为7亿元&#xff0c;同比下降12.7%&#xff0c;看似利润下滑的背后&#xff0c…...