Apache SeaTunnel MongoDB CDC 使用指南
随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。
本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。
支持的引擎
SeaTunnel Zeta
Flink
主要特性
- 批处理
- 流处理
- 精确一次
- 列投影
- 并行度
- 支持用户定义分片
功能描述
MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。
支持的数据源信息
要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。
数据源 | 支持的版本 | 依赖 |
---|---|---|
MongoDB | 通用 | 下载 |
可用性设置
- MongoDB版本:MongoDB 版本 >= 4.0。
- 集群部署:副本集或分片集群。
- 存储引擎:WiredTiger 存储引擎。
- 权限:changeStream 和 read
use admin;
db.createRole({role: "strole",privileges: [{resource: { db: "", collection: "" },actions: ["splitVector","listDatabases","listCollections","collStats","find","changeStream" ]}],roles: [{ role: 'read', db: 'config' }]}
);db.createUser({user: 'stuser',pwd: 'stpw',roles: [{ role: 'strole', db: 'admin' }]}
);
数据类型映射
以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。
MongoDB BSON 类型 | SeaTunnel 数据类型 |
---|---|
ObjectId | STRING |
String | STRING |
Boolean | BOOLEAN |
Binary | BINARY |
Int32 | INTEGER |
Int64 | BIGINT |
Double | DOUBLE |
Decimal128 | DECIMAL |
Date | DATE |
Timestamp | TIMESTAMP |
Object | ROW |
Array | ARRAY |
对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。
MongoDB BSON 类型 | SeaTunnel STRING 表示 |
---|---|
Symbol | {"_value": {"$symbol": "12"}} |
RegularExpression | {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}} |
JavaScript | {"_value": {"$code": "function() { return 10; }"}} |
DbPointer | {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}} |
提示
在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。
名称 | 类型 | 必须 | 默认值 | 描述 |
---|---|---|---|---|
hosts | String | 是 | - | MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018 |
username | String | 否 | - | 连接 MongoDB 时使用的数据库用户名。 |
password | String | 否 | - | 连接 MongoDB 时使用的密码。 |
database | List | 是 | - | 要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。 |
collection | List | 是 | - | 数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。 |
connection.options | String | 否 | - | MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。 |
batch.size | Long | 否 | 1024 | 游标批大小。 |
poll.max.batch.size | Enum | 否 | 1024 | 轮询新数据时包含在单个批次中的更改流文档的最大数量。 |
poll.await.time.ms | Long | 否 | 1000 | 等待检查更改流上的新结果之前的时间量。 |
heartbeat.interval.ms | String | 否 | 0 | 发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 |
incremental.snapshot.chunk.size.mb | Long | 否 | 64 | 增量快照的块大小(MB)。 |
common-options | 否 | - | 源插件通用参数,请参考源通用选项获取详情。 |
提示:
- 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
- MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
- 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。
如何创建 MongoDB CDC 数据同步作业
将 CDC 数据打印到客户端
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:
env {# 您可以在此处设置引擎配置parallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpwschema = {fields {"_id" : string,"name" : string,"description" : string,"weight" : string}}}
}# 在本地客户端打印读取的 MongoDB 数据
sink {Console {parallelism = 1}
}
将 CDC 数据写入 MysqlDB
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:
env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpw}
}sink {jdbc {url = "jdbc:mysql://mysql_cdc_e2e:3306"driver = "com.mysql.cj.jdbc.Driver"user = "st_user"password = "seatunnel"generate_sink_sql = true# You need to configure both database and tabledatabase = mongodb_cdctable = productsprimary_keys = ["_id"]}
}
多表同步
以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:
env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory","crm"]collection = ["inventory.products","crm.test"]username = stuserpassword = stpw}
}# Console printing of the read Mongodb data
sink {Console {parallelism = 1}
}
提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。
使用正则表达式匹配多表
以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:
匹配示例 | 表达式 | 描述 |
---|---|---|
前缀匹配 | ^(test).* | 匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。 |
后缀匹配 | .*[p$] | 匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。 |
``` | ||
env { | ||
# You can set engine configuration here | ||
parallelism = 1 | ||
job.mode = "STREAMING" | ||
checkpoint.interval = 5000 | ||
} |
source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }
Console printing of the read Mongodb data
sink { Console { parallelism = 1 } }
### 实时流数据格式
{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }
```
到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。
Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!
本文由 白鲸开源科技 提供发布支持!
相关文章:

Apache SeaTunnel MongoDB CDC 使用指南
随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。 本文将深入探讨该连…...

智能合约 之 部署ERC-20
Remix介绍 Remix是一个由以太坊社区开发的在线集成开发环境(IDE),旨在帮助开发者编写、测试和部署以太坊智能合约。它提供了一个简单易用的界面,使得开发者可以在浏览器中直接进行智能合约的开发,而无需安装任何额外的…...

【C++】用红黑树模拟实现set、map
目录 前言及准备:一、红黑树接口1.1 begin1.2 end1.3 查找1.4 插入1.5 左单旋和右单旋 二、树形迭代器(正向)2.1 前置 三、模拟实现set四、模拟实现map 前言及准备: set、map的底层结构是红黑树,它们的函数通过调用红…...

实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式)
实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式) 1、操作环境1、查看当前数据库版本2、操作系统版本3、查看 Linux 系统上的 glibc(GNU C 库)版本(**这里很重要,要下载对应的内核mysql版本…...

深入探讨医保购药APP的技术架构与设计思路
随着移动互联网的发展,医疗保健行业也迎来了数字化转型的浪潮。医保购药APP作为医保体系数字化的一部分,其技术架构和设计思路至关重要。接下来,小编将为您讲解医保购药APP的技术架构与设计思路,为相关从业者提供参考和启发。 一、…...
react中点击按钮不能获取最新的state时候
在这个问题中,用户希望在点击确认按钮时触发handleChange函数,并且能够正确获取到最新的bzText值。最初的代码中,在handleOpen函数中弹出一个确认框,并在确认框的onOk回调函数中调用handleChange函数。然而,由于组件传…...

2、鸿蒙学习-申请调试证书和调试Profile文件
申请发布证书 发布证书由AGC颁发的、为HarmonyOS应用配置签名信息的数字证书,可保障软件代码完整性和发布者身份真实性。证书格式为.cer,包含公钥、证书指纹等信息。 说明 请确保您的开发者帐号已实名认证。每个帐号最多申请1个发布证书。 1、登录AppGa…...
蓝桥杯算法基础(13):十大排序算法(希尔排序) (快速排序)c语言版
希尔排序 优化版的插入排序,优化的地方就是步长(增量)增大了,原来的插入排序的步长(增量)是1,而希尔排序的步长(增量)可以很大,然后逐渐减小直到1形成插入排…...
web学习笔记(三十二)
目录 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 1.2call、apply、bind的不同点 1.3call、apply、bind的使用场景 2. 对象的深拷贝 2.1对象的浅拷贝 2.1对象的深拷贝 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 在没有传参数时&…...

Android 地图SDK 绘制点 删除 指定
问题 Android 地图SDK 删除指定绘制点 详细问题 笔者进行Android 项目开发,对于已标记的绘制点,提供撤回按钮,即删除绘制点,如何实现。 解决方案 新增绘制点 private List<Marker> markerList new ArrayList<>…...

Nodejs 第五十八章(大文件上传)
在现代网站中,越来越多的个性化图片,视频,去展示,因此我们的网站一般都会支持文件上传。 文件上传的方案 大文件上传:将大文件切分成较小的片段(通常称为分片或块),然后逐个上传这…...

Linux编译器--gcc/g++的使用
1. gcc与g gcc与g分别是c语言与c代码的编译器,但同时g也兼容c语言。 我们知道在Linux中,系统并不以文件后缀来区分文件类别。但对于gcc与g等编译器而言却是需要的。Linux中c代码文件的后缀是.c,c代码文件的后缀是.cpp(.cc)(.cxx)。 在Linu…...

苍穹外卖-day13:vue基础回顾+进阶
vue基础回顾进阶 课程内容 VUE 基础回顾路由 Vue-Router状态管理 vuexTypeScript 1. VUE 基础回顾 1.1 基于脚手架创建前端工程 1.1.1 环境要求 要想基于脚手架创建前端工程,需要具备如下环境要求: node.js 前端项目的运行环境 学习web阶段已安…...
蓝桥杯/慈善晚会/c\c++
问题描述 热心公益的G哥哥又来举办慈善晚会了,这次他邀请到了巴菲特、马云等巨富,还邀请到了大V、小C等算法界泰斗。晚会一共邀请了n位尊贵的客人,每位客人都位于不同的城市,也就是说每座城市都有且仅有一位客人。这些城市的编号为…...

2024.3.19
思维导图...

【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例
【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例 🌈 个人主页:高斯小哥 🔥 高质量专栏:Matplotlib之旅:零基础精通数据可视化、Python基础【高质量合集】、PyT…...
【DataWhale学习笔记】使用AgentScope调用qwen大模型
AgentScope AgentScope介绍 AgentScope是一款全新的Multi-Agent框架,专为应用开发者打造,旨在提供高易用、高可靠的编程体验! 高易用:AgentScope支持纯Python编程,提供多种语法工具实现灵活的应用流程编排ÿ…...

【C++】手撕AVL树
> 作者简介:დ旧言~,目前大二,现在学习Java,c,c,Python等 > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:能直接手撕AVL树。 > 毒鸡汤:放弃自…...

探索 TorchRe-ID--基于 Python 的人员再识别库
导言 人员再识别(re-ID)是计算机视觉中的一项重要任务,在监控系统、零售分析和人机交互中有着广泛的应用。TorchRe-ID 是一个功能强大、用户友好的 Python 库,它为人员再识别任务提供了一套全面的工具和模型。在本文中࿰…...

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:Flex)
以弹性方式布局子组件的容器组件。 说明: 该组件从API Version 7开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。Flex组件在渲染时存在二次布局过程,因此在对性能有严格要求的场景下建议使用Column、Row代替。Flex组…...

Flask RESTful 示例
目录 1. 环境准备2. 安装依赖3. 修改main.py4. 运行应用5. API使用示例获取所有任务获取单个任务创建新任务更新任务删除任务 中文乱码问题: 下面创建一个简单的Flask RESTful API示例。首先,我们需要创建环境,安装必要的依赖,然后…...

stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

大话软工笔记—需求分析概述
需求分析,就是要对需求调研收集到的资料信息逐个地进行拆分、研究,从大量的不确定“需求”中确定出哪些需求最终要转换为确定的“功能需求”。 需求分析的作用非常重要,后续设计的依据主要来自于需求分析的成果,包括: 项目的目的…...

CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...

深入解析C++中的extern关键字:跨文件共享变量与函数的终极指南
🚀 C extern 关键字深度解析:跨文件编程的终极指南 📅 更新时间:2025年6月5日 🏷️ 标签:C | extern关键字 | 多文件编程 | 链接与声明 | 现代C 文章目录 前言🔥一、extern 是什么?&…...

多种风格导航菜单 HTML 实现(附源码)
下面我将为您展示 6 种不同风格的导航菜单实现,每种都包含完整 HTML、CSS 和 JavaScript 代码。 1. 简约水平导航栏 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport&qu…...

网络编程(UDP编程)
思维导图 UDP基础编程(单播) 1.流程图 服务器:短信的接收方 创建套接字 (socket)-----------------------------------------》有手机指定网络信息-----------------------------------------------》有号码绑定套接字 (bind)--------------…...
MySQL用户和授权
开放MySQL白名单 可以通过iptables-save命令确认对应客户端ip是否可以访问MySQL服务: test: # iptables-save | grep 3306 -A mp_srv_whitelist -s 172.16.14.102/32 -p tcp -m tcp --dport 3306 -j ACCEPT -A mp_srv_whitelist -s 172.16.4.16/32 -p tcp -m tcp -…...

以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
蓝桥杯 冶炼金属
原题目链接 🔧 冶炼金属转换率推测题解 📜 原题描述 小蓝有一个神奇的炉子用于将普通金属 O O O 冶炼成为一种特殊金属 X X X。这个炉子有一个属性叫转换率 V V V,是一个正整数,表示每 V V V 个普通金属 O O O 可以冶炼出 …...