当前位置: 首页 > news >正文

Apache SeaTunnel MongoDB CDC 使用指南

随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。

file

本文将深入探讨该连接器的主要特性、支持的数据源信息、配置选项以及如何创建数据同步作业,助力开发者更好地利用 SeaTunnel 进行数据集成和实时数据分析。这些更新旨在为开发者提供更为丰富的数据处理能力,帮助他们更有效地捕获和处理来自 MongoDB 的变更数据。

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • 批处理
  • 流处理
  • 精确一次
  • 列投影
  • 并行度
  • 支持用户定义分片

功能描述

MongoDB CDC 源连接器允许从 MongoDB 数据库读取快照数据和增量数据。

支持的数据源信息

要使用 MongoDB CDC 连接器,需要以下依赖。它们可以通过 install-plugin.sh 脚本或从 Maven 中央仓库下载。

数据源支持的版本依赖
MongoDB通用下载

可用性设置

  1. MongoDB版本:MongoDB 版本 >= 4.0。
  2. 集群部署:副本集或分片集群。
  3. 存储引擎:WiredTiger 存储引擎。
  4. 权限:changeStream 和 read
use admin;
db.createRole({role: "strole",privileges: [{resource: { db: "", collection: "" },actions: ["splitVector","listDatabases","listCollections","collStats","find","changeStream" ]}],roles: [{ role: 'read', db: 'config' }]}
);db.createUser({user: 'stuser',pwd: 'stpw',roles: [{ role: 'strole', db: 'admin' }]}
);

数据类型映射

以下表格列出了从 MongoDB BSON 类型到 SeaTunnel 数据类型的字段数据类型映射。

MongoDB BSON 类型SeaTunnel 数据类型
ObjectIdSTRING
StringSTRING
BooleanBOOLEAN
BinaryBINARY
Int32INTEGER
Int64BIGINT
DoubleDOUBLE
Decimal128DECIMAL
DateDATE
TimestampTIMESTAMP
ObjectROW
ArrayARRAY

对于 MongoDB 中的特定类型,我们使用扩展 JSON 格式将它们映射到 SeaTunnel STRING 类型。

MongoDB BSON 类型SeaTunnel STRING 表示
Symbol{"_value": {"$symbol": "12"}}
RegularExpression{"_value": {"$regularExpression": {"pattern": "^9$", "options": "i"}}}
JavaScript{"_value": {"$code": "function() { return 10; }"}}
DbPointer{"_value": {"$dbPointer": {"$ref": "db.coll", "$id": {"$oid": "63932a00da01604af329e33c"}}}}
提示

在 SeaTunnel 中使用 DECIMAL 类型时,请注意最大范围不能超过 34 位数字,这意味着你应该使用 decimal(34, 18)。

名称类型必须默认值描述
hostsString-MongoDB 服务器的主机名和端口对的逗号分隔列表。例如:localhost:27017,localhost:27018
usernameString-连接 MongoDB 时使用的数据库用户名。
passwordString-连接 MongoDB 时使用的密码。
databaseList-要监视更改的数据库名称。如果未设置,则会捕获所有数据库。数据库还支持正则表达式,以监视与正则表达式匹配的多个数据库。例如:db1,db2。
collectionList-数据库中要监视更改的集合名称。如果未设置,则会捕获所有集合。集合也支持正则表达式,以监视与完全限定的集合标识符匹配的多个集合。例如:db1.coll1,db2.coll2。
connection.optionsString-MongoDB 的连接选项的和号分隔列表。例如:replicaSet=test&connectTimeoutMS=300000。
batch.sizeLong1024游标批大小。
poll.max.batch.sizeEnum1024轮询新数据时包含在单个批次中的更改流文档的最大数量。
poll.await.time.msLong1000等待检查更改流上的新结果之前的时间量。
heartbeat.interval.msString0发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。
incremental.snapshot.chunk.size.mbLong64增量快照的块大小(MB)。
common-options-源插件通用参数,请参考源通用选项获取详情。

提示:

  • 如果集合变更速度较慢,强烈建议为 heartbeat.interval.ms 参数设置大于 0 的适当值。当我们从检查点或保存点恢复 SeaTunnel 作业时,心跳事件可以将 resumeToken 推进以避免其过期。
  • MongoDB 对单个文档有 16MB 的限制。更改文档包括附加信息,因此即使原始文档不大于 15MB,更改文档也可能超过 16MB 限制,导致更改流操作终止。
  • 建议使用不可变的分片键。在 MongoDB 中,分片键在启用事务后允许修改,但更改分片键可能导致频繁的分片迁移,造成额外的性能开销。此外,修改分片键还可能导致更新查找功能变得无效,在 CDC(更改数据捕获)场景中导致不一致的结果。

如何创建 MongoDB CDC 数据同步作业

将 CDC 数据打印到客户端

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并在本地客户端打印的数据同步作业:

env {# 您可以在此处设置引擎配置parallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpwschema = {fields {"_id" : string,"name" : string,"description" : string,"weight" : string}}}
}# 在本地客户端打印读取的 MongoDB 数据
sink {Console {parallelism = 1}
}

将 CDC 数据写入 MysqlDB

以下示例演示如何创建一个从 MongoDB 读取 CDC 数据并写入 mysql 数据库的数据同步作业:

env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory"]collection = ["inventory.products"]username = stuserpassword = stpw}
}sink {jdbc {url = "jdbc:mysql://mysql_cdc_e2e:3306"driver = "com.mysql.cj.jdbc.Driver"user = "st_user"password = "seatunnel"generate_sink_sql = true# You need to configure both database and tabledatabase = mongodb_cdctable = productsprimary_keys = ["_id"]}
}

多表同步

以下示例演示如何创建一个读取 mongodb 多库表 CDC 数据并在本地客户端打印的数据同步作业:

env {# You can set engine configuration hereparallelism = 1job.mode = "STREAMING"checkpoint.interval = 5000
}source {MongoDB-CDC {hosts = "mongo0:27017"database = ["inventory","crm"]collection = ["inventory.products","crm.test"]username = stuserpassword = stpw}
}# Console printing of the read Mongodb data
sink {Console {parallelism = 1}
}

提示: 多库表 CDC 同步不能指定 schema,只能下游输出 json 数据。这是因为 MongoDB 不提供查询元数据信息,所以如果想支持多表,所有表只能作为一个结构读取。

使用正则表达式匹配多表

以下示例演示如何创建一个通过正则表达式读取 mongodb 多库表数据并在本地客户端打印的数据同步作业:

匹配示例表达式描述
前缀匹配^(test).*匹配数据库名或表名以 test 为前缀的,如 test1, test2 等。
后缀匹配.*[p$]匹配数据库名或表名以 p 为后缀的,如 cdcp, edcp 等。
```
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source { MongoDB-CDC { hosts = "mongo0:27017" # So this example is used (^(test).|^(tpc).|txc|.[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5. database = ["(^(test).|^(tpc).|txc|.[p$]|t{2})"] collection = ["(t[5-8]|tt)"] username = stuser password = stpw } }

Console printing of the read Mongodb data

sink { Console { parallelism = 1 } }


### 实时流数据格式

{ _id : { }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream "operationType" : " ", // The type of change operation that occurred, such as: insert, delete, update, etc. "fullDocument" : { }, // The full document data involved in the change operation. This field does not exist in delete operations "ns" : {
"db" : " ", // The database where the change operation occurred "coll" : " " // The collection where the change operation occurred }, "to" : { // These fields are displayed only when the operation type is 'rename' "db" : " ", // The new database name after the change "coll" : " " // The new collection name after the change }, "source":{ "ts_ms":" ", // The timestamp when the change operation occurred "table":" " // The collection where the change operation occurred "db":" ", // The database where the change operation occurred "snapshot":"false" // Identify the current stage of data synchronization }, "documentKey" : { "_id" : }, // The _id field value of the document involved in the change operation "updateDescription" : { // Description of the update operation "updatedFields" : { }, // The fields and values that the update operation modified "removedFields" : [ " ", ... ] // The fields and values that the update operation removed } "clusterTime" : , // The timestamp of the Oplog log entry corresponding to the change operation "txnNumber" : , // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number "lsid" : { // Represents information related to the Session in which the transaction is located "id" : , "uid" : } }

```

到此本指南就结束了,MongoDB CDC Sink连接器的发布,不仅强化了 Apache SeaTunnel 在数据集成领域的地位,也为开发者提供了更多的可能性。

Apache SeaTunnel 社区也期待您的参与和贡献,共同迈向更广阔的数据处理未来,让我们携手共建一个更加强大、开放、互助的社区!

本文由 白鲸开源科技 提供发布支持!

相关文章:

Apache SeaTunnel MongoDB CDC 使用指南

随着数据驱动决策的重要性日益凸显,实时数据处理成为企业竞争力的关键。SeaTunnel MongoDB CDC(Change Data Capture) 源连接器的推出,为开发者提供了一个高效、灵活的工具,以实现对 MongoDB 数据库变更的实时捕获和处理。 本文将深入探讨该连…...

智能合约 之 部署ERC-20

Remix介绍 Remix是一个由以太坊社区开发的在线集成开发环境(IDE),旨在帮助开发者编写、测试和部署以太坊智能合约。它提供了一个简单易用的界面,使得开发者可以在浏览器中直接进行智能合约的开发,而无需安装任何额外的…...

【C++】用红黑树模拟实现set、map

目录 前言及准备:一、红黑树接口1.1 begin1.2 end1.3 查找1.4 插入1.5 左单旋和右单旋 二、树形迭代器(正向)2.1 前置 三、模拟实现set四、模拟实现map 前言及准备: set、map的底层结构是红黑树,它们的函数通过调用红…...

实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式)

实现:mysql-5.7.42 到 mysql-8.2.0 的升级(二进制方式) 1、操作环境1、查看当前数据库版本2、操作系统版本3、查看 Linux 系统上的 glibc(GNU C 库)版本(**这里很重要,要下载对应的内核mysql版本…...

深入探讨医保购药APP的技术架构与设计思路

随着移动互联网的发展,医疗保健行业也迎来了数字化转型的浪潮。医保购药APP作为医保体系数字化的一部分,其技术架构和设计思路至关重要。接下来,小编将为您讲解医保购药APP的技术架构与设计思路,为相关从业者提供参考和启发。 一、…...

react中点击按钮不能获取最新的state时候

在这个问题中,用户希望在点击确认按钮时触发handleChange函数,并且能够正确获取到最新的bzText值。最初的代码中,在handleOpen函数中弹出一个确认框,并在确认框的onOk回调函数中调用handleChange函数。然而,由于组件传…...

2、鸿蒙学习-申请调试证书和调试Profile文件

申请发布证书 发布证书由AGC颁发的、为HarmonyOS应用配置签名信息的数字证书,可保障软件代码完整性和发布者身份真实性。证书格式为.cer,包含公钥、证书指纹等信息。 说明 请确保您的开发者帐号已实名认证。每个帐号最多申请1个发布证书。 1、登录AppGa…...

蓝桥杯算法基础(13):十大排序算法(希尔排序) (快速排序)c语言版

希尔排序 优化版的插入排序,优化的地方就是步长(增量)增大了,原来的插入排序的步长(增量)是1,而希尔排序的步长(增量)可以很大,然后逐渐减小直到1形成插入排…...

web学习笔记(三十二)

目录 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 1.2call、apply、bind的不同点 1.3call、apply、bind的使用场景 2. 对象的深拷贝 2.1对象的浅拷贝 2.1对象的深拷贝 1.函数的call、apply、bind方法 1.1call、apply、bind的相同点 在没有传参数时&…...

Android 地图SDK 绘制点 删除 指定

问题 Android 地图SDK 删除指定绘制点 详细问题 笔者进行Android 项目开发&#xff0c;对于已标记的绘制点&#xff0c;提供撤回按钮&#xff0c;即删除绘制点&#xff0c;如何实现。 解决方案 新增绘制点 private List<Marker> markerList new ArrayList<>…...

Nodejs 第五十八章(大文件上传)

在现代网站中&#xff0c;越来越多的个性化图片&#xff0c;视频&#xff0c;去展示&#xff0c;因此我们的网站一般都会支持文件上传。 文件上传的方案 大文件上传&#xff1a;将大文件切分成较小的片段&#xff08;通常称为分片或块&#xff09;&#xff0c;然后逐个上传这…...

Linux编译器--gcc/g++的使用

1. gcc与g gcc与g分别是c语言与c代码的编译器&#xff0c;但同时g也兼容c语言。 我们知道在Linux中&#xff0c;系统并不以文件后缀来区分文件类别。但对于gcc与g等编译器而言却是需要的。Linux中c代码文件的后缀是.c&#xff0c;c代码文件的后缀是.cpp(.cc)(.cxx)。 在Linu…...

苍穹外卖-day13:vue基础回顾+进阶

vue基础回顾进阶 课程内容 VUE 基础回顾路由 Vue-Router状态管理 vuexTypeScript 1. VUE 基础回顾 1.1 基于脚手架创建前端工程 1.1.1 环境要求 要想基于脚手架创建前端工程&#xff0c;需要具备如下环境要求&#xff1a; ​ node.js 前端项目的运行环境 学习web阶段已安…...

蓝桥杯/慈善晚会/c\c++

问题描述 热心公益的G哥哥又来举办慈善晚会了&#xff0c;这次他邀请到了巴菲特、马云等巨富&#xff0c;还邀请到了大V、小C等算法界泰斗。晚会一共邀请了n位尊贵的客人&#xff0c;每位客人都位于不同的城市&#xff0c;也就是说每座城市都有且仅有一位客人。这些城市的编号为…...

2024.3.19

思维导图...

【Python】新手入门学习:详细介绍单一职责原则(SRP)及其作用、代码示例

【Python】新手入门学习&#xff1a;详细介绍单一职责原则&#xff08;SRP&#xff09;及其作用、代码示例 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyT…...

【DataWhale学习笔记】使用AgentScope调用qwen大模型

AgentScope AgentScope介绍 AgentScope是一款全新的Multi-Agent框架&#xff0c;专为应用开发者打造&#xff0c;旨在提供高易用、高可靠的编程体验&#xff01; 高易用&#xff1a;AgentScope支持纯Python编程&#xff0c;提供多种语法工具实现灵活的应用流程编排&#xff…...

【C++】手撕AVL树

> 作者简介&#xff1a;დ旧言~&#xff0c;目前大二&#xff0c;现在学习Java&#xff0c;c&#xff0c;c&#xff0c;Python等 > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;能直接手撕AVL树。 > 毒鸡汤&#xff1a;放弃自…...

探索 TorchRe-ID--基于 Python 的人员再识别库

导言 人员再识别&#xff08;re-ID&#xff09;是计算机视觉中的一项重要任务&#xff0c;在监控系统、零售分析和人机交互中有着广泛的应用。TorchRe-ID 是一个功能强大、用户友好的 Python 库&#xff0c;它为人员再识别任务提供了一套全面的工具和模型。在本文中&#xff0…...

鸿蒙Harmony应用开发—ArkTS声明式开发(容器组件:Flex)

以弹性方式布局子组件的容器组件。 说明&#xff1a; 该组件从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。Flex组件在渲染时存在二次布局过程&#xff0c;因此在对性能有严格要求的场景下建议使用Column、Row代替。Flex组…...

告别系统臃肿与隐私泄露:Win11Debloat让Windows效率提升80%

告别系统臃肿与隐私泄露&#xff1a;Win11Debloat让Windows效率提升80% 【免费下载链接】Win11Debloat A simple, lightweight PowerShell script that allows you to remove pre-installed apps, disable telemetry, as well as perform various other changes to declutter a…...

用Docker三分钟部署MetaGPT开发环境(附LLM本地化方案)

三分钟容器化部署MetaGPT全栈开发环境实战指南 容器化部署的价值与优势 在当今快速迭代的AI开发领域&#xff0c;环境配置一直是困扰开发者的首要难题。传统部署方式需要处理Python版本管理、依赖冲突、CUDA驱动兼容等复杂问题&#xff0c;而容器化技术为这一痛点提供了优雅的解…...

CenterPoint实战:基于热力图的3D目标检测与跟踪全解析

1. CenterPoint算法核心思想解析 第一次接触CenterPoint时&#xff0c;最让我惊讶的是它的简洁性。传统3D目标检测就像在游乐场玩"套圈"游戏——需要准备各种尺寸的圆圈&#xff08;锚框&#xff09;去匹配不同形状的奖品&#xff08;物体&#xff09;&#xff0c;而…...

Win11+Ubuntu22.04双系统避坑指南:如何正确分配分区空间(含CUDA安装建议)

Win11Ubuntu 22.04双系统分区策略与CUDA开发环境配置实战 作为一名长期在深度学习领域工作的开发者&#xff0c;我经历过无数次双系统安装的"血泪史"。特别是当项目 deadline 临近&#xff0c;却因为分区不当导致 CUDA 无法安装时&#xff0c;那种绝望感至今难忘。本…...

即时通信|自定义基于 Netty 的二进制协议(应用层协议)+心跳检测

基于IM仿微信聊天的场景&#xff1a;TCP&#xff08;传输层&#xff09;负责&#xff1a;把字节流可靠地从A送到B自定义协议&#xff08;应用层&#xff09;负责&#xff1a;规定字节流的含义┌──────────┬──────────┬─────────────────…...

SecGPT-14B知识库增强:让OpenClaw支持最新CVE漏洞库

SecGPT-14B知识库增强&#xff1a;让OpenClaw支持最新CVE漏洞库 1. 为什么需要给OpenClaw注入CVE知识库 去年处理Log4j2漏洞时&#xff0c;我遇到了一个尴尬场景&#xff1a;当我让OpenClaw帮我检查服务器是否存在CVE-2021-44228漏洞时&#xff0c;它给出的回答是"未找到…...

OpenClaw技能市场挖掘:千问3.5-9B增强插件TOP5

OpenClaw技能市场挖掘&#xff1a;千问3.5-9B增强插件TOP5 1. 为什么需要关注OpenClaw技能市场&#xff1f; 第一次接触OpenClaw时&#xff0c;我以为它只是个简单的自动化脚本工具。直到在项目里连续熬了三个深夜处理邮件分类和会议纪要&#xff0c;才意识到自己错过了什么—…...

002、环境搭建:Python虚拟环境、LangChain安装与核心依赖解析

002、环境搭建&#xff1a;Python虚拟环境、LangChain安装与核心依赖解析从一次深夜调试说起 上周三凌晨两点&#xff0c;我被一个诡异的错误钉在屏幕前&#xff1a;明明本地测试通过的LangChain智能体&#xff0c;在同事的机器上死活跑不起来。报错信息指向一个版本冲突——py…...

OpenClaw+千问3.5-9B学术助手:自动整理参考文献与生成综述

OpenClaw千问3.5-9B学术助手&#xff1a;自动整理参考文献与生成综述 1. 为什么需要自动化文献处理 去年冬天&#xff0c;当我面对堆积如山的PDF文献时&#xff0c;突然意识到传统文献管理方式已经跟不上现代研究的节奏。手动标注重点、复制粘贴引用、反复切换不同文献工具—…...

共享单车智能通信系统架构与技术解析

1. 共享单车通信系统架构解析共享单车的智能通信系统主要由四大核心模块构成&#xff1a;智能车锁、供电系统、通信模块和云端平台。这套系统设计最精妙之处在于&#xff0c;它完美结合了移动通信技术、蓝牙短距传输和GPS定位技术&#xff0c;构建了一个稳定可靠的物联网应用场…...