简化数据流:Apache SeaTunnel实现多表同步的高效指南
Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例说明如何实现这些功能。
单表 to 单表
一个source,一个sink。
从mysql同步到mysql,中间不做区分
env {# You can set flink configuration hereexecution.parallelism = 2job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "select * from base_region"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "insert into base_region(id,region_name) values(?,?)"}
}
执行任务
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf
单表 to 多表
一个source,多个sink。
从MySQL同步到MySQL,将一个用户表数据同步过去,中间通过2个sql组件分布将男性用户和女性用户分开,在sink阶段分别插入到不同的表:
env {execution.parallelism = 2job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"result_table_name="t_user"query = "select * from t_user;"}
}transform {Sql {source_table_name = "t_user"result_table_name = "t_user_nan"query = "select id,name,birth,gender from t_user where gender ='男';"}Sql {source_table_name = "t_user"result_table_name = "t_user_nv"query = "select id,name,birth,gender from t_user where gender ='女';"}
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nan"query = "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"}jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nv"query = "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf
多表 to 单表
多个source,一个sink。
假如有一张交换器使用情况表,一张路由器使用情况表,目标表是将这种数据合在一起的olap表。
表结构如下:
-- dw 源表1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
) ;INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);-- dw 源表2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
);INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);-------------------------------------------------------------------------------
-- olap 目标表
CREATE TABLE `device_performance` (`id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',`event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',`cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) COMMENT='设备状态';
将交换器数据和路由器数据一起同步到olap目标表,总结通过sql组件处理:
env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}
执行任务:
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf
作业成功!
多表 to 多表
多个source,多个sink。
将交换器使用情况数据和路由器使用情况数据分别同步到对应的目标表,中间sql组件处理
env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance_switch VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance_router VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}
结语
综上所述,Apache SeaTunnel多表同步技术具有高效、实时、可靠和灵活的特点,在企业的数据同步领域发挥着重要作用。借助Apache SeaTunnel多表同步功能,企业能够更好地实现不同系统和数据库之间数据的无缝流转,提升数据管理和利用的效率,为业务发展提供有力支持。希望本文能够帮助读者更好地了解和应用Apache SeaTunnel多表同步,从而为企业数据同步带来更多可能性。
原文链接:https://blog.csdn.net/weixin_44586883/article/details/136049897
本文由 白鲸开源科技 提供发布支持!
相关文章:
简化数据流:Apache SeaTunnel实现多表同步的高效指南
Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例说明如何实现这些功能。 单表 to 单表 一个source,一个sink。 从mysql同步到mysql,…...
均匀圆形阵列原理及MATLAB仿真
均匀圆形阵列原理及MATLAB仿真 目录 前言 一、均匀圆阵原理 二、圆心不存在阵元方向图仿真 三、圆心存在阵元方向图仿真 四、MATLAB仿真代码 总结 前言 本文详细推导了均匀圆形阵列的方向图函数,对圆心不放置阵元和圆心放置阵元的均匀圆形阵列方向图都进行了仿…...
vue2使用univerjs
1、univerjs Univer 提供了一个全面的企业级文档与数据协同的解决方案,支持电子表格、文本文档和演示幻灯片三大核心文档类型。通过灵活的 API 和插件机制,开发者可以在 Univer 的基础上进行个性化功能的定制和扩展,以适应不同用户在不同场景…...
VUE3 el-table-column header新增必填*
1.在需要加必填星号的el-table-column上添加render-header属性 <el-table-column :label"getName(产品代码)" :render-header"addRedStart" prop"MODELCODE" min-width“4.5%”> <template v-slot"scope"> <el-input …...
条件概率和贝叶斯公式
...
Kali中docker与docker-compose的配置
权限升级 sudo su 升级为root用户 更新软件 apt-get update安装HTTPS协议和CA证书 apt-get install -y apt-transport-https ca-certificates下载docker apt下载docker apt install docker.io 验证docker安装是否成功 查版本 docker -v 启动docker systemctl start …...
C++ | Leetcode C++题解之第283题移动零
题目: 题解: class Solution { public:void moveZeroes(vector<int>& nums) {int n nums.size(), left 0, right 0;while (right < n) {if (nums[right]) {swap(nums[left], nums[right]);left;}right;}} };...
Exponential Moving Average (EMA) in Stable Diffusion
1.Moving Average in Stable Diffusion (SMA&EMA) 1.Moving average 2.移动平均值 3.How We Trained Stable Diffusion for Less than $50k (Part 3) Moving Average 在统计学中,移动平均是通过创建整个数据集中不同选择的一系列平均值来分析数据点的计算。 …...
017、Vue动态tag标签
文章目录 1、先看效果2、代码 1、先看效果 2、代码 <template><div class "tags"><el-tag size"medium"closable v-for"item,index in tags":key"item.path":effect"item.title$route.name?dark:plain"cl…...
RocketMQ 架构概览
Apache RocketMQ 是一个分布式消息中间件和流计算平台,提供低延迟、高性能和可靠的队列服务,并且支持大规模的分布式系统。在详细介绍 RocketMQ 的整体架构之前,先了解其设计目标和核心特性是很重要的。RocketMQ 主要用于处理大规模的消息&am…...
优化医疗数据管理:Kettle ETL 数据采集方案详解
在现代医疗保健领域,数据的准确性、完整性和及时性对于提高医疗服务质量和患者护理至关重要。为了有效管理和利用医疗数据,Kettle ETL(Extract, Transform, Load)数据采集方案成为了许多医疗机构的首选工具之一。本文将深入探讨Ke…...
spring-from表单
在spring boot当中,from表单怎样开发(name=value) 先列出接口所需信息(抓包得到请求信息),将这些必要信息以注解的方式表达出来 步骤: 梳理前置条件(请求地址,请求header,请求方法,请求数据,响应结果)编辑一个普通类,在类上标记注解@Controller: 标记在类上,让类…...
【.NET】asp.net core 程序重启容器后redis无法连接,连接超时
环境是容器化部署asp.net core 程序当有大量请求打到容器如果此时重启容器会出现,redis无法连接情况。 使用 csredis 库报错: Status unavailable, waiting for recovery. Connect to server timeout 使用StackExchange.Redis 报错: Time…...
【vue前端项目实战案例】Vue3仿今日头条App
本文将开发一款仿“今日头条”的新闻App。该案例是基于 Vue3.0 Vue Router webpack TypeScript 等技术栈实现的一款新闻资讯类App,适合有一定Vue框架使用经验的开发者进行学习。 项目源码在文章末尾 1 项目概述 该项目是一款“今日头条”的新闻资讯App…...
常见的文心一言的指令
文心一言,作为百度研发的预训练语言模型“ERNIE 3.0”的一项功能,能够与人对话互动,回答问题,协助创作,高效便捷地帮助人们获取信息、知识和灵感。以下是一些常见的文心一言指令类型及其具体示例: 1. 查询…...
数字货币交易接口实现(含源代码)
数字货币交易接口实现(含源代码) 使用币安交易接口步骤1:注册API密钥步骤2:安装所需库步骤3:使用API进行交易获取市场数据查看账户信息执行交易错误处理安全提示 使用OKX交易接口步骤1:注册API密钥步骤2&am…...
c++函数以及函数分文件编写
1.函数 1.1格式 返回值类型 函数名 (参数列表)//返回值类型指的是return过去的类型 { 函数体语句 return 表达式 } 1.2常见的函数样式 1.无参返回 2.有参返回 3.无参有返 4.有参有返 #include<iostream> using namespace std; int add(int nu…...
【JVM基础06】——组成-直接内存详解
目录 1- 引言:直接内存概述1-1 直接内存是什么?直接内存的定义(What)1-2 为什么用直接内存?Java程序对直接内存的使用 (Why) 2- ⭐核心:详解直接内存(How)2-1 文件拷贝案例介绍对比常规 IO(BIO) 和 NIO常规 IO 的操作流程NIO 的操…...
学术研讨 | 区块链与隐私计算领域专用硬件研讨会顺利召开
学术研讨 近日,国家区块链技术创新中心主办,长安链开源社区支持的“区块链与隐私计算领域专用硬件研讨会”顺利召开,会议围绕基于区块链与隐私计算的生成式AI上链、硬件加速、软硬协同等主题展开讨论,来自复旦大学、清华大学、北京…...
AngularJS API 深入解析
AngularJS API 深入解析 AngularJS,作为一个强大且灵活的JavaScript框架,自从其诞生以来,就一直是前端开发者构建复杂Web应用的首选工具。本文将深入探讨AngularJS的API,帮助读者理解其核心功能和工作原理。 AngularJS简介 AngularJS由Google开发,并于2010年发布。它是…...
练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...
python/java环境配置
环境变量放一起 python: 1.首先下载Python Python下载地址:Download Python | Python.org downloads ---windows -- 64 2.安装Python 下面两个,然后自定义,全选 可以把前4个选上 3.环境配置 1)搜高级系统设置 2…...
工程地质软件市场:发展现状、趋势与策略建议
一、引言 在工程建设领域,准确把握地质条件是确保项目顺利推进和安全运营的关键。工程地质软件作为处理、分析、模拟和展示工程地质数据的重要工具,正发挥着日益重要的作用。它凭借强大的数据处理能力、三维建模功能、空间分析工具和可视化展示手段&…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
IT供电系统绝缘监测及故障定位解决方案
随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...
selenium学习实战【Python爬虫】
selenium学习实战【Python爬虫】 文章目录 selenium学习实战【Python爬虫】一、声明二、学习目标三、安装依赖3.1 安装selenium库3.2 安装浏览器驱动3.2.1 查看Edge版本3.2.2 驱动安装 四、代码讲解4.1 配置浏览器4.2 加载更多4.3 寻找内容4.4 完整代码 五、报告文件爬取5.1 提…...
docker 部署发现spring.profiles.active 问题
报错: org.springframework.boot.context.config.InvalidConfigDataPropertyException: Property spring.profiles.active imported from location class path resource [application-test.yml] is invalid in a profile specific resource [origin: class path re…...
关键领域软件测试的突围之路:如何破解安全与效率的平衡难题
在数字化浪潮席卷全球的今天,软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件,这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下,实现高效测试与快速迭代?这一命题正考验着…...
在QWebEngineView上实现鼠标、触摸等事件捕获的解决方案
这个问题我看其他博主也写了,要么要会员、要么写的乱七八糟。这里我整理一下,把问题说清楚并且给出代码,拿去用就行,照着葫芦画瓢。 问题 在继承QWebEngineView后,重写mousePressEvent或event函数无法捕获鼠标按下事…...
LRU 缓存机制详解与实现(Java版) + 力扣解决
📌 LRU 缓存机制详解与实现(Java版) 一、📖 问题背景 在日常开发中,我们经常会使用 缓存(Cache) 来提升性能。但由于内存有限,缓存不可能无限增长,于是需要策略决定&am…...
