当前位置: 首页 > news >正文

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

  • 一、技术流程
  • 二、搭建环境
  • 三、创建Kafka changefeed
  • 四、写入数据以产生变更日志
  • 五、配置 Flink 消费 Kafka 数据

一、技术流程

  • 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群
  • 创建 changefeed,将 TiDB 增量数据输出至 Kafka
  • 使用 go-tpc 写入数据到上游 TiDB
  • 使用 Kafka console consumer 观察数据被写入到指定的 Topic
  • (可选)配置 Flink 集群消费 Kafka 内数据

二、搭建环境

部署包含 TiCDC 的 TiDB 集群

在实验或测试环境中,可以使用 TiUP Playground 功能,快速部署 TiCDC,命令如下:

tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
# 查看集群状态
tiup status

三、创建Kafka changefeed

1.创建 changefeed 配置文件

根据 Flink 的要求和规范,每张表的增量数据需要发送到独立的 Topic 中,并且每个事件需要按照主键值分发 Partition。因此,需要创建一个名为 changefeed.conf 的配置文件,填写如下内容:

[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
]

2.创建一个 changefeed,将增量数据输出到 Kafka

tiup ctl:v<CLUSTER_VERSION> cdc changefeed 
create --server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" 
--changefeed-id="kafka-changefeed" 
--config="changefeed.conf"

如果命令执行成功,将会返回被创建的 changefeed 的相关信息,包含被创建的 changefeed 的 ID 以及相关信息,内容如下:

Create changefeed successfully!
ID: kafka-changefeed
Info: {... changfeed info json struct ...}

如果命令长时间没有返回,你需要检查当前执行命令所在服务器到 sink-uri 中指定的 Kafka 机器的网络可达性,保证二者之间的网络连接正常。

生产环境下 Kafka 集群通常有多个 broker 节点,你可以在 sink-uri 中配置多个 broker 的访问地址,这有助于提升 changefeed 到 Kafka 集群访问的稳定性,当部分被配置的 Kafka 节点故障的时候,changefeed 依旧可以正常工作。假设 Kafka 集群中有 3 个 broker 节点,地址分别为 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以参考如下 sink-uri 创建 changefeed:

tiup ctl:v<CLUSTER_VERSION> cdc changefeed create 
--server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" 
--config="changefeed.conf"

3.Changefeed 创建成功后,执行如下命令,查看 changefeed 的状态

tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"

四、写入数据以产生变更日志

完成以上步骤后,TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Kafka,下面对 TiDB 写入数据,以产生增量数据变更日志。

1.模拟业务负载

在测试实验环境下,可以使用 go-tpc 向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。如下命令,首先在上游 TiDB 创建名为 tpcc 的数据库,然后使用 TiUP bench 写入数据到这个数据库中。

tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

2.消费 Kafka Topic 中的数据

changefeed 正常运行时,会向 Kafka Topic 写入数据,你可以通过由 Kafka 提供的 kafka-console-consumer.sh,观测到数据成功被写入到 Kafka Topic 中:

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`

至此,TiDB 的增量数据变更日志就实时地复制到了 Kafka。下一步,你可以使用 Flink 消费 Kafka 数据。当然,你也可以自行开发适用于业务场景的 Kafka 消费端。

五、配置 Flink 消费 Kafka 数据

1.安装 Flink Kafka Connector

在 Flink 生态中,Flink Kafka Connector 用于消费 Kafka 中的数据并输出到 Flink 中。Flink Kafka Connector 并不是内建的,因此在 Flink 安装完毕后,还需要将 Flink Kafka Connector 及其依赖项添加到 Flink 安装目录中。下载下列 jar 文件至 Flink 安装目录下的 lib 目录中,如果你已经运行了 Flink 集群,请重启集群以加载新的插件。

  • flink-connector-kafka-1.17.1.jar
  • flink-sql-connector-kafka-1.17.1.jar
  • kafka-clients-3.5.1.jar

2.创建一个表

可以在 Flink 的安装目录执行如下命令,启动 Flink SQL 交互式客户端:

[root@flink flink-1.15.0]# ./bin/sql-client.sh

随后,执行如下语句创建一个名为 tpcc_orders 的表:

CREATE TABLE tpcc_orders (o_id INTEGER,o_d_id INTEGER,o_w_id INTEGER,o_c_id INTEGER,o_entry_d STRING,o_carrier_id INTEGER,o_ol_cnt INTEGER,o_all_local INTEGER
) WITH (
'connector' = 'kafka',
'topic' = 'tidb_tpcc_orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest'
)

请将 topic 和 properties.bootstrap.servers 参数替换为环境中的实际值。

3.查询表内容

执行如下命令,查询 tpcc_orders 表中的数据:

SELECT * FROM tpcc_orders;

执行成功后,可以观察到有数据输出,如下图

在这里插入图片描述
至此,就完成了 TiDB 与 Flink 的数据集成。

相关文章:

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

TiDB数据库从入门到精通系列之六&#xff1a;使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka 一、技术流程二、搭建环境三、创建Kafka changefeed四、写入数据以产生变更日志五、配置 Flink 消费 Kafka 数据 一、技术流程 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群创建 c…...

Spring对象装配

在spring中&#xff0c;Bean的执行流程为启动spring容器&#xff0c;实例化bean&#xff0c;将bean注册到spring容器中&#xff0c;将bean装配到需要的类中。 既然我们需要将bea装配到需要的类中&#xff0c;那么如何实现呢&#xff1f;这篇文章&#xff0c;将来阐述一下如何实…...

bigemap如何添加mapbox地图?

第一步 打开浏览器&#xff0c;找到你要访问的地图的URL地址&#xff0c;并且确认可以正常在浏览器中访问&#xff1b;浏览器中不能访问&#xff0c;同样也不能在软件中访问。 以下为常用地图源地址&#xff1a; 天地图&#xff1a; http://map.tianditu.gov.cn 包含&…...

python爬虫6:lxml库

python爬虫6&#xff1a;lxml库 前言 ​ python实现网络爬虫非常简单&#xff0c;只需要掌握一定的基础知识和一定的库使用技巧即可。本系列目标旨在梳理相关知识点&#xff0c;方便以后复习。 申明 ​ 本系列所涉及的代码仅用于个人研究与讨论&#xff0c;并不会对网站产生不好…...

Linux查找命令

find find /dir -name filename 按名字查找 find . -name “*.c” 将当前目录及其子目录下所有文件后缀为 .c 的文件列出来 find . -type f 将当前目录及其子目录中的所有文件列出 find . -ctime 20 将当前目录及其子目录下所有最近 20 天内更新过的文件列出 find / -type f -…...

在 IntelliJ IDEA 中使用 Docker 开发指南

目录 一、IDEA安装Docker插件 二、IDEA连接Docker 1、Docker for Windows 连接 2、SSH 连接 3、Connection successful 连接成功 三、查看Docker面板 四、使用插件生成镜像 一、IDEA安装Docker插件 打开 IntelliJ IDEA&#xff0c;点击菜单栏中的 "File" -&g…...

【并发编程】自研数据同步工具的优化:创建线程池多线程异步去分页调用其他服务接口获取海量数据

文章目录 场景&#xff1a;解决方案 场景&#xff1a; 前段时间在做一个数据同步工具&#xff0c;其中一个服务的任务是调用A服务的接口&#xff0c;将数据库中指定数据请求过来&#xff0c;交给kafka去判断哪些数据是需要新增&#xff0c;哪些数据是需要修改的。 刚开始的设…...

python函数、运算符等简单介绍3(无顺序)

set&#xff08;集合&#xff09; 集合(set) -> 负责存储【不重复的数据】&#xff0c;并且是【无序存储】 的容器&#xff0c;主要用来去重和逻辑比较 set1 {1,2,3,4,58,7,4,1,2,3,5} print(set1) print(type(set1)) # 输出结果&#xff1a; {1, 2, 3, 4, 5, 7, 58} <…...

TCP服务器(套接字通信)

服务器 客户端 结果...

【智慧工地源码】:人工智能、BIM技术、机器学习在智慧工地的应用

智慧工地云平台是专为建筑施工领域所打造的一体化信息管理平台。通过大数据、云计算、人工智能、BIM、物联网和移动互联网等高科技技术手段&#xff0c;将施工区域各系统数据汇总&#xff0c;建立可视化数字工地。同时&#xff0c;围绕人、机、料、法、环等各方面关键因素&…...

使用python读Excel文件并写入另一个xls模版

效果如下&#xff1a; 原文件内容 转化后的内容 大致代码如下&#xff1a; 1. load_it.py #!/usr/bin/env python import re from datetime import datetime from io import BytesIO from pathlib import Path from typing import List, Unionfrom fastapi import HTTPExcep…...

债务人去世,债权人要求其妻女承担还款责任,法院支持吗

债务人去世&#xff0c;债权人要求其妻女承担还款责任&#xff0c;法院支持吗 2019年9月20日&#xff0c;老张以公司资金周转为由向好友任某先后借款合计40万。2022年8月27日老张出具还款承诺书&#xff0c;承诺2022年11月30日前归还本息&#xff08;本息90万元&#xff09;到…...

arcgis pro3.0-3.0.1-3.0.2安装教程大全及安装包下载

一. 产品介绍&#xff1a; ArcGIS Pro 这一功能强大的单桌面 GIS 应用程序是一款功能丰富的软件&#xff0c;采用 ArcGIS Pro 用户社区提供的增强功能和创意进行开发。 ArcGIS Pro 支持 2D、3D 和 4D 模式下的数据可视化、高级分析和权威数据维护。 支持通过 Web GIS 在一系列 …...

@RequestHeader使用

RequestHeader 请求头参数的设置 GetMapping("paramTest/requestHeader")public String requestHeaderTest(RequestHeader("name") String name){return name;} 在Postman的Headers中添加请求头参数&#xff0c;不过貌似不能加中文...

LabVIEW开发图像采集和基于颜色的隔离

LabVIEW开发图像采集和基于颜色的隔离 在当今的工业和工厂中&#xff0c;准确性和精度是决定特定行业生产力的两个重要关键点。为了优化生产力&#xff0c;各行各业正在从手动操作转向自动操作和控制。机器人技术在工业过程中的出现为人类提供了机械辅助。机器视觉在工业机器人…...

站长公益主机,免费主机➕免费域名➕博客申请➕论坛申请

站长公益主机&#xff0c;免费主机➕免费域名➕博客申请➕论坛申请 在出教程之前准备好久&#xff0c;测试搭建轻量论坛无压力 选用稳定免费域名➕免费主机分销给&#xff0c;可以套CDN使用 坚持免费时间是大厂不能媲美&#xff0c;刚开始做网站时用的是这个分销&#xff0c;独…...

【PRO-UPDATE】自动更新程序图形小记

大纲流程 设计流程 v0.1 v1.0...

flume系列之:监控Systemctl托管的flume agent组

flume系列之:监控Systemctl托管的flume agent组 一、需求背景二、相关技术博客三、远程登陆flume机器四、发送飞书告警五、监控flume agent组状态一、需求背景 flume接kafka集群,一个kafka集群对应一个flume agent组,会把一组flume agent用systemctl托管每接一个kafka集群会…...

16.3.1 【Linux】程序的观察

既然程序这么重要&#xff0c;那么我们如何查阅系统上面正在运行当中的程序呢&#xff1f;利用静态的 ps 或者是动态的 top&#xff0c;还能以 pstree 来查阅程序树之间的关系。 ps &#xff1a;将某个时间点的程序运行情况撷取下来 仅观察自己的 bash 相关程序&#xff1a; p…...

HarmonyOS 设置全屏NoTitleBar

这篇很有用&#xff1a;玩转HarmonyOS 状态栏&标题栏&导航栏相关操作方法整理 配置页面全屏显示(在config.json中配置)&#xff1a; "metaData": {"customizeData": [{"name": "hwc-theme","value": "androi…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

synchronized 学习

学习源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖&#xff0c;也要考虑性能问题&#xff08;场景&#xff09; 2.常见面试问题&#xff1a; sync出…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

vue3+vite项目中使用.env文件环境变量方法

vue3vite项目中使用.env文件环境变量方法 .env文件作用命名规则常用的配置项示例使用方法注意事项在vite.config.js文件中读取环境变量方法 .env文件作用 .env 文件用于定义环境变量&#xff0c;这些变量可以在项目中通过 import.meta.env 进行访问。Vite 会自动加载这些环境变…...

selenium学习实战【Python爬虫】

selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...

【HarmonyOS 5 开发速记】如何获取用户信息(头像/昵称/手机号)

1.获取 authorizationCode&#xff1a; 2.利用 authorizationCode 获取 accessToken&#xff1a;文档中心 3.获取手机&#xff1a;文档中心 4.获取昵称头像&#xff1a;文档中心 首先创建 request 若要获取手机号&#xff0c;scope必填 phone&#xff0c;permissions 必填 …...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

关于uniapp展示PDF的解决方案

在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项&#xff1a; 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库&#xff1a; npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...

系统掌握PyTorch:图解张量、Autograd、DataLoader、nn.Module与实战模型

本文较长&#xff0c;建议点赞收藏&#xff0c;以免遗失。更多AI大模型应用开发学习视频及资料&#xff0c;尽在聚客AI学院。 本文通过代码驱动的方式&#xff0c;系统讲解PyTorch核心概念和实战技巧&#xff0c;涵盖张量操作、自动微分、数据加载、模型构建和训练全流程&#…...

32单片机——基本定时器

STM32F103有众多的定时器&#xff0c;其中包括2个基本定时器&#xff08;TIM6和TIM7&#xff09;、4个通用定时器&#xff08;TIM2~TIM5&#xff09;、2个高级控制定时器&#xff08;TIM1和TIM8&#xff09;&#xff0c;这些定时器彼此完全独立&#xff0c;不共享任何资源 1、定…...