简化数据流:Apache SeaTunnel实现多表同步的高效指南
Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例说明如何实现这些功能。
单表 to 单表
一个source,一个sink。
从mysql同步到mysql,中间不做区分
env {# You can set flink configuration hereexecution.parallelism = 2job.mode = "BATCH"
}
source{Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "select * from base_region"}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"query = "insert into base_region(id,region_name) values(?,?)"}
}
执行任务
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf
单表 to 多表
一个source,多个sink。
从MySQL同步到MySQL,将一个用户表数据同步过去,中间通过2个sql组件分布将男性用户和女性用户分开,在sink阶段分别插入到不同的表:
env {execution.parallelism = 2job.mode = "BATCH"
}
source {Jdbc {url = "jdbc:mysql://127.0.0.1:3306/test"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"result_table_name="t_user"query = "select * from t_user;"}
}transform {Sql {source_table_name = "t_user"result_table_name = "t_user_nan"query = "select id,name,birth,gender from t_user where gender ='男';"}Sql {source_table_name = "t_user"result_table_name = "t_user_nv"query = "select id,name,birth,gender from t_user where gender ='女';"}
}sink {jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nan"query = "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"}jdbc {url = "jdbc:mysql://127.0.0.1:3306/dw"driver = "com.mysql.cj.jdbc.Driver"connection_check_timeout_sec = 100user = "user"password = "password"source_table_name = "t_user_nv"query = "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf
多表 to 单表
多个source,一个sink。
假如有一张交换器使用情况表,一张路由器使用情况表,目标表是将这种数据合在一起的olap表。
表结构如下:
-- dw 源表1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
) ;INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', '交换器1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', '交换器2', 65);-- dw 源表2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (`event_time` timestamp COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) COMMENT '设备名称',`cpu_usage` INT COMMENT 'CPU使用率百分比'
);INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', '路由器1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', '路由器2', 46);-------------------------------------------------------------------------------
-- olap 目标表
CREATE TABLE `device_performance` (`id` INT NOT NULL AUTO_INCREMENT COMMENT '表主键',`event_time` VARCHAR(32) NOT NULL COMMENT '业务时间',`device_id` VARCHAR(32) COMMENT '设备id',`device_type` VARCHAR(32) COMMENT '设备类型',`device_name` VARCHAR(128) NOT NULL COMMENT '设备名称',`cpu_usage` FLOAT NOT NULL COMMENT 'CPU利用率单位是%',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) COMMENT='设备状态';
将交换器数据和路由器数据一起同步到olap目标表,总结通过sql组件处理:
env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}
执行任务:
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf
作业成功!
多表 to 多表
多个source,多个sink。
将交换器使用情况数据和路由器使用情况数据分别同步到对应的目标表,中间sql组件处理
env {job.mode="BATCH"job.name="device_performance"
}source {Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="switch_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"result_table_name="router_src"query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"}
}transform {Sql {source_table_name = "switch_src"result_table_name = "switch_dst"query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"}Sql {source_table_name = "router_src"result_table_name = "router_dst"query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"}
}sink {Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "switch_dst"query="INSERT INTO device_performance_switch VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}Jdbc {url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"driver="com.mysql.cj.jdbc.Driver"user = "user"password = "password"source_table_name = "router_dst"query="INSERT INTO device_performance_router VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"}
}
结语
综上所述,Apache SeaTunnel多表同步技术具有高效、实时、可靠和灵活的特点,在企业的数据同步领域发挥着重要作用。借助Apache SeaTunnel多表同步功能,企业能够更好地实现不同系统和数据库之间数据的无缝流转,提升数据管理和利用的效率,为业务发展提供有力支持。希望本文能够帮助读者更好地了解和应用Apache SeaTunnel多表同步,从而为企业数据同步带来更多可能性。
原文链接:https://blog.csdn.net/weixin_44586883/article/details/136049897
本文由 白鲸开源科技 提供发布支持!
相关文章:
简化数据流:Apache SeaTunnel实现多表同步的高效指南
Apache SeaTunnel除了单表之间的数据同步之外,也支持单表同步到多表,多表同步到单表,以及多表同步到多表,下面简单举例说明如何实现这些功能。 单表 to 单表 一个source,一个sink。 从mysql同步到mysql,…...
均匀圆形阵列原理及MATLAB仿真
均匀圆形阵列原理及MATLAB仿真 目录 前言 一、均匀圆阵原理 二、圆心不存在阵元方向图仿真 三、圆心存在阵元方向图仿真 四、MATLAB仿真代码 总结 前言 本文详细推导了均匀圆形阵列的方向图函数,对圆心不放置阵元和圆心放置阵元的均匀圆形阵列方向图都进行了仿…...
vue2使用univerjs
1、univerjs Univer 提供了一个全面的企业级文档与数据协同的解决方案,支持电子表格、文本文档和演示幻灯片三大核心文档类型。通过灵活的 API 和插件机制,开发者可以在 Univer 的基础上进行个性化功能的定制和扩展,以适应不同用户在不同场景…...
VUE3 el-table-column header新增必填*
1.在需要加必填星号的el-table-column上添加render-header属性 <el-table-column :label"getName(产品代码)" :render-header"addRedStart" prop"MODELCODE" min-width“4.5%”> <template v-slot"scope"> <el-input …...
条件概率和贝叶斯公式
...
Kali中docker与docker-compose的配置
权限升级 sudo su 升级为root用户 更新软件 apt-get update安装HTTPS协议和CA证书 apt-get install -y apt-transport-https ca-certificates下载docker apt下载docker apt install docker.io 验证docker安装是否成功 查版本 docker -v 启动docker systemctl start …...
C++ | Leetcode C++题解之第283题移动零
题目: 题解: class Solution { public:void moveZeroes(vector<int>& nums) {int n nums.size(), left 0, right 0;while (right < n) {if (nums[right]) {swap(nums[left], nums[right]);left;}right;}} };...
Exponential Moving Average (EMA) in Stable Diffusion
1.Moving Average in Stable Diffusion (SMA&EMA) 1.Moving average 2.移动平均值 3.How We Trained Stable Diffusion for Less than $50k (Part 3) Moving Average 在统计学中,移动平均是通过创建整个数据集中不同选择的一系列平均值来分析数据点的计算。 …...
017、Vue动态tag标签
文章目录 1、先看效果2、代码 1、先看效果 2、代码 <template><div class "tags"><el-tag size"medium"closable v-for"item,index in tags":key"item.path":effect"item.title$route.name?dark:plain"cl…...
RocketMQ 架构概览
Apache RocketMQ 是一个分布式消息中间件和流计算平台,提供低延迟、高性能和可靠的队列服务,并且支持大规模的分布式系统。在详细介绍 RocketMQ 的整体架构之前,先了解其设计目标和核心特性是很重要的。RocketMQ 主要用于处理大规模的消息&am…...
优化医疗数据管理:Kettle ETL 数据采集方案详解
在现代医疗保健领域,数据的准确性、完整性和及时性对于提高医疗服务质量和患者护理至关重要。为了有效管理和利用医疗数据,Kettle ETL(Extract, Transform, Load)数据采集方案成为了许多医疗机构的首选工具之一。本文将深入探讨Ke…...
spring-from表单
在spring boot当中,from表单怎样开发(name=value) 先列出接口所需信息(抓包得到请求信息),将这些必要信息以注解的方式表达出来 步骤: 梳理前置条件(请求地址,请求header,请求方法,请求数据,响应结果)编辑一个普通类,在类上标记注解@Controller: 标记在类上,让类…...
【.NET】asp.net core 程序重启容器后redis无法连接,连接超时
环境是容器化部署asp.net core 程序当有大量请求打到容器如果此时重启容器会出现,redis无法连接情况。 使用 csredis 库报错: Status unavailable, waiting for recovery. Connect to server timeout 使用StackExchange.Redis 报错: Time…...
【vue前端项目实战案例】Vue3仿今日头条App
本文将开发一款仿“今日头条”的新闻App。该案例是基于 Vue3.0 Vue Router webpack TypeScript 等技术栈实现的一款新闻资讯类App,适合有一定Vue框架使用经验的开发者进行学习。 项目源码在文章末尾 1 项目概述 该项目是一款“今日头条”的新闻资讯App…...
常见的文心一言的指令
文心一言,作为百度研发的预训练语言模型“ERNIE 3.0”的一项功能,能够与人对话互动,回答问题,协助创作,高效便捷地帮助人们获取信息、知识和灵感。以下是一些常见的文心一言指令类型及其具体示例: 1. 查询…...
数字货币交易接口实现(含源代码)
数字货币交易接口实现(含源代码) 使用币安交易接口步骤1:注册API密钥步骤2:安装所需库步骤3:使用API进行交易获取市场数据查看账户信息执行交易错误处理安全提示 使用OKX交易接口步骤1:注册API密钥步骤2&am…...
c++函数以及函数分文件编写
1.函数 1.1格式 返回值类型 函数名 (参数列表)//返回值类型指的是return过去的类型 { 函数体语句 return 表达式 } 1.2常见的函数样式 1.无参返回 2.有参返回 3.无参有返 4.有参有返 #include<iostream> using namespace std; int add(int nu…...
【JVM基础06】——组成-直接内存详解
目录 1- 引言:直接内存概述1-1 直接内存是什么?直接内存的定义(What)1-2 为什么用直接内存?Java程序对直接内存的使用 (Why) 2- ⭐核心:详解直接内存(How)2-1 文件拷贝案例介绍对比常规 IO(BIO) 和 NIO常规 IO 的操作流程NIO 的操…...
学术研讨 | 区块链与隐私计算领域专用硬件研讨会顺利召开
学术研讨 近日,国家区块链技术创新中心主办,长安链开源社区支持的“区块链与隐私计算领域专用硬件研讨会”顺利召开,会议围绕基于区块链与隐私计算的生成式AI上链、硬件加速、软硬协同等主题展开讨论,来自复旦大学、清华大学、北京…...
AngularJS API 深入解析
AngularJS API 深入解析 AngularJS,作为一个强大且灵活的JavaScript框架,自从其诞生以来,就一直是前端开发者构建复杂Web应用的首选工具。本文将深入探讨AngularJS的API,帮助读者理解其核心功能和工作原理。 AngularJS简介 AngularJS由Google开发,并于2010年发布。它是…...
提升code-server前端性能的终极指南:渐进式图片加载高级技巧
提升code-server前端性能的终极指南:渐进式图片加载高级技巧 【免费下载链接】code-server VS Code in the browser 项目地址: https://gitcode.com/GitHub_Trending/co/code-server code-server作为一款能在浏览器中运行的VS Code实现,让开发者可…...
超级AI数字员工源码系统,支持贴牌OEM,独立部署交付
温馨提示:文末有资源获取方式最近“龙虾AI”概念很火,到处都在讨论。但说实话,这类技术对普通用户而言存在明显门槛,部署要代码、配置要工程师、日常运行的Token成本也不低——轻度使用每月100-200元,重度甚至单日上千…...
蓝桥杯嵌入式备赛:STM32G431引脚复用功能表,一张图搞定定时器与ADC配置
蓝桥杯嵌入式备赛:STM32G431引脚复用功能实战指南 在蓝桥杯嵌入式赛场上,STM32G431作为官方指定开发平台的核心控制器,其引脚复用功能的灵活配置往往是决定项目成败的关键。许多参赛选手在紧张激烈的比赛中,常常因为引脚配置错误…...
掌握 AgentScope 与 Spring AI Alibaba:大模型多智能体实践指南(收藏版)
本文深入探讨了 AgentScope 与 Spring AI Alibaba 在大模型应用中的多智能体实践。从单智能体优先原则出发,详细解析了 Pipeline、Routing、Skills、Subagents、Supervisor、Handoffs 及 Custom Workflow 等多种多智能体模式,并提供了实用的架构选型指南…...
高效安全:从远程服务器到本地Windows的文件传输全攻略
1. 远程桌面连接:最直观的文件传输方式 远程桌面连接(RDP)是Windows系统自带的"杀手级"功能,我帮客户部署项目时90%的场景都会用它传文件。它的优势在于操作可视化程度高,就像直接在服务器桌面上操作本地文件…...
Pixel Dream Workshop生成图像的自动化软件测试方案
Pixel Dream Workshop生成图像的自动化软件测试方案 1. 当AI艺术遇上软件测试 最近在帮一个电商客户部署Pixel Dream Workshop时,遇到了一个有趣的问题:他们需要批量生成商品展示图,但发现AI生成的质量时好时坏。有时候图片完美符合要求&am…...
「webMAN-MOD」技术探索:构建PS3主机的多功能扩展生态
「webMAN-MOD」技术探索:构建PS3主机的多功能扩展生态 【免费下载链接】webMAN-MOD Extended services for PS3 console (web server, ftp server, netiso, ntfs, ps3mapi, etc.) 项目地址: https://gitcode.com/gh_mirrors/we/webMAN-MOD 一、基础认知&…...
避坑指南:SpringBoot整合Drools 7.20时热部署冲突的解决方案
SpringBoot与Drools 7.20热部署冲突深度排查指南 当SpringBoot的devtools热部署功能遇上Drools规则引擎,就像两个高效率的工人同时修改同一台机器——看似都能独立工作,组合时却可能引发难以察觉的运行时故障。本文将带您深入这个典型的技术冲突现场&…...
MBPFan技术解析:MacBook在Linux环境下的智能散热控制机制
MBPFan技术解析:MacBook在Linux环境下的智能散热控制机制 【免费下载链接】mbpfan 项目地址: https://gitcode.com/gh_mirrors/mb/mbpfan 在Linux系统上使用MacBook的用户经常面临散热管理的技术挑战,系统原生的温度控制策略往往无法充分发挥苹果…...
Java后端开发——真实面试汇总(持续更新)
一.浙江大学研究院一面(面试Time:1小时30分钟)1. 面试官自我介绍,同时我开始自我介绍2. 平时接触到哪些数据结构?3. ArrayList和LinkedList的主要区别是什么?4. 数组和链表的主要区别是什么?5.…...
