Apache SeaTunnel MongoDB CDC 使用指南
随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。

本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。
支持的引擎
SeaTunnel Zeta
Flink
主要特性
- 批处理
- 流处理
- 精确一次
- 列投影
- 并行度
- 支持用户定义分片
功能描述
MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。
支持的数据源信息
要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。
| 数据源 | 支持的版本 | 依赖 |
|---|---|---|
| MongoDB | 通用 | 下载 |
可用性设置
- MongoDB版本:MongoDB 版本 >= 4.0。
- 集群部署:副本集或分片集群。
- 存储引擎:WiredTiger 存储引擎。
- 权限:changeStream 和 read
use admin;
db.createRole({role: "strole",privileges: [{resource: { db: "", collection: "" },actions: ["splitVector","listDatabases","listCollections","collStats","find","changeStream" ]}],roles: [{ role: 'read', db: 'config' }]}
);db.createUser({user: 'stuser',pwd: 'stpw',roles: [{ role: 'strole', db: 'admin' }]}
);
数据类型映射
以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。
| MongoDB BSON 类型 | SeaTunnel 数据类型 |
|---|---|
| ObjectId | STRING |
| String | STRING |
| Boolean | BOOLEAN |
| Binary | BINARY |
| Int32 | INTEGER |
| Int64 | BIGINT |
| Double | DOUBLE |
| Decimal128 | DECIMAL |
| Date | DATE |
| Timestamp | TIMESTAMP |
| Object | ROW |
| Array | ARRAY |
对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。
| MongoDB BSON 类型 | SeaTunnel STRING 表示 |
|---|---|
| Symbol | {"_value": {"$symbol": "12"}} |
| RegularExpression | {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}} |
| JavaScript | {"_value": {"$code": "function() { return 10; }"}} |
| DbPointer | {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}} |
提示
在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。
| 名称 | 类型 | 必须 | 默认值 | 描述 |
|---|---|---|---|---|
| hosts | String | 是 | - | MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018 |
| username | String | 否 | - | 连接 MongoDB 时使用的数据库用户名。 |
| password | String | 否 | - | 连接 MongoDB 时使用的密码。 |
| database | List | 是 | - | 要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。 |
| collection | List | 是 | - | 数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。 |
| connection.options | String | 否 | - | MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。 |
| batch.size | Long | 否 | 1024 | 游标批大小。 |
| poll.max.batch.size | Enum | 否 | 1024 | 轮询新数据时包含在单个批次中的更改流文档的最大数量。 |
| poll.await.time.ms | Long | 否 | 1000 | 等待检查更改流上的新结果之前的时间量。 |
| heartbeat.interval.ms | String | 否 | 0 | 发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 |
| incremental.snapshot.chunk.size.mb | Long | 否 | 64 | 增量快照的块大小(MB)。 |
| common-options | 否 | - | 源插件通用参数,请参考源通用选项获取详情。 |
提示:
- 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
- MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
- 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。
如何创建 MongoDB CDC 数据同步作业
将 CDC 数据打印到客户端
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:
env {# 您可以在此处设置引擎配置parallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpwschema = {fields {"_id" : string,"name" : string,"description" : string,"weight" : string}}}
}# 在本地客户端打印读取的 MongoDB 数据
sink {Console {parallelism = 1}
}
将 CDC 数据写入 MysqlDB
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:
env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpw}
}sink {jdbc {url = "jdbc:mysql://mysql_cdc_e2e:3306"driver = "com.mysql.cj.jdbc.Driver"user = "st_user"password = "seatunnel"generate_sink_sql = true# You need to configure both database and tabledatabase = mongodb_cdctable = productsprimary_keys = ["_id"]}
}
多表同步
以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:
env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory","crm"]collection = ["inventory.products","crm.test"]username = stuserpassword = stpw}
}# Console printing of the read Mongodb data
sink {Console {parallelism = 1}
}
提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。
使用正则表达式匹配多表
以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:
| 匹配示例 | 表达式 | 描述 |
|---|---|---|
| 前缀匹配 | ^(test).* | 匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。 |
| 后缀匹配 | .*[p$] | 匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。 |
| ``` | ||
| env { | ||
| # You can set engine configuration here | ||
| parallelism = 1 | ||
| job.mode = "STREAMING" | ||
| checkpoint.interval = 5000 | ||
| } |
source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }
Console printing of the read Mongodb data
sink { Console { parallelism = 1 } }
### 实时流数据格式
{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }
```
到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。
Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!
本文由 白鲸开源科技 提供发布支持!
相关文章:
Apache SeaTunnel MongoDB CDC 使用指南
随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。 本文将深入探讨该连…...
智能合约 之 部署ERC-20
Remix介绍 Remix是一个由以太坊社区开发的在线集成开发环境(IDE),旨在帮助开发者编写、测试和部署以太坊智能合约。它提供了一个简单易用的界面,使得开发者可以在浏览器中直接进行智能合约的开发,而无需安装任何额外的…...
【C++】用红黑树模拟实现set、map
目录 前言及准备:一、红黑树接口1.1 begin1.2 end1.3 查找1.4 插入1.5 左单旋和右单旋 二、树形迭代器(正向)2.1 前置 三、模拟实现set四、模拟实现map 前言及准备: set、map的底层结构是红黑树,它们的函数通过调用红…...
实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式)
实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式) 1、操作环境1、查看当前数据库版本2、操作系统版本3、查看 Linux 系统上的 glibc(GNU C 库)版本(**这里很重要,要下载对应的内核mysql版本…...
深入探讨医保购药APP的技术架构与设计思路
随着移动互联网的发展,医疗保健行业也迎来了数字化转型的浪潮。医保购药APP作为医保体系数字化的一部分,其技术架构和设计思路至关重要。接下来,小编将为您讲解医保购药APP的技术架构与设计思路,为相关从业者提供参考和启发。 一、…...
react中点击按钮不能获取最新的state时候
在这个问题中,用户希望在点击确认按钮时触发handleChange函数,并且能够正确获取到最新的bzText值。最初的代码中,在handleOpen函数中弹出一个确认框,并在确认框的onOk回调函数中调用handleChange函数。然而,由于组件传…...
2、鸿蒙学习-申请调试证书和调试Profile文件
申请发布证书 发布证书由AGC颁发的、为HarmonyOS应用配置签名信息的数字证书,可保障软件代码完整性和发布者身份真实性。证书格式为.cer,包含公钥、证书指纹等信息。 说明 请确保您的开发者帐号已实名认证。每个帐号最多申请1个发布证书。 1、登录AppGa…...
蓝桥杯算法基础(13):十大排序算法(希尔排序) (快速排序)c语言版
希尔排序 优化版的插入排序,优化的地方就是步长(增量)增大了,原来的插入排序的步长(增量)是1,而希尔排序的步长(增量)可以很大,然后逐渐减小直到1形成插入排…...
web学习笔记(三十二)
目录 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 1.2call、apply、bind的不同点 1.3call、apply、bind的使用场景 2. 对象的深拷贝 2.1对象的浅拷贝 2.1对象的深拷贝 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 在没有传参数时&…...
Android 地图SDK 绘制点 删除 指定
问题 Android 地图SDK 删除指定绘制点 详细问题 笔者进行Android 项目开发,对于已标记的绘制点,提供撤回按钮,即删除绘制点,如何实现。 解决方案 新增绘制点 private List<Marker> markerList new ArrayList<>…...
Nodejs 第五十八章(大文件上传)
在现代网站中,越来越多的个性化图片,视频,去展示,因此我们的网站一般都会支持文件上传。 文件上传的方案 大文件上传:将大文件切分成较小的片段(通常称为分片或块),然后逐个上传这…...
Linux编译器--gcc/g++的使用
1. gcc与g gcc与g分别是c语言与c代码的编译器,但同时g也兼容c语言。 我们知道在Linux中,系统并不以文件后缀来区分文件类别。但对于gcc与g等编译器而言却是需要的。Linux中c代码文件的后缀是.c,c代码文件的后缀是.cpp(.cc)(.cxx)。 在Linu…...
苍穹外卖-day13:vue基础回顾+进阶
vue基础回顾进阶 课程内容 VUE 基础回顾路由 Vue-Router状态管理 vuexTypeScript 1. VUE 基础回顾 1.1 基于脚手架创建前端工程 1.1.1 环境要求 要想基于脚手架创建前端工程,需要具备如下环境要求: node.js 前端项目的运行环境 学习web阶段已安…...
蓝桥杯/慈善晚会/c\c++
问题描述 热心公益的G哥哥又来举办慈善晚会了,这次他邀请到了巴菲特、马云等巨富,还邀请到了大V、小C等算法界泰斗。晚会一共邀请了n位尊贵的客人,每位客人都位于不同的城市,也就是说每座城市都有且仅有一位客人。这些城市的编号为…...
2024.3.19
思维导图...
【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例
【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例 🌈 个人主页:高斯小哥 🔥 高质量专栏:Matplotlib之旅:零基础精通数据可视化、Python基础【高质量合集】、PyT…...
【DataWhale学习笔记】使用AgentScope调用qwen大模型
AgentScope AgentScope介绍 AgentScope是一款全新的Multi-Agent框架,专为应用开发者打造,旨在提供高易用、高可靠的编程体验! 高易用:AgentScope支持纯Python编程,提供多种语法工具实现灵活的应用流程编排ÿ…...
【C++】手撕AVL树
> 作者简介:დ旧言~,目前大二,现在学习Java,c,c,Python等 > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:能直接手撕AVL树。 > 毒鸡汤:放弃自…...
探索 TorchRe-ID--基于 Python 的人员再识别库
导言 人员再识别(re-ID)是计算机视觉中的一项重要任务,在监控系统、零售分析和人机交互中有着广泛的应用。TorchRe-ID 是一个功能强大、用户友好的 Python 库,它为人员再识别任务提供了一套全面的工具和模型。在本文中࿰…...
鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:Flex)
以弹性方式布局子组件的容器组件。 说明: 该组件从API Version 7开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。Flex组件在渲染时存在二次布局过程,因此在对性能有严格要求的场景下建议使用Column、Row代替。Flex组…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...
K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
在四层代理中还原真实客户端ngx_stream_realip_module
一、模块原理与价值 PROXY Protocol 回溯 第三方负载均衡(如 HAProxy、AWS NLB、阿里 SLB)发起上游连接时,将真实客户端 IP/Port 写入 PROXY Protocol v1/v2 头。Stream 层接收到头部后,ngx_stream_realip_module 从中提取原始信息…...
高等数学(下)题型笔记(八)空间解析几何与向量代数
目录 0 前言 1 向量的点乘 1.1 基本公式 1.2 例题 2 向量的叉乘 2.1 基础知识 2.2 例题 3 空间平面方程 3.1 基础知识 3.2 例题 4 空间直线方程 4.1 基础知识 4.2 例题 5 旋转曲面及其方程 5.1 基础知识 5.2 例题 6 空间曲面的法线与切平面 6.1 基础知识 6.2…...
MODBUS TCP转CANopen 技术赋能高效协同作业
在现代工业自动化领域,MODBUS TCP和CANopen两种通讯协议因其稳定性和高效性被广泛应用于各种设备和系统中。而随着科技的不断进步,这两种通讯协议也正在被逐步融合,形成了一种新型的通讯方式——开疆智能MODBUS TCP转CANopen网关KJ-TCPC-CANP…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
