Apache SeaTunnel MongoDB CDC 使用指南
随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。

本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。
支持的引擎
SeaTunnel Zeta
Flink
主要特性
- 批处理
- 流处理
- 精确一次
- 列投影
- 并行度
- 支持用户定义分片
功能描述
MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。
支持的数据源信息
要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。
| 数据源 | 支持的版本 | 依赖 |
|---|---|---|
| MongoDB | 通用 | 下载 |
可用性设置
- MongoDB版本:MongoDB 版本 >= 4.0。
- 集群部署:副本集或分片集群。
- 存储引擎:WiredTiger 存储引擎。
- 权限:changeStream 和 read
use admin;
db.createRole({role: "strole",privileges: [{resource: { db: "", collection: "" },actions: ["splitVector","listDatabases","listCollections","collStats","find","changeStream" ]}],roles: [{ role: 'read', db: 'config' }]}
);db.createUser({user: 'stuser',pwd: 'stpw',roles: [{ role: 'strole', db: 'admin' }]}
);
数据类型映射
以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。
| MongoDB BSON 类型 | SeaTunnel 数据类型 |
|---|---|
| ObjectId | STRING |
| String | STRING |
| Boolean | BOOLEAN |
| Binary | BINARY |
| Int32 | INTEGER |
| Int64 | BIGINT |
| Double | DOUBLE |
| Decimal128 | DECIMAL |
| Date | DATE |
| Timestamp | TIMESTAMP |
| Object | ROW |
| Array | ARRAY |
对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。
| MongoDB BSON 类型 | SeaTunnel STRING 表示 |
|---|---|
| Symbol | {"_value": {"$symbol": "12"}} |
| RegularExpression | {"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}} |
| JavaScript | {"_value": {"$code": "function() { return 10; }"}} |
| DbPointer | {"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}} |
提示
在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。
| 名称 | 类型 | 必须 | 默认值 | 描述 |
|---|---|---|---|---|
| hosts | String | 是 | - | MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018 |
| username | String | 否 | - | 连接 MongoDB 时使用的数据库用户名。 |
| password | String | 否 | - | 连接 MongoDB 时使用的密码。 |
| database | List | 是 | - | 要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。 |
| collection | List | 是 | - | 数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。 |
| connection.options | String | 否 | - | MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。 |
| batch.size | Long | 否 | 1024 | 游标批大小。 |
| poll.max.batch.size | Enum | 否 | 1024 | 轮询新数据时包含在单个批次中的更改流文档的最大数量。 |
| poll.await.time.ms | Long | 否 | 1000 | 等待检查更改流上的新结果之前的时间量。 |
| heartbeat.interval.ms | String | 否 | 0 | 发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 |
| incremental.snapshot.chunk.size.mb | Long | 否 | 64 | 增量快照的块大小(MB)。 |
| common-options | 否 | - | 源插件通用参数,请参考源通用选项获取详情。 |
提示:
- 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
- MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
- 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。
如何创建 MongoDB CDC 数据同步作业
将 CDC 数据打印到客户端
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:
env {# 您可以在此处设置引擎配置parallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpwschema = {fields {"_id" : string,"name" : string,"description" : string,"weight" : string}}}
}# 在本地客户端打印读取的 MongoDB 数据
sink {Console {parallelism = 1}
}
将 CDC 数据写入 MysqlDB
以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:
env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpw}
}sink {jdbc {url = "jdbc:mysql://mysql_cdc_e2e:3306"driver = "com.mysql.cj.jdbc.Driver"user = "st_user"password = "seatunnel"generate_sink_sql = true# You need to configure both database and tabledatabase = mongodb_cdctable = productsprimary_keys = ["_id"]}
}
多表同步
以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:
env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory","crm"]collection = ["inventory.products","crm.test"]username = stuserpassword = stpw}
}# Console printing of the read Mongodb data
sink {Console {parallelism = 1}
}
提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。
使用正则表达式匹配多表
以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:
| 匹配示例 | 表达式 | 描述 |
|---|---|---|
| 前缀匹配 | ^(test).* | 匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。 |
| 后缀匹配 | .*[p$] | 匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。 |
| ``` | ||
| env { | ||
| # You can set engine configuration here | ||
| parallelism = 1 | ||
| job.mode = "STREAMING" | ||
| checkpoint.interval = 5000 | ||
| } |
source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }
Console printing of the read Mongodb data
sink { Console { parallelism = 1 } }
### 实时流数据格式
{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }
```
到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。
Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!
本文由 白鲸开源科技 提供发布支持!
相关文章:
Apache SeaTunnel MongoDB CDC 使用指南
随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。 本文将深入探讨该连…...
智能合约 之 部署ERC-20
Remix介绍 Remix是一个由以太坊社区开发的在线集成开发环境(IDE),旨在帮助开发者编写、测试和部署以太坊智能合约。它提供了一个简单易用的界面,使得开发者可以在浏览器中直接进行智能合约的开发,而无需安装任何额外的…...
【C++】用红黑树模拟实现set、map
目录 前言及准备:一、红黑树接口1.1 begin1.2 end1.3 查找1.4 插入1.5 左单旋和右单旋 二、树形迭代器(正向)2.1 前置 三、模拟实现set四、模拟实现map 前言及准备: set、map的底层结构是红黑树,它们的函数通过调用红…...
实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式)
实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式) 1、操作环境1、查看当前数据库版本2、操作系统版本3、查看 Linux 系统上的 glibc(GNU C 库)版本(**这里很重要,要下载对应的内核mysql版本…...
深入探讨医保购药APP的技术架构与设计思路
随着移动互联网的发展,医疗保健行业也迎来了数字化转型的浪潮。医保购药APP作为医保体系数字化的一部分,其技术架构和设计思路至关重要。接下来,小编将为您讲解医保购药APP的技术架构与设计思路,为相关从业者提供参考和启发。 一、…...
react中点击按钮不能获取最新的state时候
在这个问题中,用户希望在点击确认按钮时触发handleChange函数,并且能够正确获取到最新的bzText值。最初的代码中,在handleOpen函数中弹出一个确认框,并在确认框的onOk回调函数中调用handleChange函数。然而,由于组件传…...
2、鸿蒙学习-申请调试证书和调试Profile文件
申请发布证书 发布证书由AGC颁发的、为HarmonyOS应用配置签名信息的数字证书,可保障软件代码完整性和发布者身份真实性。证书格式为.cer,包含公钥、证书指纹等信息。 说明 请确保您的开发者帐号已实名认证。每个帐号最多申请1个发布证书。 1、登录AppGa…...
蓝桥杯算法基础(13):十大排序算法(希尔排序) (快速排序)c语言版
希尔排序 优化版的插入排序,优化的地方就是步长(增量)增大了,原来的插入排序的步长(增量)是1,而希尔排序的步长(增量)可以很大,然后逐渐减小直到1形成插入排…...
web学习笔记(三十二)
目录 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 1.2call、apply、bind的不同点 1.3call、apply、bind的使用场景 2. 对象的深拷贝 2.1对象的浅拷贝 2.1对象的深拷贝 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 在没有传参数时&…...
Android 地图SDK 绘制点 删除 指定
问题 Android 地图SDK 删除指定绘制点 详细问题 笔者进行Android 项目开发,对于已标记的绘制点,提供撤回按钮,即删除绘制点,如何实现。 解决方案 新增绘制点 private List<Marker> markerList new ArrayList<>…...
Nodejs 第五十八章(大文件上传)
在现代网站中,越来越多的个性化图片,视频,去展示,因此我们的网站一般都会支持文件上传。 文件上传的方案 大文件上传:将大文件切分成较小的片段(通常称为分片或块),然后逐个上传这…...
Linux编译器--gcc/g++的使用
1. gcc与g gcc与g分别是c语言与c代码的编译器,但同时g也兼容c语言。 我们知道在Linux中,系统并不以文件后缀来区分文件类别。但对于gcc与g等编译器而言却是需要的。Linux中c代码文件的后缀是.c,c代码文件的后缀是.cpp(.cc)(.cxx)。 在Linu…...
苍穹外卖-day13:vue基础回顾+进阶
vue基础回顾进阶 课程内容 VUE 基础回顾路由 Vue-Router状态管理 vuexTypeScript 1. VUE 基础回顾 1.1 基于脚手架创建前端工程 1.1.1 环境要求 要想基于脚手架创建前端工程,需要具备如下环境要求: node.js 前端项目的运行环境 学习web阶段已安…...
蓝桥杯/慈善晚会/c\c++
问题描述 热心公益的G哥哥又来举办慈善晚会了,这次他邀请到了巴菲特、马云等巨富,还邀请到了大V、小C等算法界泰斗。晚会一共邀请了n位尊贵的客人,每位客人都位于不同的城市,也就是说每座城市都有且仅有一位客人。这些城市的编号为…...
2024.3.19
思维导图...
【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例
【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例 🌈 个人主页:高斯小哥 🔥 高质量专栏:Matplotlib之旅:零基础精通数据可视化、Python基础【高质量合集】、PyT…...
【DataWhale学习笔记】使用AgentScope调用qwen大模型
AgentScope AgentScope介绍 AgentScope是一款全新的Multi-Agent框架,专为应用开发者打造,旨在提供高易用、高可靠的编程体验! 高易用:AgentScope支持纯Python编程,提供多种语法工具实现灵活的应用流程编排ÿ…...
【C++】手撕AVL树
> 作者简介:დ旧言~,目前大二,现在学习Java,c,c,Python等 > 座右铭:松树千年终是朽,槿花一日自为荣。 > 目标:能直接手撕AVL树。 > 毒鸡汤:放弃自…...
探索 TorchRe-ID--基于 Python 的人员再识别库
导言 人员再识别(re-ID)是计算机视觉中的一项重要任务,在监控系统、零售分析和人机交互中有着广泛的应用。TorchRe-ID 是一个功能强大、用户友好的 Python 库,它为人员再识别任务提供了一套全面的工具和模型。在本文中࿰…...
鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:Flex)
以弹性方式布局子组件的容器组件。 说明: 该组件从API Version 7开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。Flex组件在渲染时存在二次布局过程,因此在对性能有严格要求的场景下建议使用Column、Row代替。Flex组…...
【网络】每天掌握一个Linux命令 - iftop
在Linux系统中,iftop是网络管理的得力助手,能实时监控网络流量、连接情况等,帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...
【解密LSTM、GRU如何解决传统RNN梯度消失问题】
解密LSTM与GRU:如何让RNN变得更聪明? 在深度学习的世界里,循环神经网络(RNN)以其卓越的序列数据处理能力广泛应用于自然语言处理、时间序列预测等领域。然而,传统RNN存在的一个严重问题——梯度消失&#…...
Java毕业设计:WML信息查询与后端信息发布系统开发
JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发,实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构,服务器端使用Java Servlet处理请求,数据库采用MySQL存储信息࿰…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
Go语言多线程问题
打印零与奇偶数(leetcode 1116) 方法1:使用互斥锁和条件变量 package mainimport ("fmt""sync" )type ZeroEvenOdd struct {n intzeroMutex sync.MutexevenMutex sync.MutexoddMutex sync.Mutexcurrent int…...
给网站添加live2d看板娘
给网站添加live2d看板娘 参考文献: stevenjoezhang/live2d-widget: 把萌萌哒的看板娘抱回家 (ノ≧∇≦)ノ | Live2D widget for web platformEikanya/Live2d-model: Live2d model collectionzenghongtu/live2d-model-assets 前言 网站环境如下,文章也主…...
java高级——高阶函数、如何定义一个函数式接口类似stream流的filter
java高级——高阶函数、stream流 前情提要文章介绍一、函数伊始1.1 合格的函数1.2 有形的函数2. 函数对象2.1 函数对象——行为参数化2.2 函数对象——延迟执行 二、 函数编程语法1. 函数对象表现形式1.1 Lambda表达式1.2 方法引用(Math::max) 2 函数接口…...
PydanticAI快速入门示例
参考链接:https://ai.pydantic.dev/#why-use-pydanticai 示例代码 from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIModel from pydantic_ai.providers.openai import OpenAIProvider# 配置使用阿里云通义千问模型 model OpenAIMode…...
Ubuntu 可执行程序自启动方法
使用 autostart(适用于桌面环境) 适用于 GNOME/KDE 桌面环境(如 Ubuntu 图形界面) 1. 创建 .desktop 文件 sudo vi ~/.config/autostart/my_laser.desktop[Desktop Entry] TypeApplication NameMy Laser Program Execbash -c &…...
DJango知识-模型类
一.项目创建 在想要将项目创键的目录下,输入cmd (进入命令提示符)在cmd中输入:Django-admin startproject 项目名称 (创建项目)cd 项目名称 (进入项目)Django-admin startapp 程序名称 (创建程序)python manage.py runserver 8080 (运行程序)将弹出的网址复制到浏览器中…...
